add Modbus tests

This commit is contained in:
Stefan Allius
2024-05-02 23:56:42 +02:00
parent 58c3333fcc
commit 1d9cbf314e

View File

@@ -3,6 +3,10 @@ import pytest, logging
from app.src.gen3.talent import Talent, Control from app.src.gen3.talent import Talent, Control
from app.src.config import Config from app.src.config import Config
from app.src.infos import Infos from app.src.infos import Infos
from app.src.modbus import Modbus
pytest_plugins = ('pytest_asyncio',)
# initialize the proxy statistics # initialize the proxy statistics
Infos.static_init() Infos.static_init()
@@ -19,6 +23,7 @@ class MemoryStream(Talent):
self.__chunk_idx = 0 self.__chunk_idx = 0
self.msg_count = 0 self.msg_count = 0
self.addr = 'Test: SrvSide' self.addr = 'Test: SrvSide'
self.send_msg_ofs = 0
def append_msg(self, msg): def append_msg(self, msg):
self.__msg += msg self.__msg += msg
@@ -50,6 +55,10 @@ class MemoryStream(Talent):
self.msg_count += 1 self.msg_count += 1
return return
async def flush_send_msg(self):
pass
@pytest.fixture @pytest.fixture
def MsgContactInfo(): # Contact Info message def MsgContactInfo(): # Contact Info message
@@ -170,6 +179,26 @@ def MsgOtaAck(): # Over the air update rewuest from tsun cloud
def MsgOtaInvalid(): # Get Time Request message def MsgOtaInvalid(): # Get Time Request message
return b'\x00\x00\x00\x14\x10R170000000000001\x99\x13\x01' return b'\x00\x00\x00\x14\x10R170000000000001\x99\x13\x01'
@pytest.fixture
def MsgModbusCmd():
msg = b'\x00\x00\x00\x20\x10R170000000000001'
msg += b'\x70\x77\x00\x01\xa3\x28\x08\x01\x06\x20\x08'
msg += b'\x00\x00\x03\xc8'
return msg
@pytest.fixture
def MsgModbusRsp():
msg = b'\x00\x00\x00\x20\x10R170000000000001'
msg += b'\x91\x77\x17\x18\x19\x1a\x08\x01\x06\x20\x08'
msg += b'\x00\x00\x03\xc8'
return msg
@pytest.fixture
def MsgModbusInv():
msg = b'\x00\x00\x00\x20\x10R170000000000001'
msg += b'\x99\x77\x17\x18\x19\x1a\x08\x01\x06\x20\x08'
msg += b'\x00\x00\x03\xc8'
return msg
def test_read_message(MsgContactInfo): def test_read_message(MsgContactInfo):
m = MemoryStream(MsgContactInfo, (0,)) m = MemoryStream(MsgContactInfo, (0,))
@@ -740,3 +769,96 @@ def test_proxy_counter():
assert Infos.new_stat_data == {'proxy': True} assert Infos.new_stat_data == {'proxy': True}
assert 0 == m.db.stat['proxy']['Unknown_Msg'] assert 0 == m.db.stat['proxy']['Unknown_Msg']
m.close() m.close()
def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
ConfigTsunInv1
m = MemoryStream(MsgModbusCmd, (0,), False)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
m.db.stat['proxy']['Modbus_Command'] = 0
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.id_str == b"R170000000000001"
assert m.unique_id == 'R170000000000001'
assert int(m.ctrl)==112
assert m.msg_id==119
assert m.header_len==23
assert m.data_len==13
assert m._forward_buffer==MsgModbusCmd
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
assert m.db.stat['proxy']['Modbus_Command'] == 1
m.close()
def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp):
ConfigTsunInv1
m = MemoryStream(MsgModbusRsp, (0,), False)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
m.db.stat['proxy']['Modbus_Command'] = 0
m.forward_modbus_rep = False
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.id_str == b"R170000000000001"
assert m.unique_id == 'R170000000000001'
assert int(m.ctrl)==145
assert m.msg_id==119
assert m.header_len==23
assert m.data_len==13
assert m._forward_buffer==b''
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
assert m.db.stat['proxy']['Modbus_Command'] == 0
m.close()
def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusRsp):
ConfigTsunInv1
m = MemoryStream(MsgModbusRsp, (0,), False)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
m.db.stat['proxy']['Modbus_Command'] = 0
m.forward_modbus_rep = True
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.id_str == b"R170000000000001"
assert m.unique_id == 'R170000000000001'
assert int(m.ctrl)==145
assert m.msg_id==119
assert m.header_len==23
assert m.data_len==13
assert m._forward_buffer==MsgModbusRsp
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
assert m.db.stat['proxy']['Modbus_Command'] == 0
m.close()
def test_msg_modbus_invalid(ConfigTsunInv1, MsgModbusInv):
ConfigTsunInv1
m = MemoryStream(MsgModbusInv, (0,), False)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
m.db.stat['proxy']['Modbus_Command'] = 0
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.id_str == b"R170000000000001"
assert m.unique_id == 'R170000000000001'
assert int(m.ctrl)==153
assert m.msg_id==119
assert m.header_len==23
assert m.data_len==13
assert m._forward_buffer==MsgModbusInv
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 1
assert m.db.stat['proxy']['Modbus_Command'] == 0
m.close()
@pytest.mark.asyncio
async def test_msg_build_modbus_req(ConfigTsunInv1, MsgModbusCmd):
ConfigTsunInv1
m = MemoryStream(b'', (0,), False)
m.id_str = b"R170000000000001"
await m.send_modbus_cmd(Modbus.MB_WRITE_SINGLE_REG, 0x2008, 0)
assert 0==m.send_msg_ofs
assert m._forward_buffer==b''
assert m._send_buffer==MsgModbusCmd
m.close()