add more unit tests
This commit is contained in:
@@ -92,6 +92,7 @@ class SolarmanV5(Message):
|
||||
0x4510: self.msg_command_req, # from server
|
||||
0x1510: self.msg_command_rsp, # from inverter
|
||||
}
|
||||
self.modbus_elms = 0 # for unit tests
|
||||
|
||||
'''
|
||||
Our puplic methods
|
||||
@@ -447,8 +448,11 @@ class SolarmanV5(Message):
|
||||
if valid == 1 and modbus_msg_len > 4:
|
||||
# logger.info(f'first byte modbus:{data[14]}')
|
||||
inv_update = False
|
||||
self.modbus_elms = 0
|
||||
|
||||
for key, update, _ in self.mb.recv_resp(self.db, data[14:],
|
||||
self.node_id):
|
||||
self.modbus_elms += 1
|
||||
if update:
|
||||
if key == 'inverter':
|
||||
inv_update = True
|
||||
|
||||
@@ -388,7 +388,7 @@ def SyncStartRspMsg(): # 0x1310
|
||||
|
||||
@pytest.fixture
|
||||
def SyncStartFwdMsg(): # 0x4310
|
||||
msg = b'\xa5\x2f\x00\x10\x43\x0e\x0d' +get_sn() +b'\x81\x7a\x0b\x2e\x32'
|
||||
msg = b'\xa5\x2f\x00\x10\x43\x0d\x0e' +get_sn() +b'\x81\x7a\x0b\x2e\x32'
|
||||
msg += b'\x39\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x41\x6c\x6c\x69\x75\x73'
|
||||
msg += b'\x2d\x48\x6f\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x61\x01'
|
||||
@@ -474,6 +474,28 @@ def MsgModbusRsp(): # 0x1510
|
||||
msg += b'\x15'
|
||||
return msg
|
||||
|
||||
@pytest.fixture
|
||||
def MsgModbusInvalid(): # 0x1510
|
||||
msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x00'
|
||||
msg += total()
|
||||
msg += hb()
|
||||
msg += b'\x0a\xe2\xfa\x33\x01\x03\x28\x40\x10\x08\xd8'
|
||||
msg += b'\x00\x00\x13\x87\x00\x31\x00\x68\x02\x58\x00\x00\x01\x53\x00\x02'
|
||||
msg += b'\x00\x00\x01\x52\x00\x02\x00\x00\x01\x53\x00\x03\x00\x00\x00\x04'
|
||||
msg += b'\x00\x01\x00\x00\x6c\x68'
|
||||
msg += correct_checksum(msg)
|
||||
msg += b'\x15'
|
||||
return msg
|
||||
|
||||
@pytest.fixture
|
||||
def MsgUnknownCmd():
|
||||
msg = b'\xa5\x17\x00\x10\x45\x03\x02' +get_sn() +b'\x03\xb0\x02'
|
||||
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x06\x20\x08'
|
||||
msg += b'\x00\x00\x03\xc8'
|
||||
msg += correct_checksum(msg)
|
||||
msg += b'\x15'
|
||||
return msg
|
||||
|
||||
@pytest.fixture
|
||||
def ConfigTsunAllowAll():
|
||||
Config.config = {'solarman':{'enabled': True}, 'inverters':{'allow_all':True}}
|
||||
@@ -864,6 +886,7 @@ def test_sync_start_ind(ConfigTsunInv1, SyncStartIndMsg, SyncStartRspMsg, SyncSt
|
||||
assert m._forward_buffer==SyncStartIndMsg
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
|
||||
m.seq.server_side = False # simulate forawding to TSUN cloud
|
||||
m._update_header(m._forward_buffer)
|
||||
assert str(m.seq) == '0d:0e' # value after forwarding indication
|
||||
assert m._forward_buffer==SyncStartFwdMsg
|
||||
@@ -924,25 +947,6 @@ def test_sync_end_rsp(ConfigTsunInv1, SyncEndRspMsg):
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
m.close()
|
||||
|
||||
def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg):
|
||||
ConfigTsunInv1
|
||||
m = MemoryStream(AtCommandIndMsg, (0,), False)
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 1
|
||||
assert m.header_len==11
|
||||
assert m.snr == 2070233889
|
||||
# assert m.unique_id == '2070233889'
|
||||
assert m.control == 0x4510
|
||||
assert str(m.seq) == '03:02'
|
||||
assert m.data_len == 39
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==b''
|
||||
assert m._forward_buffer==AtCommandIndMsg
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
assert m.db.stat['proxy']['AT_Command'] == 1
|
||||
m.close()
|
||||
|
||||
def test_build_modell_600(ConfigTsunAllowAll, InverterIndMsg):
|
||||
ConfigTsunAllowAll
|
||||
m = MemoryStream(InverterIndMsg, (0,))
|
||||
@@ -1143,10 +1147,37 @@ async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIn
|
||||
|
||||
m.close()
|
||||
|
||||
def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg):
|
||||
ConfigTsunInv1
|
||||
m = MemoryStream(AtCommandIndMsg, (0,), False)
|
||||
m.forward_modbus_resp = False
|
||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||
m.db.stat['proxy']['AT_Command'] = 0
|
||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 1
|
||||
assert m.header_len==11
|
||||
assert m.snr == 2070233889
|
||||
# assert m.unique_id == '2070233889'
|
||||
assert m.control == 0x4510
|
||||
assert str(m.seq) == '03:02'
|
||||
assert m.data_len == 39
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==b''
|
||||
assert m._forward_buffer==AtCommandIndMsg
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
assert m.db.stat['proxy']['AT_Command'] == 1
|
||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||
assert m.forward_modbus_resp == False
|
||||
m.close()
|
||||
|
||||
def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
||||
ConfigTsunInv1
|
||||
m = MemoryStream(MsgModbusCmd, (0,), False)
|
||||
m.forward_modbus_resp = False
|
||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||
m.db.stat['proxy']['AT_Command'] = 0
|
||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
@@ -1158,7 +1189,31 @@ def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
||||
assert m._forward_buffer==MsgModbusCmd
|
||||
assert m._send_buffer==b''
|
||||
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||
assert m.db.stat['proxy']['AT_Command'] == 0
|
||||
assert m.db.stat['proxy']['Modbus_Command'] == 1
|
||||
assert m.forward_modbus_resp == True
|
||||
m.close()
|
||||
|
||||
def test_msg_unknown_cmd_req(ConfigTsunInv1, MsgUnknownCmd):
|
||||
ConfigTsunInv1
|
||||
m = MemoryStream(MsgUnknownCmd, (0,), False)
|
||||
m.forward_modbus_resp = False
|
||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||
m.db.stat['proxy']['AT_Command'] = 0
|
||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 1
|
||||
assert m.control == 0x4510
|
||||
assert str(m.seq) == '03:02'
|
||||
assert m.header_len==11
|
||||
assert m.data_len==23
|
||||
assert m._forward_buffer==MsgUnknownCmd
|
||||
assert m._send_buffer==b''
|
||||
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||
assert m.db.stat['proxy']['AT_Command'] == 0
|
||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||
assert m.forward_modbus_resp == False
|
||||
m.close()
|
||||
|
||||
def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp):
|
||||
@@ -1233,31 +1288,25 @@ def test_msg_modbus_rsp3(ConfigTsunInv1, MsgModbusRsp):
|
||||
assert m.new_data['inverter'] == False
|
||||
|
||||
m.close()
|
||||
'''
|
||||
def test_msg_modbus_invalid(ConfigTsunInv1, MsgModbusInv):
|
||||
|
||||
def test_msg_modbus_invalid(ConfigTsunInv1, MsgModbusInvalid):
|
||||
ConfigTsunInv1
|
||||
m = MemoryStream(MsgModbusInv, (0,), False)
|
||||
m = MemoryStream(MsgModbusInvalid, (0,), False)
|
||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 1
|
||||
assert m.id_str == b"R170000000000001"
|
||||
assert m.unique_id == 'R170000000000001'
|
||||
assert int(m.ctrl)==153
|
||||
assert m.msg_id==119
|
||||
assert m.header_len==23
|
||||
assert m.data_len==13
|
||||
assert m._forward_buffer==MsgModbusInv
|
||||
assert m._forward_buffer==MsgModbusInvalid
|
||||
assert m._send_buffer==b''
|
||||
assert m.db.stat['proxy']['Unknown_Ctrl'] == 1
|
||||
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||
m.close()
|
||||
|
||||
def test_msg_modbus_fragment(ConfigTsunInv1, MsgModbusResp20):
|
||||
def test_msg_modbus_fragment(ConfigTsunInv1, MsgModbusRsp):
|
||||
ConfigTsunInv1
|
||||
# receive more bytes than expected (7 bytes from the next msg)
|
||||
m = MemoryStream(MsgModbusResp20+b'\x00\x00\x00\x45\x10\x52\x31', (0,))
|
||||
m = MemoryStream(MsgModbusRsp+b'\x00\x00\x00\x45\x10\x52\x31', (0,))
|
||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||
m.forward_modbus_resp = True
|
||||
@@ -1267,44 +1316,14 @@ def test_msg_modbus_fragment(ConfigTsunInv1, MsgModbusResp20):
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 1
|
||||
assert m.id_str == b"R170000000000001"
|
||||
assert m.unique_id == 'R170000000000001'
|
||||
assert int(m.ctrl) == 0x91
|
||||
assert m.msg_id == 119
|
||||
assert m.header_len == 23
|
||||
assert m.data_len == 50
|
||||
assert m._forward_buffer==MsgModbusResp20
|
||||
assert m._forward_buffer==MsgModbusRsp
|
||||
assert m._send_buffer == b''
|
||||
assert m.mb.err == 0
|
||||
assert m.modbus_elms == 20-1 # register 0x300d is unknown, so one value can't be mapped
|
||||
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||
m.close()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_msg_build_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
||||
ConfigTsunInv1
|
||||
m = MemoryStream(b'', (0,), True)
|
||||
m.id_str = b"R170000000000001"
|
||||
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||
assert 0 == m.send_msg_ofs
|
||||
assert m._forward_buffer == b''
|
||||
assert m._send_buffer == b''
|
||||
|
||||
m.state = m.STATE_UP
|
||||
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||
assert 0 == m.send_msg_ofs
|
||||
assert m._forward_buffer == b''
|
||||
assert m._send_buffer == MsgModbusCmd
|
||||
|
||||
m._send_buffer = bytearray(0) # clear send buffer for next test
|
||||
m.test_exception_async_write = True
|
||||
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||
assert 0 == m.send_msg_ofs
|
||||
assert m._forward_buffer == b''
|
||||
assert m._send_buffer == b''
|
||||
m.close()
|
||||
|
||||
'''
|
||||
def test_zombie_conn(ConfigTsunInv1, MsgInverterInd):
|
||||
ConfigTsunInv1
|
||||
tracer.setLevel(logging.DEBUG)
|
||||
|
||||
Reference in New Issue
Block a user