From 73526b7dc6f88f7709f6d671cafe4f3909159748 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Thu, 26 Sep 2024 23:04:11 +0200 Subject: [PATCH] split AsyncStream in two classes --- app/proxy.svg | 855 ++++++++++++++++------------- app/proxy.yuml | 13 +- app/src/async_stream.py | 190 ++++--- app/src/gen3/connection_g3.py | 18 +- app/src/gen3/inverter_g3.py | 8 +- app/src/gen3plus/connection_g3p.py | 19 +- app/src/gen3plus/inverter_g3p.py | 7 +- app/src/object_factory.py | 13 - app/tests/test_inverter_g3.py | 1 + app/tests/test_inverter_g3p.py | 1 + 10 files changed, 620 insertions(+), 505 deletions(-) delete mode 100644 app/src/object_factory.py diff --git a/app/proxy.svg b/app/proxy.svg index aa7a73b..b79e92e 100644 --- a/app/proxy.svg +++ b/app/proxy.svg @@ -4,455 +4,520 @@ - - + + G - + A0 - - - -You can stick notes -on diagrams too! + + + +You can stick notes +on diagrams too! A1 - -Mqtt -<<Singleton>> - -<static>ha_restarts -<static>__client -<static>__cb_MqttIsUp - -<async>publish() -<async>close() + +Mqtt +<<Singleton>> + +<static>ha_restarts +<static>__client +<static>__cb_MqttIsUp + +<async>publish() +<async>close() A2 - -Inverter - -cls.db_stat -cls.entity_prfx -cls.discovery_prfx -cls.proxy_node_id -cls.proxy_unique_id -cls.mqtt:Mqtt - + +Inverter + +cls.db_stat +cls.entity_prfx +cls.discovery_prfx +cls.proxy_node_id +cls.proxy_unique_id +cls.mqtt:Mqtt + A1->A2 - - - + + + A3 - -InverterG3 - -__ha_restarts - -async_create_remote() -async_publ_mqtt() -close() + +InverterG3 + +__ha_restarts + +async_create_remote() +async_publ_mqtt() +close() A2->A3 - - + + A4 - -InverterG3P - -__ha_restarts - -async_create_remote( -)async_publ_mqtt() -close() + +InverterG3P + +__ha_restarts + +async_create_remote( +)async_publ_mqtt() +close() A2->A4 - - + + A5 - -IterRegistry - - -__iter__ + +IterRegistry + + +__iter__ A6 - -Message - -server_side:bool -header_valid:bool -header_len:unsigned -data_len:unsigned -unique_id -node_id -sug_area -_recv_buffer:bytearray -_send_buffer:bytearray -_forward_buffer:bytearray -db:Infos -new_data:list -state - -_read():void<abstract> -close():void -inc_counter():void -dec_counter():void + +Message + +server_side:bool +header_valid:bool +header_len:unsigned +data_len:unsigned +unique_id +node_id +sug_area +_recv_buffer:bytearray +_send_buffer:bytearray +_forward_buffer:bytearray +db:Infos +new_data:list +state + +_read():void<abstract> +close():void +inc_counter():void +dec_counter():void A5->A6 - - - - - -A15 - -Talent - -await_conn_resp_cnt -id_str -contact_name -contact_mail -db:InfosG3 -mb:Modbus -switch - -msg_contact_info() -msg_ota_update() -msg_get_time() -msg_collector_data() -msg_inverter_data() -msg_unknown() -close() - - - -A6->A15 - - - - - -A16 - -SolarmanV5 - -control -serial -snr -db:InfosG3P -mb:Modbus -switch - -msg_unknown() -close() - - - -A6->A16 - - - - - -A7 - -<<AsyncIfc>> - - -set_node_id() -get_conn_no() -tx_add() -tx_flush() -tx_get() -tx_peek() -tx_log() -tx_clear() -tx_len() -fwd_add() -fwd_flush() -fwd_log() -fwd_clear() -rx_get() -rx_peek() -rx_log() -rx_clear() -rx_len() -rx_set_cb() -prot_set_timeout_cb() - - - -A8 - -AsyncIfcImpl - -fwd_fifo:ByteFifo -tx_fifo:ByteFifo -rx_fifo:ByteFifo -conn_no:Count -node_id -timeout_cb - - - -A7->A8 - - - - - -A9 - -AsyncStream - - - -A8->A9 - - - - - -A10 - -ConnectionG3 - -remote.stream:ConnectionG3 - -healthy() -close() - - - -A10->A3 - - - - - -A10->A9 - - - -1 - - - -A10->A10 - - -0..1 -has - - - -A11 - -ConnectionG3P - -remote.stream:ConnectionG3P - -healthy() -close() - - - -A11->A4 - - - - - -A11->A9 - - - -1 - - - -A11->A11 - - -0..1 -has - - - -A12 - -Infos - -stat -new_stat_data -info_dev - -static_init() -dev_value() -inc_counter() -dec_counter() -ha_proxy_conf -ha_conf -ha_remove -update_db -set_db_def_value -get_db_value -ignore_this_device - - - -A13 - -InfosG3 - - -ha_confs() -parse() - - - -A12->A13 - - - - - -A14 - -InfosG3P - - -ha_confs() -parse() - - - -A12->A14 - - - - - -A15->A7 - - -use - - - -A15->A10 - - - - - -A15->A13 - - - - - -A16->A7 - - -use - - - -A16->A11 - - - - - -A16->A14 - - + + A17 - -Modbus - -que -snd_handler -rsp_handler -timeout -max_retires -last_xxx -err -retry_cnt -req_pend -tim - -build_msg() -recv_req() -recv_resp() -close() + +Talent + +await_conn_resp_cnt +id_str +contact_name +contact_mail +db:InfosG3 +mb:Modbus +switch + +msg_contact_info() +msg_ota_update() +msg_get_time() +msg_collector_data() +msg_inverter_data() +msg_unknown() +close() - - -A17->A15 - - -has -1 - - - -A17->A16 - - -has -1 + + +A6->A17 + + A18 - -ModbusConn - -host -port -addr -stream:InverterG3P - + +SolarmanV5 + +control +serial +snr +db:InfosG3P +mb:Modbus +switch + +msg_unknown() +close() - - -A18->A4 - - -1 -has + + +A6->A18 + + + + + +A7 + +<<AsyncIfc>> + + +set_node_id() +get_conn_no() +tx_add() +tx_flush() +tx_get() +tx_peek() +tx_log() +tx_clear() +tx_len() +fwd_add() +fwd_flush() +fwd_log() +fwd_clear() +rx_get() +rx_peek() +rx_log() +rx_clear() +rx_len() +rx_set_cb() +prot_set_timeout_cb() + + + +A8 + +AsyncIfcImpl + +fwd_fifo:ByteFifo +tx_fifo:ByteFifo +rx_fifo:ByteFifo +conn_no:Count +node_id +timeout_cb + + + +A7->A8 + + + + + +A9 + +AsyncStream + +reader +writer +addr +r_addr +l_addr + +<async>loop +disc() +close() +healthy() +__async_read() +__async_write() +__async_forward() + + + +A8->A9 + + + + + +A10 + +AsyncStreamServer + +async_create_remote + +<async>server_loop() +<async>_async_forward() +<async>publish_outstanding_mqtt() +close() + + + +A9->A10 + + + + + +A11 + +AsyncStreamClient + + +<async>client_loop() +<async>_async_forward()) + + + +A9->A11 + + + + + +A12 + +ConnectionG3 + +remote.stream:ConnectionG3 + +healthy() +close() + + + +A12->A3 + + + + + +A12->A10 + + + +0..1 + + + +A12->A11 + + + +0..1 + + + +A12->A12 + + +0..1 +has + + + +A13 + +ConnectionG3P + +remote.stream:ConnectionG3P + +healthy() +close() + + + +A13->A4 + + + + + +A13->A10 + + + +0..1 + + + +A13->A11 + + + +0..1 + + + +A13->A13 + + +0..1 +has + + + +A14 + +Infos + +stat +new_stat_data +info_dev + +static_init() +dev_value() +inc_counter() +dec_counter() +ha_proxy_conf +ha_conf +ha_remove +update_db +set_db_def_value +get_db_value +ignore_this_device + + + +A15 + +InfosG3 + + +ha_confs() +parse() + + + +A14->A15 + + + + + +A16 + +InfosG3P + + +ha_confs() +parse() + + + +A14->A16 + + + + + +A17->A7 + + +use + + + +A17->A12 + + + + + +A17->A15 + + + + + +A18->A7 + + +use + + + +A18->A13 + + + + + +A18->A16 + + + + + +A19 + +Modbus + +que +snd_handler +rsp_handler +timeout +max_retires +last_xxx +err +retry_cnt +req_pend +tim + +build_msg() +recv_req() +recv_resp() +close() + + + +A19->A17 + + +has +1 + + + +A19->A18 + + +has +1 + + + +A20 + +ModbusConn + +host +port +addr +stream:InverterG3P + + + + +A20->A4 + + +1 +has diff --git a/app/proxy.yuml b/app/proxy.yuml index 9b1b136..b9f25e4 100644 --- a/app/proxy.yuml +++ b/app/proxy.yuml @@ -14,18 +14,25 @@ [<>||set_node_id();get_conn_no();;tx_add();tx_flush();tx_get();tx_peek();tx_log();tx_clear();tx_len();;fwd_add();fwd_flush();fwd_log();fwd_clear();rx_get();rx_peek();rx_log();rx_clear();rx_len();rx_set_cb();;prot_set_timeout_cb()] [AsyncIfcImpl|fwd_fifo:ByteFifo;tx_fifo:ByteFifo;rx_fifo:ByteFifo;conn_no:Count;node_id;timeout_cb] +[AsyncStream|reader;writer;addr;r_addr;l_addr|;loop;disc();close();healthy();;__async_read();__async_write();__async_forward()] +[AsyncStreamServer|async_create_remote|server_loop();_async_forward();publish_outstanding_mqtt();close()] +[AsyncStreamClient||client_loop();_async_forward())] [<>]^-.-[AsyncIfcImpl] [AsyncIfcImpl]<-[AsyncStream] -[AsyncStream|reader;writer;addr;r_addr;l_addr|server_loop();client_loop();loop;disc();close();;__async_read();async_write();__async_forward()] +[AsyncStream]<-[AsyncStreamServer] +[AsyncStream]<-[AsyncStreamClient] + [ConnectionG3|remote.stream:ConnectionG3|healthy();close()] [ConnectionG3]^[InverterG3] [ConnectionG3]has-0..1>[ConnectionG3] -[ConnectionG3]++-1>[AsyncStream] +[ConnectionG3]++-0..1>[AsyncStreamClient] +[ConnectionG3]++-0..1>[AsyncStreamServer] [ConnectionG3P|remote.stream:ConnectionG3P|healthy();close()] [ConnectionG3P]^[InverterG3P] [ConnectionG3P]has-0..1>[ConnectionG3P] -[ConnectionG3P]++-1>[AsyncStream] +[ConnectionG3P]++-0..1>[AsyncStreamClient] +[ConnectionG3P]++-0..1>[AsyncStreamServer] [Infos|stat;new_stat_data;info_dev|static_init();dev_value();inc_counter();dec_counter();ha_proxy_conf;ha_conf;ha_remove;update_db;set_db_def_value;get_db_value;ignore_this_device] [Infos]^[InfosG3||ha_confs();parse()] diff --git a/app/src/async_stream.py b/app/src/async_stream.py index 1e1ac84..a0c8e6a 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -7,10 +7,12 @@ from typing import Self from itertools import count if __name__ == "app.src.async_stream": + from app.src.inverter import Inverter from app.src.byte_fifo import ByteFifo from app.src.async_ifc import AsyncIfc from app.src.infos import Infos else: # pragma: no cover + from inverter import Inverter from byte_fifo import ByteFifo from async_ifc import AsyncIfc from infos import Infos @@ -156,15 +158,13 @@ class AsyncStream(AsyncIfcImpl): '''maximum default time without a received msg in sec''' def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, async_publ_mqtt, async_create_remote, - rstream: "StreamPtr") -> None: + addr, rstream: "StreamPtr") -> None: AsyncIfcImpl.__init__(self) logger.debug('AsyncStream.__init__') self.remote = rstream self.tx_fifo.reg_trigger(self.__write_cb) - self.async_create_remote = async_create_remote self._reader = reader self._writer = writer self.addr = addr @@ -172,7 +172,7 @@ class AsyncStream(AsyncIfcImpl): self.l_addr = '' self.proc_start = None # start processing start timestamp self.proc_max = 0 - self.async_publ_mqtt = async_publ_mqtt + self.async_publ_mqtt = None # will be set AsyncStreamServer only def __write_cb(self): self._writer.write(self.tx_fifo.get()) @@ -182,56 +182,6 @@ class AsyncStream(AsyncIfcImpl): return self.timeout_cb return 360 - async def publish_outstanding_mqtt(self): - '''Publish all outstanding MQTT topics''' - try: - if self.unique_id: - await self.async_publ_mqtt() - await self._async_publ_mqtt_proxy_stat('proxy') - except Exception: - pass - - async def server_loop(self, addr: str) -> None: - '''Loop for receiving messages from the inverter (server-side)''' - logger.info(f'[{self.node_id}:{self.conn_no}] ' - f'Accept connection from {addr}') - Infos.inc_counter('Inverter_Cnt') - await self.publish_outstanding_mqtt() - await self.loop() - Infos.dec_counter('Inverter_Cnt') - await self.publish_outstanding_mqtt() - logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for' - f' r{self.r_addr}') - - # if the server connection closes, we also have to disconnect - # the connection to te TSUN cloud - if self.remote.stream: - logger.info(f'[{self.node_id}:{self.conn_no}] disc client ' - f'connection: [{self.remote.ifc.node_id}:' - f'{self.remote.ifc.conn_no}]') - await self.remote.ifc.disc() - - async def client_loop(self, _: str) -> None: - '''Loop for receiving messages from the TSUN cloud (client-side)''' - client_stream = await self.remote.ifc.loop() - logger.info(f'[{client_stream.node_id}:{client_stream.conn_no}] ' - 'Client loop stopped for' - f' l{client_stream.l_addr}') - - # if the client connection closes, we don't touch the server - # connection. Instead we erase the client connection stream, - # thus on the next received packet from the inverter, we can - # establish a new connection to the TSUN cloud - - # erase backlink to inverter - client_stream.remote.stream = None - - if self.remote.stream == client_stream: - # logging.debug(f'Client l{client_stream.l_addr} refs:' - # f' {gc.get_referrers(client_stream)}') - # than erase client connection - self.remote.stream = None - async def loop(self) -> Self: """Async loop handler for precessing all received messages""" self.r_addr = self._writer.get_extra_info('peername') @@ -247,10 +197,10 @@ class AsyncStream(AsyncIfcImpl): await asyncio.wait_for(self.__async_read(), dead_conn_to) - # if self.unique_id: await self.__async_write() await self.__async_forward() - await self.async_publ_mqtt() + if self.async_publ_mqtt: + await self.async_publ_mqtt() except asyncio.TimeoutError: logger.warning(f'[{self.node_id}:{self.conn_no}] Dead ' @@ -282,13 +232,6 @@ class AsyncStream(AsyncIfcImpl): f"{traceback.format_exc()}") await asyncio.sleep(0) # be cooperative to other task - async def __async_write(self, headline: str = 'Transmit to ') -> None: - """Async write handler to transmit the send_buffer""" - if len(self.tx_fifo) > 0: - self.tx_fifo.logging(logging.INFO, f'{headline}{self.addr}:') - self._writer.write(self.tx_fifo.get()) - await self._writer.drain() - async def disc(self) -> None: """Async disc handler for graceful disconnect""" if self._writer.is_closing(): @@ -302,7 +245,6 @@ class AsyncStream(AsyncIfcImpl): hint: must be called before releasing the connection instance """ - self.async_create_remote = None super().close() self._reader.feed_eof() # abort awaited read if self._writer.is_closing(): @@ -337,23 +279,19 @@ class AsyncStream(AsyncIfcImpl): else: raise RuntimeError("Peer closed.") + async def __async_write(self, headline: str = 'Transmit to ') -> None: + """Async write handler to transmit the send_buffer""" + if len(self.tx_fifo) > 0: + self.tx_fifo.logging(logging.INFO, f'{headline}{self.addr}:') + self._writer.write(self.tx_fifo.get()) + await self._writer.drain() + async def __async_forward(self) -> None: """forward handler transmits data over the remote connection""" if len(self.fwd_fifo) == 0: return try: - if not self.remote.stream: - await self.async_create_remote() - if self.remote.stream: - if self.remote.ifc.init_new_client_conn_cb(): - await self.remote.ifc.__async_write() - - if self.remote.stream: - self.remote.ifc.update_header_cb(self.fwd_fifo.peek()) - self.fwd_fifo.logging(logging.INFO, 'Forward to ' - f'{self.remote.ifc.addr}:') - self.remote.ifc._writer.write(self.fwd_fifo.get()) - await self.remote.ifc._writer.drain() + await self._async_forward() except OSError as error: if self.remote.stream: @@ -382,3 +320,103 @@ class AsyncStream(AsyncIfcImpl): def __del__(self): logger.debug( f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}") + + +class AsyncStreamServer(AsyncStream): + def __init__(self, reader: StreamReader, writer: StreamWriter, + addr, async_publ_mqtt, async_create_remote, + rstream: "StreamPtr") -> None: + AsyncStream.__init__(self, reader, writer, addr, + rstream) + self.async_create_remote = async_create_remote + self.async_publ_mqtt = async_publ_mqtt + + async def server_loop(self, addr: str) -> None: + '''Loop for receiving messages from the inverter (server-side)''' + logger.info(f'[{self.node_id}:{self.conn_no}] ' + f'Accept connection from {addr}') + Infos.inc_counter('Inverter_Cnt') + await self.publish_outstanding_mqtt() + await self.loop() + Infos.dec_counter('Inverter_Cnt') + await self.publish_outstanding_mqtt() + logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for' + f' r{self.r_addr}') + + # if the server connection closes, we also have to disconnect + # the connection to te TSUN cloud + if self.remote.stream: + logger.info(f'[{self.node_id}:{self.conn_no}] disc client ' + f'connection: [{self.remote.ifc.node_id}:' + f'{self.remote.ifc.conn_no}]') + await self.remote.ifc.disc() + + async def _async_forward(self) -> None: + """forward handler transmits data over the remote connection""" + if not self.remote.stream: + await self.async_create_remote() + if self.remote.stream and \ + self.remote.ifc.init_new_client_conn_cb(): + await self.remote.ifc._AsyncStream__async_write() + if self.remote.stream: + self.remote.ifc.update_header_cb(self.fwd_fifo.peek()) + self.fwd_fifo.logging(logging.INFO, 'Forward to ' + f'{self.remote.ifc.addr}:') + self.remote.ifc._writer.write(self.fwd_fifo.get()) + await self.remote.ifc._writer.drain() + + async def publish_outstanding_mqtt(self): + '''Publish all outstanding MQTT topics''' + try: + await self.async_publ_mqtt() + await Inverter._async_publ_mqtt_proxy_stat('proxy') + except Exception: + pass + + def close(self) -> None: + """close handler for a no waiting disconnect + + hint: must be called before releasing the connection instance + """ + self.async_create_remote = None + self.async_publ_mqtt = None + super().close() + + +class AsyncStreamClient(AsyncStream): + def __init__(self, reader: StreamReader, writer: StreamWriter, + addr, rstream: "StreamPtr") -> None: + AsyncStream.__init__(self, reader, writer, addr, + rstream) + + async def client_loop(self, _: str) -> None: + '''Loop for receiving messages from the TSUN cloud (client-side)''' + await self.loop() + logger.info(f'[{self.node_id}:{self.conn_no}] ' + 'Client loop stopped for' + f' l{self.l_addr}') + + server_stream = self.remote.stream + + # if the client connection closes, we don't touch the server + # connection. Instead we erase the client connection stream, + # thus on the next received packet from the inverter, we can + # establish a new connection to the TSUN cloud + + if server_stream.remote.ifc == self: + # logging.debug(f'Client l{client_stream.l_addr} refs:' + # f' {gc.get_referrers(client_stream)}') + # than erase client connection + server_stream.remote.stream = None # erases stream and ifc link + + # erase backlink to inverter + self.remote.stream = None + + async def _async_forward(self) -> None: + """forward handler transmits data over the remote connection""" + if self.remote.stream: + self.remote.ifc.update_header_cb(self.fwd_fifo.peek()) + self.fwd_fifo.logging(logging.INFO, 'Forward to ' + f'{self.remote.ifc.addr}:') + self.remote.ifc._writer.write(self.fwd_fifo.get()) + await self.remote.ifc._writer.drain() diff --git a/app/src/gen3/connection_g3.py b/app/src/gen3/connection_g3.py index 8633cfc..933c35b 100644 --- a/app/src/gen3/connection_g3.py +++ b/app/src/gen3/connection_g3.py @@ -2,10 +2,12 @@ import logging from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3.connection_g3": - from app.src.async_stream import AsyncStream, StreamPtr + from app.src.async_stream import AsyncStreamServer + from app.src.async_stream import AsyncStreamClient, StreamPtr from app.src.gen3.talent import Talent else: # pragma: no cover - from async_stream import AsyncStream, StreamPtr + from async_stream import AsyncStreamServer + from async_stream import AsyncStreamClient, StreamPtr from gen3.talent import Talent logger = logging.getLogger('conn') @@ -17,10 +19,14 @@ class ConnectionG3(Talent): addr, rstream: 'ConnectionG3', server_side: bool, id_str=b'') -> None: self.remote = StreamPtr(rstream) - self._ifc = AsyncStream(reader, writer, addr, - self.async_publ_mqtt, - self.async_create_remote, - self.remote) + if server_side: + self._ifc = AsyncStreamServer(reader, writer, addr, + self.async_publ_mqtt, + self.async_create_remote, + self.remote) + else: + self._ifc = AsyncStreamClient(reader, writer, addr, + self.remote) Talent.__init__(self, server_side, self._ifc, id_str) self.conn_no = self._ifc.get_conn_no() diff --git a/app/src/gen3/inverter_g3.py b/app/src/gen3/inverter_g3.py index 7d21fff..48c70ad 100644 --- a/app/src/gen3/inverter_g3.py +++ b/app/src/gen3/inverter_g3.py @@ -71,7 +71,7 @@ class InverterG3(Inverter, ConnectionG3): logging.info(f'[{self.remote.stream.node_id}:' f'{self.remote.stream.conn_no}] ' f'Connected to {addr}') - asyncio.create_task(self._ifc.client_loop(addr)) + asyncio.create_task(self.remote.ifc.client_loop(addr)) except (ConnectionRefusedError, TimeoutError) as error: logging.info(f'{error}') @@ -83,6 +83,8 @@ class InverterG3(Inverter, ConnectionG3): async def async_publ_mqtt(self) -> None: '''publish data to MQTT broker''' + if not self.unique_id: + return # check if new inverter or collector infos are available or when the # home assistant has changed the status back to online try: @@ -97,7 +99,7 @@ class InverterG3(Inverter, ConnectionG3): for key in self.new_data: await self.__async_publ_mqtt_packet(key) for key in Infos.new_stat_data: - await self._async_publ_mqtt_proxy_stat(key) + await Inverter._async_publ_mqtt_proxy_stat(key) except MqttCodeError as error: logging.error(f'Mqtt except: {error}') @@ -118,8 +120,6 @@ class InverterG3(Inverter, ConnectionG3): async def __register_home_assistant(self) -> None: '''register all our topics at home assistant''' - if not self.unique_id: - return for data_json, component, node_id, id in self.db.ha_confs( self.entity_prfx, self.node_id, self.unique_id, self.sug_area): diff --git a/app/src/gen3plus/connection_g3p.py b/app/src/gen3plus/connection_g3p.py index a816c25..8e486ae 100644 --- a/app/src/gen3plus/connection_g3p.py +++ b/app/src/gen3plus/connection_g3p.py @@ -2,10 +2,12 @@ import logging from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3plus.connection_g3p": - from app.src.async_stream import AsyncStream, StreamPtr + from app.src.async_stream import AsyncStreamServer + from app.src.async_stream import AsyncStreamClient, StreamPtr from app.src.gen3plus.solarman_v5 import SolarmanV5 else: # pragma: no cover - from async_stream import AsyncStream, StreamPtr + from async_stream import AsyncStreamServer + from async_stream import AsyncStreamClient, StreamPtr from gen3plus.solarman_v5 import SolarmanV5 logger = logging.getLogger('conn') @@ -19,10 +21,15 @@ class ConnectionG3P(SolarmanV5): client_mode: bool) -> None: self.remote = StreamPtr(rstream) - self._ifc = AsyncStream(reader, writer, addr, - self.async_publ_mqtt, - self.async_create_remote, - self.remote) + if server_side: + self._ifc = AsyncStreamServer(reader, writer, addr, + self.async_publ_mqtt, + self.async_create_remote, + self.remote) + else: + self._ifc = AsyncStreamClient(reader, writer, addr, + self.remote) + SolarmanV5.__init__(self, server_side, client_mode, self._ifc) self.conn_no = self._ifc.get_conn_no() diff --git a/app/src/gen3plus/inverter_g3p.py b/app/src/gen3plus/inverter_g3p.py index 6be2f8c..be32fe1 100644 --- a/app/src/gen3plus/inverter_g3p.py +++ b/app/src/gen3plus/inverter_g3p.py @@ -74,7 +74,7 @@ class InverterG3P(Inverter, ConnectionG3P): logging.info(f'[{self.remote.stream.node_id}:' f'{self.remote.stream.conn_no}] ' f'Connected to {addr}') - asyncio.create_task(self._ifc.client_loop(addr)) + asyncio.create_task(self.remote.ifc.client_loop(addr)) except (ConnectionRefusedError, TimeoutError) as error: logging.info(f'{error}') @@ -86,6 +86,9 @@ class InverterG3P(Inverter, ConnectionG3P): async def async_publ_mqtt(self) -> None: '''publish data to MQTT broker''' + if not self.unique_id: + return + # check if new inverter or collector infos are available or when the # home assistant has changed the status back to online try: @@ -100,7 +103,7 @@ class InverterG3P(Inverter, ConnectionG3P): for key in self.new_data: await self.__async_publ_mqtt_packet(key) for key in Infos.new_stat_data: - await self._async_publ_mqtt_proxy_stat(key) + await Inverter._async_publ_mqtt_proxy_stat(key) except MqttCodeError as error: logging.error(f'Mqtt except: {error}') diff --git a/app/src/object_factory.py b/app/src/object_factory.py deleted file mode 100644 index 7dd20cb..0000000 --- a/app/src/object_factory.py +++ /dev/null @@ -1,13 +0,0 @@ - -class ObjectFactory: - def __init__(self): - self._builders = {} - - def register_builder(self, key, builder): - self._builders[key] = builder - - def create(self, key, **kwargs): - builder = self._builders.get(key) - if not builder: - raise ValueError(key) - return builder(**kwargs) diff --git a/app/tests/test_inverter_g3.py b/app/tests/test_inverter_g3.py index e87aeb9..762659a 100644 --- a/app/tests/test_inverter_g3.py +++ b/app/tests/test_inverter_g3.py @@ -171,6 +171,7 @@ async def test_mqtt_publish(config_conn, patch_open_connection, patch_conn_close Inverter.class_init() inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) + await inverter.async_publ_mqtt() # check call with invalid unique_id inverter._Talent__set_serial_no(serial_no= "123344") inverter.new_data['inverter'] = True diff --git a/app/tests/test_inverter_g3p.py b/app/tests/test_inverter_g3p.py index 1dd344c..26e50ae 100644 --- a/app/tests/test_inverter_g3p.py +++ b/app/tests/test_inverter_g3p.py @@ -172,6 +172,7 @@ async def test_mqtt_publish(config_conn, patch_open_connection, patch_conn_close Inverter.class_init() inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) + await inverter.async_publ_mqtt() # check call with invalid unique_id inverter._SolarmanV5__set_serial_no(snr= 123344) inverter.new_data['inverter'] = True