test inv response for a mb scan request
This commit is contained in:
@@ -709,6 +709,19 @@ def msg_modbus_rsp(): # 0x1510
|
||||
msg += b'\x15'
|
||||
return msg
|
||||
|
||||
@pytest.fixture
|
||||
def msg_modbus_rsp_mb_4(): # 0x1510, MODBUS Type:4
|
||||
msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01'
|
||||
msg += total()
|
||||
msg += hb()
|
||||
msg += b'\x0a\xe2\xfa\x33\x01\x04\x28\x40\x10\x08\xd8'
|
||||
msg += b'\x00\x00\x13\x87\x00\x31\x00\x68\x02\x58\x00\x00\x01\x53\x00\x02'
|
||||
msg += b'\x00\x00\x01\x52\x00\x02\x00\x00\x01\x53\x00\x03\x00\x00\x00\x04'
|
||||
msg += b'\x00\x01\x00\x00\x9e\xa4'
|
||||
msg += correct_checksum(msg)
|
||||
msg += b'\x15'
|
||||
return msg
|
||||
|
||||
@pytest.fixture
|
||||
def msg_modbus_interim_rsp(): # 0x0510
|
||||
msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01'
|
||||
@@ -2241,6 +2254,61 @@ async def test_modbus_scaning(config_tsun_scan, heartbeat_ind_msg, heartbeat_rsp
|
||||
assert next(m.mb_timer.exp_count) == 3
|
||||
m.close()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_modbus_scaning_inv_rsp(config_tsun_scan, heartbeat_ind_msg, heartbeat_rsp_msg, msg_modbus_rsp_mb_4):
|
||||
_ = config_tsun_scan
|
||||
assert asyncio.get_running_loop()
|
||||
|
||||
m = MemoryStream(heartbeat_ind_msg, (0x15,0x56,0))
|
||||
m.append_msg(msg_modbus_rsp_mb_4)
|
||||
assert m.mb_scan == False
|
||||
assert asyncio.get_running_loop() == m.mb_timer.loop
|
||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||
assert m.mb_timer.tim == None
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert m.mb_scan == True
|
||||
assert m.mb_start_reg == 0xff80
|
||||
assert m.mb_step == 0x40
|
||||
assert m.mb_bytes == 0x14
|
||||
assert asyncio.get_running_loop() == m.mb_timer.loop
|
||||
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 1
|
||||
assert m.snr == 2070233889
|
||||
assert m.control == 0x4710
|
||||
|
||||
assert m.msg_recvd[0]['control']==0x4710
|
||||
assert m.msg_recvd[0]['seq']=='84:11'
|
||||
assert m.msg_recvd[0]['data_len']==0x1
|
||||
|
||||
assert m.ifc.tx_fifo.get()==heartbeat_rsp_msg
|
||||
assert m.ifc.fwd_fifo.get()==heartbeat_ind_msg
|
||||
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||
|
||||
m.ifc.tx_clear() # clear send buffer for next test
|
||||
assert isclose(m.mb_timeout, 0.5)
|
||||
assert next(m.mb_timer.exp_count) == 0
|
||||
|
||||
await asyncio.sleep(0.5)
|
||||
assert m.sent_pdu==b'\xa5\x17\x00\x10E\x12\x84!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00' \
|
||||
b'\x00\x00\x00\x00\x00\x00\x01\x03\xff\xc0\x00\x14\x75\xed\x33\x15'
|
||||
assert m.ifc.tx_fifo.get()==b''
|
||||
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 2
|
||||
assert m.msg_recvd[1]['control']==0x1510
|
||||
assert m.msg_recvd[1]['seq']=='03:03'
|
||||
assert m.msg_recvd[1]['data_len']==0x3b
|
||||
assert m.mb.last_addr == 1
|
||||
assert m.mb.last_fcode == 3
|
||||
assert m.mb.last_reg == 0xffc0 # mb_start_reg + mb_step
|
||||
assert m.mb.last_len == 20
|
||||
assert m.mb.err == 3
|
||||
|
||||
assert next(m.mb_timer.exp_count) == 2
|
||||
m.close()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_client_mode(my_loop, config_tsun_inv1, str_test_ip):
|
||||
_ = config_tsun_inv1
|
||||
|
||||
Reference in New Issue
Block a user