cleanup MQTT topic handling

This commit is contained in:
Stefan Allius
2025-05-18 19:48:16 +02:00
parent bfd7dbe032
commit fb4fe6b34d
2 changed files with 63 additions and 63 deletions

View File

@@ -3,6 +3,7 @@ import logging
import aiomqtt import aiomqtt
import traceback import traceback
import struct import struct
import inspect
from modbus import Modbus from modbus import Modbus
from messages import Message from messages import Message
@@ -28,15 +29,27 @@ class Mqtt(metaclass=Singleton):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
self.task = loop.create_task(self.__loop()) self.task = loop.create_task(self.__loop())
self.ha_restarts = 0 self.ha_restarts = 0
self.topic_defs = [
{'prefix': 'auto_conf_prefix', 'topic': '/status',
'fnc': self._ha_status, 'args': []},
{'prefix': 'entity_prefix', 'topic': '/+/rated_load',
'fnc': self._modbus_cmd,
'args': [Modbus.WRITE_SINGLE_REG, 1, 0x2008]},
{'prefix': 'entity_prefix', 'topic': '/+/out_coeff',
'fnc': self._out_coeff, 'args': []},
{'prefix': 'entity_prefix', 'topic': '/+/dcu_power',
'fnc': self._dcu_cmd, 'args': []},
{'prefix': 'entity_prefix', 'topic': '/+/modbus_read_regs',
'fnc': self._modbus_cmd, 'args': [Modbus.READ_REGS, 2]},
{'prefix': 'entity_prefix', 'topic': '/+/modbus_read_inputs',
'fnc': self._modbus_cmd, 'args': [Modbus.READ_INPUTS, 2]},
{'prefix': 'entity_prefix', 'topic': '/+/at_cmd',
'fnc': self._at_cmd, 'args': []},
]
ha = Config.get('ha') ha = Config.get('ha')
self.ha_status_topic = f"{ha['auto_conf_prefix']}/status" for entry in self.topic_defs:
self.mb_rated_topic = f"{ha['entity_prefix']}/+/rated_load" entry['full_topic'] = f"{ha[entry['prefix']]}{entry['topic']}"
self.mb_out_coeff_topic = f"{ha['entity_prefix']}/+/out_coeff"
self.dcu_power_topic = f"{ha['entity_prefix']}/+/dcu_power"
self.mb_reads_topic = f"{ha['entity_prefix']}/+/modbus_read_regs"
self.mb_inputs_topic = f"{ha['entity_prefix']}/+/modbus_read_inputs"
self.mb_at_cmd_topic = f"{ha['entity_prefix']}/+/at_cmd"
@property @property
def ha_restarts(self): def ha_restarts(self):
@@ -77,20 +90,7 @@ class Mqtt(metaclass=Singleton):
try: try:
async with self.__client: async with self.__client:
logger_mqtt.info('MQTT broker connection established') logger_mqtt.info('MQTT broker connection established')
self.ctime = datetime.now() await self._init_new_conn()
self.published = 0
self.received = 0
if self.__cb_mqtt_is_up:
await self.__cb_mqtt_is_up()
await self.__client.subscribe(self.ha_status_topic)
await self.__client.subscribe(self.mb_rated_topic)
await self.__client.subscribe(self.mb_out_coeff_topic)
await self.__client.subscribe(self.dcu_power_topic)
await self.__client.subscribe(self.mb_reads_topic)
await self.__client.subscribe(self.mb_inputs_topic)
await self.__client.subscribe(self.mb_at_cmd_topic)
async for message in self.__client.messages: async for message in self.__client.messages:
await self.dispatch_msg(message) await self.dispatch_msg(message)
@@ -120,50 +120,50 @@ class Mqtt(metaclass=Singleton):
f"Exception:\n" f"Exception:\n"
f"{traceback.format_exc()}") f"{traceback.format_exc()}")
async def _init_new_conn(self):
self.ctime = datetime.now()
self.published = 0
self.received = 0
if self.__cb_mqtt_is_up:
await self.__cb_mqtt_is_up()
for entry in self.topic_defs:
await self.__client.subscribe(entry['full_topic'])
async def dispatch_msg(self, message): async def dispatch_msg(self, message):
self.received += 1 self.received += 1
if message.topic.matches(self.ha_status_topic): for entry in self.topic_defs:
status = message.payload.decode("UTF-8") if message.topic.matches(entry['full_topic']) \
logger_mqtt.info('Home-Assistant Status:' and 'fnc' in entry:
f' {status}') fnc = entry['fnc']
if status == 'online':
self.ha_restarts += 1
await self.__cb_mqtt_is_up()
if message.topic.matches(self.mb_rated_topic): if inspect.iscoroutinefunction(fnc):
await self.modbus_cmd(message, await entry['fnc'](message, *entry['args'])
Modbus.WRITE_SINGLE_REG, elif callable(fnc):
1, 0x2008) entry['fnc'](message, *entry['args'])
if message.topic.matches(self.mb_out_coeff_topic): async def _ha_status(self, message):
payload = message.payload.decode("UTF-8") status = message.payload.decode("UTF-8")
try: logger_mqtt.info('Home-Assistant Status:'
val = round(float(payload) * 1024/100) f' {status}')
if val < 0 or val > 1024: if status == 'online':
logger_mqtt.error('out_coeff: value must be in' self.ha_restarts += 1
'the range 0..100,' await self.__cb_mqtt_is_up()
f' got: {payload}')
else:
await self.modbus_cmd(message,
Modbus.WRITE_SINGLE_REG,
0, 0x202c, val)
except Exception:
pass
if message.topic.matches(self.dcu_power_topic): async def _out_coeff(self, message):
self.dcu_cmd(message) payload = message.payload.decode("UTF-8")
try:
if message.topic.matches(self.mb_reads_topic): val = round(float(payload) * 1024/100)
await self.modbus_cmd(message, if val < 0 or val > 1024:
Modbus.READ_REGS, 2) logger_mqtt.error('out_coeff: value must be in'
'the range 0..100,'
if message.topic.matches(self.mb_inputs_topic): f' got: {payload}')
await self.modbus_cmd(message, else:
Modbus.READ_INPUTS, 2) await self._modbus_cmd(message,
Modbus.WRITE_SINGLE_REG,
if message.topic.matches(self.mb_at_cmd_topic): 0, 0x202c, val)
await self.at_cmd(message) except Exception:
pass
def each_inverter(self, message, func_name: str): def each_inverter(self, message, func_name: str):
topic = str(message.topic) topic = str(message.topic)
@@ -181,7 +181,7 @@ class Mqtt(metaclass=Singleton):
else: else:
logger_mqtt.warning(f'Node_id: {node_id} not found') logger_mqtt.warning(f'Node_id: {node_id} not found')
async def modbus_cmd(self, message, func, params=0, addr=0, val=0): async def _modbus_cmd(self, message, func, params=0, addr=0, val=0):
payload = message.payload.decode("UTF-8") payload = message.payload.decode("UTF-8")
for fnc in self.each_inverter(message, "send_modbus_cmd"): for fnc in self.each_inverter(message, "send_modbus_cmd"):
res = payload.split(',') res = payload.split(',')
@@ -196,12 +196,12 @@ class Mqtt(metaclass=Singleton):
val = int(res[1]) # lenght val = int(res[1]) # lenght
await fnc(func, addr, val, logging.INFO) await fnc(func, addr, val, logging.INFO)
async def at_cmd(self, message): async def _at_cmd(self, message):
payload = message.payload.decode("UTF-8") payload = message.payload.decode("UTF-8")
for fnc in self.each_inverter(message, "send_at_cmd"): for fnc in self.each_inverter(message, "send_at_cmd"):
await fnc(payload) await fnc(payload)
def dcu_cmd(self, message): def _dcu_cmd(self, message):
payload = message.payload.decode("UTF-8") payload = message.payload.decode("UTF-8")
val = round(float(payload) * 10) val = round(float(payload) * 10)
if val < 1000 or val > 8000: if val < 1000 or val > 8000:

View File

@@ -17,7 +17,7 @@ NO_MOSQUITTO_TEST = False
pytest_plugins = ('pytest_asyncio',) pytest_plugins = ('pytest_asyncio',)
@pytest.fixture(scope="module", autouse=True) @pytest.fixture(scope="function", autouse=True)
def module_init(): def module_init():
Singleton._instances.clear() Singleton._instances.clear()
yield yield