mock the aiomqtt library and increse coverage (#428)

* mock the aiomqtt library and increse coverage

* test inv response for a mb scan request
This commit is contained in:
Stefan Allius
2025-05-25 01:34:22 +02:00
committed by GitHub
parent 321c66838d
commit f69f9c6d63
2 changed files with 131 additions and 37 deletions

View File

@@ -3,7 +3,8 @@ import pytest
import asyncio import asyncio
import aiomqtt import aiomqtt
import logging import logging
from aiomqtt import MqttError from aiomqtt import MqttError, MessagesIterator
from aiomqtt import Message as AiomqttMessage
from mock import patch, Mock from mock import patch, Mock
from async_stream import AsyncIfcImpl from async_stream import AsyncIfcImpl
@@ -34,6 +35,26 @@ def test_hostname():
# else: # else:
return 'test.mosquitto.org' return 'test.mosquitto.org'
@pytest.fixture(scope="function")
def aiomqtt_mock(monkeypatch):
recv_que = asyncio.Queue()
async def my_aenter(self):
return self
async def my_subscribe(self, *arg):
return
async def my_anext(self):
return await recv_que.get()
async def my_receive(self, topic: str, payload: bytes):
msg = AiomqttMessage(topic, payload,qos=0, retain=False, mid=0, properties=None)
await recv_que.put(msg)
await asyncio.sleep(0) # dispath the msg
monkeypatch.setattr(aiomqtt.Client, "__aenter__", my_aenter)
monkeypatch.setattr(aiomqtt.Client, "subscribe", my_subscribe)
monkeypatch.setattr(MessagesIterator, "__anext__", my_anext)
monkeypatch.setattr(Mqtt, "receive", my_receive, False)
@pytest.fixture @pytest.fixture
def config_mqtt_conn(test_hostname, test_port): def config_mqtt_conn(test_hostname, test_port):
Config.act_config = {'mqtt':{'host': test_hostname, 'port': test_port, 'user': '', 'passwd': ''}, Config.act_config = {'mqtt':{'host': test_hostname, 'port': test_port, 'user': '', 'passwd': ''},
@@ -250,92 +271,97 @@ async def test_mqtt_except_def_config(config_def_conn, monkeypatch, caplog):
assert 'MQTT is unconfigured; Check your config.toml!' in caplog.text assert 'MQTT is unconfigured; Check your config.toml!' in caplog.text
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_msg_dispatch(config_mqtt_conn, spy_modbus_cmd): async def test_mqtt_dispatch(config_mqtt_conn, aiomqtt_mock, spy_modbus_cmd):
_ = config_mqtt_conn _ = config_mqtt_conn
_ = aiomqtt_mock
spy = spy_modbus_cmd spy = spy_modbus_cmd
try: try:
m = Mqtt(None) m = Mqtt(None)
msg = aiomqtt.Message(topic= 'homeassistant/status', payload= b'online', qos= 0, retain = False, mid= 0, properties= None) assert m.ha_restarts == 0
await m.dispatch_msg(msg) await m.receive('homeassistant/status', b'online') # send the message
assert m.ha_restarts == 1 assert m.ha_restarts == 1
msg = aiomqtt.Message(topic= 'tsun/inv_1/rated_load', payload= b'2', qos= 0, retain = False, mid= 0, properties= None) await m.receive(topic= 'tsun/inv_1/rated_load', payload= b'2')
await m.dispatch_msg(msg)
spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x2008, 2, logging.INFO) spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x2008, 2, logging.INFO)
spy.reset_mock() spy.reset_mock()
msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'100', qos= 0, retain = False, mid= 0, properties= None) await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'100')
await m.dispatch_msg(msg)
spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x202c, 1024, logging.INFO) spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x202c, 1024, logging.INFO)
spy.reset_mock() spy.reset_mock()
msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'50', qos= 0, retain = False, mid= 0, properties= None) await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'50')
await m.dispatch_msg(msg)
spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x202c, 512, logging.INFO) spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x202c, 512, logging.INFO)
spy.reset_mock() spy.reset_mock()
msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) await m.receive(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10')
await m.dispatch_msg(msg)
spy.assert_awaited_once_with(Modbus.READ_REGS, 0x3000, 10, logging.INFO) spy.assert_awaited_once_with(Modbus.READ_REGS, 0x3000, 10, logging.INFO)
spy.reset_mock() spy.reset_mock()
msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) await m.receive(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10')
await m.dispatch_msg(msg)
spy.assert_awaited_once_with(Modbus.READ_INPUTS, 0x3000, 10, logging.INFO) spy.assert_awaited_once_with(Modbus.READ_INPUTS, 0x3000, 10, logging.INFO)
# test dispatching with empty mapping table # test dispatching with empty mapping table
m.topic_defs.clear() m.topic_defs.clear()
spy.reset_mock() spy.reset_mock()
msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) await m.receive(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10')
await m.dispatch_msg(msg)
spy.assert_not_called() spy.assert_not_called()
# test dispatching with incomplete mapping table - invalid fnc defined # test dispatching with incomplete mapping table - invalid fnc defined
m.topic_defs.append( m.topic_defs.append(
{'prefix': 'entity_prefix', 'topic': '/+/modbus_read_inputs', {'prefix': 'entity_prefix', 'topic': '/+/modbus_read_inputs',
'full_topic': 'tsun/+/modbus_read_inputs', 'fnc': 'invalid'} 'full_topic': 'tsun/+/modbus_read_inputs', 'fnc': 'addr'}
) )
spy.reset_mock() spy.reset_mock()
msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) await m.receive(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10')
await m.dispatch_msg(msg)
spy.assert_not_called() spy.assert_not_called()
except MqttError:
assert False
except Exception:
assert False
finally: finally:
await m.close() await m.close()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_msg_dispatch_err(config_mqtt_conn, spy_modbus_cmd): async def test_mqtt_dispatch_err(config_mqtt_conn, aiomqtt_mock, spy_modbus_cmd, caplog):
_ = config_mqtt_conn _ = config_mqtt_conn
_ = aiomqtt_mock
spy = spy_modbus_cmd spy = spy_modbus_cmd
LOGGER = logging.getLogger("mqtt")
LOGGER.propagate = True
LOGGER.setLevel(logging.INFO)
try: try:
m = Mqtt(None) m = Mqtt(None)
# test out of range param # test out of range param
msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'-1', qos= 0, retain = False, mid= 0, properties= None) await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'-1')
await m.dispatch_msg(msg)
spy.assert_not_called() spy.assert_not_called()
# test unknown node_id # test unknown node_id
spy.reset_mock() await m.receive(topic= 'tsun/inv_2/out_coeff', payload= b'2')
msg = aiomqtt.Message(topic= 'tsun/inv_2/out_coeff', payload= b'2', qos= 0, retain = False, mid= 0, properties= None)
await m.dispatch_msg(msg)
spy.assert_not_called() spy.assert_not_called()
# test invalid fload param # test invalid fload param
spy.reset_mock() await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'2, 3')
msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'2, 3', qos= 0, retain = False, mid= 0, properties= None) spy.assert_not_called()
await m.dispatch_msg(msg)
await m.receive(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10, 7')
spy.assert_not_called() spy.assert_not_called()
spy.reset_mock() await m.receive(topic= 'tsun/inv_1/dcu_power', payload= b'100W')
msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10, 7', qos= 0, retain = False, mid= 0, properties= None)
await m.dispatch_msg(msg)
spy.assert_not_called()
spy.reset_mock()
msg = aiomqtt.Message(topic= 'tsun/inv_1/dcu_power', payload= b'100W', qos= 0, retain = False, mid= 0, properties= None)
await m.dispatch_msg(msg)
spy.assert_not_called() spy.assert_not_called()
with caplog.at_level(logging.INFO):
msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'2', qos= 0, retain = False, mid= 0, properties= None)
for _ in m.each_inverter(msg, "addr"):
pass # do nothing here
assert 'Cmd not supported by: inv_1/' in caplog.text
except MqttError:
assert False
except Exception:
assert False
finally: finally:
await m.close() await m.close()

