add more unittests for solarman_v5.py

This commit is contained in:
Stefan Allius
2024-04-14 12:30:07 +02:00
parent 05b576b198
commit ac0bf2f8f8
3 changed files with 144 additions and 13 deletions

View File

@@ -325,6 +325,54 @@ def UnknownMsg(): # 0x5110
msg += b'\x15'
return msg
@pytest.fixture
def SyncStartIndMsg(): # 0x4310
msg = b'\xa5\x2f\x00\x10\x43\x0c\x0d' +get_sn() +b'\x81\x7a\x0b\x2e\x32'
msg += b'\x39\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x41\x6c\x6c\x69\x75\x73'
msg += b'\x2d\x48\x6f\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x61\x01'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def SyncStartRspMsg(): # 0x1310
msg = b'\xa5\x0a\x00\x10\x13\x0d\x0d' +get_sn() +b'\x81\x01'
msg += total()
msg += hb()
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def SyncStartFwdMsg(): # 0x4310
msg = b'\xa5\x2f\x00\x10\x43\x0e\x0d' +get_sn() +b'\x81\x7a\x0b\x2e\x32'
msg += b'\x39\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x41\x6c\x6c\x69\x75\x73'
msg += b'\x2d\x48\x6f\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x61\x01'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def AtCommandIndMsg(): # 0x4510
msg = b'\xa5\x27\x00\x10\x45\x02\x01' +get_sn() +b'\x01\x02\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'AT+TIME=214028,1,60,120\r'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def AtCommandRspMsg(): # 0x1510
msg = b'\xa5\x0a\x00\x10\x15\x02\x02' +get_sn() +b'\x01\x01'
msg += total()
msg += hb()
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def HeartbeatIndMsg(): # 0x4710
msg = b'\xa5\x01\x00\x10\x47\x10\x84' +get_sn()
@@ -343,17 +391,19 @@ def HeartbeatRspMsg(): # 0x1710
return msg
@pytest.fixture
def AtCommandIndMsg(): # 0x4510
msg = b'\xa5\x27\x00\x10\x45\x02\x01' +get_sn() +b'\x01\x02\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'AT+TIME=214028,1,60,120\r'
def SyncEndIndMsg(): # 0x4810
msg = b'\xa5\x3c\x00\x10\x48\x06\x07' +get_sn() +b'\x01\xa5\x3c\x2e\x32'
msg += b'\x2c\x00\x00\x00\xc1\x01\xec\x33\x01\x05\x2c\xff\xff\xff\xff\xff'
msg += b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
msg += b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
msg += b'\xff\xff\xff\xff\xff\xff\xff'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def AtCommandRspMsg(): # 0x1510
msg = b'\xa5\x0a\x00\x10\x15\x03\x01' +get_sn() +b'\x01\x01'
def SyncEndRspMsg(): # 0x1810
msg = b'\xa5\x0a\x00\x10\x18\x07\x07' +get_sn() +b'\x01\x01'
msg += total()
msg += hb()
msg += correct_checksum(msg)
@@ -733,9 +783,86 @@ def test_heartbeat_rsp(ConfigTsunInv1, HeartbeatRspMsg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_sync_start_ind(ConfigTsunInv1, SyncStartIndMsg, SyncStartRspMsg, SyncStartFwdMsg):
ConfigTsunInv1
m = MemoryStream(SyncStartIndMsg, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.header_len==11
assert m.snr == 2070233889
# assert m.unique_id == '2070233889'
assert m.control == 0x4310
assert str(m.seq) == '0d:0d' # value after sending response
assert m.data_len == 47
assert m._recv_buffer==b''
assert m._send_buffer==SyncStartRspMsg
assert m._forward_buffer==SyncStartIndMsg
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m._update_header(m._forward_buffer)
assert str(m.seq) == '0d:0e' # value after forwarding indication
assert m._forward_buffer==SyncStartFwdMsg
m.close()
def test_sync_start_rsp(ConfigTsunInv1, SyncStartRspMsg):
ConfigTsunInv1
m = MemoryStream(SyncStartRspMsg, (0,), False)
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == '2070233889'
assert m.control == 0x1310
assert str(m.seq) == '0d:0d' # value after sending response
assert m.data_len == 0x0a
assert m._recv_buffer==b''
assert m._send_buffer==b''
assert m._forward_buffer==b'' # HeartbeatRspMsg
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_sync_end_ind(ConfigTsunInv1, SyncEndIndMsg, SyncEndRspMsg):
ConfigTsunInv1
m = MemoryStream(SyncEndIndMsg, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.header_len==11
assert m.snr == 2070233889
# assert m.unique_id == '2070233889'
assert m.control == 0x4810
assert str(m.seq) == '07:07' # value after sending response
assert m.data_len == 60
assert m._recv_buffer==b''
assert m._send_buffer==SyncEndRspMsg
assert m._forward_buffer==SyncEndIndMsg
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_sync_end_rsp(ConfigTsunInv1, SyncEndRspMsg):
ConfigTsunInv1
m = MemoryStream(SyncEndRspMsg, (0,), False)
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == '2070233889'
assert m.control == 0x1810
assert str(m.seq) == '07:07' # value after sending response
assert m.data_len == 0x0a
assert m._recv_buffer==b''
assert m._send_buffer==b''
assert m._forward_buffer==b'' # HeartbeatRspMsg
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg, AtCommandRspMsg):
ConfigTsunInv1
m = MemoryStream(AtCommandIndMsg, (0,))
m = MemoryStream(AtCommandIndMsg, (0,), False)
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
@@ -743,7 +870,7 @@ def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg, AtCommandRspMsg):
assert m.snr == 2070233889
# assert m.unique_id == '2070233889'
assert m.control == 0x4510
assert str(m.seq) == '01:03'
assert str(m.seq) == '02:02'
assert m.data_len == 39
assert m._recv_buffer==b''
assert m._send_buffer==AtCommandRspMsg
@@ -817,15 +944,19 @@ def test_build_logger_modell(ConfigTsunAllowAll, DeviceIndMsg):
def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandIndMsg):
ConfigTsunAllowAll
m = MemoryStream(DeviceIndMsg)
m = MemoryStream(DeviceIndMsg, (0,), True)
m.read()
assert m.control == 0x4110
assert str(m.seq) == '01:01'
assert m._recv_buffer==b''
assert m._send_buffer==DeviceRspMsg
assert m._forward_buffer==DeviceIndMsg
m._send_buffer = bytearray(0) # clear send buffer for next test
m._forward_buffer = bytearray(0) # clear send buffer for next test
m.send_at_cmd('AT+TIME=214028,1,60,120')
assert m._recv_buffer==b''
assert m._send_buffer==AtCommandIndMsg
assert m._forward_buffer==b''
assert str(m.seq) == '01:02'
m.close()