* Sonar qube 6 (#174)

* test class ModbusConn

* Sonar qube 3 (#178)

* add more unit tests

* GEN3: don't crash on overwritten msg in the receive buffer

* improve test coverage und reduce test delays

* reduce cognitive complexity
This commit is contained in:
Stefan Allius
2024-09-03 18:58:24 +02:00
committed by GitHub
parent 627ca97360
commit a9dc7e6847
9 changed files with 334 additions and 122 deletions

View File

@@ -4,6 +4,7 @@ import asyncio
from mock import patch
from enum import Enum
from enum import Enum
from app.src.singleton import Singleton
from app.src.config import Config
from app.src.infos import Infos
@@ -11,6 +12,10 @@ from app.src.mqtt import Mqtt
from app.src.messages import Message, State
from app.src.inverter import Inverter
from app.src.modbus_tcp import ModbusConn, ModbusTcp
from app.src.mqtt import Mqtt
from app.src.messages import Message, State
from app.src.inverter import Inverter
from app.src.modbus_tcp import ModbusConn, ModbusTcp
pytest_plugins = ('pytest_asyncio',)
@@ -77,6 +82,7 @@ class TestType(Enum):
test = TestType.RD_TEST_0_BYTES
class FakeReader():
def __init__(self):
self.on_recv = asyncio.Event()
@@ -110,7 +116,7 @@ class FakeWriter():
@pytest.fixture
def patch_open():
async def new_conn(conn):
await asyncio.sleep(0.01)
await asyncio.sleep(0)
return FakeReader(), FakeWriter()
def new_open(host: str, port: int):
@@ -127,6 +133,11 @@ def patch_no_mqtt():
with patch.object(Mqtt, 'publish') as conn:
yield conn
@pytest.fixture
def patch_no_mqtt():
with patch.object(Mqtt, 'publish') as conn:
yield conn
@pytest.mark.asyncio
async def test_modbus_conn(patch_open):
@@ -161,12 +172,12 @@ async def test_modbus_cnf1(config_conn, patch_open):
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
loop = asyncio.get_event_loop()
ModbusTcp(loop)
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
for m in Message:
if (m.node_id == 'inv_2'):
assert False
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
@pytest.mark.asyncio
@@ -181,7 +192,7 @@ async def test_modbus_cnf2(config_conn, patch_no_mqtt, patch_open):
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
ModbusTcp(asyncio.get_event_loop())
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
test = 0
for m in Message:
if (m.node_id == 'inv_2'):
@@ -192,14 +203,13 @@ async def test_modbus_cnf2(config_conn, patch_no_mqtt, patch_open):
del m
assert 1 == test
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
# check that the connection is released
for m in Message:
if (m.node_id == 'inv_2'):
assert False
@pytest.mark.asyncio
async def test_modbus_cnf3(config_conn, patch_no_mqtt, patch_open):
_ = config_conn
@@ -211,32 +221,24 @@ async def test_modbus_cnf3(config_conn, patch_no_mqtt, patch_open):
test = TestType.RD_TEST_0_BYTES
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
ModbusTcp(asyncio.get_event_loop(), tim_restart= 0.1)
await asyncio.sleep(0.1)
ModbusTcp(asyncio.get_event_loop(), tim_restart= 0)
await asyncio.sleep(0.01)
test = 0
for m in Message:
if (m.node_id == 'inv_2'):
assert Infos.stat['proxy']['Inverter_Cnt'] == 1
m.shutdown_started = False
m.reader.on_recv.set()
test += 1
await asyncio.sleep(0.1)
assert m.state == State.closed
assert 1 == test
await asyncio.sleep(0.1)
assert Infos.stat['proxy']['Inverter_Cnt'] == 1
# check that the connection is released
for m in Message:
if (m.node_id == 'inv_2'):
test += 1
m.shutdown_started = True
m.reader.on_recv.set()
del m
if test == 1:
m.shutdown_started = False
m.reader.on_recv.set()
await asyncio.sleep(0.1)
assert m.state == State.closed
await asyncio.sleep(0.1)
else:
m.shutdown_started = True
m.reader.on_recv.set()
del m
assert 3 == test
await asyncio.sleep(0.1)
assert 2 == test
await asyncio.sleep(0.01)
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
for m in Message:
if (m.node_id == 'inv_2'):
assert False