experimental AT cmd handler and tests
This commit is contained in:
@@ -81,9 +81,9 @@ class SolarmanV5(Message):
|
|||||||
0x1810: self.msg_response,
|
0x1810: self.msg_response,
|
||||||
|
|
||||||
#
|
#
|
||||||
# AT cmd
|
# MODbus or AT cmd
|
||||||
0x4510: self.at_command_ind, # from server
|
0x4510: self.msg_command_req, # from server
|
||||||
0x1510: self.msg_response, # from inverter
|
0x1510: self.msg_response, # from inverter
|
||||||
}
|
}
|
||||||
|
|
||||||
'''
|
'''
|
||||||
@@ -292,6 +292,13 @@ class SolarmanV5(Message):
|
|||||||
self._heartbeat())
|
self._heartbeat())
|
||||||
self.__finish_send_msg()
|
self.__finish_send_msg()
|
||||||
|
|
||||||
|
def send_at_cmd(self, AT_cmd: str) -> None:
|
||||||
|
self.__build_header(0x4510)
|
||||||
|
self._send_buffer += struct.pack(f'<BHLLL{len(AT_cmd)}sc', 1, 2,
|
||||||
|
0, 0, 0, AT_cmd.encode('utf-8'),
|
||||||
|
b'\r')
|
||||||
|
self.__finish_send_msg()
|
||||||
|
|
||||||
def __forward_msg(self):
|
def __forward_msg(self):
|
||||||
self.forward(self._recv_buffer, self.header_len+self.data_len+2)
|
self.forward(self._recv_buffer, self.header_len+self.data_len+2)
|
||||||
|
|
||||||
@@ -381,7 +388,7 @@ class SolarmanV5(Message):
|
|||||||
self.__forward_msg()
|
self.__forward_msg()
|
||||||
self.__send_ack_rsp(0x1310, ftype)
|
self.__send_ack_rsp(0x1310, ftype)
|
||||||
|
|
||||||
def at_command_ind(self):
|
def msg_command_req(self):
|
||||||
data = self._recv_buffer[self.header_len:]
|
data = self._recv_buffer[self.header_len:]
|
||||||
result = struct.unpack_from('<B', data, 0)
|
result = struct.unpack_from('<B', data, 0)
|
||||||
ftype = result[0]
|
ftype = result[0]
|
||||||
|
|||||||
@@ -344,15 +344,16 @@ def HeartbeatRspMsg(): # 0x1710
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def AtCommandIndMsg(): # 0x4510
|
def AtCommandIndMsg(): # 0x4510
|
||||||
msg = b'\xa5\x01\x00\x10\x45\x10\x84' +get_sn()
|
msg = b'\xa5\x27\x00\x10\x45\x02\x01' +get_sn() +b'\x01\x02\x00'
|
||||||
msg += b'\x00'
|
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
msg += b'AT+TIME=214028,1,60,120\r'
|
||||||
msg += correct_checksum(msg)
|
msg += correct_checksum(msg)
|
||||||
msg += b'\x15'
|
msg += b'\x15'
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def AtCommandRspMsg(): # 0x1510
|
def AtCommandRspMsg(): # 0x1510
|
||||||
msg = b'\xa5\x0a\x00\x10\x15\x11\x84' +get_sn() +b'\x00\x01'
|
msg = b'\xa5\x0a\x00\x10\x15\x03\x01' +get_sn() +b'\x01\x01'
|
||||||
msg += total()
|
msg += total()
|
||||||
msg += hb()
|
msg += hb()
|
||||||
msg += correct_checksum(msg)
|
msg += correct_checksum(msg)
|
||||||
@@ -742,8 +743,8 @@ def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg, AtCommandRspMsg):
|
|||||||
assert m.snr == 2070233889
|
assert m.snr == 2070233889
|
||||||
# assert m.unique_id == '2070233889'
|
# assert m.unique_id == '2070233889'
|
||||||
assert m.control == 0x4510
|
assert m.control == 0x4510
|
||||||
assert str(m.seq) == '84:11'
|
assert str(m.seq) == '01:03'
|
||||||
assert m.data_len == 0x01
|
assert m.data_len == 39
|
||||||
assert m._recv_buffer==b''
|
assert m._recv_buffer==b''
|
||||||
assert m._send_buffer==AtCommandRspMsg
|
assert m._send_buffer==AtCommandRspMsg
|
||||||
assert m._forward_buffer==AtCommandIndMsg
|
assert m._forward_buffer==AtCommandIndMsg
|
||||||
@@ -813,3 +814,18 @@ def test_build_logger_modell(ConfigTsunAllowAll, DeviceIndMsg):
|
|||||||
assert 'LSW5BLE_17_02B0_1.05' == m.db.get_db_value(Register.COLLECTOR_FW_VERSION, 0).rstrip('\00')
|
assert 'LSW5BLE_17_02B0_1.05' == m.db.get_db_value(Register.COLLECTOR_FW_VERSION, 0).rstrip('\00')
|
||||||
assert 'LSW5BLE' == m.db.get_db_value(Register.CHIP_MODEL, 0)
|
assert 'LSW5BLE' == m.db.get_db_value(Register.CHIP_MODEL, 0)
|
||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
|
def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandIndMsg):
|
||||||
|
ConfigTsunAllowAll
|
||||||
|
m = MemoryStream(DeviceIndMsg)
|
||||||
|
m.read()
|
||||||
|
assert m._recv_buffer==b''
|
||||||
|
assert m._send_buffer==DeviceRspMsg
|
||||||
|
assert m._forward_buffer==DeviceIndMsg
|
||||||
|
m._send_buffer = bytearray(0) # clear send buffer for next test
|
||||||
|
m._forward_buffer = bytearray(0) # clear send buffer for next test
|
||||||
|
m.send_at_cmd('AT+TIME=214028,1,60,120')
|
||||||
|
assert m._recv_buffer==b''
|
||||||
|
assert m._send_buffer==AtCommandIndMsg
|
||||||
|
assert m._forward_buffer==b''
|
||||||
|
m.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user