import pytest import struct import time import asyncio import logging import random from asyncio import StreamReader, StreamWriter from math import isclose from async_stream import AsyncIfcImpl, StreamPtr from gen3plus.solarman_v5 import SolarmanV5, SolarmanBase from cnf.config import Config from infos import Infos, Register from modbus import Modbus from messages import State, Message from proxy import Proxy from test_inverter_g3p import FakeReader, FakeWriter, patch_open_connection from inverter_base import InverterBase from test_modbus_tcp import test_port, test_hostname pytest_plugins = ('pytest_asyncio',) # initialize the proxy statistics Infos.static_init() timestamp = int(time.time()) # 1712861197 heartbeat = 60 class Mqtt(): def __init__(self): self.clear() def clear(self): self.key = '' self.data = '' async def publish(self, key, data): self.key = key self.data = data class FakeIfc(AsyncIfcImpl): def __init__(self): super().__init__() self.remote = StreamPtr(None) async def create_remote(self): await asyncio.sleep(0) class FakeInverter(): def __init__(self): self.forward_at_cmd_resp = False class MemoryStream(SolarmanV5): def __init__(self, msg, chunks = (0,), server_side: bool = True, inverter=FakeInverter()): _ifc = FakeIfc() super().__init__(inverter, ('test.local', 1234), _ifc, server_side, client_mode=False) if server_side: self.mb.timeout = 0.4 # overwrite for faster testing self.mb_first_timeout = 0.5 self.mb_timeout = 0.5 self.sent_pdu = b'' self.ifc.tx_fifo.reg_trigger(self.write_cb) self.__msg = msg self.__msg_len = len(msg) self.__chunks = chunks self.__offs = 0 self.__chunk_idx = 0 self.msg_count = 0 self.addr = 'Test: SrvSide' self.db.stat['proxy']['Invalid_Msg_Format'] = 0 self.db.stat['proxy']['AT_Command'] = 0 self.db.stat['proxy']['AT_Command_Blocked'] = 0 self.test_exception_async_write = False self.at_acl = {'mqtt': {'allow': ['AT+'], 'block': ['AT+WEBU']}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE', 'AT+TIME'], 'block': ['AT+WEBU']}} self.key = '' self.data = '' self.msg_recvd = [] def write_cb(self): if self.test_exception_async_write: raise RuntimeError("Peer closed.") self.sent_pdu = self.ifc.tx_fifo.get() def _timestamp(self): return timestamp def _heartbeat(self) -> int: return heartbeat def append_msg(self, msg): self.__msg += msg self.__msg_len += len(msg) self.__chunk_idx = 0 def publish_mqtt(self, key, data): Proxy.mqtt.key = key Proxy.mqtt.data = data def _read(self) -> int: copied_bytes = 0 try: if (self.__offs < self.__msg_len): chunk_len = self.__chunks[self.__chunk_idx] self.__chunk_idx += 1 if chunk_len!=0: self.ifc.rx_fifo += self.__msg[self.__offs:chunk_len] copied_bytes = chunk_len - self.__offs self.__offs = chunk_len else: self.ifc.rx_fifo += self.__msg[self.__offs:] copied_bytes = self.__msg_len - self.__offs self.__offs = self.__msg_len except Exception: pass # ignore exceptions here return copied_bytes def createClientStream(self, msg, chunks = (0,)): c = MemoryStream(msg, chunks, False) self.ifc.remote.stream = c c.ifc.remote.stream = self return c def _SolarmanBase__flush_recv_msg(self) -> None: self.msg_recvd.append( { 'control': self.control, 'seq': str(self.seq), 'data_len': self.data_len } ) super()._SolarmanBase__flush_recv_msg() self.msg_count += 1 def get_sn() -> bytes: return b'\x21\x43\x65\x7b' def get_sn_int() -> int: return 2070233889 def get_dcu_sn() -> bytes: return b'\x20\x43\x65\x7b' def get_dcu_sn_int() -> int: return 2070233888 def get_inv_no() -> bytes: return b'T170000000000001' def get_invalid_sn(): return b'R170000000000002' def total(): ts = timestamp # convert int to little-endian bytes return struct.pack(' None: """forward handler transmits data over the remote connection""" # dst.ifc.update_header_cb(src.fwd_fifo.peek()) dst.ifc.tx_add(src.ifc.fwd_fifo.get()) @pytest.mark.asyncio async def test_proxy_at_cmd(my_loop, config_tsun_inv1, patch_open_connection, at_command_ind_msg, at_command_rsp_msg): _ = config_tsun_inv1 _ = patch_open_connection assert asyncio.get_running_loop() with InverterTest(FakeReader(), FakeWriter(), client_mode=False) as inverter: await inverter.create_remote() await asyncio.sleep(0) r = inverter.remote.stream l = inverter.local.stream l.db.stat['proxy']['AT_Command'] = 0 l.db.stat['proxy']['Unknown_Ctrl'] = 0 l.db.stat['proxy']['AT_Command_Blocked'] = 0 l.db.stat['proxy']['Modbus_Command'] = 0 inverter.forward_at_cmd_resp = False r.append_msg(at_command_ind_msg) r.read() # read complete msg, and dispatch msg assert inverter.forward_at_cmd_resp inverter.forward(r,l) assert l.ifc.tx_fifo.get()==at_command_ind_msg assert l.db.stat['proxy']['Invalid_Msg_Format'] == 0 assert l.db.stat['proxy']['AT_Command'] == 1 assert l.db.stat['proxy']['AT_Command_Blocked'] == 0 assert l.db.stat['proxy']['Modbus_Command'] == 0 l.append_msg(at_command_rsp_msg) l.read() # read at resp assert l.ifc.fwd_fifo.peek()==at_command_rsp_msg inverter.forward(l,r) assert r.ifc.tx_fifo.get()==at_command_rsp_msg assert Proxy.mqtt.key == '' assert Proxy.mqtt.data == "" @pytest.mark.asyncio async def test_proxy_at_blocked(my_loop, config_tsun_inv1, patch_open_connection, at_command_ind_msg_block, at_command_rsp_msg): _ = config_tsun_inv1 _ = patch_open_connection assert asyncio.get_running_loop() with InverterTest(FakeReader(), FakeWriter(), client_mode=False) as inverter: await inverter.create_remote() await asyncio.sleep(0) r = inverter.remote.stream l = inverter.local.stream l.db.stat['proxy']['AT_Command'] = 0 l.db.stat['proxy']['Unknown_Ctrl'] = 0 l.db.stat['proxy']['AT_Command_Blocked'] = 0 l.db.stat['proxy']['Modbus_Command'] = 0 inverter.forward_at_cmd_resp = False r.append_msg(at_command_ind_msg_block) r.read() # read complete msg, and dispatch msg assert not inverter.forward_at_cmd_resp inverter.forward(r,l) assert l.ifc.tx_fifo.get()==b'' assert l.db.stat['proxy']['Invalid_Msg_Format'] == 0 assert l.db.stat['proxy']['AT_Command'] == 0 assert l.db.stat['proxy']['AT_Command_Blocked'] == 1 assert l.db.stat['proxy']['Modbus_Command'] == 0 l.append_msg(at_command_rsp_msg) l.read() # read at resp assert l.ifc.fwd_fifo.peek()==b'' inverter.forward(l,r) assert r.ifc.tx_fifo.get()==b'' assert Proxy.mqtt.key == 'tsun/inv1/at_resp' assert Proxy.mqtt.data == "+ok" @pytest.mark.asyncio async def test_dcu_cmd(my_loop, config_tsun_allow_all, dcu_dev_ind_msg, dcu_dev_rsp_msg, dcu_data_ind_msg, dcu_data_rsp_msg, dcu_command_ind_msg, dcu_command_rsp_msg): '''test dcu_power command fpr a DCU device with sensor 0x3026''' _ = config_tsun_allow_all m = MemoryStream(dcu_dev_ind_msg, (0,), True) m.read() # read device ind assert m.control == 0x4110 assert str(m.seq) == '01:92' assert m.ifc.tx_fifo.get()==dcu_dev_rsp_msg assert m.ifc.fwd_fifo.get()==dcu_dev_ind_msg m.send_dcu_cmd(b'\x01\x01\x06\x01\x00\x01\x03\xe8') assert m.ifc.tx_fifo.get()==b'' assert m.ifc.fwd_fifo.get()==b'' assert m.sent_pdu == b'' assert str(m.seq) == '01:92' assert Proxy.mqtt.key == '' assert Proxy.mqtt.data == "" m.append_msg(dcu_data_ind_msg) m.read() # read inverter ind assert m.control == 0x4210 assert str(m.seq) == '02:93' assert m.ifc.tx_fifo.get()==dcu_data_rsp_msg assert m.ifc.fwd_fifo.get()==dcu_data_ind_msg m.send_dcu_cmd(b'\x01\x01\x06\x01\x00\x01\x03\xe8') assert m.ifc.fwd_fifo.get() == b'' assert m.ifc.tx_fifo.get()== b'' assert m.sent_pdu == dcu_command_ind_msg m.sent_pdu = bytearray() assert str(m.seq) == '02:94' assert Proxy.mqtt.key == '' assert Proxy.mqtt.data == "" m.append_msg(dcu_command_rsp_msg) m.read() # read at resp assert m.control == 0x1510 assert str(m.seq) == '03:94' assert m.ifc.rx_get()==b'' assert m.ifc.tx_fifo.get()==b'' assert m.ifc.fwd_fifo.get()==b'' assert Proxy.mqtt.key == 'tsun/dcu_resp' assert Proxy.mqtt.data == "+ok" Proxy.mqtt.clear() # clear last test result @pytest.mark.asyncio async def test_dcu_cmd_not_supported(my_loop, config_tsun_allow_all, device_ind_msg, device_rsp_msg, inverter_ind_msg, inverter_rsp_msg): '''test that an inverter don't accept the dcu_power command''' _ = config_tsun_allow_all m = MemoryStream(device_ind_msg, (0,), True) m.read() # read device ind assert m.control == 0x4110 assert str(m.seq) == '01:01' assert m.ifc.tx_fifo.get()==device_rsp_msg assert m.ifc.fwd_fifo.get()==device_ind_msg m.send_dcu_cmd(b'\x01\x01\x06\x01\x00\x01\x03\xe8') assert m.ifc.tx_fifo.get()==b'' assert m.ifc.fwd_fifo.get()==b'' assert m.sent_pdu == b'' assert str(m.seq) == '01:01' assert Proxy.mqtt.key == '' assert Proxy.mqtt.data == "" m.append_msg(inverter_ind_msg) m.read() # read inverter ind assert m.control == 0x4210 assert str(m.seq) == '02:02' assert m.ifc.tx_fifo.get()==inverter_rsp_msg assert m.ifc.fwd_fifo.get()==inverter_ind_msg m.send_dcu_cmd(b'\x01\x01\x06\x01\x00\x01\x03\xe8') assert m.ifc.fwd_fifo.get() == b'' assert m.ifc.tx_fifo.get()== b'' assert m.sent_pdu == b'' Proxy.mqtt.clear() # clear last test result @pytest.mark.asyncio async def test_proxy_dcu_cmd(my_loop, config_tsun_dcu1, patch_open_connection, dcu_command_ind_msg, dcu_command_rsp_msg): _ = config_tsun_inv1 _ = patch_open_connection assert asyncio.get_running_loop() with InverterTest(FakeReader(), FakeWriter(), client_mode=False) as inverter: await inverter.create_remote() await asyncio.sleep(0) r = inverter.remote.stream l = inverter.local.stream l.db.stat['proxy']['DCU_Command'] = 0 l.db.stat['proxy']['AT_Command'] = 0 l.db.stat['proxy']['Unknown_Ctrl'] = 0 l.db.stat['proxy']['AT_Command_Blocked'] = 0 l.db.stat['proxy']['Modbus_Command'] = 0 inverter.forward_dcu_cmd_resp = False r.append_msg(dcu_command_ind_msg) r.read() # read complete msg, and dispatch msg assert inverter.forward_dcu_cmd_resp inverter.forward(r,l) assert l.ifc.tx_fifo.get()==dcu_command_ind_msg assert l.db.stat['proxy']['Invalid_Msg_Format'] == 0 assert l.db.stat['proxy']['DCU_Command'] == 1 assert l.db.stat['proxy']['AT_Command'] == 0 assert l.db.stat['proxy']['AT_Command_Blocked'] == 0 assert l.db.stat['proxy']['Modbus_Command'] == 0 l.append_msg(dcu_command_rsp_msg) l.read() # read at resp assert l.ifc.fwd_fifo.peek()==dcu_command_rsp_msg inverter.forward(l,r) assert r.ifc.tx_fifo.get()==dcu_command_rsp_msg assert Proxy.mqtt.key == '' assert Proxy.mqtt.data == ""