add more unit tests

This commit is contained in:
Stefan Allius
2024-03-31 23:26:14 +02:00
parent 884d4c04e6
commit e4b7ef7a0c
2 changed files with 170 additions and 21 deletions

View File

@@ -87,6 +87,14 @@ def DeviceIndMsg(): # 0x4110
msg += b'\x15'
return msg
@pytest.fixture
def DeviceRspMsg(): # 0x1110
msg = b'\xa5\x0a\x00\x10\x11\x10\x84' +get_sn() +b'\x01\x01\x69\x6f\x09'
msg += b'\x66\x78\x00\x00\x00'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def InvalidStartByte(): # 0x4110
msg = b'\xa4\xd4\x00\x10\x41\x00\x01' +get_sn() +b'\x02\xba\xd2\x00\x00'
@@ -180,6 +188,38 @@ def InverterIndMsg(): # 0x4210
msg += b'\x15'
return msg
@pytest.fixture
def InverterRspMsg(): # 0x1210
msg = b'\xa5\x0a\x00\x10\x12\x10\x84' +get_sn() +b'\x01\x01\x69\x6f\x09'
msg += b'\x66\x78\x00\x00\x00'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def UnknownMsg(): # 0x5110
msg = b'\xa5\x0a\x00\x10\x51\x10\x84' +get_sn() +b'\x01\x01\x69\x6f\x09'
msg += b'\x66\x78\x00\x00\x00'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def HeartbeatIndMsg(): # 0x4710
msg = b'\xa5\x01\x00\x10\x47\x10\x84' +get_sn()
msg += b'\x00'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def HeartbeatRspMsg(): # 0x1710
msg = b'\xa5\x0a\x00\x10\x17\x10\x84' +get_sn() +b'\x00\x01\x22\x71\x09'
msg += b'\x66\x78\x00\x00\x00'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def ConfigTsunAllowAll():
Config.config = {'solarman':{'enabled': True}, 'inverters':{'allow_all':True}}
@@ -190,7 +230,7 @@ def ConfigNoTsunInv1():
@pytest.fixture
def ConfigTsunInv1():
Config.config = {'solarman':{'enabled': True},'inverters':{'R170000000000001':{'node_id':'inv1','suggested_area':'roof'}}}
Config.config = {'solarman':{'enabled': True},'inverters':{'Y170000000000001':{'monitor_sn': 2070233889,'node_id':'inv1','suggested_area':'roof'}}}
def test_read_message(DeviceIndMsg):
m = MemoryStream(DeviceIndMsg, (0,))
@@ -389,7 +429,8 @@ def test_read_message_in_chunks(DeviceIndMsg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_read_message_in_chunks2(DeviceIndMsg):
def test_read_message_in_chunks2(ConfigTsunInv1, DeviceIndMsg):
ConfigTsunInv1
m = MemoryStream(DeviceIndMsg, (4,10,0))
m.read() # read 4 bytes, header incomplere
assert not m.header_valid
@@ -433,9 +474,9 @@ def test_read_two_messages(ConfigTsunAllowAll, DeviceIndMsg, InverterIndMsg):
assert m._send_buffer==b''
# assert m._send_buffer==MsgContactResp
# m._send_buffer = bytearray(0) # clear send buffer for next test
# m._init_new_client_conn()
# assert m._send_buffer==b'\x00\x00\x00,\x10R170000000000001\x91\x00\x08solarhub\x0fsolarhub@123456'
m._send_buffer = bytearray(0) # clear send buffer for next test
m._init_new_client_conn()
assert m._send_buffer==b''
assert m._recv_buffer==InverterIndMsg
m._send_buffer = bytearray(0) # clear send buffer for next test
@@ -454,7 +495,97 @@ def test_read_two_messages(ConfigTsunAllowAll, DeviceIndMsg, InverterIndMsg):
assert m._forward_buffer==InverterIndMsg
assert m._send_buffer==b''
# m._send_buffer = bytearray(0) # clear send buffer for next test
# m._init_new_client_conn()
# assert m._send_buffer==b'\x00\x00\x00,\x10R170000000000002\x91\x00\x08solarhub\x0fsolarhub@123456'
m._send_buffer = bytearray(0) # clear send buffer for next test
m._init_new_client_conn()
assert m._send_buffer==b''
m.close()
def test_unkown_message(ConfigTsunInv1, UnknownMsg):
ConfigTsunInv1
m = MemoryStream(UnknownMsg, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == '2070233889'
assert m.control == 0x5110
assert m.serial == 0x8410
assert m.data_len == 0x0a
assert m._recv_buffer==b''
assert m._send_buffer==b''
assert m._forward_buffer==UnknownMsg
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_device_rsp(ConfigTsunInv1, DeviceRspMsg):
ConfigTsunInv1
m = MemoryStream(DeviceRspMsg, (0,), False)
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == '2070233889'
assert m.control == 0x1110
assert m.serial == 0x8410
assert m.data_len == 0x0a
assert m._recv_buffer==b''
assert m._send_buffer==b''
assert m._forward_buffer==DeviceRspMsg
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_inverter_rsp(ConfigTsunInv1, InverterRspMsg):
ConfigTsunInv1
m = MemoryStream(InverterRspMsg, (0,), False)
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == '2070233889'
assert m.control == 0x1210
assert m.serial == 0x8410
assert m.data_len == 0x0a
assert m._recv_buffer==b''
assert m._send_buffer==b''
assert m._forward_buffer==InverterRspMsg
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_heartbeat_ind(ConfigTsunInv1, HeartbeatIndMsg):
ConfigTsunInv1
m = MemoryStream(HeartbeatIndMsg, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.header_len==11
assert m.snr == 2070233889
# assert m.unique_id == '2070233889'
assert m.control == 0x4710
assert m.serial == 0x8410
assert m.data_len == 0x01
assert m._recv_buffer==b''
assert m._send_buffer==b''
assert m._forward_buffer==HeartbeatIndMsg
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_heartbeat_rsp(ConfigTsunInv1, HeartbeatRspMsg):
ConfigTsunInv1
m = MemoryStream(HeartbeatRspMsg, (0,), False)
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == '2070233889'
assert m.control == 0x1710
assert m.serial == 0x8410
assert m.data_len == 0x0a
assert m._recv_buffer==b''
assert m._send_buffer==b''
assert m._forward_buffer==HeartbeatRspMsg
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()