diff --git a/app/tests/test_infos.py b/app/tests/test_infos.py index c3e6ddf..4b23bfb 100644 --- a/app/tests/test_infos.py +++ b/app/tests/test_infos.py @@ -17,13 +17,13 @@ def test_statistic_counter(): assert val == None or val == 0 i.static_init() # initialize counter - assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 0, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "Modbus_Command": 0}}) + assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 0, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "AT_Command_Blocked": 0, "Modbus_Command": 0}}) val = i.dev_value(Register.INVERTER_CNT) # valid and initiliazed addr assert val == 0 i.inc_counter('Inverter_Cnt') - assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 1, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "Modbus_Command": 0}}) + assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 1, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "AT_Command_Blocked": 0, "Modbus_Command": 0}}) val = i.dev_value(Register.INVERTER_CNT) assert val == 1 diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index 52a66a8..06a57f0 100644 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -39,9 +39,10 @@ class MemoryStream(SolarmanV5): self.addr = 'Test: SrvSide' self.db.stat['proxy']['Invalid_Msg_Format'] = 0 self.db.stat['proxy']['AT_Command'] = 0 + self.db.stat['proxy']['AT_Command_Blocked'] = 0 self.test_exception_async_write = False self.entity_prfx = '' - self.at_acl = {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE', 'AT+TIME'], 'block': []}} + self.at_acl = {'mqtt': {'allow': ['AT+'], 'block': ['AT+WEBU']}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE', 'AT+TIME'], 'block': ['AT+WEBU']}} def _timestamp(self): return timestamp @@ -466,6 +467,15 @@ def AtCommandIndMsg(): # 0x4510 msg += b'\x15' return msg +@pytest.fixture +def AtCommandIndMsgBlock(): # 0x4510 + msg = b'\xa5\x17\x00\x10\x45\x03\x02' +get_sn() +b'\x01\x02\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'AT+WEBU\r' + msg += correct_checksum(msg) + msg += b'\x15' + return msg + @pytest.fixture def AtCommandRspMsg(): # 0x1510 msg = b'\xa5\x0a\x00\x10\x15\x03\x03' +get_sn() +b'\x01\x01' @@ -1277,11 +1287,49 @@ async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIn assert m.forward_at_cmd_resp == False m.close() -def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg): +@pytest.mark.asyncio +async def test_AT_cmd_blocked(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, AtCommandIndMsg): + ConfigTsunAllowAll + m = MemoryStream(DeviceIndMsg, (0,), True) + m.append_msg(InverterIndMsg) + m.read() + assert m.control == 0x4110 + assert str(m.seq) == '01:01' + assert m._recv_buffer==InverterIndMsg # unhandled next message + assert m._send_buffer==DeviceRspMsg + assert m._forward_buffer==DeviceIndMsg + + m._send_buffer = bytearray(0) # clear send buffer for next test + m._forward_buffer = bytearray(0) # clear send buffer for next test + await m.send_at_cmd('AT+WEBU') + assert m._recv_buffer==InverterIndMsg # unhandled next message + assert m._send_buffer==b'' + assert m._forward_buffer==b'' + assert str(m.seq) == '01:01' + + m.read() + assert m.control == 0x4210 + assert str(m.seq) == '02:02' + assert m._recv_buffer==b'' + assert m._send_buffer==InverterRspMsg + assert m._forward_buffer==InverterIndMsg + + m._send_buffer = bytearray(0) # clear send buffer for next test + m._forward_buffer = bytearray(0) # clear send buffer for next test + await m.send_at_cmd('AT+WEBU') + assert m._recv_buffer==b'' + assert m._send_buffer==b'' + assert m._forward_buffer==b'' + assert str(m.seq) == '02:02' + assert m.forward_at_cmd_resp == False + m.close() + +def test_AT_cmd_ind(ConfigTsunInv1, AtCommandIndMsg): ConfigTsunInv1 m = MemoryStream(AtCommandIndMsg, (0,), False) m.db.stat['proxy']['Unknown_Ctrl'] = 0 m.db.stat['proxy']['AT_Command'] = 0 + m.db.stat['proxy']['AT_Command_Blocked'] = 0 m.db.stat['proxy']['Modbus_Command'] = 0 m.read() # read complete msg, and dispatch msg assert not m.header_valid # must be invalid, since msg was handled and buffer flushed @@ -1297,6 +1345,32 @@ def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg): assert m._forward_buffer==AtCommandIndMsg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 assert m.db.stat['proxy']['AT_Command'] == 1 + assert m.db.stat['proxy']['AT_Command_Blocked'] == 0 + assert m.db.stat['proxy']['Modbus_Command'] == 0 + m.close() + +def test_AT_cmd_ind_block(ConfigTsunInv1, AtCommandIndMsgBlock): + ConfigTsunInv1 + m = MemoryStream(AtCommandIndMsgBlock, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['AT_Command'] = 0 + m.db.stat['proxy']['AT_Command_Blocked'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.header_len==11 + assert m.snr == 2070233889 + # assert m.unique_id == '2070233889' + assert m.control == 0x4510 + assert str(m.seq) == '03:02' + assert m.data_len == 23 + assert m._recv_buffer==b'' + assert m._send_buffer==b'' + assert m._forward_buffer==b'' + assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 + assert m.db.stat['proxy']['AT_Command'] == 0 + assert m.db.stat['proxy']['AT_Command_Blocked'] == 1 assert m.db.stat['proxy']['Modbus_Command'] == 0 m.close()