Files
tsun-gen3-proxy/app/tests/test_messages.py
Stefan Allius a016e6c3e2 add unit tests
2023-09-24 22:47:43 +02:00

161 lines
5.6 KiB
Python

# test_with_pytest.py
import pytest
from app.src.messages import Message, Control
from app.src.config import Config
class MemoryStream(Message):
def __init__(self, msg, chunks = (0,)):
super().__init__()
self.__msg = msg
self.__msg_len = len(msg)
self.__chunks = chunks
self.__offs = 0
self.__chunk_idx = 0
self.msg_count = 0
self.server_side = False
self.addr = 'Test: SrvSide'
def _read(self) -> int:
copied_bytes = 0
try:
if (self.__offs < self.__msg_len):
len = self.__chunks[self.__chunk_idx]
self.__chunk_idx += 1
if len!=0:
self._recv_buffer += self.__msg[self.__offs:len]
copied_bytes = len - self.__offs
self.__offs = len
else:
self._recv_buffer += self.__msg[self.__offs:]
copied_bytes = self.__msg_len - self.__offs
self.__offs = self.__msg_len
except:
pass
return copied_bytes
def _Message__flush_recv_msg(self) -> None:
super()._Message__flush_recv_msg()
self.msg_count += 1
return
def __del__ (self):
super().__del__()
@pytest.fixture
def MsgContactInfo(): # Contact Info message
Config.config = {'tsun':{'enabled': True}}
return b'\x00\x00\x00\x2c\x10R170000000000001\x91\x00\x08solarhub\x0fsolarhub\x40123456'
@pytest.fixture
def MsgContactInfo_LongId(): # Contact Info message with longer ID
Config.config = {'tsun':{'enabled': True}}
return b'\x00\x00\x00\x2d\x11R1700000000000011\x91\x00\x08solarhub\x0fsolarhub\x40123456'
@pytest.fixture
def Msg2ContactInfo(): # two Contact Info messages
Config.config = {'tsun':{'enabled': True}}
return b'\x00\x00\x00\x2c\x10R170000000000001\x91\x00\x08solarhub\x0fsolarhub\x40123456\x00\x00\x00\x2c\x10R170000000000002\x91\x00\x08solarhub\x0fsolarhub\x40123456'
def test_read_message(MsgContactInfo):
m = MemoryStream(MsgContactInfo, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.id_str == b"R170000000000001"
assert int(m.ctrl)==145
assert m.msg_id==0
assert m.header_len==23
assert m.data_len==25
def test_read_message_long_id(MsgContactInfo_LongId):
m = MemoryStream(MsgContactInfo_LongId, (23,24))
m.read() # read 23 bytes, one is missing
assert not m.header_valid # must be invalid, since header not complete
assert m.msg_count == 0
m.read() # read the missing byte
assert m.header_valid # must be valid, since header is complete but not the msg
assert m.msg_count == 0
assert m.id_str == b"R1700000000000011"
assert int(m.ctrl)==145
assert m.msg_id==0
assert m.header_len==24
assert m.data_len==25
m.read() # try to read rest of message, but there is no chunk available
assert m.header_valid # must be valid, since header is complete but not the msg
assert m.msg_count == 0
def test_read_message_in_chunks(MsgContactInfo):
m = MemoryStream(MsgContactInfo, (4,23,0))
m.read() # read 4 bytes, header incomplere
assert not m.header_valid # must be invalid, since header not complete
assert m.msg_count == 0
m.read() # read missing bytes for complete header
assert m.header_valid # must be valid, since header is complete but not the msg
assert m.msg_count == 0
assert m.id_str == b"R170000000000001"
assert int(m.ctrl)==145
assert m.msg_id==0
assert m.header_len==23
assert m.data_len==25
m.read() # read rest of message
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
def test_read_message_in_chunks2(MsgContactInfo):
m = MemoryStream(MsgContactInfo, (4,10,0))
m.read() # read 4 bytes, header incomplere
assert not m.header_valid
assert m.msg_count == 0
m.read() # read 6 more bytes, header incomplere
assert not m.header_valid
assert m.msg_count == 0
m.read() # read rest of message
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.header_len==23
assert m.data_len==25
assert m.id_str == b"R170000000000001"
assert int(m.ctrl)==145
assert m.msg_id==0
assert m.msg_count == 1
while m.read(): # read rest of message
pass
assert m.msg_count == 1
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
def test_read_two_messages(Msg2ContactInfo):
m = MemoryStream(Msg2ContactInfo, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.id_str == b"R170000000000001"
assert int(m.ctrl)==145
assert m.msg_id==0
assert m.header_len==23
assert m.data_len==25
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 2
assert m.id_str == b"R170000000000002"
assert int(m.ctrl)==145
assert m.msg_id==0
assert m.header_len==23
assert m.data_len==25
def test_ctrl_byte():
c = Control(0x91)
assert c.is_ind()
assert not c.is_resp()
c = Control(0x99)
assert not c.is_ind()
assert c.is_resp()