fix unit tests
This commit is contained in:
@@ -63,6 +63,12 @@ class MemoryStream(SolarmanV5):
|
|||||||
if self.test_exception_async_write:
|
if self.test_exception_async_write:
|
||||||
raise RuntimeError("Peer closed.")
|
raise RuntimeError("Peer closed.")
|
||||||
|
|
||||||
|
def createClientStream(self, msg, chunks = (0,)):
|
||||||
|
c = MemoryStream(msg, chunks, False)
|
||||||
|
self.remoteStream = c
|
||||||
|
c. remoteStream = self
|
||||||
|
return c
|
||||||
|
|
||||||
def _SolarmanV5__flush_recv_msg(self) -> None:
|
def _SolarmanV5__flush_recv_msg(self) -> None:
|
||||||
super()._SolarmanV5__flush_recv_msg()
|
super()._SolarmanV5__flush_recv_msg()
|
||||||
self.msg_count += 1
|
self.msg_count += 1
|
||||||
@@ -1174,20 +1180,22 @@ def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg):
|
|||||||
|
|
||||||
def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
||||||
ConfigTsunInv1
|
ConfigTsunInv1
|
||||||
m = MemoryStream(MsgModbusCmd, (0,), False)
|
m = MemoryStream(b'')
|
||||||
|
c = m.createClientStream(MsgModbusCmd)
|
||||||
|
|
||||||
m.forward_modbus_resp = False
|
m.forward_modbus_resp = False
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['AT_Command'] = 0
|
m.db.stat['proxy']['AT_Command'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
m.read() # read complete msg, and dispatch msg
|
c.read() # read complete msg, and dispatch msg
|
||||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
assert not c.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
assert m.msg_count == 1
|
assert c.msg_count == 1
|
||||||
assert m.control == 0x4510
|
assert c.control == 0x4510
|
||||||
assert str(m.seq) == '03:02'
|
assert str(c.seq) == '03:02'
|
||||||
assert m.header_len==11
|
assert c.header_len==11
|
||||||
assert m.data_len==23
|
assert c.data_len==23
|
||||||
assert m._forward_buffer==MsgModbusCmd
|
assert c._forward_buffer==MsgModbusCmd
|
||||||
assert m._send_buffer==b''
|
assert c._send_buffer==b''
|
||||||
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||||
assert m.db.stat['proxy']['AT_Command'] == 0
|
assert m.db.stat['proxy']['AT_Command'] == 0
|
||||||
assert m.db.stat['proxy']['Modbus_Command'] == 1
|
assert m.db.stat['proxy']['Modbus_Command'] == 1
|
||||||
|
|||||||
@@ -51,6 +51,12 @@ class MemoryStream(Talent):
|
|||||||
def _timestamp(self):
|
def _timestamp(self):
|
||||||
return 1700260990000
|
return 1700260990000
|
||||||
|
|
||||||
|
def createClientStream(self, msg, chunks = (0,)):
|
||||||
|
c = MemoryStream(msg, chunks, False)
|
||||||
|
self.remoteStream = c
|
||||||
|
c. remoteStream = self
|
||||||
|
return c
|
||||||
|
|
||||||
def _Talent__flush_recv_msg(self) -> None:
|
def _Talent__flush_recv_msg(self) -> None:
|
||||||
super()._Talent__flush_recv_msg()
|
super()._Talent__flush_recv_msg()
|
||||||
self.msg_count += 1
|
self.msg_count += 1
|
||||||
@@ -789,20 +795,22 @@ def test_proxy_counter():
|
|||||||
|
|
||||||
def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
||||||
ConfigTsunInv1
|
ConfigTsunInv1
|
||||||
m = MemoryStream(MsgModbusCmd, (0,), False)
|
m = MemoryStream(b'')
|
||||||
|
c = m.createClientStream(MsgModbusCmd)
|
||||||
|
|
||||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
m.read() # read complete msg, and dispatch msg
|
c.read() # read complete msg, and dispatch msg
|
||||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
assert not c.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
assert m.msg_count == 1
|
assert c.msg_count == 1
|
||||||
assert m.id_str == b"R170000000000001"
|
assert c.id_str == b"R170000000000001"
|
||||||
assert m.unique_id == 'R170000000000001'
|
assert c.unique_id == 'R170000000000001'
|
||||||
assert int(m.ctrl)==112
|
assert int(c.ctrl)==112
|
||||||
assert m.msg_id==119
|
assert c.msg_id==119
|
||||||
assert m.header_len==23
|
assert c.header_len==23
|
||||||
assert m.data_len==13
|
assert c.data_len==13
|
||||||
assert m._forward_buffer==MsgModbusCmd
|
assert c._forward_buffer==MsgModbusCmd
|
||||||
assert m._send_buffer==b''
|
assert c._send_buffer==b''
|
||||||
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||||
assert m.db.stat['proxy']['Modbus_Command'] == 1
|
assert m.db.stat['proxy']['Modbus_Command'] == 1
|
||||||
m.close()
|
m.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user