From 766774224bbff23c0e7e36c9f252af6a66de4b1c Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Sat, 18 May 2024 21:46:28 +0200 Subject: [PATCH] adapt unit tests --- app/tests/test_modbus.py | 177 ++++++++++++++++++++++++++++++------- app/tests/test_solarman.py | 17 +++- app/tests/test_talent.py | 20 ++++- 3 files changed, 177 insertions(+), 37 deletions(-) diff --git a/app/tests/test_modbus.py b/app/tests/test_modbus.py index ed6a68a..8ea4637 100644 --- a/app/tests/test_modbus.py +++ b/app/tests/test_modbus.py @@ -1,15 +1,24 @@ # test_with_pytest.py -# import pytest, logging +import pytest +import asyncio from app.src.modbus import Modbus from app.src.infos import Infos +pytest_plugins = ('pytest_asyncio',) +pytestmark = pytest.mark.asyncio(scope="module") + class TestHelper(Modbus): def __init__(self): - super().__init__() + super().__init__(self.send_cb) self.db = Infos() + self.pdu = None + self.send_calls = 0 + def send_cb(self, pdu: bytearray): + self.pdu = pdu + self.send_calls += 1 def test_modbus_crc(): - mb = Modbus() + mb = Modbus(None) assert 0x0b02 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x04') assert 0 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x04\x02\x0b') assert mb.check_crc(b'\x01\x06\x20\x08\x00\x04\x02\x0b') @@ -20,33 +29,34 @@ def test_modbus_crc(): assert 0x5c75 == mb._Modbus__calc_crc(b'\x01\x03\x08\x01\x2c\x00\x2c\x02\x2c\x2c\x46') def test_build_modbus_pdu(): - mb = Modbus() - pdu = mb.build_msg(1,6,0x2000,0x12) - assert pdu == b'\x01\x06\x20\x00\x00\x12\x02\x07' - assert mb.check_crc(pdu) + mb = TestHelper() + mb.build_msg(1,6,0x2000,0x12) + mb.get_next_req() + assert mb.pdu == b'\x01\x06\x20\x00\x00\x12\x02\x07' + assert mb.check_crc(mb.pdu) def test_recv_req_crc(): - mb = Modbus() - res = mb.recv_req(b'\x01\x06\x20\x00\x00\x12\x02\x08') - assert not res + mb = TestHelper() + mb.recv_req(b'\x01\x06\x20\x00\x00\x12\x02\x08') + mb.get_next_req() assert mb.last_fcode == 0 assert mb.last_reg == 0 assert mb.last_len == 0 assert mb.err == 1 def test_recv_req_addr(): - mb = Modbus() - res = mb.recv_req(b'\x02\x06\x20\x00\x00\x12\x02\x34') - assert not res - assert mb.last_fcode == 0 - assert mb.last_reg == 0 - assert mb.last_len == 0 - assert mb.err == 2 + mb = TestHelper() + mb.recv_req(b'\x02\x06\x20\x00\x00\x12\x02\x34') + mb.get_next_req() + assert mb.last_addr == 2 + assert mb.last_fcode == 6 + assert mb.last_reg == 0x2000 + assert mb.last_len == 18 def test_recv_req(): - mb = Modbus() - res = mb.recv_req(b'\x01\x06\x20\x00\x00\x12\x02\x07') - assert res + mb = TestHelper() + mb.recv_req(b'\x01\x06\x20\x00\x00\x12\x02\x07') + mb.get_next_req() assert mb.last_fcode == 6 assert mb.last_reg == 0x2000 assert mb.last_len == 0x12 @@ -54,6 +64,7 @@ def test_recv_req(): def test_recv_recv_crc(): mb = TestHelper() + mb.req_pend = True mb.last_fcode = 3 mb.last_reg == 0x300e mb.last_len == 2 @@ -66,6 +77,7 @@ def test_recv_recv_crc(): def test_recv_recv_addr(): mb = TestHelper() + mb.req_pend = True mb.last_fcode = 3 mb.last_reg == 0x300e mb.last_len == 2 @@ -75,36 +87,48 @@ def test_recv_recv_addr(): call += 1 assert mb.err == 2 assert 0 == call + assert mb.que.qsize() == 0 + mb.stop_timer() + assert not mb.req_pend def test_recv_recv_fcode(): mb = TestHelper() - mb.last_fcode = 4 - mb.last_reg == 0x300e - mb.last_len == 2 - + mb.build_msg(1,4,0x300e,2) + assert mb.que.qsize() == 0 + assert mb.req_pend + call = 0 for key, update, val in mb.recv_resp(mb.db, b'\x01\x03\x04\x01\x2c\x00\x46\xbb\xf4', 'test'): call += 1 + assert mb.err == 3 assert 0 == call + assert mb.que.qsize() == 0 + mb.stop_timer() + assert not mb.req_pend def test_recv_recv_len(): mb = TestHelper() - mb.last_fcode = 3 - mb.last_reg == 0x300e - mb.last_len == 2 - + mb.build_msg(1,3,0x300e,3) + assert mb.que.qsize() == 0 + assert mb.req_pend + assert mb.last_len == 3 call = 0 for key, update, _ in mb.recv_resp(mb.db, b'\x01\x03\x04\x01\x2c\x00\x46\xbb\xf4', 'test'): call += 1 + assert mb.err == 4 assert 0 == call + assert mb.que.qsize() == 0 + mb.stop_timer() + assert not mb.req_pend def test_build_recv(): mb = TestHelper() - pdu = mb.build_msg(1,3,0x3007,6) - assert mb.check_crc(pdu) - assert mb.err == 0 + mb.build_msg(1,3,0x3007,6) + assert mb.que.qsize() == 0 + assert mb.req_pend + call = 0 exp_result = ['V0.0.212', 4.4, 0.7, 0.7, 30] for key, update, val in mb.recv_resp(mb.db, b'\x01\x03\x0c\x01\x2c\x00\x2c\x00\x2c\x00\x46\x00\x46\x00\x46\x32\xc8', 'test'): @@ -121,6 +145,7 @@ def test_build_recv(): assert 0 == mb.err assert 5 == call + mb.req_pend = True call = 0 for key, update, val in mb.recv_resp(mb.db, b'\x01\x03\x0c\x01\x2c\x00\x2c\x00\x2c\x00\x46\x00\x46\x00\x46\x32\xc8', 'test'): if key == 'grid': @@ -133,13 +158,20 @@ def test_build_recv(): assert False assert exp_result[call] == val call += 1 + assert 0 == mb.err assert 5 == call + assert mb.que.qsize() == 0 + mb.stop_timer() + assert not mb.req_pend def test_build_long(): mb = TestHelper() - pdu = mb.build_msg(1,3,0x3022,4) - assert mb.check_crc(pdu) + mb.build_msg(1,3,0x3022,4) + assert mb.que.qsize() == 0 + assert mb.req_pend + assert mb.last_addr == 1 + assert mb.last_fcode == 3 assert mb.err == 0 call = 0 exp_result = [3.0, 28841.4, 113.34] @@ -150,5 +182,84 @@ def test_build_long(): else: assert False call += 1 + assert 0 == mb.err assert 3 == call + assert mb.que.qsize() == 0 + mb.stop_timer() + assert not mb.req_pend + +def test_queue(): + mb = TestHelper() + mb.build_msg(1,3,0x3022,4) + assert mb.que.qsize() == 0 + assert mb.req_pend + + assert mb.send_calls == 1 + assert mb.pdu == b'\x01\x030"\x00\x04\xeb\x03' + mb.pdu = None + mb.get_next_req() + assert mb.send_calls == 1 + assert mb.pdu == None + + assert mb.que.qsize() == 0 + mb.stop_timer() + assert not mb.req_pend + +def test_queue2(): + mb = TestHelper() + mb.build_msg(1,3,0x3007,6) + mb.build_msg(1,6,0x2008,4) + assert mb.que.qsize() == 1 + assert mb.req_pend + + assert mb.send_calls == 1 + assert mb.pdu == b'\x01\x030\x07\x00\x06{\t' + mb.get_next_req() + assert mb.send_calls == 1 + call = 0 + exp_result = ['V0.0.212', 4.4, 0.7, 0.7, 30] + for key, update, val in mb.recv_resp(mb.db, b'\x01\x03\x0c\x01\x2c\x00\x2c\x00\x2c\x00\x46\x00\x46\x00\x46\x32\xc8', 'test'): + if key == 'grid': + assert update == True + elif key == 'inverter': + assert update == True + elif key == 'env': + assert update == True + else: + assert False + assert exp_result[call] == val + call += 1 + assert 0 == mb.err + assert 5 == call + + assert mb.send_calls == 2 + assert mb.pdu == b'\x01\x06\x20\x08\x00\x04\x02\x0b' + + for key, update, val in mb.recv_resp(mb.db, b'\x01\x06\x20\x08\x00\x04\x02\x0b', 'test'): + pass + + assert mb.que.qsize() == 0 + assert not mb.req_pend + + +@pytest.mark.asyncio +async def test_timeout(): + assert asyncio.get_running_loop() + mb = TestHelper() + assert asyncio.get_running_loop() == mb.loop + mb.build_msg(1,3,0x3007,6) + mb.build_msg(1,6,0x2008,4) + assert mb.que.qsize() == 1 + assert mb.req_pend + + assert mb.send_calls == 1 + assert mb.pdu == b'\x01\x030\x07\x00\x06{\t' + await asyncio.sleep(1.1) # wait for first timeout and next pdu + assert mb.req_pend + assert mb.send_calls == 2 + assert mb.pdu == b'\x01\x06\x20\x08\x00\x04\x02\x0b' + await asyncio.sleep(1.1) # wait for second timout + + assert not mb.req_pend + assert mb.que.qsize() == 0 diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index 46df675..19132d7 100644 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -16,9 +16,16 @@ Infos.static_init() timestamp = int(time.time()) # 1712861197 heartbeat = 60 +class Writer(): + def write(self, pdu: bytearray): + pass + class MemoryStream(SolarmanV5): def __init__(self, msg, chunks = (0,), server_side: bool = True): super().__init__(server_side) + if server_side: + self.mb.timeout = 1 # overwrite for faster testing + self.writer = Writer() self.__msg = msg self.__msg_len = len(msg) self.__chunks = chunks @@ -35,7 +42,6 @@ class MemoryStream(SolarmanV5): def _heartbeat(self) -> int: return heartbeat - def append_msg(self, msg): self.__msg += msg @@ -1446,9 +1452,12 @@ def test_msg_modbus_rsp3(ConfigTsunInv1, MsgModbusRsp): m.append_msg(MsgModbusRsp) m.forward_modbus_resp = True + m.mb.last_addr = 1 m.mb.last_fcode = 3 m.mb.last_len = 20 m.mb.last_reg = 0x3008 + m.mb.req_pend = True + m.mb.err = 0 # assert m.db.db == {'inverter': {'Manufacturer': 'TSUN', 'Equipment_Model': 'TSOL-MSxx00'}} m.new_data['inverter'] = False @@ -1465,7 +1474,7 @@ def test_msg_modbus_rsp3(ConfigTsunInv1, MsgModbusRsp): m.read() # read complete msg, and dispatch msg assert not m.header_valid # must be invalid, since msg was handled and buffer flushed - assert m.mb.err == 0 + assert m.mb.err == 5 assert m.msg_count == 2 assert m._forward_buffer==MsgModbusRsp assert m._send_buffer==b'' @@ -1515,9 +1524,13 @@ def test_msg_modbus_fragment(ConfigTsunInv1, MsgModbusRsp): m.db.stat['proxy']['Unknown_Ctrl'] = 0 m.db.stat['proxy']['Modbus_Command'] = 0 m.forward_modbus_resp = True + m.mb.last_addr = 1 m.mb.last_fcode = 3 m.mb.last_len = 20 m.mb.last_reg = 0x3008 + m.mb.req_pend = True + m.mb.err = 0 + m.read() # read complete msg, and dispatch msg assert not m.header_valid # must be invalid, since msg was handled and buffer flushed assert m.msg_count == 1 diff --git a/app/tests/test_talent.py b/app/tests/test_talent.py index ed39111..c0b8e1e 100644 --- a/app/tests/test_talent.py +++ b/app/tests/test_talent.py @@ -12,10 +12,18 @@ pytest_plugins = ('pytest_asyncio',) Infos.static_init() tracer = logging.getLogger('tracer') - + + +class Writer(): + def write(self, pdu: bytearray): + pass + class MemoryStream(Talent): def __init__(self, msg, chunks = (0,), server_side: bool = True): super().__init__(server_side) + if server_side: + self.mb.timeout = 1 # overwrite for faster testing + self.writer = Writer() self.__msg = msg self.__msg_len = len(msg) self.__chunks = chunks @@ -896,9 +904,13 @@ def test_msg_modbus_rsp3(ConfigTsunInv1, MsgModbusResp20): m.append_msg(MsgModbusResp20) m.forward_modbus_resp = True + m.mb.last_addr = 1 m.mb.last_fcode = 3 m.mb.last_len = 20 m.mb.last_reg = 0x3008 + m.mb.req_pend = True + m.mb.err = 0 + assert m.db.db == {} m.new_data['inverter'] = False @@ -915,7 +927,7 @@ def test_msg_modbus_rsp3(ConfigTsunInv1, MsgModbusResp20): m.read() # read complete msg, and dispatch msg assert not m.header_valid # must be invalid, since msg was handled and buffer flushed - assert m.mb.err == 0 + assert m.mb.err == 5 assert m.msg_count == 2 assert m._forward_buffer==MsgModbusResp20 assert m._send_buffer==b'' @@ -952,9 +964,13 @@ def test_msg_modbus_fragment(ConfigTsunInv1, MsgModbusResp20): m.db.stat['proxy']['Unknown_Ctrl'] = 0 m.db.stat['proxy']['Modbus_Command'] = 0 m.forward_modbus_resp = True + m.mb.last_addr = 1 m.mb.last_fcode = 3 m.mb.last_len = 20 m.mb.last_reg = 0x3008 + m.mb.req_pend = True + m.mb.err = 0 + m.read() # read complete msg, and dispatch msg assert not m.header_valid # must be invalid, since msg was handled and buffer flushed assert m.msg_count == 1