add missing testcases
This commit is contained in:
@@ -17,13 +17,13 @@ def test_statistic_counter():
|
||||
assert val == None or val == 0
|
||||
|
||||
i.static_init() # initialize counter
|
||||
assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 0, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "Modbus_Command": 0}})
|
||||
assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 0, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "AT_Command_Blocked": 0, "Modbus_Command": 0}})
|
||||
|
||||
val = i.dev_value(Register.INVERTER_CNT) # valid and initiliazed addr
|
||||
assert val == 0
|
||||
|
||||
i.inc_counter('Inverter_Cnt')
|
||||
assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 1, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "Modbus_Command": 0}})
|
||||
assert json.dumps(i.stat) == json.dumps({"proxy": {"Inverter_Cnt": 1, "Unknown_SNR": 0, "Unknown_Msg": 0, "Invalid_Data_Type": 0, "Internal_Error": 0,"Unknown_Ctrl": 0, "OTA_Start_Msg": 0, "SW_Exception": 0, "Invalid_Msg_Format": 0, "AT_Command": 0, "AT_Command_Blocked": 0, "Modbus_Command": 0}})
|
||||
val = i.dev_value(Register.INVERTER_CNT)
|
||||
assert val == 1
|
||||
|
||||
|
||||
@@ -39,9 +39,10 @@ class MemoryStream(SolarmanV5):
|
||||
self.addr = 'Test: SrvSide'
|
||||
self.db.stat['proxy']['Invalid_Msg_Format'] = 0
|
||||
self.db.stat['proxy']['AT_Command'] = 0
|
||||
self.db.stat['proxy']['AT_Command_Blocked'] = 0
|
||||
self.test_exception_async_write = False
|
||||
self.entity_prfx = ''
|
||||
self.at_acl = {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE', 'AT+TIME'], 'block': []}}
|
||||
self.at_acl = {'mqtt': {'allow': ['AT+'], 'block': ['AT+WEBU']}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE', 'AT+TIME'], 'block': ['AT+WEBU']}}
|
||||
|
||||
def _timestamp(self):
|
||||
return timestamp
|
||||
@@ -466,6 +467,15 @@ def AtCommandIndMsg(): # 0x4510
|
||||
msg += b'\x15'
|
||||
return msg
|
||||
|
||||
@pytest.fixture
|
||||
def AtCommandIndMsgBlock(): # 0x4510
|
||||
msg = b'\xa5\x17\x00\x10\x45\x03\x02' +get_sn() +b'\x01\x02\x00'
|
||||
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
msg += b'AT+WEBU\r'
|
||||
msg += correct_checksum(msg)
|
||||
msg += b'\x15'
|
||||
return msg
|
||||
|
||||
@pytest.fixture
|
||||
def AtCommandRspMsg(): # 0x1510
|
||||
msg = b'\xa5\x0a\x00\x10\x15\x03\x03' +get_sn() +b'\x01\x01'
|
||||
@@ -1277,11 +1287,49 @@ async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIn
|
||||
assert m.forward_at_cmd_resp == False
|
||||
m.close()
|
||||
|
||||
def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg):
|
||||
@pytest.mark.asyncio
|
||||
async def test_AT_cmd_blocked(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, AtCommandIndMsg):
|
||||
ConfigTsunAllowAll
|
||||
m = MemoryStream(DeviceIndMsg, (0,), True)
|
||||
m.append_msg(InverterIndMsg)
|
||||
m.read()
|
||||
assert m.control == 0x4110
|
||||
assert str(m.seq) == '01:01'
|
||||
assert m._recv_buffer==InverterIndMsg # unhandled next message
|
||||
assert m._send_buffer==DeviceRspMsg
|
||||
assert m._forward_buffer==DeviceIndMsg
|
||||
|
||||
m._send_buffer = bytearray(0) # clear send buffer for next test
|
||||
m._forward_buffer = bytearray(0) # clear send buffer for next test
|
||||
await m.send_at_cmd('AT+WEBU')
|
||||
assert m._recv_buffer==InverterIndMsg # unhandled next message
|
||||
assert m._send_buffer==b''
|
||||
assert m._forward_buffer==b''
|
||||
assert str(m.seq) == '01:01'
|
||||
|
||||
m.read()
|
||||
assert m.control == 0x4210
|
||||
assert str(m.seq) == '02:02'
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==InverterRspMsg
|
||||
assert m._forward_buffer==InverterIndMsg
|
||||
|
||||
m._send_buffer = bytearray(0) # clear send buffer for next test
|
||||
m._forward_buffer = bytearray(0) # clear send buffer for next test
|
||||
await m.send_at_cmd('AT+WEBU')
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==b''
|
||||
assert m._forward_buffer==b''
|
||||
assert str(m.seq) == '02:02'
|
||||
assert m.forward_at_cmd_resp == False
|
||||
m.close()
|
||||
|
||||
def test_AT_cmd_ind(ConfigTsunInv1, AtCommandIndMsg):
|
||||
ConfigTsunInv1
|
||||
m = MemoryStream(AtCommandIndMsg, (0,), False)
|
||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||
m.db.stat['proxy']['AT_Command'] = 0
|
||||
m.db.stat['proxy']['AT_Command_Blocked'] = 0
|
||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
@@ -1297,6 +1345,32 @@ def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg):
|
||||
assert m._forward_buffer==AtCommandIndMsg
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
assert m.db.stat['proxy']['AT_Command'] == 1
|
||||
assert m.db.stat['proxy']['AT_Command_Blocked'] == 0
|
||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||
m.close()
|
||||
|
||||
def test_AT_cmd_ind_block(ConfigTsunInv1, AtCommandIndMsgBlock):
|
||||
ConfigTsunInv1
|
||||
m = MemoryStream(AtCommandIndMsgBlock, (0,), False)
|
||||
m.db.stat['proxy']['Unknown_Ctrl'] = 0
|
||||
m.db.stat['proxy']['AT_Command'] = 0
|
||||
m.db.stat['proxy']['AT_Command_Blocked'] = 0
|
||||
m.db.stat['proxy']['Modbus_Command'] = 0
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 1
|
||||
assert m.header_len==11
|
||||
assert m.snr == 2070233889
|
||||
# assert m.unique_id == '2070233889'
|
||||
assert m.control == 0x4510
|
||||
assert str(m.seq) == '03:02'
|
||||
assert m.data_len == 23
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==b''
|
||||
assert m._forward_buffer==b''
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
assert m.db.stat['proxy']['AT_Command'] == 0
|
||||
assert m.db.stat['proxy']['AT_Command_Blocked'] == 1
|
||||
assert m.db.stat['proxy']['Modbus_Command'] == 0
|
||||
m.close()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user