add unit test for solarman V5

This commit is contained in:
Stefan Allius
2024-03-31 00:59:57 +01:00
parent 643c0026d8
commit f69b02aaeb
3 changed files with 218 additions and 13 deletions

View File

@@ -1,5 +1,6 @@
import pytest, json
from app.src.gen3plus.solarman_v5 import SolarmanV5
from app.src.config import Config
from app.src.infos import Infos
# initialize the proxy statistics
@@ -15,6 +16,8 @@ class MemoryStream(SolarmanV5):
self.__chunk_idx = 0
self.msg_count = 0
self.addr = 'Test: SrvSide'
self.db.stat['proxy']['Invalid_Msg_Format'] = 0
def append_msg(self, msg):
self.__msg += msg
@@ -48,7 +51,7 @@ class MemoryStream(SolarmanV5):
def get_sn() -> bytes:
return b'\xc8\x1e\x4d\x7b'
return b'\x21\x43\x65\x7b'
def get_inv_no() -> bytes:
return b'T170000000000001'
@@ -56,9 +59,16 @@ def get_inv_no() -> bytes:
def get_invalid_sn():
return b'R170000000000002'
def correct_checksum(buf):
checksum = sum(buf[1:]) & 0xff
return checksum.to_bytes(length=1)
def incorrect_checksum(buf):
checksum = (sum(buf[1:])+1) & 0xff
return checksum.to_bytes(length=1)
@pytest.fixture
def TestMsg(): # Contact Info message
def DeviceIndMsg(): # 0x4110
msg = b'\xa5\xd4\x00\x10\x41\x00\x01' +get_sn() +b'\x02\xba\xd2\x00\x00'
msg += b'\x19\x00\x00\x00\x00\x00\x00\x00\x05\x3c\x78\x01\x64\x01\x4c\x53'
msg += b'\x57\x35\x42\x4c\x45\x5f\x31\x37\x5f\x30\x32\x42\x30\x5f\x31\x2e'
@@ -72,23 +82,215 @@ def TestMsg(): # Contact Info message
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x41\x6c\x6c\x69\x75\x73\x2d\x48\x6f'
msg += b'\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x3c'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
def test_read_message(TestMsg):
m = MemoryStream(TestMsg, (0,))
@pytest.fixture
def InvalidStartByte(): # 0x4110
msg = b'\xa4\xd4\x00\x10\x41\x00\x01' +get_sn() +b'\x02\xba\xd2\x00\x00'
msg += b'\x19\x00\x00\x00\x00\x00\x00\x00\x05\x3c\x78\x01\x64\x01\x4c\x53'
msg += b'\x57\x35\x42\x4c\x45\x5f\x31\x37\x5f\x30\x32\x42\x30\x5f\x31\x2e'
msg += b'\x30\x35\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x40\x2a\x8f\x4f\x51\x54\x31\x39\x32\x2e'
msg += b'\x31\x36\x38\x2e\x38\x30\x2e\x34\x39\x00\x00\x00\x0f\x00\x01\xb0'
msg += b'\x02\x0f\x00\xff\x56\x31\x2e\x31\x2e\x30\x30\x2e\x30\x42\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xfe\xfe\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x41\x6c\x6c\x69\x75\x73\x2d\x48\x6f'
msg += b'\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def InvalidStopByte(): # 0x4110
msg = b'\xa5\xd4\x00\x10\x41\x00\x01' +get_sn() +b'\x02\xba\xd2\x00\x00'
msg += b'\x19\x00\x00\x00\x00\x00\x00\x00\x05\x3c\x78\x01\x64\x01\x4c\x53'
msg += b'\x57\x35\x42\x4c\x45\x5f\x31\x37\x5f\x30\x32\x42\x30\x5f\x31\x2e'
msg += b'\x30\x35\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x40\x2a\x8f\x4f\x51\x54\x31\x39\x32\x2e'
msg += b'\x31\x36\x38\x2e\x38\x30\x2e\x34\x39\x00\x00\x00\x0f\x00\x01\xb0'
msg += b'\x02\x0f\x00\xff\x56\x31\x2e\x31\x2e\x30\x30\x2e\x30\x42\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xfe\xfe\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x41\x6c\x6c\x69\x75\x73\x2d\x48\x6f'
msg += b'\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += correct_checksum(msg)
msg += b'\x14'
return msg
@pytest.fixture
def InvalidChecksum(): # 0x4110
msg = b'\xa5\xd4\x00\x10\x41\x00\x01' +get_sn() +b'\x02\xba\xd2\x00\x00'
msg += b'\x19\x00\x00\x00\x00\x00\x00\x00\x05\x3c\x78\x01\x64\x01\x4c\x53'
msg += b'\x57\x35\x42\x4c\x45\x5f\x31\x37\x5f\x30\x32\x42\x30\x5f\x31\x2e'
msg += b'\x30\x35\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x40\x2a\x8f\x4f\x51\x54\x31\x39\x32\x2e'
msg += b'\x31\x36\x38\x2e\x38\x30\x2e\x34\x39\x00\x00\x00\x0f\x00\x01\xb0'
msg += b'\x02\x0f\x00\xff\x56\x31\x2e\x31\x2e\x30\x30\x2e\x30\x42\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xfe\xfe\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x41\x6c\x6c\x69\x75\x73\x2d\x48\x6f'
msg += b'\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += incorrect_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture
def ConfigTsunAllowAll():
Config.config = {'solarman':{'enabled': True}, 'inverters':{'allow_all':True}}
@pytest.fixture
def ConfigNoTsunInv1():
Config.config = {'solarman':{'enabled': False},'inverters':{'Y170000000000001':{'monitor_sn': 2070233889,'node_id':'inv1','suggested_area':'roof'}}}
@pytest.fixture
def ConfigTsunInv1():
Config.config = {'solarman':{'enabled': True},'inverters':{'R170000000000001':{'node_id':'inv1','suggested_area':'roof'}}}
def test_read_message(DeviceIndMsg):
m = MemoryStream(DeviceIndMsg, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
# assert m.id_str == b"R170000000000001"
assert m.snr == 2068651720
# assert m.unique_id == None
# assert int(m.ctrl)==145
# assert m.msg_id==0
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == None
assert m.control == 0x4110
assert m.serial == 0x0100
assert m.data_len == 0xd4
assert m._forward_buffer==b''
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_invalid_start_byte(InvalidStartByte):
m = MemoryStream(InvalidStartByte, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since start byte is wrong
assert m.msg_count == 0
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == 0
assert m.control == 0x4110
assert m.serial == 0x0100
assert m.data_len == 0xd4
assert m._forward_buffer==b''
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
m.close()
def test_invalid_stop_byte(InvalidStopByte):
m = MemoryStream(InvalidStopByte, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since start byte is wrong
assert m.msg_count == 1 # msg flush was called
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == 0
assert m.control == 0x4110
assert m.serial == 0x0100
assert m.data_len == 0xd4
assert m._forward_buffer==b''
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
m.close()
def test_invalid_checksum(InvalidChecksum):
m = MemoryStream(InvalidChecksum, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since start byte is wrong
assert m.msg_count == 1 # msg flush was called
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == 0
assert m.control == 0x4110
assert m.serial == 0x0100
assert m.data_len == 0xd4
assert m._forward_buffer==b''
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
m.close()
def test_read_message_twice(ConfigNoTsunInv1, DeviceIndMsg):
ConfigNoTsunInv1
m = MemoryStream(DeviceIndMsg, (0,))
m.append_msg(DeviceIndMsg)
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == '2070233889'
assert m.control == 0x4110
assert m.serial == 0x0100
assert m.data_len == 0xd4
assert m._forward_buffer==b''
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 2
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == '2070233889'
assert m.control == 0x4110
assert m.serial == 0x0100
assert m.data_len == 0xd4
assert m._forward_buffer==b''
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_read_message_in_chunks(DeviceIndMsg):
m = MemoryStream(DeviceIndMsg, (4,11,0))
m.read() # read 4 bytes, header incomplere
assert not m.header_valid # must be invalid, since header not complete
assert m.msg_count == 0
m.read() # read missing bytes for complete header
assert m.header_valid # must be valid, since header is complete but not the msg
assert m.msg_count == 0
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == 0 # should be None ?
assert m.control == 0x4110
assert m.serial == 0x0100
assert m.data_len == 0xd4
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.read() # read rest of message
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
def test_read_message_in_chunks2(DeviceIndMsg):
m = MemoryStream(DeviceIndMsg, (4,10,0))
m.read() # read 4 bytes, header incomplere
assert not m.header_valid
assert m.msg_count == 0
m.read() # read 6 more bytes, header incomplere
assert not m.header_valid
assert m.msg_count == 0
m.read() # read rest of message
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.header_len==11
assert m.snr == 2070233889
assert m.unique_id == '2070233889'
assert m.control == 0x4110
assert m.serial == 0x0100
assert m.data_len == 0xd4
assert m.msg_count == 1
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
while m.read(): # read rest of message
pass
assert m.msg_count == 1
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()