From 1d9cbf314e96d0bd61d383ee3eba8fd03c11e37e Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Thu, 2 May 2024 23:56:42 +0200 Subject: [PATCH] add Modbus tests --- app/tests/test_talent.py | 122 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/app/tests/test_talent.py b/app/tests/test_talent.py index 89fd420..fc4ed4e 100644 --- a/app/tests/test_talent.py +++ b/app/tests/test_talent.py @@ -3,6 +3,10 @@ import pytest, logging from app.src.gen3.talent import Talent, Control from app.src.config import Config from app.src.infos import Infos +from app.src.modbus import Modbus + + +pytest_plugins = ('pytest_asyncio',) # initialize the proxy statistics Infos.static_init() @@ -19,6 +23,7 @@ class MemoryStream(Talent): self.__chunk_idx = 0 self.msg_count = 0 self.addr = 'Test: SrvSide' + self.send_msg_ofs = 0 def append_msg(self, msg): self.__msg += msg @@ -50,6 +55,10 @@ class MemoryStream(Talent): self.msg_count += 1 return + async def flush_send_msg(self): + pass + + @pytest.fixture def MsgContactInfo(): # Contact Info message @@ -170,6 +179,26 @@ def MsgOtaAck(): # Over the air update rewuest from tsun cloud def MsgOtaInvalid(): # Get Time Request message return b'\x00\x00\x00\x14\x10R170000000000001\x99\x13\x01' +@pytest.fixture +def MsgModbusCmd(): + msg = b'\x00\x00\x00\x20\x10R170000000000001' + msg += b'\x70\x77\x00\x01\xa3\x28\x08\x01\x06\x20\x08' + msg += b'\x00\x00\x03\xc8' + return msg + +@pytest.fixture +def MsgModbusRsp(): + msg = b'\x00\x00\x00\x20\x10R170000000000001' + msg += b'\x91\x77\x17\x18\x19\x1a\x08\x01\x06\x20\x08' + msg += b'\x00\x00\x03\xc8' + return msg + +@pytest.fixture +def MsgModbusInv(): + msg = b'\x00\x00\x00\x20\x10R170000000000001' + msg += b'\x99\x77\x17\x18\x19\x1a\x08\x01\x06\x20\x08' + msg += b'\x00\x00\x03\xc8' + return msg def test_read_message(MsgContactInfo): m = MemoryStream(MsgContactInfo, (0,)) @@ -740,3 +769,96 @@ def test_proxy_counter(): assert Infos.new_stat_data == {'proxy': True} assert 0 == m.db.stat['proxy']['Unknown_Msg'] m.close() + +def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd): + ConfigTsunInv1 + m = MemoryStream(MsgModbusCmd, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.id_str == b"R170000000000001" + assert m.unique_id == 'R170000000000001' + assert int(m.ctrl)==112 + assert m.msg_id==119 + assert m.header_len==23 + assert m.data_len==13 + assert m._forward_buffer==MsgModbusCmd + assert m._send_buffer==b'' + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + assert m.db.stat['proxy']['Modbus_Command'] == 1 + m.close() + +def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp): + ConfigTsunInv1 + m = MemoryStream(MsgModbusRsp, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.forward_modbus_rep = False + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.id_str == b"R170000000000001" + assert m.unique_id == 'R170000000000001' + assert int(m.ctrl)==145 + assert m.msg_id==119 + assert m.header_len==23 + assert m.data_len==13 + assert m._forward_buffer==b'' + assert m._send_buffer==b'' + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + assert m.db.stat['proxy']['Modbus_Command'] == 0 + m.close() + +def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusRsp): + ConfigTsunInv1 + m = MemoryStream(MsgModbusRsp, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.forward_modbus_rep = True + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.id_str == b"R170000000000001" + assert m.unique_id == 'R170000000000001' + assert int(m.ctrl)==145 + assert m.msg_id==119 + assert m.header_len==23 + assert m.data_len==13 + assert m._forward_buffer==MsgModbusRsp + assert m._send_buffer==b'' + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + assert m.db.stat['proxy']['Modbus_Command'] == 0 + m.close() + +def test_msg_modbus_invalid(ConfigTsunInv1, MsgModbusInv): + ConfigTsunInv1 + m = MemoryStream(MsgModbusInv, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.id_str == b"R170000000000001" + assert m.unique_id == 'R170000000000001' + assert int(m.ctrl)==153 + assert m.msg_id==119 + assert m.header_len==23 + assert m.data_len==13 + assert m._forward_buffer==MsgModbusInv + assert m._send_buffer==b'' + assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 + assert m.db.stat['proxy']['Modbus_Command'] == 0 + m.close() + +@pytest.mark.asyncio +async def test_msg_build_modbus_req(ConfigTsunInv1, MsgModbusCmd): + ConfigTsunInv1 + m = MemoryStream(b'', (0,), False) + m.id_str = b"R170000000000001" + await m.send_modbus_cmd(Modbus.MB_WRITE_SINGLE_REG, 0x2008, 0) + assert 0==m.send_msg_ofs + assert m._forward_buffer==b'' + assert m._send_buffer==MsgModbusCmd + m.close()