adapt unit tests
This commit is contained in:
@@ -1277,7 +1277,6 @@ async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIn
|
|||||||
def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg):
|
def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg):
|
||||||
ConfigTsunInv1
|
ConfigTsunInv1
|
||||||
m = MemoryStream(AtCommandIndMsg, (0,), False)
|
m = MemoryStream(AtCommandIndMsg, (0,), False)
|
||||||
m.forward_modbus_resp = False
|
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['AT_Command'] = 0
|
m.db.stat['proxy']['AT_Command'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
@@ -1296,7 +1295,6 @@ def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg):
|
|||||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||||
assert m.db.stat['proxy']['AT_Command'] == 1
|
assert m.db.stat['proxy']['AT_Command'] == 1
|
||||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||||
assert m.forward_modbus_resp == False
|
|
||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
def test_msg_at_command_rsp1(ConfigTsunInv1, AtCommandRspMsg):
|
def test_msg_at_command_rsp1(ConfigTsunInv1, AtCommandRspMsg):
|
||||||
@@ -1342,7 +1340,6 @@ def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
|||||||
m = MemoryStream(b'')
|
m = MemoryStream(b'')
|
||||||
c = m.createClientStream(MsgModbusCmd)
|
c = m.createClientStream(MsgModbusCmd)
|
||||||
|
|
||||||
m.forward_modbus_resp = False
|
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['AT_Command'] = 0
|
m.db.stat['proxy']['AT_Command'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
@@ -1360,7 +1357,6 @@ def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
|||||||
assert m.db.stat['proxy']['AT_Command'] == 0
|
assert m.db.stat['proxy']['AT_Command'] == 0
|
||||||
assert m.db.stat['proxy']['Modbus_Command'] == 1
|
assert m.db.stat['proxy']['Modbus_Command'] == 1
|
||||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||||
assert m.forward_modbus_resp == True
|
|
||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
def test_msg_modbus_req2(ConfigTsunInv1, MsgModbusCmdCrcErr):
|
def test_msg_modbus_req2(ConfigTsunInv1, MsgModbusCmdCrcErr):
|
||||||
@@ -1368,7 +1364,6 @@ def test_msg_modbus_req2(ConfigTsunInv1, MsgModbusCmdCrcErr):
|
|||||||
m = MemoryStream(b'')
|
m = MemoryStream(b'')
|
||||||
c = m.createClientStream(MsgModbusCmdCrcErr)
|
c = m.createClientStream(MsgModbusCmdCrcErr)
|
||||||
|
|
||||||
m.forward_modbus_resp = False
|
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['AT_Command'] = 0
|
m.db.stat['proxy']['AT_Command'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
@@ -1386,13 +1381,11 @@ def test_msg_modbus_req2(ConfigTsunInv1, MsgModbusCmdCrcErr):
|
|||||||
assert m.db.stat['proxy']['AT_Command'] == 0
|
assert m.db.stat['proxy']['AT_Command'] == 0
|
||||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
|
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
|
||||||
assert m.forward_modbus_resp == True
|
|
||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
def test_msg_unknown_cmd_req(ConfigTsunInv1, MsgUnknownCmd):
|
def test_msg_unknown_cmd_req(ConfigTsunInv1, MsgUnknownCmd):
|
||||||
ConfigTsunInv1
|
ConfigTsunInv1
|
||||||
m = MemoryStream(MsgUnknownCmd, (0,), False)
|
m = MemoryStream(MsgUnknownCmd, (0,), False)
|
||||||
m.forward_modbus_resp = False
|
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['AT_Command'] = 0
|
m.db.stat['proxy']['AT_Command'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
@@ -1410,15 +1403,14 @@ def test_msg_unknown_cmd_req(ConfigTsunInv1, MsgUnknownCmd):
|
|||||||
assert m.db.stat['proxy']['AT_Command'] == 0
|
assert m.db.stat['proxy']['AT_Command'] == 0
|
||||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||||
assert m.forward_modbus_resp == False
|
|
||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp):
|
def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp):
|
||||||
|
'''Modbus response without a valid Modbus request must be dropped'''
|
||||||
ConfigTsunInv1
|
ConfigTsunInv1
|
||||||
m = MemoryStream(MsgModbusRsp)
|
m = MemoryStream(MsgModbusRsp)
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
m.forward_modbus_resp = False
|
|
||||||
m.read() # read complete msg, and dispatch msg
|
m.read() # read complete msg, and dispatch msg
|
||||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
assert m.msg_count == 1
|
assert m.msg_count == 1
|
||||||
@@ -1433,30 +1425,12 @@ def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp):
|
|||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusRsp):
|
def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusRsp):
|
||||||
ConfigTsunInv1
|
'''Modbus response with a valid Modbus request must be forwarded'''
|
||||||
m = MemoryStream(MsgModbusRsp)
|
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
|
||||||
m.forward_modbus_resp = True
|
|
||||||
m.read() # read complete msg, and dispatch msg
|
|
||||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
|
||||||
assert m.msg_count == 1
|
|
||||||
assert m.control == 0x1510
|
|
||||||
assert str(m.seq) == '03:03'
|
|
||||||
assert m.header_len==11
|
|
||||||
assert m.data_len==59
|
|
||||||
assert m._forward_buffer==MsgModbusRsp
|
|
||||||
assert m._send_buffer==b''
|
|
||||||
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
|
||||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
|
||||||
m.close()
|
|
||||||
|
|
||||||
def test_msg_modbus_rsp3(ConfigTsunInv1, MsgModbusRsp):
|
|
||||||
ConfigTsunInv1
|
ConfigTsunInv1
|
||||||
m = MemoryStream(MsgModbusRsp)
|
m = MemoryStream(MsgModbusRsp)
|
||||||
m.append_msg(MsgModbusRsp)
|
m.append_msg(MsgModbusRsp)
|
||||||
|
|
||||||
m.forward_modbus_resp = True
|
m.mb.rsp_handler = m._SolarmanV5__forward_msg
|
||||||
m.mb.last_addr = 1
|
m.mb.last_addr = 1
|
||||||
m.mb.last_fcode = 3
|
m.mb.last_fcode = 3
|
||||||
m.mb.last_len = 20
|
m.mb.last_len = 20
|
||||||
@@ -1494,7 +1468,6 @@ def test_msg_unknown_rsp(ConfigTsunInv1, MsgUnknownCmdRsp):
|
|||||||
m = MemoryStream(MsgUnknownCmdRsp)
|
m = MemoryStream(MsgUnknownCmdRsp)
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
m.forward_modbus_resp = True
|
|
||||||
m.read() # read complete msg, and dispatch msg
|
m.read() # read complete msg, and dispatch msg
|
||||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
assert m.msg_count == 1
|
assert m.msg_count == 1
|
||||||
@@ -1528,7 +1501,7 @@ def test_msg_modbus_fragment(ConfigTsunInv1, MsgModbusRsp):
|
|||||||
m = MemoryStream(MsgModbusRsp+b'\x00\x00\x00\x45\x10\x52\x31', (0,))
|
m = MemoryStream(MsgModbusRsp+b'\x00\x00\x00\x45\x10\x52\x31', (0,))
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
m.forward_modbus_resp = True
|
m.mb.rsp_handler = m._SolarmanV5__forward_msg
|
||||||
m.mb.last_addr = 1
|
m.mb.last_addr = 1
|
||||||
m.mb.last_fcode = 3
|
m.mb.last_fcode = 3
|
||||||
m.mb.last_len = 20
|
m.mb.last_len = 20
|
||||||
|
|||||||
@@ -860,11 +860,11 @@ def test_msg_modbus_req2(ConfigTsunInv1, MsgModbusCmdCrcErr):
|
|||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp):
|
def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp):
|
||||||
|
'''Modbus response without a valid Modbus request must be dropped'''
|
||||||
ConfigTsunInv1
|
ConfigTsunInv1
|
||||||
m = MemoryStream(MsgModbusRsp)
|
m = MemoryStream(MsgModbusRsp)
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
m.forward_modbus_resp = False
|
|
||||||
m.read() # read complete msg, and dispatch msg
|
m.read() # read complete msg, and dispatch msg
|
||||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
assert m.msg_count == 1
|
assert m.msg_count == 1
|
||||||
@@ -880,33 +880,13 @@ def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp):
|
|||||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusRsp):
|
def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusResp20):
|
||||||
ConfigTsunInv1
|
'''Modbus response with a valid Modbus request must be forwarded'''
|
||||||
m = MemoryStream(MsgModbusRsp)
|
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
|
||||||
m.forward_modbus_resp = True
|
|
||||||
m.read() # read complete msg, and dispatch msg
|
|
||||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
|
||||||
assert m.msg_count == 1
|
|
||||||
assert m.id_str == b"R170000000000001"
|
|
||||||
assert m.unique_id == 'R170000000000001'
|
|
||||||
assert int(m.ctrl)==145
|
|
||||||
assert m.msg_id==119
|
|
||||||
assert m.header_len==23
|
|
||||||
assert m.data_len==13
|
|
||||||
assert m._forward_buffer==MsgModbusRsp
|
|
||||||
assert m._send_buffer==b''
|
|
||||||
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
|
||||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
|
||||||
m.close()
|
|
||||||
|
|
||||||
def test_msg_modbus_rsp3(ConfigTsunInv1, MsgModbusResp20):
|
|
||||||
ConfigTsunInv1
|
ConfigTsunInv1
|
||||||
m = MemoryStream(MsgModbusResp20)
|
m = MemoryStream(MsgModbusResp20)
|
||||||
m.append_msg(MsgModbusResp20)
|
m.append_msg(MsgModbusResp20)
|
||||||
|
|
||||||
m.forward_modbus_resp = True
|
m.mb.rsp_handler = m.msg_forward
|
||||||
m.mb.last_addr = 1
|
m.mb.last_addr = 1
|
||||||
m.mb.last_fcode = 3
|
m.mb.last_fcode = 3
|
||||||
m.mb.last_len = 20
|
m.mb.last_len = 20
|
||||||
@@ -966,7 +946,7 @@ def test_msg_modbus_fragment(ConfigTsunInv1, MsgModbusResp20):
|
|||||||
m = MemoryStream(MsgModbusResp20+b'\x00\x00\x00\x45\x10\x52\x31', (0,))
|
m = MemoryStream(MsgModbusResp20+b'\x00\x00\x00\x45\x10\x52\x31', (0,))
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
m.forward_modbus_resp = True
|
m.mb.rsp_handler = m.msg_forward
|
||||||
m.mb.last_addr = 1
|
m.mb.last_addr = 1
|
||||||
m.mb.last_fcode = 3
|
m.mb.last_fcode = 3
|
||||||
m.mb.last_len = 20
|
m.mb.last_len = 20
|
||||||
|
|||||||
Reference in New Issue
Block a user