From bf0f152d5a26ac79e8549fb50d136fd8a0955ce1 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Sun, 5 May 2024 20:20:19 +0200 Subject: [PATCH] add unit tests for modbus --- app/tests/test_modbus.py | 20 ++++++++++++++++++++ app/tests/test_solarman.py | 21 ++++++++++++++------- app/tests/test_talent.py | 4 ++-- system_tests/test_tcp_socket.py | 2 +- 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/app/tests/test_modbus.py b/app/tests/test_modbus.py index 0e9cf5b..fcec232 100644 --- a/app/tests/test_modbus.py +++ b/app/tests/test_modbus.py @@ -1,7 +1,12 @@ # test_with_pytest.py # import pytest, logging from app.src.modbus import Modbus +from app.src.infos import Infos +class TestHelper(Modbus): + def __init__(self): + super().__init__() + self.db = Infos() def test_modbus_crc(): mb = Modbus() @@ -19,3 +24,18 @@ def test_build_modbus_pdu(): assert pdu == b'\x01\x06\x20\x00\x00\x12\x02\x07' assert mb.check_crc(pdu) +def test_build_recv(): + mb = TestHelper() + pdu = mb.build_msg(1,3,0x300e,0x2) + assert pdu == b'\x01\x03\x30\x0e\x00\x02\xaa\xc8' + assert mb.check_crc(pdu) + call = 0 + for key, update in mb.recv_resp(mb.db, b'\x01\x03\x04\x01\x2c\x00\x46\xbb\xf4'): + if key == 'grid': + assert update == True + elif key == 'inverter': + assert update == True + else: + assert False + call += 1 + assert 2 == call diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index 48f5509..e699ae7 100644 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -6,6 +6,9 @@ from app.src.gen3plus.solarman_v5 import SolarmanV5 from app.src.config import Config from app.src.infos import Infos, Register + +pytest_plugins = ('pytest_asyncio',) + # initialize the proxy statistics Infos.static_init() @@ -54,6 +57,9 @@ class MemoryStream(SolarmanV5): pass return copied_bytes + async def async_write(self, headline=''): + pass + def _SolarmanV5__flush_recv_msg(self) -> None: super()._SolarmanV5__flush_recv_msg() self.msg_count += 1 @@ -725,7 +731,7 @@ def test_device_rsp(ConfigTsunInv1, DeviceRspMsg): assert m.data_len == 0x0a assert m._recv_buffer==b'' assert m._send_buffer==b'' - assert m._forward_buffer==b'' # DeviceRspMsg + assert m._forward_buffer==DeviceRspMsg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -743,7 +749,7 @@ def test_inverter_rsp(ConfigTsunInv1, InverterRspMsg): assert m.data_len == 0x0a assert m._recv_buffer==b'' assert m._send_buffer==b'' - assert m._forward_buffer==b'' # InverterRspMsg + assert m._forward_buffer==InverterRspMsg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -779,7 +785,7 @@ def test_heartbeat_rsp(ConfigTsunInv1, HeartbeatRspMsg): assert m.data_len == 0x0a assert m._recv_buffer==b'' assert m._send_buffer==b'' - assert m._forward_buffer==b'' # HeartbeatRspMsg + assert m._forward_buffer==HeartbeatRspMsg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -820,7 +826,7 @@ def test_sync_start_rsp(ConfigTsunInv1, SyncStartRspMsg): assert m.data_len == 0x0a assert m._recv_buffer==b'' assert m._send_buffer==b'' - assert m._forward_buffer==b'' # HeartbeatRspMsg + assert m._forward_buffer==SyncStartRspMsg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -856,7 +862,7 @@ def test_sync_end_rsp(ConfigTsunInv1, SyncEndRspMsg): assert m.data_len == 0x0a assert m._recv_buffer==b'' assert m._send_buffer==b'' - assert m._forward_buffer==b'' # HeartbeatRspMsg + assert m._forward_buffer==SyncEndRspMsg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -942,7 +948,8 @@ def test_build_logger_modell(ConfigTsunAllowAll, DeviceIndMsg): assert 'V1.1.00.0B' == m.db.get_db_value(Register.COLLECTOR_FW_VERSION, 0).rstrip('\00') m.close() -def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandIndMsg): +@pytest.mark.asyncio +async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandIndMsg): ConfigTsunAllowAll m = MemoryStream(DeviceIndMsg, (0,), True) m.read() @@ -954,7 +961,7 @@ def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandIndMsg) m._send_buffer = bytearray(0) # clear send buffer for next test m._forward_buffer = bytearray(0) # clear send buffer for next test - m.send_at_cmd('AT+TIME=214028,1,60,120') + await m.send_at_cmd('AT+TIME=214028,1,60,120') assert m._recv_buffer==b'' assert m._send_buffer==AtCommandIndMsg assert m._forward_buffer==b'' diff --git a/app/tests/test_talent.py b/app/tests/test_talent.py index 7c996ab..4b1de2f 100644 --- a/app/tests/test_talent.py +++ b/app/tests/test_talent.py @@ -795,7 +795,7 @@ def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp): m = MemoryStream(MsgModbusRsp, (0,), False) m.db.stat['proxy']['Unknown_Ctrl'] = 0 m.db.stat['proxy']['Modbus_Command'] = 0 - m.forward_modbus_rep = False + m.forward_modbus_resp = False m.read() # read complete msg, and dispatch msg assert not m.header_valid # must be invalid, since msg was handled and buffer flushed assert m.msg_count == 1 @@ -816,7 +816,7 @@ def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusRsp): m = MemoryStream(MsgModbusRsp, (0,), False) m.db.stat['proxy']['Unknown_Ctrl'] = 0 m.db.stat['proxy']['Modbus_Command'] = 0 - m.forward_modbus_rep = True + m.forward_modbus_resp = True m.read() # read complete msg, and dispatch msg assert not m.header_valid # must be invalid, since msg was handled and buffer flushed assert m.msg_count == 1 diff --git a/system_tests/test_tcp_socket.py b/system_tests/test_tcp_socket.py index 606ea68..f01a0a0 100644 --- a/system_tests/test_tcp_socket.py +++ b/system_tests/test_tcp_socket.py @@ -224,7 +224,7 @@ def test_send_inv_data(ClientConnection, MsgTimeStampReq, MsgTimeStampResp, MsgI data = s.recv(1024) except TimeoutError: pass - # time.sleep(32.5) + time.sleep(32.5) # assert data == MsgTimeStampResp try: s.sendall(MsgInvData)