From 94a96702181ef9ff84d03f62a7fdffe81bc2f008 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Sat, 24 May 2025 23:25:47 +0200 Subject: [PATCH] mock the aiomqtt library and increse coverage --- app/tests/test_mqtt.py | 100 ++++++++++++++++++++++++++--------------- 1 file changed, 63 insertions(+), 37 deletions(-) diff --git a/app/tests/test_mqtt.py b/app/tests/test_mqtt.py index eb68796..8cbe5cd 100755 --- a/app/tests/test_mqtt.py +++ b/app/tests/test_mqtt.py @@ -3,7 +3,8 @@ import pytest import asyncio import aiomqtt import logging -from aiomqtt import MqttError +from aiomqtt import MqttError, MessagesIterator +from aiomqtt import Message as AiomqttMessage from mock import patch, Mock from async_stream import AsyncIfcImpl @@ -34,6 +35,26 @@ def test_hostname(): # else: return 'test.mosquitto.org' +@pytest.fixture(scope="function") +def aiomqtt_mock(monkeypatch): + recv_que = asyncio.Queue() + + async def my_aenter(self): + return self + async def my_subscribe(self, *arg): + return + async def my_anext(self): + return await recv_que.get() + async def my_receive(self, topic: str, payload: bytes): + msg = AiomqttMessage(topic, payload,qos=0, retain=False, mid=0, properties=None) + await recv_que.put(msg) + await asyncio.sleep(0) # dispath the msg + + monkeypatch.setattr(aiomqtt.Client, "__aenter__", my_aenter) + monkeypatch.setattr(aiomqtt.Client, "subscribe", my_subscribe) + monkeypatch.setattr(MessagesIterator, "__anext__", my_anext) + monkeypatch.setattr(Mqtt, "receive", my_receive, False) + @pytest.fixture def config_mqtt_conn(test_hostname, test_port): Config.act_config = {'mqtt':{'host': test_hostname, 'port': test_port, 'user': '', 'passwd': ''}, @@ -250,92 +271,97 @@ async def test_mqtt_except_def_config(config_def_conn, monkeypatch, caplog): assert 'MQTT is unconfigured; Check your config.toml!' in caplog.text @pytest.mark.asyncio -async def test_msg_dispatch(config_mqtt_conn, spy_modbus_cmd): +async def test_mqtt_dispatch(config_mqtt_conn, aiomqtt_mock, spy_modbus_cmd): _ = config_mqtt_conn + _ = aiomqtt_mock spy = spy_modbus_cmd try: m = Mqtt(None) - msg = aiomqtt.Message(topic= 'homeassistant/status', payload= b'online', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + assert m.ha_restarts == 0 + await m.receive('homeassistant/status', b'online') # send the message assert m.ha_restarts == 1 - msg = aiomqtt.Message(topic= 'tsun/inv_1/rated_load', payload= b'2', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/rated_load', payload= b'2') spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x2008, 2, logging.INFO) - + spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'100', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'100') spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x202c, 1024, logging.INFO) spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'50', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'50') spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x202c, 512, logging.INFO) spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10') spy.assert_awaited_once_with(Modbus.READ_REGS, 0x3000, 10, logging.INFO) spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10') spy.assert_awaited_once_with(Modbus.READ_INPUTS, 0x3000, 10, logging.INFO) # test dispatching with empty mapping table m.topic_defs.clear() spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10') spy.assert_not_called() # test dispatching with incomplete mapping table - invalid fnc defined m.topic_defs.append( {'prefix': 'entity_prefix', 'topic': '/+/modbus_read_inputs', - 'full_topic': 'tsun/+/modbus_read_inputs', 'fnc': 'invalid'} + 'full_topic': 'tsun/+/modbus_read_inputs', 'fnc': 'addr'} ) spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10') spy.assert_not_called() + except MqttError: + assert False + except Exception: + assert False finally: await m.close() @pytest.mark.asyncio -async def test_msg_dispatch_err(config_mqtt_conn, spy_modbus_cmd): +async def test_mqtt_dispatch_err(config_mqtt_conn, aiomqtt_mock, spy_modbus_cmd, caplog): _ = config_mqtt_conn + _ = aiomqtt_mock spy = spy_modbus_cmd + + LOGGER = logging.getLogger("mqtt") + LOGGER.propagate = True + LOGGER.setLevel(logging.INFO) + try: m = Mqtt(None) + # test out of range param - msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'-1', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'-1') spy.assert_not_called() # test unknown node_id - spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_2/out_coeff', payload= b'2', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_2/out_coeff', payload= b'2') spy.assert_not_called() # test invalid fload param - spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'2, 3', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'2, 3') + spy.assert_not_called() + + await m.receive(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10, 7') spy.assert_not_called() - spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10, 7', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) - spy.assert_not_called() - - spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/dcu_power', payload= b'100W', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/dcu_power', payload= b'100W') spy.assert_not_called() + with caplog.at_level(logging.INFO): + msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'2', qos= 0, retain = False, mid= 0, properties= None) + for _ in m.each_inverter(msg, "addr"): + pass # do nothing here + assert 'Cmd not supported by: inv_1/' in caplog.text + except MqttError: + assert False + except Exception: + assert False finally: await m.close()