diff --git a/app/proxy.svg b/app/proxy.svg index b2ef8ba..c3857fb 100644 --- a/app/proxy.svg +++ b/app/proxy.svg @@ -4,579 +4,587 @@ - - + + G - + A0 - - - -You can stick notes -on diagrams too! + + + +You can stick notes +on diagrams too! A1 - -Mqtt -<<Singleton>> - -<static>ha_restarts -<static>__client -<static>__cb_MqttIsUp - -<async>publish() -<async>close() + +Mqtt +<<Singleton>> + +<static>ha_restarts +<static>__client +<static>__cb_MqttIsUp + +<async>publish() +<async>close() A2 - -Inverter - -cls.db_stat -cls.entity_prfx -cls.discovery_prfx -cls.proxy_node_id -cls.proxy_unique_id -cls.mqtt:Mqtt - + +Inverter + +cls.db_stat +cls.entity_prfx +cls.discovery_prfx +cls.proxy_node_id +cls.proxy_unique_id +cls.mqtt:Mqtt +__ha_restarts + +async_create_remote(inv_prot, conn_class)async_publ_mqtt() A1->A2 - - - + + + A3 - -InverterG3 - -__ha_restarts - -async_create_remote() -async_publ_mqtt() -close() + +InverterG3 + +addr +remote:StreamPtr +local:StreamPtr + +async_create_remote() +close() A2->A3 - - + + A4 - -InverterG3P - -__ha_restarts - -async_create_remote( -)async_publ_mqtt() -close() + +InverterG3P + +addr +remote:StreamPtr +local:StreamPtr + +async_create_remote() +close() A2->A4 - - + + A5 - -IterRegistry - - -__iter__ + +IterRegistry + + +__iter__ A6 - -Message - -server_side:bool -header_valid:bool -header_len:unsigned -data_len:unsigned -unique_id -node_id -sug_area -_recv_buffer:bytearray -_send_buffer:bytearray -_forward_buffer:bytearray -db:Infos -new_data:list -state - -_read():void<abstract> -close():void -inc_counter():void -dec_counter():void + +Message + +server_side:bool +header_valid:bool +header_len:unsigned +data_len:unsigned +unique_id +node_id +sug_area +_recv_buffer:bytearray +_send_buffer:bytearray +_forward_buffer:bytearray +db:Infos +new_data:list +state + +_read():void<abstract> +close():void +inc_counter():void +dec_counter():void A5->A6 - - + + A21 - -Talent - -await_conn_resp_cnt -id_str -contact_name -contact_mail -db:InfosG3 -mb:Modbus -switch - -msg_contact_info() -msg_ota_update() -msg_get_time() -msg_collector_data() -msg_inverter_data() -msg_unknown() -close() + +Talent + +await_conn_resp_cnt +id_str +contact_name +contact_mail +db:InfosG3 +mb:Modbus +switch + +msg_contact_info() +msg_ota_update() +msg_get_time() +msg_collector_data() +msg_inverter_data() +msg_unknown() +close() A6->A21 - - + + A22 - -SolarmanV5 - -control -serial -snr -db:InfosG3P -mb:Modbus -switch - -msg_unknown() -close() + +SolarmanV5 + +control +serial +snr +db:InfosG3P +mb:Modbus +switch + +msg_unknown() +close() A6->A22 - - + + A7 - -<<AsyncIfc>> - - -set_node_id() -get_conn_no() -tx_add() -tx_flush() -tx_get() -tx_peek() -tx_log() -tx_clear() -tx_len() -fwd_add() -fwd_flush() -fwd_log() -fwd_clear() -rx_get() -rx_peek() -rx_log() -rx_clear() -rx_len() -rx_set_cb() -prot_set_timeout_cb() + +<<AsyncIfc>> + + +set_node_id() +get_conn_no() +tx_add() +tx_flush() +tx_get() +tx_peek() +tx_log() +tx_clear() +tx_len() +fwd_add() +fwd_flush() +fwd_log() +fwd_clear() +rx_get() +rx_peek() +rx_log() +rx_clear() +rx_len() +rx_set_cb() +prot_set_timeout_cb() A8 - -AsyncIfcImpl - -fwd_fifo:ByteFifo -tx_fifo:ByteFifo -rx_fifo:ByteFifo -conn_no:Count -node_id -timeout_cb + +AsyncIfcImpl + +fwd_fifo:ByteFifo +tx_fifo:ByteFifo +rx_fifo:ByteFifo +conn_no:Count +node_id +timeout_cb A7->A8 - - + + A9 - -AsyncStream - -reader -writer -addr -r_addr -l_addr - -<async>loop -disc() -close() -healthy() -__async_read() -__async_write() -__async_forward() + +AsyncStream + +reader +writer +addr +r_addr +l_addr + +<async>loop +disc() +close() +healthy() +__async_read() +__async_write() +__async_forward() A8->A9 - - + + A10 - -AsyncStreamServer - -async_create_remote - -<async>server_loop() -<async>_async_forward() -<async>publish_outstanding_mqtt() -close() + +AsyncStreamServer + +async_create_remote + +<async>server_loop() +<async>_async_forward() +<async>publish_outstanding_mqtt() +close() A9->A10 - - + + A11 - -AsyncStreamClient - - -<async>client_loop() -<async>_async_forward()) + +AsyncStreamClient + + +<async>client_loop() +<async>_async_forward()) A9->A11 - - + + A12 - -ConnectionG3 - -remote.stream:ConnectionG3 - -healthy() + +ConnectionG3 + +remote.stream:ConnectionG3 + +healthy() A13 - -ConnectionG3Client - -remote.stream:ConnectionG3 - -close() + +ConnectionG3Client + +_ifc:AsyncStreamClient +conn_no +addr + +close() A12->A13 - - + + A14 - -ConnectionG3Server - -remote.stream:ConnectionG3 - -close() + +ConnectionG3Server + +_ifc:AsyncStreamServer +conn_no +addr + +close() A12->A14 - - + + + + + +A13->A3 + A13->A11 - - - -1 - - - -A13->A14 - - + + + +1 A14->A3 - - + A14->A10 - - - -1 + + + +1 A15 - -ConnectionG3P - -remote.stream:ConnectionG3P - -healthy() -close() + +ConnectionG3P + +remote.stream:ConnectionG3P + +healthy() +close() A16 - -ConnectionG3PClient - -remote.stream:ConnectionG3P - -close() + +ConnectionG3PClient + +_ifc:AsyncStreamClient +conn_no +addr + +close() A15->A16 - - + + A17 - -ConnectionG3PServer - -remote.stream:ConnectionG3P - -close() + +ConnectionG3PServer + +_ifc:AsyncStreamServer +conn_no +addr + +close() A15->A17 - - + + + + + +A16->A4 + A16->A11 - - - -1 - - - -A16->A17 - - + + + +1 A17->A4 - - + A17->A10 - - - -1 + + + +1 A18 - -Infos - -stat -new_stat_data -info_dev - -static_init() -dev_value() -inc_counter() -dec_counter() -ha_proxy_conf -ha_conf -ha_remove -update_db -set_db_def_value -get_db_value -ignore_this_device + +Infos + +stat +new_stat_data +info_dev + +static_init() +dev_value() +inc_counter() +dec_counter() +ha_proxy_conf +ha_conf +ha_remove +update_db +set_db_def_value +get_db_value +ignore_this_device A19 - -InfosG3 - - -ha_confs() -parse() + +InfosG3 + + +ha_confs() +parse() A18->A19 - - + + A20 - -InfosG3P - - -ha_confs() -parse() + +InfosG3P + + +ha_confs() +parse() A18->A20 - - + + A21->A7 - - -use + + +use A21->A12 - - + + A21->A19 - - + + A22->A7 - - -use + + +use A22->A15 - - + + A22->A20 - - + + A23 - -Modbus - -que -snd_handler -rsp_handler -timeout -max_retires -last_xxx -err -retry_cnt -req_pend -tim - -build_msg() -recv_req() -recv_resp() -close() + +Modbus + +que +snd_handler +rsp_handler +timeout +max_retires +last_xxx +err +retry_cnt +req_pend +tim + +build_msg() +recv_req() +recv_resp() +close() A23->A21 - - -has -1 + + +has +1 A23->A22 - - -has -1 + + +has +1 A24 - -ModbusConn - -host -port -addr -stream:InverterG3P - + +ModbusConn + +host +port +addr +stream:InverterG3P + A24->A4 - - -1 -has + + +1 +has diff --git a/app/proxy.yuml b/app/proxy.yuml index c750340..c19f5c6 100644 --- a/app/proxy.yuml +++ b/app/proxy.yuml @@ -5,9 +5,9 @@ [note: You can stick notes on diagrams too!{bg:cornsilk}] [Mqtt;<>|ha_restarts;__client;__cb_MqttIsUp|publish();close()] -[Inverter|cls.db_stat;cls.entity_prfx;cls.discovery_prfx;cls.proxy_node_id;cls.proxy_unique_id;cls.mqtt:Mqtt|] -[Inverter]^[InverterG3|__ha_restarts|async_create_remote();async_publ_mqtt();;close()] -[Inverter]^[InverterG3P|__ha_restarts|async_create_remote(;)async_publ_mqtt();close()] +[Inverter|cls.db_stat;cls.entity_prfx;cls.discovery_prfx;cls.proxy_node_id;cls.proxy_unique_id;cls.mqtt:Mqtt;;__ha_restarts|async_create_remote(inv_prot, conn_class)async_publ_mqtt()] +[Inverter]^[InverterG3|addr;remote:StreamPtr;local:StreamPtr|async_create_remote();;close()] +[Inverter]^[InverterG3P|addr;remote:StreamPtr;local:StreamPtr|async_create_remote();;close()] [Mqtt;<>]<-++[Inverter] [IterRegistry||__iter__]^[Message|server_side:bool;header_valid:bool;header_len:unsigned;data_len:unsigned;unique_id;node_id;sug_area;_recv_buffer:bytearray;_send_buffer:bytearray;_forward_buffer:bytearray;db:Infos;new_data:list;state|_read():void;close():void;inc_counter():void;dec_counter():void] @@ -24,23 +24,23 @@ [ConnectionG3|remote.stream:ConnectionG3|healthy()] -[ConnectionG3Client|remote.stream:ConnectionG3|close()] -[ConnectionG3Server|remote.stream:ConnectionG3|;close()] +[ConnectionG3Client|_ifc:AsyncStreamClient;conn_no;addr|close()] +[ConnectionG3Server|_ifc:AsyncStreamServer;conn_no;addr|;close()] [ConnectionG3]^[ConnectionG3Client] [ConnectionG3]^[ConnectionG3Server] -[ConnectionG3Client]<-[ConnectionG3Server] +[ConnectionG3Client]-[InverterG3] [ConnectionG3Client]++-1>[AsyncStreamClient] -[ConnectionG3Server]^[InverterG3] +[ConnectionG3Server]-[InverterG3] [ConnectionG3Server]++-1>[AsyncStreamServer] [ConnectionG3P|remote.stream:ConnectionG3P|healthy();close()] -[ConnectionG3PClient|remote.stream:ConnectionG3P|close()] -[ConnectionG3PServer|remote.stream:ConnectionG3P|;close()] +[ConnectionG3PClient|_ifc:AsyncStreamClient;conn_no;addr|close()] +[ConnectionG3PServer|_ifc:AsyncStreamServer;conn_no;addr|;close()] [ConnectionG3P]^[ConnectionG3PClient] [ConnectionG3P]^[ConnectionG3PServer] -[ConnectionG3PClient]<-[ConnectionG3PServer] +[ConnectionG3PClient]-[InverterG3P] [ConnectionG3PClient]++-1>[AsyncStreamClient] -[ConnectionG3PServer]^[InverterG3P] +[ConnectionG3PServer]-[InverterG3P] [ConnectionG3PServer]++-1>[AsyncStreamServer] [Infos|stat;new_stat_data;info_dev|static_init();dev_value();inc_counter();dec_counter();ha_proxy_conf;ha_conf;ha_remove;update_db;set_db_def_value;get_db_value;ignore_this_device] diff --git a/app/src/gen3/connection_g3.py b/app/src/gen3/connection_g3.py index 8e213b1..8229e3b 100644 --- a/app/src/gen3/connection_g3.py +++ b/app/src/gen3/connection_g3.py @@ -3,23 +3,19 @@ from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3.connection_g3": from app.src.async_stream import AsyncStreamServer - from app.src.async_stream import AsyncStreamClient, StreamPtr + from app.src.async_stream import AsyncStreamClient + from app.src.inverter import Inverter from app.src.gen3.talent import Talent else: # pragma: no cover from async_stream import AsyncStreamServer - from async_stream import AsyncStreamClient, StreamPtr + from async_stream import AsyncStreamClient + from inverter import Inverter from gen3.talent import Talent logger = logging.getLogger('conn') class ConnectionG3(Talent): - async def async_create_remote(self) -> None: - pass # virtual interface # pragma: no cover - - async def async_publ_mqtt(self) -> None: - pass # virtual interface # pragma: no cover - def healthy(self) -> bool: logger.debug('ConnectionG3 healthy()') return self._ifc.healthy() @@ -32,16 +28,15 @@ class ConnectionG3(Talent): class ConnectionG3Server(ConnectionG3): - def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, rstream: 'ConnectionG3Client', - id_str=b'') -> None: + def __init__(self, inverter: "Inverter", + reader: StreamReader, writer: StreamWriter, + addr, id_str=b'') -> None: server_side = True - self.remote = StreamPtr(rstream) self._ifc = AsyncStreamServer(reader, writer, - self.async_publ_mqtt, - self.async_create_remote, - self.remote) + inverter.async_publ_mqtt, + inverter.async_create_remote, + inverter.remote) self.conn_no = self._ifc.get_conn_no() self.addr = addr Talent.__init__(self, server_side, self._ifc, id_str) @@ -49,13 +44,12 @@ class ConnectionG3Server(ConnectionG3): class ConnectionG3Client(ConnectionG3): - def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, rstream: 'ConnectionG3Server', - id_str=b'') -> None: + def __init__(self, inverter: "Inverter", + reader: StreamReader, writer: StreamWriter, + addr, id_str=b'') -> None: server_side = False - self.remote = StreamPtr(rstream) self._ifc = AsyncStreamClient(reader, writer, - self.remote) + inverter.remote) self.conn_no = self._ifc.get_conn_no() self.addr = addr Talent.__init__(self, server_side, self._ifc, id_str) diff --git a/app/src/gen3/inverter_g3.py b/app/src/gen3/inverter_g3.py index 8b74716..0a4ae04 100644 --- a/app/src/gen3/inverter_g3.py +++ b/app/src/gen3/inverter_g3.py @@ -3,10 +3,12 @@ from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3.inverter_g3": from app.src.inverter import Inverter + from app.src.async_stream import StreamPtr from app.src.gen3.connection_g3 import ConnectionG3Server from app.src.gen3.connection_g3 import ConnectionG3Client else: # pragma: no cover from inverter import Inverter + from async_stream import StreamPtr from gen3.connection_g3 import ConnectionG3Server from gen3.connection_g3 import ConnectionG3Client @@ -14,40 +16,14 @@ else: # pragma: no cover logger_mqtt = logging.getLogger('mqtt') -class InverterG3(Inverter, ConnectionG3Server): - '''class Inverter is a derivation of an Async_Stream - - The class has some class method for managing common resources like a - connection to the MQTT broker or proxy error counter which are common - for all inverter connection - - Instances of the class are connections to an inverter and can have an - optional link to an remote connection to the TSUN cloud. A remote - connection dies with the inverter connection. - - class methods: - class_init(): initialize the common resources of the proxy (MQTT - broker, Proxy DB, etc). Must be called before the - first inverter instance can be created - class_close(): release the common resources of the proxy. Should not - be called before any instances of the class are - destroyed - - methods: - server_loop(addr): Async loop method for receiving messages from the - inverter (server-side) - client_loop(addr): Async loop method for receiving messages from the - TSUN cloud (client-side) - async_create_remote(): Establish a client connection to the TSUN cloud - async_publ_mqtt(): Publish data to MQTT broker - close(): Release method which must be called before a instance can be - destroyed - ''' - +class InverterG3(Inverter): def __init__(self, reader: StreamReader, writer: StreamWriter, addr): - Inverter.__init__(self) - ConnectionG3Server.__init__(self, reader, writer, addr, None) + super().__init__() self.addr = addr + self.remote = StreamPtr(None) + self.local = StreamPtr( + ConnectionG3Server(self, reader, writer, addr) + ) async def async_create_remote(self) -> None: await Inverter.async_create_remote( @@ -55,5 +31,5 @@ class InverterG3(Inverter, ConnectionG3Server): def close(self) -> None: logging.debug(f'InverterG3.close() {self.addr}') - ConnectionG3Server.close(self) + self.local.stream.close() # logging.info(f'Inverter refs: {gc.get_referrers(self)}') diff --git a/app/src/gen3plus/connection_g3p.py b/app/src/gen3plus/connection_g3p.py index d1b99f3..163bd89 100644 --- a/app/src/gen3plus/connection_g3p.py +++ b/app/src/gen3plus/connection_g3p.py @@ -3,23 +3,19 @@ from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3plus.connection_g3p": from app.src.async_stream import AsyncStreamServer - from app.src.async_stream import AsyncStreamClient, StreamPtr + from app.src.async_stream import AsyncStreamClient + from app.src.inverter import Inverter from app.src.gen3plus.solarman_v5 import SolarmanV5 else: # pragma: no cover from async_stream import AsyncStreamServer - from async_stream import AsyncStreamClient, StreamPtr + from async_stream import AsyncStreamClient + from inverter import Inverter from gen3plus.solarman_v5 import SolarmanV5 logger = logging.getLogger('conn') class ConnectionG3P(SolarmanV5): - async def async_create_remote(self) -> None: - pass # virtual interface # pragma: no cover - - async def async_publ_mqtt(self) -> None: - pass # virtual interface # pragma: no cover - def healthy(self) -> bool: logger.debug('ConnectionG3P healthy()') return self._ifc.healthy() @@ -32,16 +28,15 @@ class ConnectionG3P(SolarmanV5): class ConnectionG3PServer(ConnectionG3P): - def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, rstream: 'ConnectionG3PClient', - client_mode: bool) -> None: + def __init__(self, inverter: "Inverter", + reader: StreamReader, writer: StreamWriter, + addr, client_mode: bool) -> None: server_side = True - self.remote = StreamPtr(rstream) self._ifc = AsyncStreamServer(reader, writer, - self.async_publ_mqtt, - self.async_create_remote, - self.remote) + inverter.async_publ_mqtt, + inverter.async_create_remote, + inverter.remote) self.conn_no = self._ifc.get_conn_no() self.addr = addr SolarmanV5.__init__(self, server_side, client_mode, self._ifc) @@ -49,13 +44,13 @@ class ConnectionG3PServer(ConnectionG3P): class ConnectionG3PClient(ConnectionG3P): - def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, rstream: 'ConnectionG3PServer') -> None: + def __init__(self, inverter: "Inverter", + reader: StreamReader, writer: StreamWriter, + addr) -> None: server_side = False client_mode = False - self.remote = StreamPtr(rstream) - self._ifc = AsyncStreamClient(reader, writer, self.remote) + self._ifc = AsyncStreamClient(reader, writer, inverter.remote) self.conn_no = self._ifc.get_conn_no() self.addr = addr SolarmanV5.__init__(self, server_side, client_mode, self._ifc) diff --git a/app/src/gen3plus/inverter_g3p.py b/app/src/gen3plus/inverter_g3p.py index 0d15d92..2c6cb6d 100644 --- a/app/src/gen3plus/inverter_g3p.py +++ b/app/src/gen3plus/inverter_g3p.py @@ -3,10 +3,12 @@ from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3plus.inverter_g3p": from app.src.inverter import Inverter + from app.src.async_stream import StreamPtr from app.src.gen3plus.connection_g3p import ConnectionG3PServer from app.src.gen3plus.connection_g3p import ConnectionG3PClient else: # pragma: no cover from inverter import Inverter + from async_stream import StreamPtr from gen3plus.connection_g3p import ConnectionG3PServer from gen3plus.connection_g3p import ConnectionG3PClient @@ -14,42 +16,15 @@ else: # pragma: no cover logger_mqtt = logging.getLogger('mqtt') -class InverterG3P(Inverter, ConnectionG3PServer): - '''class Inverter is a derivation of an Async_Stream - - The class has some class method for managing common resources like a - connection to the MQTT broker or proxy error counter which are common - for all inverter connection - - Instances of the class are connections to an inverter and can have an - optional link to an remote connection to the TSUN cloud. A remote - connection dies with the inverter connection. - - class methods: - class_init(): initialize the common resources of the proxy (MQTT - broker, Proxy DB, etc). Must be called before the - first inverter instance can be created - class_close(): release the common resources of the proxy. Should not - be called before any instances of the class are - destroyed - - methods: - server_loop(addr): Async loop method for receiving messages from the - inverter (server-side) - client_loop(addr): Async loop method for receiving messages from the - TSUN cloud (client-side) - async_create_remote(): Establish a client connection to the TSUN cloud - async_publ_mqtt(): Publish data to MQTT broker - close(): Release method which must be called before a instance can be - destroyed - ''' - +class InverterG3P(Inverter): def __init__(self, reader: StreamReader, writer: StreamWriter, addr, client_mode: bool = False): - Inverter.__init__(self) - ConnectionG3PServer.__init__( - self, reader, writer, addr, None, client_mode=client_mode) + super().__init__() self.addr = addr + self.remote = StreamPtr(None) + self.local = StreamPtr( + ConnectionG3PServer(self, reader, writer, addr, client_mode) + ) async def async_create_remote(self) -> None: await Inverter.async_create_remote( @@ -57,5 +32,5 @@ class InverterG3P(Inverter, ConnectionG3PServer): def close(self) -> None: logging.debug(f'InverterG3P.close() {self.addr}') - ConnectionG3PServer.close(self) + self.local.stream.close() # logger.debug (f'Inverter refs: {gc.get_referrers(self)}') diff --git a/app/src/inverter.py b/app/src/inverter.py index 68b7996..2f53ef8 100644 --- a/app/src/inverter.py +++ b/app/src/inverter.py @@ -17,6 +17,28 @@ logger_mqtt = logging.getLogger('mqtt') class Inverter(): + '''class Inverter is a baseclass + + The class has some class method for managing common resources like a + connection to the MQTT broker or proxy error counter which are common + for all inverter connection + + Instances of the class are connections to an inverter and can have an + optional link to an remote connection to the TSUN cloud. A remote + connection dies with the inverter connection. + + class methods: + class_init(): initialize the common resources of the proxy (MQTT + broker, Proxy DB, etc). Must be called before the + first inverter instance can be created + class_close(): release the common resources of the proxy. Should not + be called before any instances of the class are + destroyed + + methods: + async_create_remote(): Establish a client connection to the TSUN cloud + async_publ_mqtt(): Publish data to MQTT broker + ''' @classmethod def class_init(cls) -> None: logging.debug('Inverter.class_init') @@ -94,17 +116,18 @@ class Inverter(): host = tsun['host'] port = tsun['port'] addr = (host, port) + stream = self.local.stream try: - logging.info(f'[{self.node_id}] Connect to {addr}') + logging.info(f'[{stream.node_id}] Connect to {addr}') connect = asyncio.open_connection(host, port) reader, writer = await connect - if hasattr(self, 'id_str'): + if hasattr(stream, 'id_str'): self.remote.stream = conn_class( - reader, writer, addr, self, self.id_str) + self, reader, writer, addr, stream.id_str) else: self.remote.stream = conn_class( - reader, writer, addr, self) + self, reader, writer, addr) logging.info(f'[{self.remote.stream.node_id}:' f'{self.remote.stream.conn_no}] ' @@ -114,56 +137,57 @@ class Inverter(): except (ConnectionRefusedError, TimeoutError) as error: logging.info(f'{error}') except Exception: - self.inc_counter('SW_Exception') + Infos.inc_counter('SW_Exception') logging.error( f"Inverter: Exception for {addr}:\n" f"{traceback.format_exc()}") async def async_publ_mqtt(self) -> None: '''publish data to MQTT broker''' - if not self.unique_id: + stream = self.local.stream + if not stream.unique_id: return # check if new inverter or collector infos are available or when the # home assistant has changed the status back to online try: - if (('inverter' in self.new_data and self.new_data['inverter']) - or ('collector' in self.new_data and - self.new_data['collector']) + if (('inverter' in stream.new_data and stream.new_data['inverter']) + or ('collector' in stream.new_data and + stream.new_data['collector']) or self.mqtt.ha_restarts != self.__ha_restarts): await self._register_proxy_stat_home_assistant() - await self.__register_home_assistant() + await self.__register_home_assistant(stream) self.__ha_restarts = self.mqtt.ha_restarts - for key in self.new_data: - await self.__async_publ_mqtt_packet(key) + for key in stream.new_data: + await self.__async_publ_mqtt_packet(stream, key) for key in Infos.new_stat_data: await Inverter._async_publ_mqtt_proxy_stat(key) except MqttCodeError as error: logging.error(f'Mqtt except: {error}') except Exception: - self.inc_counter('SW_Exception') + Infos.inc_counter('SW_Exception') logging.error( f"Inverter: Exception:\n" f"{traceback.format_exc()}") - async def __async_publ_mqtt_packet(self, key): - db = self.db.db - if key in db and self.new_data[key]: + async def __async_publ_mqtt_packet(self, stream, key): + db = stream.db.db + if key in db and stream.new_data[key]: data_json = json.dumps(db[key]) - node_id = self.node_id + node_id = stream.node_id logger_mqtt.debug(f'{key}: {data_json}') await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 - self.new_data[key] = False + stream.new_data[key] = False - async def __register_home_assistant(self) -> None: + async def __register_home_assistant(self, stream) -> None: '''register all our topics at home assistant''' - for data_json, component, node_id, id in self.db.ha_confs( - self.entity_prfx, self.node_id, self.unique_id, - self.sug_area): + for data_json, component, node_id, id in stream.db.ha_confs( + self.entity_prfx, stream.node_id, stream.unique_id, + stream.sug_area): logger_mqtt.debug(f"MQTT Register: cmp:'{component}'" f" node_id:'{node_id}' {data_json}") await self.mqtt.publish(f"{self.discovery_prfx}{component}" f"/{node_id}{id}/config", data_json) - self.db.reg_clr_at_midnight(f'{self.entity_prfx}{self.node_id}') + stream.db.reg_clr_at_midnight(f'{self.entity_prfx}{stream.node_id}') diff --git a/app/src/modbus_tcp.py b/app/src/modbus_tcp.py index ab7057f..da4bd03 100644 --- a/app/src/modbus_tcp.py +++ b/app/src/modbus_tcp.py @@ -19,24 +19,25 @@ class ModbusConn(): self.host = host self.port = port self.addr = (host, port) - self.stream = None + self.inverter = None async def __aenter__(self) -> 'InverterG3P': '''Establish a client connection to the TSUN cloud''' connection = asyncio.open_connection(self.host, self.port) reader, writer = await connection - self.stream = InverterG3P(reader, writer, self.addr, - client_mode=True) - logging.info(f'[{self.stream.node_id}:{self.stream.conn_no}] ' + self.inverter = InverterG3P(reader, writer, self.addr, + client_mode=True) + stream = self.inverter.local.stream + logging.info(f'[{stream.node_id}:{stream.conn_no}] ' f'Connected to {self.addr}') Infos.inc_counter('Inverter_Cnt') - await self.stream._ifc.publish_outstanding_mqtt() - return self.stream + await self.inverter.local._ifc.publish_outstanding_mqtt() + return self.inverter async def __aexit__(self, exc_type, exc, tb): Infos.dec_counter('Inverter_Cnt') - await self.stream._ifc.publish_outstanding_mqtt() - self.stream.close() + await self.inverter.local._ifc.publish_outstanding_mqtt() + self.inverter.close() class ModbusTcp(): @@ -61,7 +62,8 @@ class ModbusTcp(): '''Loop for receiving messages from the TSUN cloud (client-side)''' while True: try: - async with ModbusConn(host, port) as stream: + async with ModbusConn(host, port) as inverter: + stream = inverter.local.stream await stream.send_start_cmd(snr, host) await stream._ifc.loop() logger.info(f'[{stream.node_id}:{stream.conn_no}] ' diff --git a/app/tests/test_connection_g3.py b/app/tests/test_connection_g3.py index e182560..b215f18 100644 --- a/app/tests/test_connection_g3.py +++ b/app/tests/test_connection_g3.py @@ -4,10 +4,24 @@ import asyncio from itertools import count from mock import patch +from app.src.async_stream import StreamPtr from app.src.async_stream import AsyncStream, AsyncIfcImpl from app.src.gen3.connection_g3 import ConnectionG3Server from app.src.gen3.talent import Talent + +class FakeInverter(): + async def async_publ_mqtt(self) -> None: + pass # dummy funcion + + async def async_create_remote(self, inv_prot: str, conn_class) -> None: + pass # dummy function + + def __init__ (self): + self.remote = StreamPtr(None) + self.local = StreamPtr(None) + + @pytest.fixture def patch_async_init(): with patch.object(AsyncStream, '__init__') as conn: @@ -71,8 +85,8 @@ def test_method_calls(patch_talent_init, patch_healthy, patch_async_close, patch writer = FakeWriter() id_str = "id_string" addr = ('proxy.local', 10000) - conn = ConnectionG3Server(reader, writer, addr, - rstream= None, id_str=id_str) + conn = ConnectionG3Server(FakeInverter(), reader, writer, addr, + id_str=id_str) assert 5 == conn._ifc.get_conn_no() spy2.assert_called_once_with(conn, True, conn._ifc, id_str) conn.healthy() diff --git a/app/tests/test_connection_g3p.py b/app/tests/test_connection_g3p.py index 85ed802..d8a8769 100644 --- a/app/tests/test_connection_g3p.py +++ b/app/tests/test_connection_g3p.py @@ -5,10 +5,23 @@ import asyncio from itertools import count from mock import patch from app.src.singleton import Singleton -from app.src.async_stream import AsyncStream, AsyncIfcImpl +from app.src.async_stream import AsyncStream, AsyncIfcImpl, StreamPtr from app.src.gen3plus.connection_g3p import ConnectionG3PServer from app.src.gen3plus.solarman_v5 import SolarmanV5 + +class FakeInverter(): + async def async_publ_mqtt(self) -> None: + pass # dummy funcion + + async def async_create_remote(self, inv_prot: str, conn_class) -> None: + pass # dummy function + + def __init__ (self): + self.remote = StreamPtr(None) + self.local = StreamPtr(None) + + @pytest.fixture def patch_async_init(): with patch.object(AsyncStream, '__init__', return_value= None) as conn: @@ -76,8 +89,8 @@ def test_method_calls(patch_solarman_init, patch_healthy, patch_async_close, pat reader = FakeReader() writer = FakeWriter() addr = ('proxy.local', 10000) - conn = ConnectionG3PServer(reader, writer, addr, - rstream= None, client_mode=False) + conn = ConnectionG3PServer(FakeInverter(), reader, writer, addr, + client_mode=False) assert 5 == conn._ifc.get_conn_no() spy2.assert_called_once_with(conn, True, False, conn._ifc) conn.healthy() diff --git a/app/tests/test_inverter_g3.py b/app/tests/test_inverter_g3.py index f61c25d..9bbce5b 100644 --- a/app/tests/test_inverter_g3.py +++ b/app/tests/test_inverter_g3.py @@ -104,18 +104,14 @@ def patch_open_connection(): yield conn -def test_method_calls(patch_conn_init, patch_conn_close): - spy1 = patch_conn_init +def test_method_calls(patch_conn_close): spy2 = patch_conn_close reader = FakeReader() writer = FakeWriter() addr = ('proxy.local', 10000) inverter = InverterG3(reader, writer, addr) - inverter.l_addr = '' - inverter.r_addr = '' - - spy1.assert_called_once() - spy1.assert_called_once_with(inverter, reader, writer, addr, None) + assert inverter.local.stream + assert inverter.local.ifc inverter.close() spy2.assert_called_once() @@ -171,18 +167,19 @@ async def test_mqtt_publish(config_conn, patch_open_connection, patch_conn_close Inverter.class_init() inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) + stream = inverter.local.stream await inverter.async_publ_mqtt() # check call with invalid unique_id - inverter._Talent__set_serial_no(serial_no= "123344") + stream._Talent__set_serial_no(serial_no= "123344") - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == False + assert stream.new_data['inverter'] == False - inverter.new_data['env'] = True - inverter.db.db['env'] = {} + stream.new_data['env'] = True + stream.db.db['env'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['env'] == False + assert stream.new_data['env'] == False Infos.new_stat_data['proxy'] = True await inverter.async_publ_mqtt() @@ -203,12 +200,12 @@ async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err, patc Inverter.class_init() inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) - inverter._Talent__set_serial_no(serial_no= "123344") - - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream = inverter.local.stream + stream._Talent__set_serial_no(serial_no= "123344") + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == True + assert stream.new_data['inverter'] == True inverter.close() spy1.assert_called_once() @@ -225,12 +222,13 @@ async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except Inverter.class_init() inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) - inverter._Talent__set_serial_no(serial_no= "123344") + stream = inverter.local.stream + stream._Talent__set_serial_no(serial_no= "123344") - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == True + assert stream.new_data['inverter'] == True inverter.close() spy1.assert_called_once() diff --git a/app/tests/test_inverter_g3p.py b/app/tests/test_inverter_g3p.py index b60b50c..5785031 100644 --- a/app/tests/test_inverter_g3p.py +++ b/app/tests/test_inverter_g3p.py @@ -105,18 +105,14 @@ def patch_open_connection(): yield conn -def test_method_calls(patch_conn_init, patch_conn_close): - spy1 = patch_conn_init +def test_method_calls(patch_conn_close): spy2 = patch_conn_close reader = FakeReader() writer = FakeWriter() addr = ('proxy.local', 10000) inverter = InverterG3P(reader, writer, addr, client_mode=False) - inverter.l_addr = '' - inverter.r_addr = '' - - spy1.assert_called_once() - spy1.assert_called_once_with(inverter, reader, writer, addr, None, client_mode=False) + assert inverter.local.stream + assert inverter.local.ifc inverter.close() spy2.assert_called_once() @@ -172,18 +168,19 @@ async def test_mqtt_publish(config_conn, patch_open_connection, patch_conn_close Inverter.class_init() inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) + stream = inverter.local.stream await inverter.async_publ_mqtt() # check call with invalid unique_id - inverter._SolarmanV5__set_serial_no(snr= 123344) + stream._SolarmanV5__set_serial_no(snr= 123344) - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == False + assert stream.new_data['inverter'] == False - inverter.new_data['env'] = True - inverter.db.db['env'] = {} + stream.new_data['env'] = True + stream.db.db['env'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['env'] == False + assert stream.new_data['env'] == False Infos.new_stat_data['proxy'] = True await inverter.async_publ_mqtt() @@ -204,12 +201,12 @@ async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err, patc Inverter.class_init() inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) - inverter._SolarmanV5__set_serial_no(snr= 123344) - - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream = inverter.local.stream + stream._SolarmanV5__set_serial_no(snr= 123344) + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == True + assert stream.new_data['inverter'] == True inverter.close() spy1.assert_called_once() @@ -226,12 +223,13 @@ async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except Inverter.class_init() inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) - inverter._SolarmanV5__set_serial_no(snr= 123344) + stream = inverter.local.stream + stream._SolarmanV5__set_serial_no(snr= 123344) - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == True + assert stream.new_data['inverter'] == True inverter.close() spy1.assert_called_once() diff --git a/app/tests/test_modbus_tcp.py b/app/tests/test_modbus_tcp.py index 353a488..582e57c 100644 --- a/app/tests/test_modbus_tcp.py +++ b/app/tests/test_modbus_tcp.py @@ -150,7 +150,8 @@ async def test_modbus_conn(patch_open): _ = patch_open assert Infos.stat['proxy']['Inverter_Cnt'] == 0 - async with ModbusConn('test.local', 1234) as stream: + async with ModbusConn('test.local', 1234) as inverter: + stream = inverter.local.stream assert stream.node_id == 'G3P' assert stream.addr == ('test.local', 1234) assert type(stream._ifc._reader) is FakeReader