* replace constructor call with a literal https://sonarcloud.io/project/issues?open=AZeMhhlEyR1Wrs09sNyb&id=s-allius_tsun-gen3-proxy * re-raise cancel error after cleanup https://sonarcloud.io/project/issues?open=AZeMhhltyR1Wrs09sNyc&id=s-allius_tsun-gen3-proxy * remove duplicated line * change send_modbus_cmd into a synchronous function * make send_start_cmd synchronous https://sonarcloud.io/project/issues?open=AZeMhhhyyR1Wrs09sNya&id=s-allius_tsun-gen3-proxy * make more functions synchronous * update changelog
219 lines
7.7 KiB
Python
Executable File
219 lines
7.7 KiB
Python
Executable File
import asyncio
|
|
import logging
|
|
import aiomqtt
|
|
import traceback
|
|
import struct
|
|
import inspect
|
|
|
|
from modbus import Modbus
|
|
from messages import Message
|
|
from cnf.config import Config
|
|
from singleton import Singleton
|
|
from datetime import datetime
|
|
|
|
|
|
logger_mqtt = logging.getLogger('mqtt')
|
|
|
|
|
|
class Mqtt(metaclass=Singleton):
|
|
__client: aiomqtt.Client = None
|
|
__cb_mqtt_is_up = None
|
|
ctime = None
|
|
published: int = 0
|
|
received: int = 0
|
|
|
|
def __init__(self, cb_mqtt_is_up):
|
|
logger_mqtt.debug('MQTT: __init__')
|
|
if cb_mqtt_is_up:
|
|
self.__cb_mqtt_is_up = cb_mqtt_is_up
|
|
loop = asyncio.get_event_loop()
|
|
self.task = loop.create_task(self.__loop())
|
|
self.ha_restarts = 0
|
|
self.topic_defs = [
|
|
{'prefix': 'auto_conf_prefix', 'topic': '/status',
|
|
'fnc': self._ha_status, 'args': []},
|
|
{'prefix': 'entity_prefix', 'topic': '/+/rated_load',
|
|
'fnc': self._modbus_cmd,
|
|
'args': [Modbus.WRITE_SINGLE_REG, 1, 0x2008]},
|
|
{'prefix': 'entity_prefix', 'topic': '/+/out_coeff',
|
|
'fnc': self._out_coeff, 'args': []},
|
|
{'prefix': 'entity_prefix', 'topic': '/+/dcu_power',
|
|
'fnc': self._dcu_cmd, 'args': []},
|
|
{'prefix': 'entity_prefix', 'topic': '/+/modbus_read_regs',
|
|
'fnc': self._modbus_cmd, 'args': [Modbus.READ_REGS, 2]},
|
|
{'prefix': 'entity_prefix', 'topic': '/+/modbus_read_inputs',
|
|
'fnc': self._modbus_cmd, 'args': [Modbus.READ_INPUTS, 2]},
|
|
{'prefix': 'entity_prefix', 'topic': '/+/at_cmd',
|
|
'fnc': self._at_cmd, 'args': []},
|
|
]
|
|
|
|
ha = Config.get('ha')
|
|
for entry in self.topic_defs:
|
|
entry['full_topic'] = f"{ha[entry['prefix']]}{entry['topic']}"
|
|
|
|
@property
|
|
def ha_restarts(self):
|
|
return self._ha_restarts
|
|
|
|
@ha_restarts.setter
|
|
def ha_restarts(self, value):
|
|
self._ha_restarts = value
|
|
|
|
async def close(self) -> None:
|
|
logger_mqtt.debug('MQTT: close')
|
|
self.task.cancel()
|
|
try:
|
|
await self.task
|
|
|
|
except (asyncio.CancelledError, Exception) as e:
|
|
logging.debug(f"Mqtt.close: exception: {e} ...")
|
|
|
|
async def publish(self, topic: str, payload: str | bytes | bytearray
|
|
| int | float | None = None) -> None:
|
|
if self.__client:
|
|
await self.__client.publish(topic, payload)
|
|
self.published += 1
|
|
|
|
async def __loop(self) -> None:
|
|
mqtt = Config.get('mqtt')
|
|
logger_mqtt.info(f'start MQTT: host:{mqtt["host"]} port:'
|
|
f'{mqtt["port"]} '
|
|
f'user:{mqtt["user"]}')
|
|
self.__client = aiomqtt.Client(hostname=mqtt['host'],
|
|
port=mqtt['port'],
|
|
username=mqtt['user'],
|
|
password=mqtt['passwd'])
|
|
|
|
interval = 5 # Seconds
|
|
|
|
while True:
|
|
try:
|
|
async with self.__client:
|
|
logger_mqtt.info('MQTT broker connection established')
|
|
await self._init_new_conn()
|
|
|
|
async for message in self.__client.messages:
|
|
await self.dispatch_msg(message)
|
|
|
|
except aiomqtt.MqttError:
|
|
self.ctime = None
|
|
|
|
if Config.is_default('mqtt'):
|
|
logger_mqtt.info(
|
|
"MQTT is unconfigured; Check your config.toml!")
|
|
interval = 30
|
|
else:
|
|
interval = 5 # Seconds
|
|
logger_mqtt.info(
|
|
f"Connection lost; Reconnecting in {interval}"
|
|
" seconds ...")
|
|
|
|
await asyncio.sleep(interval)
|
|
except asyncio.CancelledError:
|
|
logger_mqtt.debug("MQTT task cancelled")
|
|
self.__client = None
|
|
raise
|
|
except Exception:
|
|
# self.inc_counter('SW_Exception') # fixme
|
|
self.ctime = None
|
|
logger_mqtt.error(
|
|
f"Exception:\n"
|
|
f"{traceback.format_exc()}")
|
|
|
|
async def _init_new_conn(self):
|
|
self.ctime = datetime.now()
|
|
self.published = 0
|
|
self.received = 0
|
|
if self.__cb_mqtt_is_up:
|
|
await self.__cb_mqtt_is_up()
|
|
for entry in self.topic_defs:
|
|
await self.__client.subscribe(entry['full_topic'])
|
|
|
|
async def dispatch_msg(self, message):
|
|
self.received += 1
|
|
|
|
for entry in self.topic_defs:
|
|
if message.topic.matches(entry['full_topic']) \
|
|
and 'fnc' in entry:
|
|
fnc = entry['fnc']
|
|
|
|
if inspect.iscoroutinefunction(fnc):
|
|
await entry['fnc'](message, *entry['args'])
|
|
elif callable(fnc):
|
|
entry['fnc'](message, *entry['args'])
|
|
|
|
async def _ha_status(self, message):
|
|
status = message.payload.decode("UTF-8")
|
|
logger_mqtt.info('Home-Assistant Status:'
|
|
f' {status}')
|
|
if status == 'online':
|
|
self.ha_restarts += 1
|
|
if self.__cb_mqtt_is_up:
|
|
await self.__cb_mqtt_is_up()
|
|
|
|
def _out_coeff(self, message):
|
|
payload = message.payload.decode("UTF-8")
|
|
try:
|
|
val = round(float(payload) * 1024/100)
|
|
if val < 0 or val > 1024:
|
|
logger_mqtt.error('out_coeff: value must be in'
|
|
'the range 0..100,'
|
|
f' got: {payload}')
|
|
else:
|
|
self._modbus_cmd(message,
|
|
Modbus.WRITE_SINGLE_REG,
|
|
0, 0x202c, val)
|
|
except Exception:
|
|
pass
|
|
|
|
def each_inverter(self, message, func_name: str):
|
|
topic = str(message.topic)
|
|
node_id = topic.split('/')[1] + '/'
|
|
for m in Message:
|
|
if m.server_side and (m.node_id == node_id):
|
|
logger_mqtt.debug(f'Found: {node_id}')
|
|
fnc = getattr(m, func_name, None)
|
|
if callable(fnc):
|
|
yield fnc
|
|
else:
|
|
logger_mqtt.warning(f'Cmd not supported by: {node_id}')
|
|
break
|
|
|
|
else:
|
|
logger_mqtt.warning(f'Node_id: {node_id} not found')
|
|
|
|
def _modbus_cmd(self, message, func, params=0, addr=0, val=0):
|
|
payload = message.payload.decode("UTF-8")
|
|
for fnc in self.each_inverter(message, "send_modbus_cmd"):
|
|
res = payload.split(',')
|
|
if params > 0 and params != len(res):
|
|
logger_mqtt.error(f'Parameter expected: {params}, '
|
|
f'got: {len(res)}')
|
|
return
|
|
if params == 1:
|
|
val = int(payload)
|
|
elif params == 2:
|
|
addr = int(res[0], base=16)
|
|
val = int(res[1]) # lenght
|
|
fnc(func, addr, val, logging.INFO)
|
|
|
|
async def _at_cmd(self, message):
|
|
payload = message.payload.decode("UTF-8")
|
|
for fnc in self.each_inverter(message, "send_at_cmd"):
|
|
await fnc(payload)
|
|
|
|
def _dcu_cmd(self, message):
|
|
payload = message.payload.decode("UTF-8")
|
|
try:
|
|
val = round(float(payload) * 10)
|
|
if val < 1000 or val > 8000:
|
|
logger_mqtt.error('dcu_power: value must be in'
|
|
'the range 100..800,'
|
|
f' got: {payload}')
|
|
else:
|
|
pdu = struct.pack('>BBBBBBH', 1, 1, 6, 1, 0, 1, val)
|
|
for fnc in self.each_inverter(message, "send_dcu_cmd"):
|
|
fnc(pdu)
|
|
except Exception:
|
|
pass
|