add more unit tests

This commit is contained in:
Stefan Allius
2024-05-09 23:23:33 +02:00
parent 65973b2835
commit b3f0fc97d7
2 changed files with 98 additions and 4 deletions

View File

@@ -28,6 +28,7 @@ class MemoryStream(SolarmanV5):
self.addr = 'Test: SrvSide'
self.db.stat['proxy']['Invalid_Msg_Format'] = 0
self.db.stat['proxy']['AT_Command'] = 0
self.test_exception_async_write = False
def _timestamp(self):
return timestamp
@@ -59,7 +60,8 @@ class MemoryStream(SolarmanV5):
return copied_bytes
async def async_write(self, headline=''):
pass
if self.test_exception_async_write:
raise RuntimeError("Peer closed.")
def _SolarmanV5__flush_recv_msg(self) -> None:
super()._SolarmanV5__flush_recv_msg()
@@ -315,6 +317,39 @@ def InverterIndMsg2000(): # 0x4210 rated Power 2000W
msg += b'\x15'
return msg
@pytest.fixture
def InverterIndMsg800(): # 0x4210 rated Power 800W
msg = b'\xa5\x99\x01\x10\x42\xe6\x9e' +get_sn() +b'\x01\xb0\x02\xbc\xc8'
msg += b'\x24\x32\x6c\x1f\x00\x00\xa0\x47\xe4\x33\x01\x00\x03\x08\x00\x00'
msg += b'\x59\x31\x37\x45\x37\x41\x30\x46\x30\x31\x30\x42\x30\x31\x33\x45'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x01\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x40\x10\x08\xc8\x00\x49\x13\x8d\x00\x36\x00\x00\x03\x20\x06\x7a'
msg += b'\x01\x61\x00\xa8\x02\x54\x01\x5a\x00\x8a\x01\xe4\x01\x5a\x00\xbd'
msg += b'\x02\x8f\x00\x11\x00\x01\x00\x00\x00\x0b\x00\x00\x27\x98\x00\x04'
msg += b'\x00\x00\x0c\x04\x00\x03\x00\x00\x0a\xe7\x00\x05\x00\x00\x0c\x75'
msg += b'\x00\x00\x00\x00\x06\x16\x02\x00\x00\x00\x55\xaa\x00\x01\x00\x00'
msg += b'\x00\x00\x00\x00\xff\xff\x03\x20\x00\x03\x04\x00\x04\x00\x04\x00'
msg += b'\x04\x00\x00\x01\xff\xff\x00\x01\x00\x06\x00\x68\x00\x68\x05\x00'
msg += b'\x09\xcd\x07\xb6\x13\x9c\x13\x24\x00\x01\x07\xae\x04\x0f\x00\x41'
msg += b'\x00\x0f\x0a\x64\x0a\x64\x00\x06\x00\x06\x09\xf6\x12\x8c\x12\x8c'
msg += b'\x00\x10\x00\x10\x14\x52\x14\x52\x00\x10\x00\x10\x01\x51\x00\x05'
msg += b'\x04\x00\x00\x01\x13\x9c\x0f\xa0\x00\x4e\x00\x66\x03\xe8\x04\x00'
msg += b'\x09\xce\x07\xa8\x13\x9c\x13\x26\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x04\x00\x04\x00\x00\x00\x00\x00\xff\xff\x00\x00'
msg += b'\x00\x00\x00\x00'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def InverterRspMsg(): # 0x1210
msg = b'\xa5\x0a\x00\x10\x12\x02\02' +get_sn() +b'\x01\x01'
@@ -947,6 +982,18 @@ def test_build_modell_2000(ConfigTsunAllowAll, InverterIndMsg2000):
assert 'TSOL-MS2000' == m.db.get_db_value(Register.EQUIPMENT_MODEL, 0)
m.close()
def test_build_modell_800(ConfigTsunAllowAll, InverterIndMsg800):
ConfigTsunAllowAll
m = MemoryStream(InverterIndMsg800, (0,))
assert 0 == m.db.get_db_value(Register.MAX_DESIGNED_POWER, 0)
assert None == m.db.get_db_value(Register.RATED_POWER, None)
assert None == m.db.get_db_value(Register.INVERTER_TEMP, None)
m.read() # read complete msg, and dispatch msg
assert 800 == m.db.get_db_value(Register.MAX_DESIGNED_POWER, 0)
assert 800 == m.db.get_db_value(Register.RATED_POWER, 0)
assert 'TSOL-MSxx00' == m.db.get_db_value(Register.EQUIPMENT_MODEL, 0)
m.close()
def test_build_logger_modell(ConfigTsunAllowAll, DeviceIndMsg):
ConfigTsunAllowAll
m = MemoryStream(DeviceIndMsg, (0,))
@@ -973,6 +1020,7 @@ async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg,
m._send_buffer = bytearray(0) # clear send buffer for next test
m._forward_buffer = bytearray(0) # clear send buffer for next test
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
assert m._recv_buffer==InverterIndMsg # unhandled next message
assert 0 == m.send_msg_ofs
assert m._forward_buffer == b''
assert m._send_buffer == b'' # modbus command must be ignore, cause connection is still not up
@@ -990,12 +1038,35 @@ async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg,
assert 0 == m.send_msg_ofs
assert m._forward_buffer == b''
assert m._send_buffer == MsgModbusCmd
m._send_buffer = bytearray(0) # clear send buffer for next test
m.test_exception_async_write = True
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
assert 0 == m.send_msg_ofs
assert m._forward_buffer == b''
assert m._send_buffer == b''
m.close()
@pytest.mark.asyncio
async def test_AT_cmd(ConfigTsunAllowAll, InverterIndMsg, InverterRspMsg, AtCommandIndMsg):
async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, AtCommandIndMsg):
ConfigTsunAllowAll
m = MemoryStream(InverterIndMsg, (0,), True)
m = MemoryStream(DeviceIndMsg, (0,), True)
m.append_msg(InverterIndMsg)
m.read()
assert m.control == 0x4110
assert str(m.seq) == '01:01'
assert m._recv_buffer==InverterIndMsg # unhandled next message
assert m._send_buffer==DeviceRspMsg
assert m._forward_buffer==DeviceIndMsg
m._send_buffer = bytearray(0) # clear send buffer for next test
m._forward_buffer = bytearray(0) # clear send buffer for next test
await m.send_at_cmd('AT+TIME=214028,1,60,120')
assert m._recv_buffer==InverterIndMsg # unhandled next message
assert m._send_buffer==b''
assert m._forward_buffer==b''
assert str(m.seq) == '01:01'
m.read()
assert m.control == 0x4210
assert str(m.seq) == '02:02'
@@ -1010,4 +1081,13 @@ async def test_AT_cmd(ConfigTsunAllowAll, InverterIndMsg, InverterRspMsg, AtComm
assert m._send_buffer==AtCommandIndMsg
assert m._forward_buffer==b''
assert str(m.seq) == '02:03'
m._send_buffer = bytearray(0) # clear send buffer for next test
m.test_exception_async_write = True
await m.send_at_cmd('AT+TIME=214028,1,60,120')
assert m._recv_buffer==b''
assert m._send_buffer==b''
assert m._forward_buffer==b''
assert str(m.seq) == '02:04'
m.close()

View File

@@ -24,6 +24,7 @@ class MemoryStream(Talent):
self.msg_count = 0
self.addr = 'Test: SrvSide'
self.send_msg_ofs = 0
self.test_exception_async_write = False
def append_msg(self, msg):
self.__msg += msg
@@ -56,7 +57,8 @@ class MemoryStream(Talent):
return
async def async_write(self, headline=''):
pass
if self.test_exception_async_write:
raise RuntimeError("Peer closed.")
@@ -899,9 +901,21 @@ async def test_msg_build_modbus_req(ConfigTsunInv1, MsgModbusCmd):
ConfigTsunInv1
m = MemoryStream(b'', (0,), True)
m.id_str = b"R170000000000001"
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
assert 0 == m.send_msg_ofs
assert m._forward_buffer == b''
assert m._send_buffer == b''
m.state = m.STATE_UP
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
assert 0 == m.send_msg_ofs
assert m._forward_buffer == b''
assert m._send_buffer == MsgModbusCmd
m._send_buffer = bytearray(0) # clear send buffer for next test
m.test_exception_async_write = True
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
assert 0 == m.send_msg_ofs
assert m._forward_buffer == b''
assert m._send_buffer == b''
m.close()