fix unit tests
This commit is contained in:
@@ -17,8 +17,11 @@ timestamp = int(time.time()) # 1712861197
|
|||||||
heartbeat = 60
|
heartbeat = 60
|
||||||
|
|
||||||
class Writer():
|
class Writer():
|
||||||
|
def __init__(self):
|
||||||
|
self.sent_pdu = b''
|
||||||
|
|
||||||
def write(self, pdu: bytearray):
|
def write(self, pdu: bytearray):
|
||||||
pass
|
self.sent_pdu = pdu
|
||||||
|
|
||||||
class MemoryStream(SolarmanV5):
|
class MemoryStream(SolarmanV5):
|
||||||
def __init__(self, msg, chunks = (0,), server_side: bool = True):
|
def __init__(self, msg, chunks = (0,), server_side: bool = True):
|
||||||
@@ -1200,7 +1203,8 @@ async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg,
|
|||||||
assert m._recv_buffer==InverterIndMsg # unhandled next message
|
assert m._recv_buffer==InverterIndMsg # unhandled next message
|
||||||
assert 0 == m.send_msg_ofs
|
assert 0 == m.send_msg_ofs
|
||||||
assert m._forward_buffer == b''
|
assert m._forward_buffer == b''
|
||||||
assert m._send_buffer == b'' # modbus command must be ignore, cause connection is still not up
|
assert m.writer.sent_pdu == b'' # modbus command must be ignore, cause connection is still not up
|
||||||
|
assert m._send_buffer == b'' # modbus command must be ignore, cause connection is still not up
|
||||||
|
|
||||||
m.read()
|
m.read()
|
||||||
assert m.control == 0x4210
|
assert m.control == 0x4210
|
||||||
@@ -1214,7 +1218,8 @@ async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg,
|
|||||||
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||||
assert 0 == m.send_msg_ofs
|
assert 0 == m.send_msg_ofs
|
||||||
assert m._forward_buffer == b''
|
assert m._forward_buffer == b''
|
||||||
assert m._send_buffer == MsgModbusCmd
|
assert m.writer.sent_pdu == MsgModbusCmd
|
||||||
|
assert m._send_buffer == b''
|
||||||
|
|
||||||
m._send_buffer = bytearray(0) # clear send buffer for next test
|
m._send_buffer = bytearray(0) # clear send buffer for next test
|
||||||
m.test_exception_async_write = True
|
m.test_exception_async_write = True
|
||||||
|
|||||||
@@ -15,8 +15,11 @@ tracer = logging.getLogger('tracer')
|
|||||||
|
|
||||||
|
|
||||||
class Writer():
|
class Writer():
|
||||||
|
def __init__(self):
|
||||||
|
self.sent_pdu = b''
|
||||||
|
|
||||||
def write(self, pdu: bytearray):
|
def write(self, pdu: bytearray):
|
||||||
pass
|
self.sent_pdu = pdu
|
||||||
|
|
||||||
class MemoryStream(Talent):
|
class MemoryStream(Talent):
|
||||||
def __init__(self, msg, chunks = (0,), server_side: bool = True):
|
def __init__(self, msg, chunks = (0,), server_side: bool = True):
|
||||||
@@ -997,19 +1000,22 @@ async def test_msg_build_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
|||||||
assert 0 == m.send_msg_ofs
|
assert 0 == m.send_msg_ofs
|
||||||
assert m._forward_buffer == b''
|
assert m._forward_buffer == b''
|
||||||
assert m._send_buffer == b''
|
assert m._send_buffer == b''
|
||||||
|
assert m.writer.sent_pdu == b''
|
||||||
|
|
||||||
m.state = m.STATE_UP
|
m.state = m.STATE_UP
|
||||||
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||||
assert 0 == m.send_msg_ofs
|
assert 0 == m.send_msg_ofs
|
||||||
assert m._forward_buffer == b''
|
assert m._forward_buffer == b''
|
||||||
assert m._send_buffer == MsgModbusCmd
|
assert m._send_buffer == b''
|
||||||
|
assert m.writer.sent_pdu == MsgModbusCmd
|
||||||
|
|
||||||
m._send_buffer = bytearray(0) # clear send buffer for next test
|
m.writer.sent_pdu = bytearray(0) # clear send buffer for next test
|
||||||
m.test_exception_async_write = True
|
m.test_exception_async_write = True
|
||||||
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||||
assert 0 == m.send_msg_ofs
|
assert 0 == m.send_msg_ofs
|
||||||
assert m._forward_buffer == b''
|
assert m._forward_buffer == b''
|
||||||
assert m._send_buffer == b''
|
assert m._send_buffer == b''
|
||||||
|
assert m.writer.sent_pdu == b''
|
||||||
m.close()
|
m.close()
|
||||||
'''
|
'''
|
||||||
def test_zombie_conn(ConfigTsunInv1, MsgInverterInd):
|
def test_zombie_conn(ConfigTsunInv1, MsgInverterInd):
|
||||||
|
|||||||
Reference in New Issue
Block a user