Sonar qube 4 (#169)

* add unit test for inverter.py

* fix SonarQube warning
This commit is contained in:
Stefan Allius
2024-08-24 22:21:55 +02:00
committed by GitHub
parent 27045cac6e
commit f9c1b83ccd
7 changed files with 116 additions and 16 deletions

View File

@@ -172,7 +172,7 @@ class SolarmanV5(Message):
self.db.set_db_def_value(Register.POLLING_INTERVAL, self.db.set_db_def_value(Register.POLLING_INTERVAL,
self.mb_timeout) self.mb_timeout)
self.db.set_db_def_value(Register.HEARTBEAT_INTERVAL, self.db.set_db_def_value(Register.HEARTBEAT_INTERVAL,
120) # fixme 120)
self.new_data['controller'] = True self.new_data['controller'] = True
self.state = State.up self.state = State.up

View File

@@ -1,11 +1,15 @@
import asyncio import asyncio
import logging import logging
import json import json
if __name__ == "app.src.inverter":
from app.src.config import Config
from app.src.mqtt import Mqtt
from app.src.infos import Infos
else: # pragma: no cover
from config import Config from config import Config
from mqtt import Mqtt from mqtt import Mqtt
from infos import Infos from infos import Infos
# logger = logging.getLogger('conn')
logger_mqtt = logging.getLogger('mqtt') logger_mqtt = logging.getLogger('mqtt')
@@ -24,6 +28,7 @@ class Inverter():
cls.proxy_unique_id = ha['proxy_unique_id'] cls.proxy_unique_id = ha['proxy_unique_id']
# call Mqtt singleton to establisch the connection to the mqtt broker # call Mqtt singleton to establisch the connection to the mqtt broker
print('call Mqtt.init')
cls.mqtt = Mqtt(cls._cb_mqtt_is_up) cls.mqtt = Mqtt(cls._cb_mqtt_is_up)
# register all counters which should be reset at midnight. # register all counters which should be reset at midnight.
@@ -72,7 +77,7 @@ class Inverter():
Infos.new_stat_data[key] = False Infos.new_stat_data[key] = False
@classmethod @classmethod
def class_close(cls, loop) -> None: def class_close(cls, loop) -> None: # pragma: no cover
logging.debug('Inverter.class_close') logging.debug('Inverter.class_close')
logging.info('Close MQTT Task') logging.info('Close MQTT Task')
loop.run_until_complete(cls.mqtt.close()) loop.run_until_complete(cls.mqtt.close())

View File

@@ -0,0 +1,90 @@
# test_with_pytest.py
import pytest
import asyncio
import aiomqtt
import logging
from mock import patch, Mock
from app.src.singleton import Singleton
from app.src.inverter import Inverter
from app.src.mqtt import Mqtt
from app.src.gen3plus.solarman_v5 import SolarmanV5
from app.src.config import Config
pytest_plugins = ('pytest_asyncio',)
@pytest.fixture(scope="module", autouse=True)
def module_init():
def new_init(cls, cb_mqtt_is_up):
cb_mqtt_is_up()
Singleton._instances.clear()
with patch.object(Mqtt, '__init__', new_init):
yield
@pytest.fixture(scope="module")
def test_port():
return 1883
@pytest.fixture(scope="module")
def test_hostname():
# if getenv("GITHUB_ACTIONS") == "true":
# return 'mqtt'
# else:
return 'test.mosquitto.org'
@pytest.fixture
def config_conn(test_hostname, test_port):
Config.act_config = {
'mqtt':{
'host': test_hostname,
'port': test_port,
'user': '',
'passwd': ''
},
'ha':{
'auto_conf_prefix': 'homeassistant',
'discovery_prefix': 'homeassistant',
'entity_prefix': 'tsun',
'proxy_node_id': 'test_1',
'proxy_unique_id': ''
},
'inverters': {
'allow_all': True,
"R170000000000001":{
'node_id': 'inv_1'
}
}
}
@pytest.mark.asyncio
async def test_inverter_cb(config_conn):
_ = config_conn
with patch.object(Inverter, '_cb_mqtt_is_up', wraps=Inverter._cb_mqtt_is_up) as spy:
print('call Inverter.class_init')
Inverter.class_init()
assert 'homeassistant/' == Inverter.discovery_prfx
assert 'tsun/' == Inverter.entity_prfx
assert 'test_1/' == Inverter.proxy_node_id
spy.assert_called_once()
@pytest.mark.asyncio
async def test_mqtt_is_up(config_conn):
_ = config_conn
with patch.object(Mqtt, 'publish') as spy:
Inverter.class_init()
await Inverter._cb_mqtt_is_up()
spy.assert_called()
@pytest.mark.asyncio
async def test_mqtt_proxy_statt_invalid(config_conn):
_ = config_conn
with patch.object(Mqtt, 'publish') as spy:
Inverter.class_init()
await Inverter._async_publ_mqtt_proxy_stat('InValId_kEy')
spy.assert_not_called()

View File

@@ -5,6 +5,7 @@ import aiomqtt
import logging import logging
from mock import patch, Mock from mock import patch, Mock
from app.src.singleton import Singleton
from app.src.mqtt import Mqtt from app.src.mqtt import Mqtt
from app.src.modbus import Modbus from app.src.modbus import Modbus
from app.src.gen3plus.solarman_v5 import SolarmanV5 from app.src.gen3plus.solarman_v5 import SolarmanV5
@@ -13,7 +14,10 @@ from app.src.config import Config
pytest_plugins = ('pytest_asyncio',) pytest_plugins = ('pytest_asyncio',)
@pytest.fixture(scope="module", autouse=True)
def module_init():
Singleton._instances.clear()
yield
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def test_port(): def test_port():

View File

@@ -7,6 +7,7 @@ class Test(metaclass=Singleton):
pass # is a dummy test class pass # is a dummy test class
def test_singleton_metaclass(): def test_singleton_metaclass():
Singleton._instances.clear()
a = Test() a = Test()
assert 1 == len(Singleton._instances) assert 1 == len(Singleton._instances)
b = Test() b = Test()

View File

@@ -1150,7 +1150,7 @@ def test_msg_ota_invalid(config_tsun_inv1, msg_ota_invalid):
m.close() m.close()
def test_msg_unknown(config_tsun_inv1, msg_unknown): def test_msg_unknown(config_tsun_inv1, msg_unknown):
config_tsun_inv1 _ = config_tsun_inv1
m = MemoryStream(msg_unknown, (0,), False) m = MemoryStream(msg_unknown, (0,), False)
m.db.stat['proxy']['Unknown_Msg'] = 0 m.db.stat['proxy']['Unknown_Msg'] = 0
m.read() # read complete msg, and dispatch msg m.read() # read complete msg, and dispatch msg

View File

@@ -138,9 +138,9 @@ def tempClientConnection():
def test_open_close(): def test_open_close():
try: try:
for s in tempClientConnection(): for _ in tempClientConnection():
pass pass
except: except Exception:
assert False assert False
def test_send_contact_info1(ClientConnection, MsgContactInfo, MsgContactResp): def test_send_contact_info1(ClientConnection, MsgContactInfo, MsgContactResp):
@@ -199,14 +199,14 @@ def test_send_ctrl_data(ClientConnection, MsgTimeStampReq, MsgTimeStampResp, Msg
s = ClientConnection s = ClientConnection
try: try:
s.sendall(MsgTimeStampReq) s.sendall(MsgTimeStampReq)
data = s.recv(1024) _ = s.recv(1024)
except TimeoutError: except TimeoutError:
pass pass
# time.sleep(2.5) # time.sleep(2.5)
# assert data == MsgTimeStampResp # assert data == MsgTimeStampResp
try: try:
s.sendall(MsgContollerInd) s.sendall(MsgContollerInd)
data = s.recv(1024) _ = s.recv(1024)
except TimeoutError: except TimeoutError:
pass pass
@@ -214,16 +214,16 @@ def test_send_inv_data(ClientConnection, MsgTimeStampReq, MsgTimeStampResp, MsgI
s = ClientConnection s = ClientConnection
try: try:
s.sendall(MsgTimeStampReq) s.sendall(MsgTimeStampReq)
data = s.recv(1024) _ = s.recv(1024)
except TimeoutError: except TimeoutError:
pass pass
# time.sleep(32.5) # time.sleep(32.5)
# assert data == MsgTimeStampResp # assert data == MsgTimeStampResp
try: try:
s.sendall(MsgInvData) s.sendall(MsgInvData)
data = s.recv(1024) _ = s.recv(1024)
s.sendall(MsgInverterInd) s.sendall(MsgInverterInd)
data = s.recv(1024) _ = s.recv(1024)
except TimeoutError: except TimeoutError:
pass pass
@@ -231,6 +231,6 @@ def test_ota_req(ClientConnection, MsgOtaUpdateReq):
s = ClientConnection s = ClientConnection
try: try:
s.sendall(MsgOtaUpdateReq) s.sendall(MsgOtaUpdateReq)
data = s.recv(1024) _ = s.recv(1024)
except TimeoutError: except TimeoutError:
pass pass