diff --git a/app/src/gen3/inverter_g3.py b/app/src/gen3/inverter_g3.py index f2483e7..8b74716 100644 --- a/app/src/gen3/inverter_g3.py +++ b/app/src/gen3/inverter_g3.py @@ -1,22 +1,14 @@ import logging -import traceback -import json -import asyncio from asyncio import StreamReader, StreamWriter -from aiomqtt import MqttCodeError if __name__ == "app.src.gen3.inverter_g3": - from app.src.config import Config from app.src.inverter import Inverter from app.src.gen3.connection_g3 import ConnectionG3Server from app.src.gen3.connection_g3 import ConnectionG3Client - from app.src.infos import Infos else: # pragma: no cover - from config import Config from inverter import Inverter from gen3.connection_g3 import ConnectionG3Server from gen3.connection_g3 import ConnectionG3Client - from infos import Infos logger_mqtt = logging.getLogger('mqtt') @@ -53,86 +45,15 @@ class InverterG3(Inverter, ConnectionG3Server): ''' def __init__(self, reader: StreamReader, writer: StreamWriter, addr): - super().__init__(reader, writer, addr, None) - self.__ha_restarts = -1 + Inverter.__init__(self) + ConnectionG3Server.__init__(self, reader, writer, addr, None) self.addr = addr async def async_create_remote(self) -> None: - '''Establish a client connection to the TSUN cloud''' - tsun = Config.get('tsun') - host = tsun['host'] - port = tsun['port'] - addr = (host, port) - - try: - logging.info(f'[{self.node_id}] Connect to {addr}') - connect = asyncio.open_connection(host, port) - reader, writer = await connect - self.remote.stream = ConnectionG3Client(reader, writer, addr, self, - self.id_str) - logging.info(f'[{self.remote.stream.node_id}:' - f'{self.remote.stream.conn_no}] ' - f'Connected to {addr}') - asyncio.create_task(self.remote.ifc.client_loop(addr)) - - except (ConnectionRefusedError, TimeoutError) as error: - logging.info(f'{error}') - except Exception: - self.inc_counter('SW_Exception') - logging.error( - f"Inverter: Exception for {addr}:\n" - f"{traceback.format_exc()}") - - async def async_publ_mqtt(self) -> None: - '''publish data to MQTT broker''' - if not self.unique_id: - return - # check if new inverter or collector infos are available or when the - # home assistant has changed the status back to online - try: - if (('inverter' in self.new_data and self.new_data['inverter']) - or ('collector' in self.new_data and - self.new_data['collector']) - or self.mqtt.ha_restarts != self.__ha_restarts): - await self._register_proxy_stat_home_assistant() - await self.__register_home_assistant() - self.__ha_restarts = self.mqtt.ha_restarts - - for key in self.new_data: - await self.__async_publ_mqtt_packet(key) - for key in Infos.new_stat_data: - await Inverter._async_publ_mqtt_proxy_stat(key) - - except MqttCodeError as error: - logging.error(f'Mqtt except: {error}') - except Exception: - self.inc_counter('SW_Exception') - logging.error( - f"Inverter: Exception:\n" - f"{traceback.format_exc()}") - - async def __async_publ_mqtt_packet(self, key): - db = self.db.db - if key in db and self.new_data[key]: - data_json = json.dumps(db[key]) - node_id = self.node_id - logger_mqtt.debug(f'{key}: {data_json}') - await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 - self.new_data[key] = False - - async def __register_home_assistant(self) -> None: - '''register all our topics at home assistant''' - for data_json, component, node_id, id in self.db.ha_confs( - self.entity_prfx, self.node_id, self.unique_id, - self.sug_area): - logger_mqtt.debug(f"MQTT Register: cmp:'{component}'" - f" node_id:'{node_id}' {data_json}") - await self.mqtt.publish(f"{self.discovery_prfx}{component}" - f"/{node_id}{id}/config", data_json) - - self.db.reg_clr_at_midnight(f'{self.entity_prfx}{self.node_id}') + await Inverter.async_create_remote( + self, 'tsun', ConnectionG3Client) def close(self) -> None: logging.debug(f'InverterG3.close() {self.addr}') - super().close() # call close handler in the parent class + ConnectionG3Server.close(self) # logging.info(f'Inverter refs: {gc.get_referrers(self)}') diff --git a/app/src/gen3plus/inverter_g3p.py b/app/src/gen3plus/inverter_g3p.py index 2c9bca7..0d15d92 100644 --- a/app/src/gen3plus/inverter_g3p.py +++ b/app/src/gen3plus/inverter_g3p.py @@ -1,22 +1,14 @@ import logging -import traceback -import json -import asyncio from asyncio import StreamReader, StreamWriter -from aiomqtt import MqttCodeError if __name__ == "app.src.gen3plus.inverter_g3p": - from app.src.config import Config from app.src.inverter import Inverter from app.src.gen3plus.connection_g3p import ConnectionG3PServer from app.src.gen3plus.connection_g3p import ConnectionG3PClient - from app.src.infos import Infos else: # pragma: no cover - from config import Config from inverter import Inverter from gen3plus.connection_g3p import ConnectionG3PServer from gen3plus.connection_g3p import ConnectionG3PClient - from infos import Infos logger_mqtt = logging.getLogger('mqtt') @@ -54,88 +46,16 @@ class InverterG3P(Inverter, ConnectionG3PServer): def __init__(self, reader: StreamReader, writer: StreamWriter, addr, client_mode: bool = False): - super().__init__(reader, writer, addr, None, - client_mode=client_mode) - self.__ha_restarts = -1 + Inverter.__init__(self) + ConnectionG3PServer.__init__( + self, reader, writer, addr, None, client_mode=client_mode) self.addr = addr async def async_create_remote(self) -> None: - '''Establish a client connection to the TSUN cloud''' - tsun = Config.get('solarman') - host = tsun['host'] - port = tsun['port'] - addr = (host, port) - - try: - logging.info(f'[{self.node_id}] Connect to {addr}') - connect = asyncio.open_connection(host, port) - reader, writer = await connect - self.remote.stream = ConnectionG3PClient(reader, writer, - addr, self) - logging.info(f'[{self.remote.stream.node_id}:' - f'{self.remote.stream.conn_no}] ' - f'Connected to {addr}') - asyncio.create_task(self.remote.ifc.client_loop(addr)) - - except (ConnectionRefusedError, TimeoutError) as error: - logging.info(f'{error}') - except Exception: - self.inc_counter('SW_Exception') - logging.error( - f"Inverter: Exception for {addr}:\n" - f"{traceback.format_exc()}") - - async def async_publ_mqtt(self) -> None: - '''publish data to MQTT broker''' - if not self.unique_id: - return - - # check if new inverter or collector infos are available or when the - # home assistant has changed the status back to online - try: - if (('inverter' in self.new_data and self.new_data['inverter']) - or ('collector' in self.new_data and - self.new_data['collector']) - or self.mqtt.ha_restarts != self.__ha_restarts): - await self._register_proxy_stat_home_assistant() - await self.__register_home_assistant() - self.__ha_restarts = self.mqtt.ha_restarts - - for key in self.new_data: - await self.__async_publ_mqtt_packet(key) - for key in Infos.new_stat_data: - await Inverter._async_publ_mqtt_proxy_stat(key) - - except MqttCodeError as error: - logging.error(f'Mqtt except: {error}') - except Exception: - self.inc_counter('SW_Exception') - logging.error( - f"Inverter: Exception:\n" - f"{traceback.format_exc()}") - - async def __async_publ_mqtt_packet(self, key): - db = self.db.db - if key in db and self.new_data[key]: - data_json = json.dumps(db[key]) - node_id = self.node_id - logger_mqtt.debug(f'{key}: {data_json}') - await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 - self.new_data[key] = False - - async def __register_home_assistant(self) -> None: - '''register all our topics at home assistant''' - for data_json, component, node_id, id in self.db.ha_confs( - self.entity_prfx, self.node_id, self.unique_id, - self.sug_area): - logger_mqtt.debug(f"MQTT Register: cmp:'{component}'" - f" node_id:'{node_id}' {data_json}") - await self.mqtt.publish(f"{self.discovery_prfx}{component}" - f"/{node_id}{id}/config", data_json) - - self.db.reg_clr_at_midnight(f'{self.entity_prfx}{self.node_id}') + await Inverter.async_create_remote( + self, 'solarman', ConnectionG3PClient) def close(self) -> None: logging.debug(f'InverterG3P.close() {self.addr}') - super().close() # call close handler in the parent class + ConnectionG3PServer.close(self) # logger.debug (f'Inverter refs: {gc.get_referrers(self)}') diff --git a/app/src/inverter.py b/app/src/inverter.py index 996fa0f..68b7996 100644 --- a/app/src/inverter.py +++ b/app/src/inverter.py @@ -1,6 +1,9 @@ import asyncio import logging +import traceback import json +from aiomqtt import MqttCodeError + if __name__ == "app.src.inverter": from app.src.config import Config from app.src.mqtt import Mqtt @@ -81,3 +84,86 @@ class Inverter(): logging.info('Close MQTT Task') loop.run_until_complete(cls.mqtt.close()) cls.mqtt = None + + def __init__(self): + self.__ha_restarts = -1 + + async def async_create_remote(self, inv_prot: str, conn_class) -> None: + '''Establish a client connection to the TSUN cloud''' + tsun = Config.get(inv_prot) + host = tsun['host'] + port = tsun['port'] + addr = (host, port) + + try: + logging.info(f'[{self.node_id}] Connect to {addr}') + connect = asyncio.open_connection(host, port) + reader, writer = await connect + if hasattr(self, 'id_str'): + self.remote.stream = conn_class( + reader, writer, addr, self, self.id_str) + else: + self.remote.stream = conn_class( + reader, writer, addr, self) + + logging.info(f'[{self.remote.stream.node_id}:' + f'{self.remote.stream.conn_no}] ' + f'Connected to {addr}') + asyncio.create_task(self.remote.ifc.client_loop(addr)) + + except (ConnectionRefusedError, TimeoutError) as error: + logging.info(f'{error}') + except Exception: + self.inc_counter('SW_Exception') + logging.error( + f"Inverter: Exception for {addr}:\n" + f"{traceback.format_exc()}") + + async def async_publ_mqtt(self) -> None: + '''publish data to MQTT broker''' + if not self.unique_id: + return + # check if new inverter or collector infos are available or when the + # home assistant has changed the status back to online + try: + if (('inverter' in self.new_data and self.new_data['inverter']) + or ('collector' in self.new_data and + self.new_data['collector']) + or self.mqtt.ha_restarts != self.__ha_restarts): + await self._register_proxy_stat_home_assistant() + await self.__register_home_assistant() + self.__ha_restarts = self.mqtt.ha_restarts + + for key in self.new_data: + await self.__async_publ_mqtt_packet(key) + for key in Infos.new_stat_data: + await Inverter._async_publ_mqtt_proxy_stat(key) + + except MqttCodeError as error: + logging.error(f'Mqtt except: {error}') + except Exception: + self.inc_counter('SW_Exception') + logging.error( + f"Inverter: Exception:\n" + f"{traceback.format_exc()}") + + async def __async_publ_mqtt_packet(self, key): + db = self.db.db + if key in db and self.new_data[key]: + data_json = json.dumps(db[key]) + node_id = self.node_id + logger_mqtt.debug(f'{key}: {data_json}') + await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 + self.new_data[key] = False + + async def __register_home_assistant(self) -> None: + '''register all our topics at home assistant''' + for data_json, component, node_id, id in self.db.ha_confs( + self.entity_prfx, self.node_id, self.unique_id, + self.sug_area): + logger_mqtt.debug(f"MQTT Register: cmp:'{component}'" + f" node_id:'{node_id}' {data_json}") + await self.mqtt.publish(f"{self.discovery_prfx}{component}" + f"/{node_id}{id}/config", data_json) + + self.db.reg_clr_at_midnight(f'{self.entity_prfx}{self.node_id}') diff --git a/app/tests/test_inverter_g3.py b/app/tests/test_inverter_g3.py index b4bec04..f61c25d 100644 --- a/app/tests/test_inverter_g3.py +++ b/app/tests/test_inverter_g3.py @@ -115,7 +115,7 @@ def test_method_calls(patch_conn_init, patch_conn_close): inverter.r_addr = '' spy1.assert_called_once() - spy1.assert_called_once_with(reader, writer, addr, None) + spy1.assert_called_once_with(inverter, reader, writer, addr, None) inverter.close() spy2.assert_called_once() diff --git a/app/tests/test_inverter_g3p.py b/app/tests/test_inverter_g3p.py index 0a4946e..b60b50c 100644 --- a/app/tests/test_inverter_g3p.py +++ b/app/tests/test_inverter_g3p.py @@ -116,7 +116,7 @@ def test_method_calls(patch_conn_init, patch_conn_close): inverter.r_addr = '' spy1.assert_called_once() - spy1.assert_called_once_with(reader, writer, addr, None, client_mode=False) + spy1.assert_called_once_with(inverter, reader, writer, addr, None, client_mode=False) inverter.close() spy2.assert_called_once()