View File

@@ -709,6 +709,19 @@ def msg_modbus_rsp(): # 0x1510
msg += b'\x15' msg += b'\x15'
return msg return msg
@pytest.fixture
def msg_modbus_rsp_mb_4(): # 0x1510, MODBUS Type:4
msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01'
msg += total()
msg += hb()
msg += b'\x0a\xe2\xfa\x33\x01\x04\x28\x40\x10\x08\xd8'
msg += b'\x00\x00\x13\x87\x00\x31\x00\x68\x02\x58\x00\x00\x01\x53\x00\x02'
msg += b'\x00\x00\x01\x52\x00\x02\x00\x00\x01\x53\x00\x03\x00\x00\x00\x04'
msg += b'\x00\x01\x00\x00\x9e\xa4'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@pytest.fixture @pytest.fixture
def msg_modbus_interim_rsp(): # 0x0510 def msg_modbus_interim_rsp(): # 0x0510
msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01' msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01'
@@ -2241,6 +2254,61 @@ async def test_modbus_scaning(config_tsun_scan, heartbeat_ind_msg, heartbeat_rsp
assert next(m.mb_timer.exp_count) == 3 assert next(m.mb_timer.exp_count) == 3
m.close() m.close()
@pytest.mark.asyncio
async def test_modbus_scaning_inv_rsp(config_tsun_scan, heartbeat_ind_msg, heartbeat_rsp_msg, msg_modbus_rsp_mb_4):
_ = config_tsun_scan
assert asyncio.get_running_loop()
m = MemoryStream(heartbeat_ind_msg, (0x15,0x56,0))
m.append_msg(msg_modbus_rsp_mb_4)
assert m.mb_scan == False
assert asyncio.get_running_loop() == m.mb_timer.loop
m.db.stat['proxy']['Unknown_Ctrl'] = 0
assert m.mb_timer.tim == None
m.read() # read complete msg, and dispatch msg
assert m.mb_scan == True
assert m.mb_start_reg == 0xff80
assert m.mb_step == 0x40
assert m.mb_bytes == 0x14
assert asyncio.get_running_loop() == m.mb_timer.loop
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.snr == 2070233889
assert m.control == 0x4710
assert m.msg_recvd[0]['control']==0x4710
assert m.msg_recvd[0]['seq']=='84:11'
assert m.msg_recvd[0]['data_len']==0x1
assert m.ifc.tx_fifo.get()==heartbeat_rsp_msg
assert m.ifc.fwd_fifo.get()==heartbeat_ind_msg
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
m.ifc.tx_clear() # clear send buffer for next test
assert isclose(m.mb_timeout, 0.5)
assert next(m.mb_timer.exp_count) == 0
await asyncio.sleep(0.5)
assert m.sent_pdu==b'\xa5\x17\x00\x10E\x12\x84!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00' \
b'\x00\x00\x00\x00\x00\x00\x01\x03\xff\xc0\x00\x14\x75\xed\x33\x15'
assert m.ifc.tx_fifo.get()==b''
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 2
assert m.msg_recvd[1]['control']==0x1510
assert m.msg_recvd[1]['seq']=='03:03'
assert m.msg_recvd[1]['data_len']==0x3b
assert m.mb.last_addr == 1
assert m.mb.last_fcode == 3
assert m.mb.last_reg == 0xffc0 # mb_start_reg + mb_step
assert m.mb.last_len == 20
assert m.mb.err == 3
assert next(m.mb_timer.exp_count) == 2
m.close()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_start_client_mode(my_loop, config_tsun_inv1, str_test_ip): async def test_start_client_mode(my_loop, config_tsun_inv1, str_test_ip):
_ = config_tsun_inv1 _ = config_tsun_inv1