register proxy dev as soon as the MQTT connection is established
This commit is contained in:
@@ -3,6 +3,7 @@ from config import Config
|
||||
from async_stream import AsyncStream
|
||||
from mqtt import Mqtt
|
||||
from aiomqtt import MqttCodeError
|
||||
from infos import Infos
|
||||
|
||||
#import gc
|
||||
|
||||
@@ -12,16 +13,61 @@ logger_mqtt = logging.getLogger('mqtt')
|
||||
|
||||
|
||||
class Inverter(AsyncStream):
|
||||
@classmethod
|
||||
def class_init(cls):
|
||||
logging.debug('Inverter.class_init')
|
||||
# initialize the proxy statistics
|
||||
Infos.static_init()
|
||||
cls.db_stat = Infos()
|
||||
|
||||
ha = Config.get('ha')
|
||||
cls.entity_prfx = ha['entity_prefix'] + '/'
|
||||
cls.discovery_prfx = ha['discovery_prefix'] + '/'
|
||||
cls.proxy_node_id = ha['proxy_node_id'] + '/'
|
||||
cls.proxy_unique_id = ha['proxy_unique_id']
|
||||
|
||||
# call Mqtt singleton to establisch the connection to the mqtt broker
|
||||
cls.mqtt = Mqtt(cls.__cb_MqttIsUp)
|
||||
|
||||
@classmethod
|
||||
async def __cb_MqttIsUp(cls):
|
||||
logging.info('Initialize proxy device on home assistant')
|
||||
# register proxy status counters at home assistant
|
||||
await cls.__register_proxy_stat_home_assistant()
|
||||
|
||||
# send values of the proxy status counters
|
||||
await asyncio.sleep(0.5) # wait a bit, before sending first data
|
||||
cls.new_stat_data['proxy'] = True # force sending data to sync home assistant
|
||||
await cls.__async_publ_mqtt_proxy_stat('proxy')
|
||||
|
||||
@classmethod
|
||||
async def __register_proxy_stat_home_assistant(cls) -> None:
|
||||
'''register all our topics at home assistant'''
|
||||
for data_json, component, node_id, id in cls.db_stat.ha_confs(cls.entity_prfx, cls.proxy_node_id, cls.proxy_unique_id, True):
|
||||
logger_mqtt.debug(f"MQTT Register: cmp:'{component}' node_id:'{node_id}' {data_json}")
|
||||
await cls.mqtt.publish(f"{cls.discovery_prfx}{component}/{node_id}{id}/config", data_json)
|
||||
|
||||
@classmethod
|
||||
async def __async_publ_mqtt_proxy_stat(cls, key):
|
||||
stat = Infos.stat
|
||||
if key in stat and cls.new_stat_data[key]:
|
||||
data_json = json.dumps(stat[key])
|
||||
node_id = cls.proxy_node_id
|
||||
logger_mqtt.debug(f'{key}: {data_json}')
|
||||
await cls.mqtt.publish(f"{cls.entity_prfx}{node_id}{key}", data_json)
|
||||
cls.new_stat_data[key] = False
|
||||
|
||||
|
||||
@classmethod
|
||||
def class_close(cls, loop):
|
||||
logging.debug('Inverter.class_close')
|
||||
logging.info ('Close MQTT Task')
|
||||
loop.run_until_complete(cls.mqtt.close())
|
||||
cls.mqtt = None
|
||||
|
||||
def __init__ (self, reader, writer, addr):
|
||||
super().__init__(reader, writer, addr, None, True)
|
||||
self.mqtt = Mqtt()
|
||||
self.ha_restarts = -1
|
||||
ha = Config.get('ha')
|
||||
self.entity_prfx = ha['entity_prefix'] + '/'
|
||||
self.discovery_prfx = ha['discovery_prefix'] + '/'
|
||||
self.proxy_node_id = ha['proxy_node_id'] + '/'
|
||||
self.proxy_unique_id = ha['proxy_unique_id']
|
||||
|
||||
|
||||
async def server_loop(self, addr):
|
||||
@@ -37,7 +83,7 @@ class Inverter(AsyncStream):
|
||||
logging.debug ("disconnect client connection")
|
||||
self.remoteStream.disc()
|
||||
try:
|
||||
await self.__async_publ_mqtt_packet('proxy')
|
||||
await self.__async_publ_mqtt_proxy_stat('proxy')
|
||||
except: pass
|
||||
|
||||
async def client_loop(self, addr):
|
||||
@@ -81,11 +127,15 @@ class Inverter(AsyncStream):
|
||||
if (('inverter' in self.new_data and self.new_data['inverter']) or
|
||||
('collector' in self.new_data and self.new_data['collector']) or
|
||||
self.mqtt.ha_restarts != self.ha_restarts):
|
||||
await self.__register_proxy_stat_home_assistant()
|
||||
await self.__register_home_assistant()
|
||||
self.ha_restarts = self.mqtt.ha_restarts
|
||||
|
||||
for key in self.new_data:
|
||||
await self.__async_publ_mqtt_packet(key)
|
||||
for key in self.new_stat_data:
|
||||
await self.__async_publ_mqtt_proxy_stat(key)
|
||||
|
||||
except MqttCodeError as error:
|
||||
logging.error(f'Mqtt except: {error}')
|
||||
except Exception:
|
||||
@@ -96,23 +146,16 @@ class Inverter(AsyncStream):
|
||||
|
||||
async def __async_publ_mqtt_packet(self, key):
|
||||
db = self.db.db
|
||||
stat = self.db.stat
|
||||
if self.new_data[key]:
|
||||
if key in db:
|
||||
data_json = json.dumps(db[key])
|
||||
node_id = self.node_id
|
||||
elif key in stat:
|
||||
data_json = json.dumps(stat[key])
|
||||
node_id = self.proxy_node_id
|
||||
else:
|
||||
return
|
||||
if key in db and self.new_data[key]:
|
||||
data_json = json.dumps(db[key])
|
||||
node_id = self.node_id
|
||||
logger_mqtt.debug(f'{key}: {data_json}')
|
||||
await self.mqtt.publish(f"{self.entity_prfx}{node_id}{key}", data_json)
|
||||
self.new_data[key] = False
|
||||
|
||||
async def __register_home_assistant(self) -> None:
|
||||
'''register all our topics at home assistant'''
|
||||
for data_json, component, node_id, id in self.db.ha_confs(self.entity_prfx, self.node_id, self.unique_id, self.proxy_node_id, self.proxy_unique_id, self.sug_area):
|
||||
for data_json, component, node_id, id in self.db.ha_confs(self.entity_prfx, self.node_id, self.unique_id, False, self.sug_area):
|
||||
logger_mqtt.debug(f"MQTT Register: cmp:'{component}' node_id:'{node_id}' {data_json}")
|
||||
await self.mqtt.publish(f"{self.discovery_prfx}{component}/{node_id}{id}/config", data_json)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user