diff --git a/app/src/mqtt.py b/app/src/mqtt.py index 734d8dd..6b72939 100755 --- a/app/src/mqtt.py +++ b/app/src/mqtt.py @@ -3,6 +3,7 @@ import logging import aiomqtt import traceback import struct +import inspect from modbus import Modbus from messages import Message @@ -28,15 +29,27 @@ class Mqtt(metaclass=Singleton): loop = asyncio.get_event_loop() self.task = loop.create_task(self.__loop()) self.ha_restarts = 0 + self.topic_defs = [ + {'prefix': 'auto_conf_prefix', 'topic': '/status', + 'fnc': self._ha_status, 'args': []}, + {'prefix': 'entity_prefix', 'topic': '/+/rated_load', + 'fnc': self._modbus_cmd, + 'args': [Modbus.WRITE_SINGLE_REG, 1, 0x2008]}, + {'prefix': 'entity_prefix', 'topic': '/+/out_coeff', + 'fnc': self._out_coeff, 'args': []}, + {'prefix': 'entity_prefix', 'topic': '/+/dcu_power', + 'fnc': self._dcu_cmd, 'args': []}, + {'prefix': 'entity_prefix', 'topic': '/+/modbus_read_regs', + 'fnc': self._modbus_cmd, 'args': [Modbus.READ_REGS, 2]}, + {'prefix': 'entity_prefix', 'topic': '/+/modbus_read_inputs', + 'fnc': self._modbus_cmd, 'args': [Modbus.READ_INPUTS, 2]}, + {'prefix': 'entity_prefix', 'topic': '/+/at_cmd', + 'fnc': self._at_cmd, 'args': []}, + ] ha = Config.get('ha') - self.ha_status_topic = f"{ha['auto_conf_prefix']}/status" - self.mb_rated_topic = f"{ha['entity_prefix']}/+/rated_load" - self.mb_out_coeff_topic = f"{ha['entity_prefix']}/+/out_coeff" - self.dcu_power_topic = f"{ha['entity_prefix']}/+/dcu_power" - self.mb_reads_topic = f"{ha['entity_prefix']}/+/modbus_read_regs" - self.mb_inputs_topic = f"{ha['entity_prefix']}/+/modbus_read_inputs" - self.mb_at_cmd_topic = f"{ha['entity_prefix']}/+/at_cmd" + for entry in self.topic_defs: + entry['full_topic'] = f"{ha[entry['prefix']]}{entry['topic']}" @property def ha_restarts(self): @@ -77,20 +90,7 @@ class Mqtt(metaclass=Singleton): try: async with self.__client: logger_mqtt.info('MQTT broker connection established') - self.ctime = datetime.now() - self.published = 0 - self.received = 0 - - if self.__cb_mqtt_is_up: - await self.__cb_mqtt_is_up() - - await self.__client.subscribe(self.ha_status_topic) - await self.__client.subscribe(self.mb_rated_topic) - await self.__client.subscribe(self.mb_out_coeff_topic) - await self.__client.subscribe(self.dcu_power_topic) - await self.__client.subscribe(self.mb_reads_topic) - await self.__client.subscribe(self.mb_inputs_topic) - await self.__client.subscribe(self.mb_at_cmd_topic) + await self._init_new_conn() async for message in self.__client.messages: await self.dispatch_msg(message) @@ -120,50 +120,50 @@ class Mqtt(metaclass=Singleton): f"Exception:\n" f"{traceback.format_exc()}") + async def _init_new_conn(self): + self.ctime = datetime.now() + self.published = 0 + self.received = 0 + if self.__cb_mqtt_is_up: + await self.__cb_mqtt_is_up() + for entry in self.topic_defs: + await self.__client.subscribe(entry['full_topic']) + async def dispatch_msg(self, message): self.received += 1 - if message.topic.matches(self.ha_status_topic): - status = message.payload.decode("UTF-8") - logger_mqtt.info('Home-Assistant Status:' - f' {status}') - if status == 'online': - self.ha_restarts += 1 - await self.__cb_mqtt_is_up() + for entry in self.topic_defs: + if message.topic.matches(entry['full_topic']) \ + and 'fnc' in entry: + fnc = entry['fnc'] - if message.topic.matches(self.mb_rated_topic): - await self.modbus_cmd(message, - Modbus.WRITE_SINGLE_REG, - 1, 0x2008) + if inspect.iscoroutinefunction(fnc): + await entry['fnc'](message, *entry['args']) + elif callable(fnc): + entry['fnc'](message, *entry['args']) - if message.topic.matches(self.mb_out_coeff_topic): - payload = message.payload.decode("UTF-8") - try: - val = round(float(payload) * 1024/100) - if val < 0 or val > 1024: - logger_mqtt.error('out_coeff: value must be in' - 'the range 0..100,' - f' got: {payload}') - else: - await self.modbus_cmd(message, - Modbus.WRITE_SINGLE_REG, - 0, 0x202c, val) - except Exception: - pass + async def _ha_status(self, message): + status = message.payload.decode("UTF-8") + logger_mqtt.info('Home-Assistant Status:' + f' {status}') + if status == 'online': + self.ha_restarts += 1 + await self.__cb_mqtt_is_up() - if message.topic.matches(self.dcu_power_topic): - self.dcu_cmd(message) - - if message.topic.matches(self.mb_reads_topic): - await self.modbus_cmd(message, - Modbus.READ_REGS, 2) - - if message.topic.matches(self.mb_inputs_topic): - await self.modbus_cmd(message, - Modbus.READ_INPUTS, 2) - - if message.topic.matches(self.mb_at_cmd_topic): - await self.at_cmd(message) + async def _out_coeff(self, message): + payload = message.payload.decode("UTF-8") + try: + val = round(float(payload) * 1024/100) + if val < 0 or val > 1024: + logger_mqtt.error('out_coeff: value must be in' + 'the range 0..100,' + f' got: {payload}') + else: + await self._modbus_cmd(message, + Modbus.WRITE_SINGLE_REG, + 0, 0x202c, val) + except Exception: + pass def each_inverter(self, message, func_name: str): topic = str(message.topic) @@ -181,7 +181,7 @@ class Mqtt(metaclass=Singleton): else: logger_mqtt.warning(f'Node_id: {node_id} not found') - async def modbus_cmd(self, message, func, params=0, addr=0, val=0): + async def _modbus_cmd(self, message, func, params=0, addr=0, val=0): payload = message.payload.decode("UTF-8") for fnc in self.each_inverter(message, "send_modbus_cmd"): res = payload.split(',') @@ -196,12 +196,12 @@ class Mqtt(metaclass=Singleton): val = int(res[1]) # lenght await fnc(func, addr, val, logging.INFO) - async def at_cmd(self, message): + async def _at_cmd(self, message): payload = message.payload.decode("UTF-8") for fnc in self.each_inverter(message, "send_at_cmd"): await fnc(payload) - def dcu_cmd(self, message): + def _dcu_cmd(self, message): payload = message.payload.decode("UTF-8") val = round(float(payload) * 10) if val < 1000 or val > 8000: diff --git a/app/tests/test_mqtt.py b/app/tests/test_mqtt.py index b9ee9d3..e80d07c 100755 --- a/app/tests/test_mqtt.py +++ b/app/tests/test_mqtt.py @@ -17,7 +17,7 @@ NO_MOSQUITTO_TEST = False pytest_plugins = ('pytest_asyncio',) -@pytest.fixture(scope="module", autouse=True) +@pytest.fixture(scope="function", autouse=True) def module_init(): Singleton._instances.clear() yield