From 3fda08bd25d756cd36f00626af3d2a3049b4de70 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Sat, 11 May 2024 20:48:57 +0200 Subject: [PATCH] add more unit tests --- app/tests/test_solarman.py | 245 ++++++++++++++++++++++++++++++++++++- 1 file changed, 242 insertions(+), 3 deletions(-) diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index 3092e45..befb159 100644 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -461,6 +461,19 @@ def MsgModbusCmd(): msg += b'\x15' return msg +@pytest.fixture +def MsgModbusRsp(): # 0x1510 + msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01' + msg += total() + msg += hb() + msg += b'\x0a\xe2\xfa\x33\x01\x03\x28\x40\x10\x08\xd8' + msg += b'\x00\x00\x13\x87\x00\x31\x00\x68\x02\x58\x00\x00\x01\x53\x00\x02' + msg += b'\x00\x00\x01\x52\x00\x02\x00\x00\x01\x53\x00\x03\x00\x00\x00\x04' + msg += b'\x00\x01\x00\x00\x6c\x68' + msg += correct_checksum(msg) + msg += b'\x15' + return msg + @pytest.fixture def ConfigTsunAllowAll(): Config.config = {'solarman':{'enabled': True}, 'inverters':{'allow_all':True}} @@ -911,7 +924,7 @@ def test_sync_end_rsp(ConfigTsunInv1, SyncEndRspMsg): assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() -def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg, AtCommandRspMsg): +def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg): ConfigTsunInv1 m = MemoryStream(AtCommandIndMsg, (0,), False) m.read() # read complete msg, and dispatch msg @@ -921,10 +934,10 @@ def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg, AtCommandRspMsg): assert m.snr == 2070233889 # assert m.unique_id == '2070233889' assert m.control == 0x4510 - assert str(m.seq) == '03:03' + assert str(m.seq) == '03:02' assert m.data_len == 39 assert m._recv_buffer==b'' - assert m._send_buffer==AtCommandRspMsg + assert m._send_buffer==b'' assert m._forward_buffer==AtCommandIndMsg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 assert m.db.stat['proxy']['AT_Command'] == 1 @@ -1005,6 +1018,44 @@ def test_build_logger_modell(ConfigTsunAllowAll, DeviceIndMsg): assert 'V1.1.00.0B' == m.db.get_db_value(Register.COLLECTOR_FW_VERSION, 0).rstrip('\00') m.close() +def test_msg_iterator(): + m1 = SolarmanV5(server_side=True) + m2 = SolarmanV5(server_side=True) + m3 = SolarmanV5(server_side=True) + m3.close() + del m3 + test1 = 0 + test2 = 0 + for key in SolarmanV5: + if key == m1: + test1+=1 + elif key == m2: + test2+=1 + elif type(key) != SolarmanV5: + continue + else: + assert False + assert test1 == 1 + assert test2 == 1 + +def test_proxy_counter(): + m = SolarmanV5(server_side=True) + assert m.new_data == {} + m.db.stat['proxy']['Unknown_Msg'] = 0 + Infos.new_stat_data['proxy'] = False + + m.inc_counter('Unknown_Msg') + assert m.new_data == {} + assert Infos.new_stat_data == {'proxy': True} + assert 1 == m.db.stat['proxy']['Unknown_Msg'] + + Infos.new_stat_data['proxy'] = False + m.dec_counter('Unknown_Msg') + assert m.new_data == {} + assert Infos.new_stat_data == {'proxy': True} + assert 0 == m.db.stat['proxy']['Unknown_Msg'] + m.close() + @pytest.mark.asyncio async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, MsgModbusCmd): ConfigTsunInv1 @@ -1091,3 +1142,191 @@ async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIn assert str(m.seq) == '02:04' m.close() + +def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd): + ConfigTsunInv1 + m = MemoryStream(MsgModbusCmd, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.control == 0x4510 + assert str(m.seq) == '03:02' + assert m.header_len==11 + assert m.data_len==23 + assert m._forward_buffer==MsgModbusCmd + assert m._send_buffer==b'' + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + assert m.db.stat['proxy']['Modbus_Command'] == 1 + m.close() + +def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp): + ConfigTsunInv1 + m = MemoryStream(MsgModbusRsp, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.forward_modbus_resp = False + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.control == 0x1510 + assert str(m.seq) == '03:03' + assert m.header_len==11 + assert m.data_len==59 + assert m._forward_buffer==b'' + assert m._send_buffer==b'' + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + assert m.db.stat['proxy']['Modbus_Command'] == 0 + m.close() + +def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusRsp): + ConfigTsunInv1 + m = MemoryStream(MsgModbusRsp, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.forward_modbus_resp = True + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.control == 0x1510 + assert str(m.seq) == '03:03' + assert m.header_len==11 + assert m.data_len==59 + assert m._forward_buffer==MsgModbusRsp + assert m._send_buffer==b'' + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + assert m.db.stat['proxy']['Modbus_Command'] == 0 + m.close() + +def test_msg_modbus_rsp3(ConfigTsunInv1, MsgModbusRsp): + ConfigTsunInv1 + m = MemoryStream(MsgModbusRsp, (0,), False) + m.append_msg(MsgModbusRsp) + + m.forward_modbus_resp = True + m.mb.last_fcode = 3 + m.mb.last_len = 20 + m.mb.last_reg = 0x3008 + # assert m.db.db == {'inverter': {'Manufacturer': 'TSUN', 'Equipment_Model': 'TSOL-MSxx00'}} + m.new_data['inverter'] = False + + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.mb.err == 0 + assert m.msg_count == 1 + assert m._forward_buffer==MsgModbusRsp + assert m._send_buffer==b'' + # assert m.db.db == {'inverter': {'Version': 'V5.1.09', 'Rated_Power': 300}, 'grid': {'Voltage': 225.9, 'Current': 0.41, 'Frequency': 49.99, 'Output_Power': 94.8}, 'env': {'Inverter_Temp': 22}, 'input': {'pv1': {'Voltage': 0.8, 'Current': 0.0, 'Power': 0.0}, 'pv2': {'Voltage': 34.5, 'Current': 2.89, 'Power': 99.8}, 'pv3': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}, 'pv4': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}}} + assert m.db.get_db_value(Register.VERSION) == 'V4.0.10' + assert m.new_data['inverter'] == True + m.new_data['inverter'] = False + + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.mb.err == 0 + assert m.msg_count == 2 + assert m._forward_buffer==MsgModbusRsp + assert m._send_buffer==b'' + # assert m.db.db == {'inverter': {'Version': 'V5.1.09', 'Rated_Power': 300}, 'grid': {'Voltage': 225.9, 'Current': 0.41, 'Frequency': 49.99, 'Output_Power': 94.8}, 'env': {'Inverter_Temp': 22}, 'input': {'pv1': {'Voltage': 0.8, 'Current': 0.0, 'Power': 0.0}, 'pv2': {'Voltage': 34.5, 'Current': 2.89, 'Power': 99.8}, 'pv3': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}, 'pv4': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}}} + assert m.db.get_db_value(Register.VERSION) == 'V4.0.10' + assert m.new_data['inverter'] == False + + m.close() +''' +def test_msg_modbus_invalid(ConfigTsunInv1, MsgModbusInv): + ConfigTsunInv1 + m = MemoryStream(MsgModbusInv, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.id_str == b"R170000000000001" + assert m.unique_id == 'R170000000000001' + assert int(m.ctrl)==153 + assert m.msg_id==119 + assert m.header_len==23 + assert m.data_len==13 + assert m._forward_buffer==MsgModbusInv + assert m._send_buffer==b'' + assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 + assert m.db.stat['proxy']['Modbus_Command'] == 0 + m.close() + +def test_msg_modbus_fragment(ConfigTsunInv1, MsgModbusResp20): + ConfigTsunInv1 + # receive more bytes than expected (7 bytes from the next msg) + m = MemoryStream(MsgModbusResp20+b'\x00\x00\x00\x45\x10\x52\x31', (0,)) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.forward_modbus_resp = True + m.mb.last_fcode = 3 + m.mb.last_len = 20 + m.mb.last_reg = 0x3008 + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.id_str == b"R170000000000001" + assert m.unique_id == 'R170000000000001' + assert int(m.ctrl) == 0x91 + assert m.msg_id == 119 + assert m.header_len == 23 + assert m.data_len == 50 + assert m._forward_buffer==MsgModbusResp20 + assert m._send_buffer == b'' + assert m.mb.err == 0 + assert m.modbus_elms == 20-1 # register 0x300d is unknown, so one value can't be mapped + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + assert m.db.stat['proxy']['Modbus_Command'] == 0 + m.close() + +@pytest.mark.asyncio +async def test_msg_build_modbus_req(ConfigTsunInv1, MsgModbusCmd): + ConfigTsunInv1 + m = MemoryStream(b'', (0,), True) + m.id_str = b"R170000000000001" + await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0) + assert 0 == m.send_msg_ofs + assert m._forward_buffer == b'' + assert m._send_buffer == b'' + + m.state = m.STATE_UP + await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0) + assert 0 == m.send_msg_ofs + assert m._forward_buffer == b'' + assert m._send_buffer == MsgModbusCmd + + m._send_buffer = bytearray(0) # clear send buffer for next test + m.test_exception_async_write = True + await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0) + assert 0 == m.send_msg_ofs + assert m._forward_buffer == b'' + assert m._send_buffer == b'' + m.close() + +def test_zombie_conn(ConfigTsunInv1, MsgInverterInd): + ConfigTsunInv1 + tracer.setLevel(logging.DEBUG) + m1 = MemoryStream(MsgInverterInd, (0,)) + m2 = MemoryStream(MsgInverterInd, (0,)) + m3 = MemoryStream(MsgInverterInd, (0,)) + assert m1.state == m1.STATE_INIT + assert m2.state == m2.STATE_INIT + assert m3.state == m3.STATE_INIT + m1.read() # read complete msg, and set unique_id + assert m1.state == m1.STATE_INIT + assert m2.state == m2.STATE_INIT + assert m3.state == m3.STATE_INIT + m2.read() # read complete msg, and set unique_id + assert m1.state == m1.STATE_CLOSED + assert m2.state == m2.STATE_INIT + assert m3.state == m3.STATE_INIT + m3.read() # read complete msg, and set unique_id + assert m1.state == m1.STATE_CLOSED + assert m2.state == m2.STATE_CLOSED + assert m3.state == m3.STATE_INIT + m1.close() + m2.close() + m3.close() +''' \ No newline at end of file