Sonar qube 3 (#178)

* add more unit tests
This commit is contained in:
Stefan Allius
2024-08-29 23:47:30 +02:00
committed by GitHub
parent 6d9addc7d5
commit d2b88ab838
2 changed files with 179 additions and 13 deletions

View File

@@ -38,7 +38,9 @@ class ModbusConn():
class ModbusTcp(): class ModbusTcp():
def __init__(self, loop) -> None: def __init__(self, loop, tim_restart=10) -> None:
self.tim_restart = tim_restart
inverters = Config.get('inverters') inverters = Config.get('inverters')
# logging.info(f'Inverters: {inverters}') # logging.info(f'Inverters: {inverters}')
@@ -79,4 +81,4 @@ class ModbusTcp():
f"ModbusTcpCreate: Exception for {(host, port)}:\n" f"ModbusTcpCreate: Exception for {(host, port)}:\n"
f"{traceback.format_exc()}") f"{traceback.format_exc()}")
await asyncio.sleep(10) await asyncio.sleep(self.tim_restart)

View File

@@ -3,10 +3,14 @@ import pytest
import asyncio import asyncio
from mock import patch from mock import patch
from enum import Enum
from app.src.singleton import Singleton from app.src.singleton import Singleton
from app.src.config import Config from app.src.config import Config
from app.src.infos import Infos from app.src.infos import Infos
from app.src.modbus_tcp import ModbusConn from app.src.mqtt import Mqtt
from app.src.messages import Message, State
from app.src.inverter import Inverter
from app.src.modbus_tcp import ModbusConn, ModbusTcp
pytest_plugins = ('pytest_asyncio',) pytest_plugins = ('pytest_asyncio',)
@@ -31,23 +35,76 @@ def test_hostname():
return 'test.mosquitto.org' return 'test.mosquitto.org'
@pytest.fixture @pytest.fixture
def config_mqtt_conn(test_hostname, test_port): def config_conn(test_hostname, test_port):
Config.act_config = {'mqtt':{'host': test_hostname, 'port': test_port, 'user': '', 'passwd': ''}, Config.act_config = {
'ha':{'auto_conf_prefix': 'homeassistant','discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun'} 'mqtt':{
'host': test_hostname,
'port': test_port,
'user': '',
'passwd': ''
},
'ha':{
'auto_conf_prefix': 'homeassistant',
'discovery_prefix': 'homeassistant',
'entity_prefix': 'tsun',
'proxy_node_id': 'test_1',
'proxy_unique_id': ''
},
'inverters':{
'allow_all': True,
"R170000000000001":{
'node_id': 'inv_1'
},
"Y170000000000001":{
'node_id': 'inv_2',
'monitor_sn': 2000000000,
'modbus_polling': True,
'suggested_area': "",
'sensor_list': 0x2b0,
'client_mode':{
'host': '192.168.0.1',
'port': 8899
}
}
} }
}
@pytest.fixture
def config_no_conn(test_port): class TestType(Enum):
Config.act_config = {'mqtt':{'host': "", 'port': test_port, 'user': '', 'passwd': ''}, RD_TEST_0_BYTES = 1
'ha':{'auto_conf_prefix': 'homeassistant','discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun'} RD_TEST_TIMEOUT = 2
}
test = TestType.RD_TEST_0_BYTES
class FakeReader(): class FakeReader():
pass def __init__(self):
self.on_recv = asyncio.Event()
async def read(self, max_len: int):
await self.on_recv.wait()
if test == TestType.RD_TEST_0_BYTES:
return b''
elif test == TestType.RD_TEST_TIMEOUT:
raise TimeoutError
def feed_eof(self):
return
class FakeWriter(): class FakeWriter():
pass def write(self, buf: bytes):
return
def get_extra_info(self, sel: str):
if sel == 'peername':
return 'remote.intern'
elif sel == 'sockname':
return 'sock:1234'
assert False
def is_closing(self):
return False
def close(self):
return
async def wait_closed(self):
return
@pytest.fixture @pytest.fixture
@@ -57,11 +114,19 @@ def patch_open():
return FakeReader(), FakeWriter() return FakeReader(), FakeWriter()
def new_open(host: str, port: int): def new_open(host: str, port: int):
global test
if test == TestType.RD_TEST_TIMEOUT:
raise TimeoutError
return new_conn(None) return new_conn(None)
with patch.object(asyncio, 'open_connection', new_open) as conn: with patch.object(asyncio, 'open_connection', new_open) as conn:
yield conn yield conn
@pytest.fixture
def patch_no_mqtt():
with patch.object(Mqtt, 'publish') as conn:
yield conn
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_modbus_conn(patch_open): async def test_modbus_conn(patch_open):
@@ -76,3 +141,102 @@ async def test_modbus_conn(patch_open):
assert Infos.stat['proxy']['Inverter_Cnt'] == 1 assert Infos.stat['proxy']['Inverter_Cnt'] == 1
assert Infos.stat['proxy']['Inverter_Cnt'] == 0 assert Infos.stat['proxy']['Inverter_Cnt'] == 0
@pytest.mark.asyncio
async def test_modbus_no_cnf():
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
loop = asyncio.get_event_loop()
ModbusTcp(loop)
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
@pytest.mark.asyncio
async def test_modbus_cnf1(config_conn, patch_open):
_ = config_conn
_ = patch_open
global test
assert asyncio.get_running_loop()
Inverter.class_init()
test = TestType.RD_TEST_TIMEOUT
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
loop = asyncio.get_event_loop()
ModbusTcp(loop)
await asyncio.sleep(0.1)
for m in Message:
if (m.node_id == 'inv_2'):
assert False
await asyncio.sleep(0.1)
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
@pytest.mark.asyncio
async def test_modbus_cnf2(config_conn, patch_no_mqtt, patch_open):
_ = config_conn
_ = patch_open
_ = patch_no_mqtt
global test
assert asyncio.get_running_loop()
Inverter.class_init()
test = TestType.RD_TEST_0_BYTES
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
ModbusTcp(asyncio.get_event_loop())
await asyncio.sleep(0.1)
test = 0
for m in Message:
if (m.node_id == 'inv_2'):
test += 1
assert Infos.stat['proxy']['Inverter_Cnt'] == 1
m.shutdown_started = True
m.reader.on_recv.set()
del m
assert 1 == test
await asyncio.sleep(0.1)
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
# check that the connection is released
for m in Message:
if (m.node_id == 'inv_2'):
assert False
@pytest.mark.asyncio
async def test_modbus_cnf3(config_conn, patch_no_mqtt, patch_open):
_ = config_conn
_ = patch_open
_ = patch_no_mqtt
global test
assert asyncio.get_running_loop()
Inverter.class_init()
test = TestType.RD_TEST_0_BYTES
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
ModbusTcp(asyncio.get_event_loop(), tim_restart= 0.1)
await asyncio.sleep(0.1)
test = 0
for m in Message:
if (m.node_id == 'inv_2'):
assert Infos.stat['proxy']['Inverter_Cnt'] == 1
m.shutdown_started = False
m.reader.on_recv.set()
test += 1
await asyncio.sleep(0.1)
assert m.state == State.closed
assert 1 == test
await asyncio.sleep(0.1)
assert Infos.stat['proxy']['Inverter_Cnt'] == 1
# check that the connection is released
for m in Message:
if (m.node_id == 'inv_2'):
test += 1
m.shutdown_started = True
m.reader.on_recv.set()
del m
assert 3 == test
await asyncio.sleep(0.1)
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
for m in Message:
if (m.node_id == 'inv_2'):
assert False