diff --git a/app/src/async_stream.py b/app/src/async_stream.py index b023c23..ca642b2 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -305,6 +305,14 @@ class AsyncStream(AsyncIfcImpl): f"Fwd Exception for {self.r_addr}:\n" f"{traceback.format_exc()}") + async def publish_outstanding_mqtt(self): + '''Publish all outstanding MQTT topics''' + try: + await self.async_publ_mqtt() + await Proxy._async_publ_mqtt_proxy_stat('proxy') + except Exception: + pass + class AsyncStreamServer(AsyncStream): def __init__(self, reader: StreamReader, writer: StreamWriter, @@ -354,14 +362,6 @@ class AsyncStreamServer(AsyncStream): self.remote.ifc._writer.write(self.fwd_fifo.get()) await self.remote.ifc._writer.drain() - async def publish_outstanding_mqtt(self): - '''Publish all outstanding MQTT topics''' - try: - await self.async_publ_mqtt() - await Proxy._async_publ_mqtt_proxy_stat('proxy') - except Exception: - pass - class AsyncStreamClient(AsyncStream): def __init__(self, reader: StreamReader, writer: StreamWriter, @@ -381,7 +381,11 @@ class AsyncStreamClient(AsyncStream): async def client_loop(self, _: str) -> None: '''Loop for receiving messages from the TSUN cloud (client-side)''' + Infos.inc_counter('Cloud_Conn_Cnt') + await self.publish_outstanding_mqtt() await self.loop() + Infos.dec_counter('Cloud_Conn_Cnt') + await self.publish_outstanding_mqtt() logger.info(f'[{self.node_id}:{self.conn_no}] ' 'Client loop stopped for' f' l{self.l_addr}') diff --git a/app/src/config.py b/app/src/config.py index 02138e7..3424bd9 100644 --- a/app/src/config.py +++ b/app/src/config.py @@ -57,7 +57,8 @@ class Config(): Optional('client_mode'): { 'host': Use(str), Optional('port', default=8899): - And(Use(int), lambda n: 1024 <= n <= 65535) + And(Use(int), lambda n: 1024 <= n <= 65535), + Optional('forward', default=False): Use(bool), }, Optional('modbus_polling', default=True): Use(bool), Optional('suggested_area', default=""): Use(str), diff --git a/app/src/gen3/infos_g3.py b/app/src/gen3/infos_g3.py index 480fc94..950a956 100644 --- a/app/src/gen3/infos_g3.py +++ b/app/src/gen3/infos_g3.py @@ -75,7 +75,15 @@ class RegisterMap: 0x00000258: {'reg': Register.EVENT_BF1}, 0x000002bc: {'reg': Register.EVENT_BF2}, 0x00000064: {'reg': Register.INVERTER_STATUS}, + + 0x00000fa0: {'reg': Register.BOOT_STATUS}, + 0x00001004: {'reg': Register.DSP_STATUS}, + 0x000010cc: {'reg': Register.WORK_MODE}, + 0x000011f8: {'reg': Register.OUTPUT_SHUTDOWN}, 0x0000125c: {'reg': Register.MAX_DESIGNED_POWER}, + 0x000012c0: {'reg': Register.RATED_LEVEL}, + 0x00001324: {'reg': Register.INPUT_COEFFICIENT, 'ratio': 100/1024}, + 0x00001388: {'reg': Register.GRID_VOLT_CAL_COEF}, 0x00003200: {'reg': Register.OUTPUT_COEFFICIENT, 'ratio': 100/1024}, } diff --git a/app/src/gen3plus/infos_g3p.py b/app/src/gen3plus/infos_g3p.py index 443dfac..aec8bfb 100644 --- a/app/src/gen3plus/infos_g3p.py +++ b/app/src/gen3plus/infos_g3p.py @@ -17,17 +17,17 @@ class RegisterMap: map = { # 0x41020007: {'reg': Register.DEVICE_SNR, 'fmt': ' None: + logging.info('SolarmanEmu.close()') + # we have references to methods of this class in self.switch + # so we have to erase self.switch, otherwise this instance can't be + # deallocated by the garbage collector ==> we get a memory leak + self.switch.clear() + self.log_lvl.clear() + self.hb_timer.close() + self.data_timer.close() + self.db = None + super().close() + + def _set_serial_no(self, snr: int): + logging.debug(f'SolarmanEmu._set_serial_no, snr: {snr}') + self.unique_id = str(snr) + + def _init_new_client_conn(self) -> bool: + logging.debug('SolarmanEmu.init_new()') + self.data_timer.start(self.data_up_inv) + return False + + def next_pkt_cnt(self): + '''get the next packet number''' + self.pkt_cnt = (self.pkt_cnt + 1) & 0xffffffff + return self.pkt_cnt + + def seconds_since_last_sync(self): + '''get seconds since last 0x4110 message was sent''' + return self._emu_timestamp() - self.last_sync + + def send_heartbeat_cb(self, exp_cnt): + '''send a heartbeat to the TSUN cloud''' + self._build_header(0x4710) + self.ifc.tx_add(struct.pack(' float: + '''process all received messages in the _recv_buffer''' + self._read() + while True: + if not self.header_valid: + self.__parse_header(self.ifc.rx_peek(), + self.ifc.rx_len()) + + if self.header_valid and self.ifc.rx_len() >= \ + (self.header_len + self.data_len+2): + self.__process_complete_received_msg() + self.__flush_recv_msg() + else: + return 0 # wait 0s before sending a response + ''' + Our public methods + ''' + def _flow_str(self, server_side: bool, type: str): # noqa: F821 + switch = { + 'rx': ' <', + 'tx': ' >', + 'forwrd': '<< ', + 'drop': ' xx', + 'rxS': '> ', + 'txS': '< ', + 'forwrdS': ' >>', + 'dropS': 'xx ', + } + if server_side: + type += 'S' + return switch.get(type, '???') + + def get_fnc_handler(self, ctrl): + fnc = self.switch.get(ctrl, self.msg_unknown) + if callable(fnc): + return fnc, repr(fnc.__name__) + else: + return self.msg_unknown, repr(fnc) + + def _build_header(self, ctrl) -> None: + '''build header for new transmit message''' + self.send_msg_ofs = self.ifc.tx_len() + + self.ifc.tx_add(struct.pack( + ' None: + '''finish the transmit message, set lenght and checksum''' + _len = self.ifc.tx_len() - self.send_msg_ofs + struct.pack_into(' None: + + if (buf_len < self.header_len): # enough bytes for complete header? + return + + result = struct.unpack_from(' bool: + crc = buf[self.data_len+11] + stop = buf[self.data_len+12] + if stop != 0x15: + hex_dump_memory(logging.ERROR, + 'Drop packet w invalid stop byte from ' + f'{self.addr}:', buf, buf_len) + self.inc_counter('Invalid_Msg_Format') + if self.ifc.rx_len() > (self.data_len+13): + next_start = buf[self.data_len+13] + if next_start != 0xa5: + # erase broken recv buffer + self.ifc.rx_clear() + + return False + + check = sum(buf[1:buf_len-2]) & 0xff + if check != crc: + self.inc_counter('Invalid_Msg_Format') + logger.debug(f'CRC {int(crc):#02x} {int(check):#08x}' + f' Stop:{int(stop):#02x}') + # start & stop byte are valid, discard only this message + return False + + return True + + def __flush_recv_msg(self) -> None: + self.ifc.rx_get(self.header_len + self.data_len+2) + self.header_valid = False + + def __dispatch_msg(self) -> None: + _fnc, _str = self.get_fnc_handler(self.control) + if self.unique_id: + logger.info(self._flow_str(self.server_side, 'rx') + + f' Ctl: {int(self.control):#04x}' + + f' Msg: {_str}') + _fnc() + else: + logger.info(self._flow_str(self.server_side, 'drop') + + f' Ctl: {int(self.control):#04x}' + + f' Msg: {_str}') + + ''' + Message handler methods + ''' + def msg_response(self): + data = self.ifc.rx_peek()[self.header_len:] + result = struct.unpack_from(' float: - '''process all received messages in the _recv_buffer''' - self._read() - while True: - if not self.header_valid: - self.__parse_header(self.ifc.rx_peek(), - self.ifc.rx_len()) - - if self.header_valid and self.ifc.rx_len() >= \ - (self.header_len + self.data_len+2): - self.__process_complete_received_msg() - self.__flush_recv_msg() - else: - return 0 # wait 0s before sending a response - - def __process_complete_received_msg(self): - log_lvl = self.log_lvl.get(self.control, logging.WARNING) - if callable(log_lvl): - log_lvl = log_lvl() - self.ifc.rx_log(log_lvl, f'Received from {self.addr}:') - # self._recv_buffer, self.header_len + - # self.data_len+2) - if self.__trailer_is_ok(self.ifc.rx_peek(), self.header_len - + self.data_len + 2): - if self.state == State.init: - self.state = State.received - self.__set_serial_no(self.snr) - self.__dispatch_msg() - def forward(self, buffer, buflen) -> None: '''add the actual receive msg to the forwarding queue''' if self.no_forwarding: @@ -252,158 +434,34 @@ class SolarmanV5(Message): self.ifc.fwd_add(buffer[:buflen]) self.ifc.fwd_log(logging.DEBUG, 'Store for forwarding:') - fnc = self.switch.get(self.control, self.msg_unknown) - logger.info(self.__flow_str(self.server_side, 'forwrd') + + _, _str = self.get_fnc_handler(self.control) + logger.info(self._flow_str(self.server_side, 'forwrd') + f' Ctl: {int(self.control):#04x}' - f' Msg: {fnc.__name__!r}') + f' Msg: {_str}') def _init_new_client_conn(self) -> bool: return False - ''' - Our private methods - ''' - def __flow_str(self, server_side: bool, type: str): # noqa: F821 - switch = { - 'rx': ' <', - 'tx': ' >', - 'forwrd': '<< ', - 'drop': ' xx', - 'rxS': '> ', - 'txS': '< ', - 'forwrdS': ' >>', - 'dropS': 'xx ', - } - if server_side: - type += 'S' - return switch.get(type, '???') - - def _timestamp(self): - # utc as epoche - return int(time.time()) # pragma: no cover - def _heartbeat(self) -> int: return 60 # pragma: no cover - def __parse_header(self, buf: bytes, buf_len: int) -> None: - - if (buf_len < self.header_len): # enough bytes for complete header? - return - - result = struct.unpack_from(' bool: - crc = buf[self.data_len+11] - stop = buf[self.data_len+12] - if stop != 0x15: - hex_dump_memory(logging.ERROR, - 'Drop packet w invalid stop byte from ' - f'{self.addr}:', buf, buf_len) - self.inc_counter('Invalid_Msg_Format') - if self.ifc.rx_len() > (self.data_len+13): - next_start = buf[self.data_len+13] - if next_start != 0xa5: - # erase broken recv buffer - self.ifc.rx_clear() - - return False - - check = sum(buf[1:buf_len-2]) & 0xff - if check != crc: - self.inc_counter('Invalid_Msg_Format') - logger.debug(f'CRC {int(crc):#02x} {int(check):#08x}' - f' Stop:{int(stop):#02x}') - # start & stop byte are valid, discard only this message - return False - - return True - - def __build_header(self, ctrl) -> None: - '''build header for new transmit message''' - self.send_msg_ofs = self.ifc.tx_len() - - self.ifc.tx_add(struct.pack( - ' None: - '''finish the transmit message, set lenght and checksum''' - _len = self.ifc.tx_len() - self.send_msg_ofs - struct.pack_into(' None: - fnc = self.switch.get(self.control, self.msg_unknown) - if self.unique_id: - logger.info(self.__flow_str(self.server_side, 'rx') + - f' Ctl: {int(self.control):#04x}' + - f' Msg: {fnc.__name__!r}') - fnc() - else: - logger.info(self.__flow_str(self.server_side, 'drop') + - f' Ctl: {int(self.control):#04x}' + - f' Msg: {fnc.__name__!r}') - - def __flush_recv_msg(self) -> None: - self.ifc.rx_get(self.header_len + self.data_len+2) - self.header_valid = False - def __send_ack_rsp(self, msgtype, ftype, ack=1): - self.__build_header(msgtype) + self._build_header(msgtype) self.ifc.tx_add(struct.pack(' 4: # logger.info(f'first byte modbus:{data[14]}') - inv_update = False - self.modbus_elms = 0 - for key, update, _ in self.mb.recv_resp(self.db, data[14:]): - self.modbus_elms += 1 - if update: - if key == 'inverter': - inv_update = True - self._set_mqtt_timestamp(key, self._timestamp()) - self.new_data[key] = True + inv_update = self.__parse_modbus_rsp(data) if inv_update: self.__build_model_name() + if self.establish_inv_emu and not self.ifc.remote.stream: + self.establish_emu() + def msg_hbeat_ind(self): data = self.ifc.rx_peek()[self.header_len:] result = struct.unpack_from(' None: + async def modbus_loop(self, host, port, + snr: int, forward: bool) -> None: '''Loop for receiving messages from the TSUN cloud (client-side)''' while True: try: async with ModbusConn(host, port) as inverter: stream = inverter.local.stream - await stream.send_start_cmd(snr, host) + await stream.send_start_cmd(snr, host, forward) await stream.ifc.loop() logger.info(f'[{stream.node_id}:{stream.conn_no}] ' f'Connection closed - Shutdown: ' diff --git a/app/tests/test_infos.py b/app/tests/test_infos.py index 18eb5e4..ed4e293 100644 --- a/app/tests/test_infos.py +++ b/app/tests/test_infos.py @@ -17,13 +17,13 @@ def test_statistic_counter(): assert val == None or val == 0 i.static_init() # initialize counter - assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 0, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "AT_Command_Blocked": 0, "Modbus_Command": 0}}) + assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 0, "Cloud_Conn_Cnt": 0, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "AT_Command_Blocked": 0, "Modbus_Command": 0}}) val = i.dev_value(Register.INVERTER_CNT) # valid and initiliazed addr assert val == 0 i.inc_counter('Inverter_Cnt') - assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 1, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "AT_Command_Blocked": 0, "Modbus_Command": 0}}) + assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 1, "Cloud_Conn_Cnt": 0, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "AT_Command_Blocked": 0, "Modbus_Command": 0}}) val = i.dev_value(Register.INVERTER_CNT) assert val == 1 diff --git a/app/tests/test_infos_g3.py b/app/tests/test_infos_g3.py index 1bab29e..18d5854 100644 --- a/app/tests/test_infos_g3.py +++ b/app/tests/test_infos_g3.py @@ -421,7 +421,7 @@ def test_must_incr_total(inv_data_seq2, inv_data_seq2_zero): if key == 'total' or key == 'inverter' or key == 'env': assert update == True tests +=1 - assert tests==8 + assert tests==12 assert json.dumps(i.db['total']) == json.dumps({'Daily_Generation': 1.7, 'Total_Generation': 17.36}) assert json.dumps(i.db['input']) == json.dumps({"pv1": {"Voltage": 33.6, "Current": 1.91, "Power": 64.5, "Daily_Generation": 1.08, "Total_Generation": 9.74}, "pv2": {"Voltage": 33.5, "Current": 1.36, "Power": 45.7, "Daily_Generation": 0.62, "Total_Generation": 7.62}, "pv3": {"Voltage": 0.0, "Current": 0.0, "Power": 0.0}, "pv4": {"Voltage": 0.0, "Current": 0.0, "Power": 0.0}}) assert json.dumps(i.db['env']) == json.dumps({"Inverter_Status": 1, "Inverter_Temp": 23}) @@ -435,7 +435,7 @@ def test_must_incr_total(inv_data_seq2, inv_data_seq2_zero): assert json.dumps(i.db['total']) == json.dumps({'Daily_Generation': 1.7, 'Total_Generation': 17.36}) assert json.dumps(i.db['input']) == json.dumps({"pv1": {"Voltage": 33.6, "Current": 1.91, "Power": 64.5, "Daily_Generation": 1.08, "Total_Generation": 9.74}, "pv2": {"Voltage": 33.5, "Current": 1.36, "Power": 45.7, "Daily_Generation": 0.62, "Total_Generation": 7.62}, "pv3": {"Voltage": 0.0, "Current": 0.0, "Power": 0.0}, "pv4": {"Voltage": 0.0, "Current": 0.0, "Power": 0.0}}) assert json.dumps(i.db['env']) == json.dumps({"Inverter_Status": 1, "Inverter_Temp": 23}) - assert json.dumps(i.db['inverter']) == json.dumps({"Rated_Power": 600, "Max_Designed_Power": -1, "Output_Coefficient": 100.0, "No_Inputs": 2}) + assert json.dumps(i.db['inverter']) == json.dumps({"Rated_Power": 600, "BOOT_STATUS": 0, "DSP_STATUS": 21930, "Work_Mode": 0, "Max_Designed_Power": -1, "Input_Coefficient": -0.1, "Output_Coefficient": 100.0, "No_Inputs": 2}) tests = 0 for key, update in i.parse (inv_data_seq2_zero): @@ -501,8 +501,8 @@ def test_new_data_types(inv_data_new): else: assert False - assert tests==5 - assert json.dumps(i.db['inverter']) == json.dumps({"Manufacturer": 0}) + assert tests==7 + assert json.dumps(i.db['inverter']) == json.dumps({"Manufacturer": 0, "DSP_STATUS": 0}) assert json.dumps(i.db['input']) == json.dumps({"pv1": {}}) assert json.dumps(i.db['events']) == json.dumps({"Inverter_Alarm": 0, "Inverter_Fault": 0}) diff --git a/app/tests/test_infos_g3p.py b/app/tests/test_infos_g3p.py index 3fbefa9..51af74d 100644 --- a/app/tests/test_infos_g3p.py +++ b/app/tests/test_infos_g3p.py @@ -110,7 +110,7 @@ def test_parse_4210(inverter_data: bytes): assert json.dumps(i.db) == json.dumps({ "controller": {"Sensor_List": "02b0", "Power_On_Time": 2051}, - "inverter": {"Serial_Number": "Y17E00000000000E", "Version": "V4.0.10", "Rated_Power": 600, "BOOT_STATUS": 0, "DSP_STATUS": 21930, "Max_Designed_Power": 2000, "Output_Coefficient": 100.0}, + "inverter": {"Serial_Number": "Y17E00000000000E", "Version": "V4.0.10", "Rated_Power": 600, "BOOT_STATUS": 0, "DSP_STATUS": 21930, "Work_Mode": 0, "Max_Designed_Power": 2000, "Input_Coefficient": 100.0, "Output_Coefficient": 100.0}, "env": {"Inverter_Status": 1, "Detect_Status_1": 2, "Detect_Status_2": 0, "Inverter_Temp": 14}, "events": {"Inverter_Alarm": 0, "Inverter_Fault": 0, "Inverter_Bitfield_1": 0, "Inverter_bitfield_2": 0}, "grid": {"Voltage": 224.8, "Current": 0.73, "Frequency": 50.05, "Output_Power": 165.8}, @@ -119,7 +119,8 @@ def test_parse_4210(inverter_data: bytes): "pv3": {"Voltage": 34.6, "Current": 1.89, "Power": 65.5, "Daily_Generation": 0.05, "Total_Generation": 31.89}, "pv4": {"Voltage": 1.7, "Current": 0.01, "Power": 0.0, "Total_Generation": 15.58}}, "total": {"Daily_Generation": 0.11, "Total_Generation": 101.36}, - "inv_unknown": {"Unknown_1": 512} + "inv_unknown": {"Unknown_1": 512}, + "other": {"Output_Shutdown": 65535, "Rated_Level": 3, "Grid_Volt_Cal_Coef": 1024} }) def test_build_4210(inverter_data: bytes): diff --git a/app/tests/test_inverter_g3p.py b/app/tests/test_inverter_g3p.py index 0f47cbe..d06ee80 100644 --- a/app/tests/test_inverter_g3p.py +++ b/app/tests/test_inverter_g3p.py @@ -144,7 +144,7 @@ async def test_mqtt_publish(config_conn, patch_open_connection): with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter: stream = inverter.local.stream await inverter.async_publ_mqtt() # check call with invalid unique_id - stream._SolarmanV5__set_serial_no(snr= 123344) + stream._set_serial_no(snr= 123344) stream.new_data['inverter'] = True stream.db.db['inverter'] = {} @@ -171,7 +171,7 @@ async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err): with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter: stream = inverter.local.stream - stream._SolarmanV5__set_serial_no(snr= 123344) + stream._set_serial_no(snr= 123344) stream.new_data['inverter'] = True stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() @@ -188,7 +188,7 @@ async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter: stream = inverter.local.stream - stream._SolarmanV5__set_serial_no(snr= 123344) + stream._set_serial_no(snr= 123344) stream.new_data['inverter'] = True stream.db.db['inverter'] = {} diff --git a/app/tests/test_modbus_tcp.py b/app/tests/test_modbus_tcp.py index 93ecfa0..029a6f3 100644 --- a/app/tests/test_modbus_tcp.py +++ b/app/tests/test_modbus_tcp.py @@ -52,6 +52,10 @@ def config_conn(test_hostname, test_port): 'proxy_node_id': 'test_1', 'proxy_unique_id': '' }, + 'solarman':{ + 'host': 'access1.solarmanpv.com', + 'port': 10000 + }, 'inverters':{ 'allow_all': True, "R170000000000001":{ @@ -65,7 +69,8 @@ def config_conn(test_hostname, test_port): 'sensor_list': 0x2b0, 'client_mode':{ 'host': '192.168.0.1', - 'port': 8899 + 'port': 8899, + 'forward': True } } } diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index a980744..52b94ca 100644 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -6,7 +6,7 @@ import logging import random from math import isclose from app.src.async_stream import AsyncIfcImpl, StreamPtr -from app.src.gen3plus.solarman_v5 import SolarmanV5 +from app.src.gen3plus.solarman_v5 import SolarmanV5, SolarmanBase from app.src.config import Config from app.src.infos import Infos, Register from app.src.modbus import Modbus @@ -37,6 +37,9 @@ class FakeIfc(AsyncIfcImpl): super().__init__() self.remote = StreamPtr(None) + async def create_remote(self): + await asyncio.sleep(0) + class MemoryStream(SolarmanV5): def __init__(self, msg, chunks = (0,), server_side: bool = True): _ifc = FakeIfc() @@ -109,7 +112,7 @@ class MemoryStream(SolarmanV5): c.ifc.remote.stream = self return c - def _SolarmanV5__flush_recv_msg(self) -> None: + def _SolarmanBase__flush_recv_msg(self) -> None: self.msg_recvd.append( { 'control': self.control, @@ -117,7 +120,7 @@ class MemoryStream(SolarmanV5): 'data_len': self.data_len } ) - super()._SolarmanV5__flush_recv_msg() + super()._SolarmanBase__flush_recv_msg() self.msg_count += 1 @@ -1102,7 +1105,7 @@ def test_sync_start_ind(config_tsun_inv1, sync_start_ind_msg, sync_start_rsp_msg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.seq.server_side = False # simulate forawding to TSUN cloud - m._update_header(m.ifc.fwd_fifo.peek()) + m._SolarmanBase__update_header(m.ifc.fwd_fifo.peek()) assert str(m.seq) == '0d:0e' # value after forwarding indication assert m.ifc.fwd_fifo.get()==sync_start_fwd_msg @@ -1768,7 +1771,7 @@ async def test_start_client_mode(config_tsun_inv1, str_test_ip): assert m.no_forwarding == False assert m.mb_timer.tim == None assert asyncio.get_running_loop() == m.mb_timer.loop - await m.send_start_cmd(get_sn_int(), str_test_ip, m.mb_first_timeout) + await m.send_start_cmd(get_sn_int(), str_test_ip, False, m.mb_first_timeout) assert m.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x01\x00!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x030\x00\x000J\xde\xf1\x15') assert m.db.get_db_value(Register.IP_ADDRESS) == str_test_ip assert isclose(m.db.get_db_value(Register.POLLING_INTERVAL), 0.5) @@ -1803,3 +1806,30 @@ def test_timeout(config_tsun_inv1): assert SolarmanV5.MAX_DEF_IDLE_TIME == m._timeout() m.state = State.closed m.close() + +def test_fnc_dispatch(): + def msg(): + return + + _ = config_tsun_inv1 + m = MemoryStream(b'') + m.switch[1] = msg + m.switch[2] = "msg" + + _obj, _str = m.get_fnc_handler(1) + assert _obj == msg + assert _str == "'msg'" + + _obj, _str = m.get_fnc_handler(2) + assert _obj == m.msg_unknown + assert _str == "'msg'" + + _obj, _str = m.get_fnc_handler(3) + assert _obj == m.msg_unknown + assert _str == "'msg_unknown'" + +def test_timestamp(): + m = MemoryStream(b'') + ts = m._timestamp() + ts_emu = m._emu_timestamp() + assert ts == ts_emu + 24*60*60 \ No newline at end of file diff --git a/app/tests/test_solarman_emu.py b/app/tests/test_solarman_emu.py new file mode 100644 index 0000000..32787ab --- /dev/null +++ b/app/tests/test_solarman_emu.py @@ -0,0 +1,230 @@ +import pytest +import asyncio +from app.src.async_stream import AsyncIfcImpl, StreamPtr +from app.src.gen3plus.solarman_v5 import SolarmanV5, SolarmanBase +from app.src.gen3plus.solarman_emu import SolarmanEmu +from app.src.infos import Infos, Register +from app.tests.test_solarman import FakeIfc, MemoryStream, get_sn_int, get_sn, correct_checksum, config_tsun_inv1, msg_modbus_rsp +from app.tests.test_infos_g3p import str_test_ip, bytes_test_ip + +timestamp = 0x3224c8bc + +class InvStream(MemoryStream): + def __init__(self, msg=b''): + super().__init__(msg) + + def _emu_timestamp(self): + return timestamp + +class CldStream(SolarmanEmu): + def __init__(self, inv: InvStream): + _ifc = FakeIfc() + _ifc.remote.stream = inv + super().__init__(('test.local', 1234), _ifc, server_side=False, client_mode=False) + self.__msg = b'' + self.__msg_len = 0 + self.__offs = 0 + self.msg_count = 0 + self.msg_recvd = [] + + def _emu_timestamp(self): + return timestamp + + def append_msg(self, msg): + self.__msg += msg + self.__msg_len += len(msg) + + def _read(self) -> int: + copied_bytes = 0 + try: + if (self.__offs < self.__msg_len): + self.ifc.rx_fifo += self.__msg[self.__offs:] + copied_bytes = self.__msg_len - self.__offs + self.__offs = self.__msg_len + except Exception: + pass # ignore exceptions here + return copied_bytes + + def _SolarmanBase__flush_recv_msg(self) -> None: + self.msg_recvd.append( + { + 'control': self.control, + 'seq': str(self.seq), + 'data_len': self.data_len + } + ) + super()._SolarmanBase__flush_recv_msg() + self.msg_count += 1 + +@pytest.fixture +def device_ind_msg(bytes_test_ip): # 0x4110 + msg = b'\xa5\xd4\x00\x10\x41\x00\x01' +get_sn() +b'\x02\xbc\xc8\x24\x32' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x05\x3c\x78\x01\x00\x01\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + bytes_test_ip + msg += b'\x0f\x00\x01\xb0' + msg += b'\x02\x0f\x00\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xfe\xfe\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += correct_checksum(msg) + msg += b'\x15' + return msg + +@pytest.fixture +def inverter_ind_msg(): # 0x4210 + msg = b'\xa5\x99\x01\x10\x42\x00\x01' +get_sn() +b'\x01\xb0\x02\xbc\xc8' + msg += b'\x24\x32\x3c\x00\x00\x00\xa0\x47\xe4\x33\x01\x00\x03\x08\x00\x00' + msg += b'\x59\x31\x37\x30\x30\x30\x30\x30\x30\x30\x30\x30\x30\x30\x30\x31' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x40\x10\x08\xc8\x00\x49\x13\x8d\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00' + msg += b'\x04\x00\x00\x01\xff\xff\x00\x01\x00\x06\x00\x68\x00\x68\x05\x00' + msg += b'\x09\xcd\x07\xb6\x13\x9c\x13\x24\x00\x01\x07\xae\x04\x0f\x00\x41' + msg += b'\x00\x0f\x0a\x64\x0a\x64\x00\x06\x00\x06\x09\xf6\x12\x8c\x12\x8c' + msg += b'\x00\x10\x00\x10\x14\x52\x14\x52\x00\x10\x00\x10\x01\x51\x00\x05' + msg += b'\x00\x00\x00\x01\x13\x9c\x0f\xa0\x00\x4e\x00\x66\x03\xe8\x04\x00' + msg += b'\x09\xce\x07\xa8\x13\x9c\x13\x26\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x04\x00\x04\x00\x00\x00\x00\x00\xff\xff\x00\x00' + msg += b'\x00\x00\x00\x00' + msg += correct_checksum(msg) + msg += b'\x15' + return msg + +@pytest.fixture +def inverter_rsp_msg(): # 0x1210 + msg = b'\xa5\x0a\x00\x10\x12\x02\02' +get_sn() +b'\x01\x01' + msg += b'\x00\x00\x00\x00' + msg += b'\x3c\x00\x00\x00' + msg += correct_checksum(msg) + msg += b'\x15' + return msg + +@pytest.fixture +def heartbeat_ind(): + msg = b'\xa5\x01\x00\x10G\x00\x01\x00\x00\x00\x00\x00Y\x15' + return msg + +def test_emu_init_close(): + # received a message with wrong start byte plus an valid message + # the complete receive buffer must be cleared to + # find the next valid message + inv = InvStream() + cld = CldStream(inv) + cld.close() + + +@pytest.mark.asyncio +async def test_emu_start(config_tsun_inv1, msg_modbus_rsp, str_test_ip, device_ind_msg): + _ = config_tsun_inv1 + assert asyncio.get_running_loop() + inv = InvStream(msg_modbus_rsp) + + assert asyncio.get_running_loop() == inv.mb_timer.loop + await inv.send_start_cmd(get_sn_int(), str_test_ip, True, inv.mb_first_timeout) + inv.read() # read complete msg, and dispatch msg + assert not inv.header_valid # must be invalid, since msg was handled and buffer flushed + assert inv.msg_count == 1 + assert inv.control == 0x1510 + + cld = CldStream(inv) + cld.ifc.update_header_cb(inv.ifc.fwd_fifo.peek()) + assert inv.ifc.fwd_fifo.peek() == device_ind_msg + cld.close() + +def test_snd_hb(config_tsun_inv1, heartbeat_ind): + _ = config_tsun_inv1 + inv = InvStream() + cld = CldStream(inv) + + # await inv.send_start_cmd(get_sn_int(), str_test_ip, False, inv.mb_first_timeout) + cld.send_heartbeat_cb(0) + assert cld.ifc.tx_fifo.peek() == heartbeat_ind + cld.close() + +@pytest.mark.asyncio +async def test_snd_inv_data(config_tsun_inv1, inverter_ind_msg, inverter_rsp_msg): + _ = config_tsun_inv1 + inv = InvStream() + inv.db.set_db_def_value(Register.INVERTER_STATUS, 1) + inv.db.set_db_def_value(Register.DETECT_STATUS_1, 2) + inv.db.set_db_def_value(Register.VERSION, 'V4.0.10') + inv.db.set_db_def_value(Register.GRID_VOLTAGE, 224.8) + inv.db.set_db_def_value(Register.GRID_CURRENT, 0.73) + inv.db.set_db_def_value(Register.GRID_FREQUENCY, 50.05) + assert asyncio.get_running_loop() == inv.mb_timer.loop + await inv.send_start_cmd(get_sn_int(), str_test_ip, False, inv.mb_first_timeout) + inv.db.set_db_def_value(Register.DATA_UP_INTERVAL, 17) # set test value + + cld = CldStream(inv) + cld.time_ofs = 0x33e447a0 + cld.last_sync = cld._emu_timestamp() - 60 + cld.pkt_cnt = 0x802 + assert cld.data_up_inv == 17 # check test value + cld.data_up_inv = 0.1 # speedup test first data msg + cld._init_new_client_conn() + cld.data_up_inv = 0.5 # timeout for second data msg + await asyncio.sleep(0.2) + assert cld.ifc.tx_fifo.get() == inverter_ind_msg + + cld.append_msg(inverter_rsp_msg) + cld.read() # read complete msg, and dispatch msg + + assert not cld.header_valid # must be invalid, since msg was handled and buffer flushed + assert cld.msg_count == 1 + assert cld.header_len==11 + assert cld.snr == 2070233889 + assert cld.unique_id == '2070233889' + assert cld.msg_recvd[0]['control']==0x1210 + assert cld.msg_recvd[0]['seq']=='02:02' + assert cld.msg_recvd[0]['data_len']==0x0a + assert '02b0' == cld.db.get_db_value(Register.SENSOR_LIST, None) + assert cld.db.stat['proxy']['Unknown_Msg'] == 0 + + cld.close() + +@pytest.mark.asyncio +async def test_rcv_invalid(config_tsun_inv1, inverter_ind_msg, inverter_rsp_msg): + _ = config_tsun_inv1 + inv = InvStream() + assert asyncio.get_running_loop() == inv.mb_timer.loop + await inv.send_start_cmd(get_sn_int(), str_test_ip, False, inv.mb_first_timeout) + inv.db.set_db_def_value(Register.DATA_UP_INTERVAL, 17) # set test value + + cld = CldStream(inv) + cld._init_new_client_conn() + + cld.append_msg(inverter_ind_msg) + cld.read() # read complete msg, and dispatch msg + + assert not cld.header_valid # must be invalid, since msg was handled and buffer flushed + assert cld.msg_count == 1 + assert cld.header_len==11 + assert cld.snr == 2070233889 + assert cld.unique_id == '2070233889' + assert cld.msg_recvd[0]['control']==0x4210 + assert cld.msg_recvd[0]['seq']=='00:01' + assert cld.msg_recvd[0]['data_len']==0x199 + assert '02b0' == cld.db.get_db_value(Register.SENSOR_LIST, None) + assert cld.db.stat['proxy']['Unknown_Msg'] == 1 + + + cld.close()