add more unit tests
This commit is contained in:
@@ -461,6 +461,19 @@ def MsgModbusCmd():
|
|||||||
msg += b'\x15'
|
msg += b'\x15'
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def MsgModbusRsp(): # 0x1510
|
||||||
|
msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01'
|
||||||
|
msg += total()
|
||||||
|
msg += hb()
|
||||||
|
msg += b'\x0a\xe2\xfa\x33\x01\x03\x28\x40\x10\x08\xd8'
|
||||||
|
msg += b'\x00\x00\x13\x87\x00\x31\x00\x68\x02\x58\x00\x00\x01\x53\x00\x02'
|
||||||
|
msg += b'\x00\x00\x01\x52\x00\x02\x00\x00\x01\x53\x00\x03\x00\x00\x00\x04'
|
||||||
|
msg += b'\x00\x01\x00\x00\x6c\x68'
|
||||||
|
msg += correct_checksum(msg)
|
||||||
|
msg += b'\x15'
|
||||||
|
return msg
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def ConfigTsunAllowAll():
|
def ConfigTsunAllowAll():
|
||||||
Config.config = {'solarman':{'enabled': True}, 'inverters':{'allow_all':True}}
|
Config.config = {'solarman':{'enabled': True}, 'inverters':{'allow_all':True}}
|
||||||
@@ -911,7 +924,7 @@ def test_sync_end_rsp(ConfigTsunInv1, SyncEndRspMsg):
|
|||||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg, AtCommandRspMsg):
|
def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg):
|
||||||
ConfigTsunInv1
|
ConfigTsunInv1
|
||||||
m = MemoryStream(AtCommandIndMsg, (0,), False)
|
m = MemoryStream(AtCommandIndMsg, (0,), False)
|
||||||
m.read() # read complete msg, and dispatch msg
|
m.read() # read complete msg, and dispatch msg
|
||||||
@@ -921,10 +934,10 @@ def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg, AtCommandRspMsg):
|
|||||||
assert m.snr == 2070233889
|
assert m.snr == 2070233889
|
||||||
# assert m.unique_id == '2070233889'
|
# assert m.unique_id == '2070233889'
|
||||||
assert m.control == 0x4510
|
assert m.control == 0x4510
|
||||||
assert str(m.seq) == '03:03'
|
assert str(m.seq) == '03:02'
|
||||||
assert m.data_len == 39
|
assert m.data_len == 39
|
||||||
assert m._recv_buffer==b''
|
assert m._recv_buffer==b''
|
||||||
assert m._send_buffer==AtCommandRspMsg
|
assert m._send_buffer==b''
|
||||||
assert m._forward_buffer==AtCommandIndMsg
|
assert m._forward_buffer==AtCommandIndMsg
|
||||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||||
assert m.db.stat['proxy']['AT_Command'] == 1
|
assert m.db.stat['proxy']['AT_Command'] == 1
|
||||||
@@ -1005,6 +1018,44 @@ def test_build_logger_modell(ConfigTsunAllowAll, DeviceIndMsg):
|
|||||||
assert 'V1.1.00.0B' == m.db.get_db_value(Register.COLLECTOR_FW_VERSION, 0).rstrip('\00')
|
assert 'V1.1.00.0B' == m.db.get_db_value(Register.COLLECTOR_FW_VERSION, 0).rstrip('\00')
|
||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
|
def test_msg_iterator():
|
||||||
|
m1 = SolarmanV5(server_side=True)
|
||||||
|
m2 = SolarmanV5(server_side=True)
|
||||||
|
m3 = SolarmanV5(server_side=True)
|
||||||
|
m3.close()
|
||||||
|
del m3
|
||||||
|
test1 = 0
|
||||||
|
test2 = 0
|
||||||
|
for key in SolarmanV5:
|
||||||
|
if key == m1:
|
||||||
|
test1+=1
|
||||||
|
elif key == m2:
|
||||||
|
test2+=1
|
||||||
|
elif type(key) != SolarmanV5:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
assert False
|
||||||
|
assert test1 == 1
|
||||||
|
assert test2 == 1
|
||||||
|
|
||||||
|
def test_proxy_counter():
|
||||||
|
m = SolarmanV5(server_side=True)
|
||||||
|
assert m.new_data == {}
|
||||||
|
m.db.stat['proxy']['Unknown_Msg'] = 0
|
||||||
|
Infos.new_stat_data['proxy'] = False
|
||||||
|
|
||||||
|
m.inc_counter('Unknown_Msg')
|
||||||
|
assert m.new_data == {}
|
||||||
|
assert Infos.new_stat_data == {'proxy': True}
|
||||||
|
assert 1 == m.db.stat['proxy']['Unknown_Msg']
|
||||||
|
|
||||||
|
Infos.new_stat_data['proxy'] = False
|
||||||
|
m.dec_counter('Unknown_Msg')
|
||||||
|
assert m.new_data == {}
|
||||||
|
assert Infos.new_stat_data == {'proxy': True}
|
||||||
|
assert 0 == m.db.stat['proxy']['Unknown_Msg']
|
||||||
|
m.close()
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, MsgModbusCmd):
|
async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, MsgModbusCmd):
|
||||||
ConfigTsunInv1
|
ConfigTsunInv1
|
||||||
@@ -1091,3 +1142,191 @@ async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIn
|
|||||||
assert str(m.seq) == '02:04'
|
assert str(m.seq) == '02:04'
|
||||||
|
|
||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
|
def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
||||||
|
ConfigTsunInv1
|
||||||
|
m = MemoryStream(MsgModbusCmd, (0,), False)
|
||||||
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
|
m.read() # read complete msg, and dispatch msg
|
||||||
|
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
|
assert m.msg_count == 1
|
||||||
|
assert m.control == 0x4510
|
||||||
|
assert str(m.seq) == '03:02'
|
||||||
|
assert m.header_len==11
|
||||||
|
assert m.data_len==23
|
||||||
|
assert m._forward_buffer==MsgModbusCmd
|
||||||
|
assert m._send_buffer==b''
|
||||||
|
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||||
|
assert m.db.stat['proxy']['Modbus_Command'] == 1
|
||||||
|
m.close()
|
||||||
|
|
||||||
|
def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp):
|
||||||
|
ConfigTsunInv1
|
||||||
|
m = MemoryStream(MsgModbusRsp, (0,), False)
|
||||||
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
|
m.forward_modbus_resp = False
|
||||||
|
m.read() # read complete msg, and dispatch msg
|
||||||
|
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
|
assert m.msg_count == 1
|
||||||
|
assert m.control == 0x1510
|
||||||
|
assert str(m.seq) == '03:03'
|
||||||
|
assert m.header_len==11
|
||||||
|
assert m.data_len==59
|
||||||
|
assert m._forward_buffer==b''
|
||||||
|
assert m._send_buffer==b''
|
||||||
|
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||||
|
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||||
|
m.close()
|
||||||
|
|
||||||
|
def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusRsp):
|
||||||
|
ConfigTsunInv1
|
||||||
|
m = MemoryStream(MsgModbusRsp, (0,), False)
|
||||||
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
|
m.forward_modbus_resp = True
|
||||||
|
m.read() # read complete msg, and dispatch msg
|
||||||
|
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
|
assert m.msg_count == 1
|
||||||
|
assert m.control == 0x1510
|
||||||
|
assert str(m.seq) == '03:03'
|
||||||
|
assert m.header_len==11
|
||||||
|
assert m.data_len==59
|
||||||
|
assert m._forward_buffer==MsgModbusRsp
|
||||||
|
assert m._send_buffer==b''
|
||||||
|
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||||
|
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||||
|
m.close()
|
||||||
|
|
||||||
|
def test_msg_modbus_rsp3(ConfigTsunInv1, MsgModbusRsp):
|
||||||
|
ConfigTsunInv1
|
||||||
|
m = MemoryStream(MsgModbusRsp, (0,), False)
|
||||||
|
m.append_msg(MsgModbusRsp)
|
||||||
|
|
||||||
|
m.forward_modbus_resp = True
|
||||||
|
m.mb.last_fcode = 3
|
||||||
|
m.mb.last_len = 20
|
||||||
|
m.mb.last_reg = 0x3008
|
||||||
|
# assert m.db.db == {'inverter': {'Manufacturer': 'TSUN', 'Equipment_Model': 'TSOL-MSxx00'}}
|
||||||
|
m.new_data['inverter'] = False
|
||||||
|
|
||||||
|
m.read() # read complete msg, and dispatch msg
|
||||||
|
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
|
assert m.mb.err == 0
|
||||||
|
assert m.msg_count == 1
|
||||||
|
assert m._forward_buffer==MsgModbusRsp
|
||||||
|
assert m._send_buffer==b''
|
||||||
|
# assert m.db.db == {'inverter': {'Version': 'V5.1.09', 'Rated_Power': 300}, 'grid': {'Voltage': 225.9, 'Current': 0.41, 'Frequency': 49.99, 'Output_Power': 94.8}, 'env': {'Inverter_Temp': 22}, 'input': {'pv1': {'Voltage': 0.8, 'Current': 0.0, 'Power': 0.0}, 'pv2': {'Voltage': 34.5, 'Current': 2.89, 'Power': 99.8}, 'pv3': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}, 'pv4': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}}}
|
||||||
|
assert m.db.get_db_value(Register.VERSION) == 'V4.0.10'
|
||||||
|
assert m.new_data['inverter'] == True
|
||||||
|
m.new_data['inverter'] = False
|
||||||
|
|
||||||
|
m.read() # read complete msg, and dispatch msg
|
||||||
|
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
|
assert m.mb.err == 0
|
||||||
|
assert m.msg_count == 2
|
||||||
|
assert m._forward_buffer==MsgModbusRsp
|
||||||
|
assert m._send_buffer==b''
|
||||||
|
# assert m.db.db == {'inverter': {'Version': 'V5.1.09', 'Rated_Power': 300}, 'grid': {'Voltage': 225.9, 'Current': 0.41, 'Frequency': 49.99, 'Output_Power': 94.8}, 'env': {'Inverter_Temp': 22}, 'input': {'pv1': {'Voltage': 0.8, 'Current': 0.0, 'Power': 0.0}, 'pv2': {'Voltage': 34.5, 'Current': 2.89, 'Power': 99.8}, 'pv3': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}, 'pv4': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}}}
|
||||||
|
assert m.db.get_db_value(Register.VERSION) == 'V4.0.10'
|
||||||
|
assert m.new_data['inverter'] == False
|
||||||
|
|
||||||
|
m.close()
|
||||||
|
'''
|
||||||
|
def test_msg_modbus_invalid(ConfigTsunInv1, MsgModbusInv):
|
||||||
|
ConfigTsunInv1
|
||||||
|
m = MemoryStream(MsgModbusInv, (0,), False)
|
||||||
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
|
m.read() # read complete msg, and dispatch msg
|
||||||
|
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
|
assert m.msg_count == 1
|
||||||
|
assert m.id_str == b"R170000000000001"
|
||||||
|
assert m.unique_id == 'R170000000000001'
|
||||||
|
assert int(m.ctrl)==153
|
||||||
|
assert m.msg_id==119
|
||||||
|
assert m.header_len==23
|
||||||
|
assert m.data_len==13
|
||||||
|
assert m._forward_buffer==MsgModbusInv
|
||||||
|
assert m._send_buffer==b''
|
||||||
|
assert m.db.stat['proxy']['Unknown_Ctrl'] == 1
|
||||||
|
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||||
|
m.close()
|
||||||
|
|
||||||
|
def test_msg_modbus_fragment(ConfigTsunInv1, MsgModbusResp20):
|
||||||
|
ConfigTsunInv1
|
||||||
|
# receive more bytes than expected (7 bytes from the next msg)
|
||||||
|
m = MemoryStream(MsgModbusResp20+b'\x00\x00\x00\x45\x10\x52\x31', (0,))
|
||||||
|
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||||
|
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||||
|
m.forward_modbus_resp = True
|
||||||
|
m.mb.last_fcode = 3
|
||||||
|
m.mb.last_len = 20
|
||||||
|
m.mb.last_reg = 0x3008
|
||||||
|
m.read() # read complete msg, and dispatch msg
|
||||||
|
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||||
|
assert m.msg_count == 1
|
||||||
|
assert m.id_str == b"R170000000000001"
|
||||||
|
assert m.unique_id == 'R170000000000001'
|
||||||
|
assert int(m.ctrl) == 0x91
|
||||||
|
assert m.msg_id == 119
|
||||||
|
assert m.header_len == 23
|
||||||
|
assert m.data_len == 50
|
||||||
|
assert m._forward_buffer==MsgModbusResp20
|
||||||
|
assert m._send_buffer == b''
|
||||||
|
assert m.mb.err == 0
|
||||||
|
assert m.modbus_elms == 20-1 # register 0x300d is unknown, so one value can't be mapped
|
||||||
|
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
|
||||||
|
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||||
|
m.close()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_msg_build_modbus_req(ConfigTsunInv1, MsgModbusCmd):
|
||||||
|
ConfigTsunInv1
|
||||||
|
m = MemoryStream(b'', (0,), True)
|
||||||
|
m.id_str = b"R170000000000001"
|
||||||
|
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||||
|
assert 0 == m.send_msg_ofs
|
||||||
|
assert m._forward_buffer == b''
|
||||||
|
assert m._send_buffer == b''
|
||||||
|
|
||||||
|
m.state = m.STATE_UP
|
||||||
|
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||||
|
assert 0 == m.send_msg_ofs
|
||||||
|
assert m._forward_buffer == b''
|
||||||
|
assert m._send_buffer == MsgModbusCmd
|
||||||
|
|
||||||
|
m._send_buffer = bytearray(0) # clear send buffer for next test
|
||||||
|
m.test_exception_async_write = True
|
||||||
|
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||||
|
assert 0 == m.send_msg_ofs
|
||||||
|
assert m._forward_buffer == b''
|
||||||
|
assert m._send_buffer == b''
|
||||||
|
m.close()
|
||||||
|
|
||||||
|
def test_zombie_conn(ConfigTsunInv1, MsgInverterInd):
|
||||||
|
ConfigTsunInv1
|
||||||
|
tracer.setLevel(logging.DEBUG)
|
||||||
|
m1 = MemoryStream(MsgInverterInd, (0,))
|
||||||
|
m2 = MemoryStream(MsgInverterInd, (0,))
|
||||||
|
m3 = MemoryStream(MsgInverterInd, (0,))
|
||||||
|
assert m1.state == m1.STATE_INIT
|
||||||
|
assert m2.state == m2.STATE_INIT
|
||||||
|
assert m3.state == m3.STATE_INIT
|
||||||
|
m1.read() # read complete msg, and set unique_id
|
||||||
|
assert m1.state == m1.STATE_INIT
|
||||||
|
assert m2.state == m2.STATE_INIT
|
||||||
|
assert m3.state == m3.STATE_INIT
|
||||||
|
m2.read() # read complete msg, and set unique_id
|
||||||
|
assert m1.state == m1.STATE_CLOSED
|
||||||
|
assert m2.state == m2.STATE_INIT
|
||||||
|
assert m3.state == m3.STATE_INIT
|
||||||
|
m3.read() # read complete msg, and set unique_id
|
||||||
|
assert m1.state == m1.STATE_CLOSED
|
||||||
|
assert m2.state == m2.STATE_CLOSED
|
||||||
|
assert m3.state == m3.STATE_INIT
|
||||||
|
m1.close()
|
||||||
|
m2.close()
|
||||||
|
m3.close()
|
||||||
|
'''
|
||||||
Reference in New Issue
Block a user