diff --git a/app/tests/test_mqtt.py b/app/tests/test_mqtt.py index eb68796..8cbe5cd 100755 --- a/app/tests/test_mqtt.py +++ b/app/tests/test_mqtt.py @@ -3,7 +3,8 @@ import pytest import asyncio import aiomqtt import logging -from aiomqtt import MqttError +from aiomqtt import MqttError, MessagesIterator +from aiomqtt import Message as AiomqttMessage from mock import patch, Mock from async_stream import AsyncIfcImpl @@ -34,6 +35,26 @@ def test_hostname(): # else: return 'test.mosquitto.org' +@pytest.fixture(scope="function") +def aiomqtt_mock(monkeypatch): + recv_que = asyncio.Queue() + + async def my_aenter(self): + return self + async def my_subscribe(self, *arg): + return + async def my_anext(self): + return await recv_que.get() + async def my_receive(self, topic: str, payload: bytes): + msg = AiomqttMessage(topic, payload,qos=0, retain=False, mid=0, properties=None) + await recv_que.put(msg) + await asyncio.sleep(0) # dispath the msg + + monkeypatch.setattr(aiomqtt.Client, "__aenter__", my_aenter) + monkeypatch.setattr(aiomqtt.Client, "subscribe", my_subscribe) + monkeypatch.setattr(MessagesIterator, "__anext__", my_anext) + monkeypatch.setattr(Mqtt, "receive", my_receive, False) + @pytest.fixture def config_mqtt_conn(test_hostname, test_port): Config.act_config = {'mqtt':{'host': test_hostname, 'port': test_port, 'user': '', 'passwd': ''}, @@ -250,92 +271,97 @@ async def test_mqtt_except_def_config(config_def_conn, monkeypatch, caplog): assert 'MQTT is unconfigured; Check your config.toml!' in caplog.text @pytest.mark.asyncio -async def test_msg_dispatch(config_mqtt_conn, spy_modbus_cmd): +async def test_mqtt_dispatch(config_mqtt_conn, aiomqtt_mock, spy_modbus_cmd): _ = config_mqtt_conn + _ = aiomqtt_mock spy = spy_modbus_cmd try: m = Mqtt(None) - msg = aiomqtt.Message(topic= 'homeassistant/status', payload= b'online', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + assert m.ha_restarts == 0 + await m.receive('homeassistant/status', b'online') # send the message assert m.ha_restarts == 1 - msg = aiomqtt.Message(topic= 'tsun/inv_1/rated_load', payload= b'2', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/rated_load', payload= b'2') spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x2008, 2, logging.INFO) - + spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'100', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'100') spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x202c, 1024, logging.INFO) spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'50', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'50') spy.assert_awaited_once_with(Modbus.WRITE_SINGLE_REG, 0x202c, 512, logging.INFO) spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10') spy.assert_awaited_once_with(Modbus.READ_REGS, 0x3000, 10, logging.INFO) spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10') spy.assert_awaited_once_with(Modbus.READ_INPUTS, 0x3000, 10, logging.INFO) # test dispatching with empty mapping table m.topic_defs.clear() spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10') spy.assert_not_called() # test dispatching with incomplete mapping table - invalid fnc defined m.topic_defs.append( {'prefix': 'entity_prefix', 'topic': '/+/modbus_read_inputs', - 'full_topic': 'tsun/+/modbus_read_inputs', 'fnc': 'invalid'} + 'full_topic': 'tsun/+/modbus_read_inputs', 'fnc': 'addr'} ) spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/modbus_read_inputs', payload= b'0x3000, 10') spy.assert_not_called() + except MqttError: + assert False + except Exception: + assert False finally: await m.close() @pytest.mark.asyncio -async def test_msg_dispatch_err(config_mqtt_conn, spy_modbus_cmd): +async def test_mqtt_dispatch_err(config_mqtt_conn, aiomqtt_mock, spy_modbus_cmd, caplog): _ = config_mqtt_conn + _ = aiomqtt_mock spy = spy_modbus_cmd + + LOGGER = logging.getLogger("mqtt") + LOGGER.propagate = True + LOGGER.setLevel(logging.INFO) + try: m = Mqtt(None) + # test out of range param - msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'-1', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'-1') spy.assert_not_called() # test unknown node_id - spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_2/out_coeff', payload= b'2', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_2/out_coeff', payload= b'2') spy.assert_not_called() # test invalid fload param - spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'2, 3', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/out_coeff', payload= b'2, 3') + spy.assert_not_called() + + await m.receive(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10, 7') spy.assert_not_called() - spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/modbus_read_regs', payload= b'0x3000, 10, 7', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) - spy.assert_not_called() - - spy.reset_mock() - msg = aiomqtt.Message(topic= 'tsun/inv_1/dcu_power', payload= b'100W', qos= 0, retain = False, mid= 0, properties= None) - await m.dispatch_msg(msg) + await m.receive(topic= 'tsun/inv_1/dcu_power', payload= b'100W') spy.assert_not_called() + with caplog.at_level(logging.INFO): + msg = aiomqtt.Message(topic= 'tsun/inv_1/out_coeff', payload= b'2', qos= 0, retain = False, mid= 0, properties= None) + for _ in m.each_inverter(msg, "addr"): + pass # do nothing here + assert 'Cmd not supported by: inv_1/' in caplog.text + except MqttError: + assert False + except Exception: + assert False finally: await m.close() diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index 8cb36b1..92874f5 100755 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -709,6 +709,19 @@ def msg_modbus_rsp(): # 0x1510 msg += b'\x15' return msg +@pytest.fixture +def msg_modbus_rsp_mb_4(): # 0x1510, MODBUS Type:4 + msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01' + msg += total() + msg += hb() + msg += b'\x0a\xe2\xfa\x33\x01\x04\x28\x40\x10\x08\xd8' + msg += b'\x00\x00\x13\x87\x00\x31\x00\x68\x02\x58\x00\x00\x01\x53\x00\x02' + msg += b'\x00\x00\x01\x52\x00\x02\x00\x00\x01\x53\x00\x03\x00\x00\x00\x04' + msg += b'\x00\x01\x00\x00\x9e\xa4' + msg += correct_checksum(msg) + msg += b'\x15' + return msg + @pytest.fixture def msg_modbus_interim_rsp(): # 0x0510 msg = b'\xa5\x3b\x00\x10\x15\x03\x03' +get_sn() +b'\x02\x01' @@ -2241,6 +2254,61 @@ async def test_modbus_scaning(config_tsun_scan, heartbeat_ind_msg, heartbeat_rsp assert next(m.mb_timer.exp_count) == 3 m.close() +@pytest.mark.asyncio +async def test_modbus_scaning_inv_rsp(config_tsun_scan, heartbeat_ind_msg, heartbeat_rsp_msg, msg_modbus_rsp_mb_4): + _ = config_tsun_scan + assert asyncio.get_running_loop() + + m = MemoryStream(heartbeat_ind_msg, (0x15,0x56,0)) + m.append_msg(msg_modbus_rsp_mb_4) + assert m.mb_scan == False + assert asyncio.get_running_loop() == m.mb_timer.loop + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + assert m.mb_timer.tim == None + m.read() # read complete msg, and dispatch msg + assert m.mb_scan == True + assert m.mb_start_reg == 0xff80 + assert m.mb_step == 0x40 + assert m.mb_bytes == 0x14 + assert asyncio.get_running_loop() == m.mb_timer.loop + + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.snr == 2070233889 + assert m.control == 0x4710 + + assert m.msg_recvd[0]['control']==0x4710 + assert m.msg_recvd[0]['seq']=='84:11' + assert m.msg_recvd[0]['data_len']==0x1 + + assert m.ifc.tx_fifo.get()==heartbeat_rsp_msg + assert m.ifc.fwd_fifo.get()==heartbeat_ind_msg + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + + m.ifc.tx_clear() # clear send buffer for next test + assert isclose(m.mb_timeout, 0.5) + assert next(m.mb_timer.exp_count) == 0 + + await asyncio.sleep(0.5) + assert m.sent_pdu==b'\xa5\x17\x00\x10E\x12\x84!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00' \ + b'\x00\x00\x00\x00\x00\x00\x01\x03\xff\xc0\x00\x14\x75\xed\x33\x15' + assert m.ifc.tx_fifo.get()==b'' + + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 2 + assert m.msg_recvd[1]['control']==0x1510 + assert m.msg_recvd[1]['seq']=='03:03' + assert m.msg_recvd[1]['data_len']==0x3b + assert m.mb.last_addr == 1 + assert m.mb.last_fcode == 3 + assert m.mb.last_reg == 0xffc0 # mb_start_reg + mb_step + assert m.mb.last_len == 20 + assert m.mb.err == 3 + + assert next(m.mb_timer.exp_count) == 2 + m.close() + @pytest.mark.asyncio async def test_start_client_mode(my_loop, config_tsun_inv1, str_test_ip): _ = config_tsun_inv1