S allius/issue320 (#324)

* at unit test for 0x4510 msg with frametype 5
This commit is contained in:
Stefan Allius
2025-03-26 18:56:01 +01:00
committed by GitHub
parent 32d7711ab7
commit 8d2dcb7212

View File

@@ -633,6 +633,15 @@ def msg_modbus_cmd_fwd():
msg += b'\x15'
return msg
@pytest.fixture
def msg_modbus_cmd_seq():
msg = b'\xa5\x17\x00\x10\x45\x03\x02' +get_sn() +b'\x05\x26\x30\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x01\x06\x01\x00\x01'
msg += b'\x03\xe8'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def msg_modbus_cmd_crc_err():
msg = b'\xa5\x17\x00\x10\x45\x03\x02' +get_sn() +b'\x02\xb0\x02'
@@ -655,6 +664,19 @@ def msg_modbus_rsp(): # 0x1510
msg += b'\x15'
return msg
@pytest.fixture
def msg_modbus_interim_rsp(): # 0x0510
msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01'
msg += total()
msg += hb()
msg += b'\x0a\xe2\xfa\x33\x01\x03\x28\x40\x10\x08\xd8'
msg += b'\x00\x00\x13\x87\x00\x31\x00\x68\x02\x58\x00\x00\x01\x53\x00\x02'
msg += b'\x00\x00\x01\x52\x00\x02\x00\x00\x01\x53\x00\x03\x00\x00\x00\x04'
msg += b'\x00\x01\x00\x00\x6c\x68'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def msg_modbus_rsp_inv_id2(): # 0x1510
msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01'
@@ -1714,6 +1736,34 @@ def test_msg_modbus_req(config_tsun_inv1, msg_modbus_cmd, msg_modbus_cmd_fwd):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_msg_modbus_req_seq(config_tsun_inv1, msg_modbus_cmd_seq):
_ = config_tsun_inv1
m = MemoryStream(b'')
m.snr = get_sn_int()
m.sensor_list = 0x2b0
m.state = State.up
c = m.createClientStream(msg_modbus_cmd_seq)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
m.db.stat['proxy']['AT_Command'] = 0
m.db.stat['proxy']['Modbus_Command'] = 0
m.db.stat['proxy']['Invalid_Msg_Format'] = 0
c.read() # read complete msg, and dispatch msg
assert not c.header_valid # must be invalid, since msg was handled and buffer flushed
assert c.msg_count == 1
assert c.control == 0x4510
assert str(c.seq) == '03:02'
assert c.header_len==11
assert c.data_len==23
assert c.ifc.fwd_fifo.get()==msg_modbus_cmd_seq
assert c.ifc.tx_fifo.get()==b''
assert m.sent_pdu == b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
assert m.db.stat['proxy']['AT_Command'] == 0
assert m.db.stat['proxy']['Modbus_Command'] == 0
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_msg_modbus_req2(config_tsun_inv1, msg_modbus_cmd_crc_err):
_ = config_tsun_inv1
m = MemoryStream(b'')