From 65973b28351f6c941778d4945a14fb899cafc834 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Thu, 9 May 2024 18:48:59 +0200 Subject: [PATCH] fix unit tests --- app/tests/test_solarman.py | 58 +++++++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index e699ae7..2b9f39f 100644 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -5,6 +5,7 @@ from datetime import datetime from app.src.gen3plus.solarman_v5 import SolarmanV5 from app.src.config import Config from app.src.infos import Infos, Register +from app.src.modbus import Modbus pytest_plugins = ('pytest_asyncio',) @@ -363,7 +364,7 @@ def SyncStartFwdMsg(): # 0x4310 @pytest.fixture def AtCommandIndMsg(): # 0x4510 - msg = b'\xa5\x27\x00\x10\x45\x02\x01' +get_sn() +b'\x01\x02\x00' + msg = b'\xa5\x27\x00\x10\x45\x03\x02' +get_sn() +b'\x01\x02\x00' msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' msg += b'AT+TIME=214028,1,60,120\r' msg += correct_checksum(msg) @@ -372,7 +373,7 @@ def AtCommandIndMsg(): # 0x4510 @pytest.fixture def AtCommandRspMsg(): # 0x1510 - msg = b'\xa5\x0a\x00\x10\x15\x02\x02' +get_sn() +b'\x01\x01' + msg = b'\xa5\x0a\x00\x10\x15\x03\x03' +get_sn() +b'\x01\x01' msg += total() msg += hb() msg += correct_checksum(msg) @@ -416,6 +417,15 @@ def SyncEndRspMsg(): # 0x1810 msg += b'\x15' return msg +@pytest.fixture +def MsgModbusCmd(): + msg = b'\xa5\x17\x00\x10\x45\x03\x02' +get_sn() +b'\x02\xb0\x02' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x06\x20\x08' + msg += b'\x00\x00\x03\xc8' + msg += correct_checksum(msg) + msg += b'\x15' + return msg + @pytest.fixture def ConfigTsunAllowAll(): Config.config = {'solarman':{'enabled': True}, 'inverters':{'allow_all':True}} @@ -876,7 +886,7 @@ def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg, AtCommandRspMsg): assert m.snr == 2070233889 # assert m.unique_id == '2070233889' assert m.control == 0x4510 - assert str(m.seq) == '02:02' + assert str(m.seq) == '03:03' assert m.data_len == 39 assert m._recv_buffer==b'' assert m._send_buffer==AtCommandRspMsg @@ -949,15 +959,49 @@ def test_build_logger_modell(ConfigTsunAllowAll, DeviceIndMsg): m.close() @pytest.mark.asyncio -async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandIndMsg): - ConfigTsunAllowAll +async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, MsgModbusCmd): + ConfigTsunInv1 m = MemoryStream(DeviceIndMsg, (0,), True) + m.append_msg(InverterIndMsg) m.read() assert m.control == 0x4110 assert str(m.seq) == '01:01' - assert m._recv_buffer==b'' + assert m._recv_buffer==InverterIndMsg # unhandled next message assert m._send_buffer==DeviceRspMsg assert m._forward_buffer==DeviceIndMsg + + m._send_buffer = bytearray(0) # clear send buffer for next test + m._forward_buffer = bytearray(0) # clear send buffer for next test + await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0) + assert 0 == m.send_msg_ofs + assert m._forward_buffer == b'' + assert m._send_buffer == b'' # modbus command must be ignore, cause connection is still not up + + m.read() + assert m.control == 0x4210 + assert str(m.seq) == '02:02' + assert m._recv_buffer==b'' + assert m._send_buffer==InverterRspMsg + assert m._forward_buffer==InverterIndMsg + + m._send_buffer = bytearray(0) # clear send buffer for next test + m._forward_buffer = bytearray(0) # clear send buffer for next test + await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0) + assert 0 == m.send_msg_ofs + assert m._forward_buffer == b'' + assert m._send_buffer == MsgModbusCmd + m.close() + +@pytest.mark.asyncio +async def test_AT_cmd(ConfigTsunAllowAll, InverterIndMsg, InverterRspMsg, AtCommandIndMsg): + ConfigTsunAllowAll + m = MemoryStream(InverterIndMsg, (0,), True) + m.read() + assert m.control == 0x4210 + assert str(m.seq) == '02:02' + assert m._recv_buffer==b'' + assert m._send_buffer==InverterRspMsg + assert m._forward_buffer==InverterIndMsg m._send_buffer = bytearray(0) # clear send buffer for next test m._forward_buffer = bytearray(0) # clear send buffer for next test @@ -965,5 +1009,5 @@ async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandI assert m._recv_buffer==b'' assert m._send_buffer==AtCommandIndMsg assert m._forward_buffer==b'' - assert str(m.seq) == '01:02' + assert str(m.seq) == '02:03' m.close()