From 5a0ef30cebaa2a4471203a19be7bf4179cc7bc92 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Sun, 29 Sep 2024 15:31:14 +0200 Subject: [PATCH 1/2] move StremPtr instances into Inverter class --- app/proxy.svg | 706 +++++++++++++++-------------- app/proxy.yuml | 22 +- app/src/gen3/connection_g3.py | 34 +- app/src/gen3/inverter_g3.py | 42 +- app/src/gen3plus/connection_g3p.py | 33 +- app/src/gen3plus/inverter_g3p.py | 43 +- app/src/inverter.py | 70 ++- app/src/modbus_tcp.py | 20 +- app/tests/test_connection_g3.py | 18 +- app/tests/test_connection_g3p.py | 19 +- app/tests/test_inverter_g3.py | 44 +- app/tests/test_inverter_g3p.py | 44 +- app/tests/test_modbus_tcp.py | 3 +- 13 files changed, 548 insertions(+), 550 deletions(-) diff --git a/app/proxy.svg b/app/proxy.svg index b2ef8ba..c3857fb 100644 --- a/app/proxy.svg +++ b/app/proxy.svg @@ -4,579 +4,587 @@ - - + + G - + A0 - - - -You can stick notes -on diagrams too! + + + +You can stick notes +on diagrams too! A1 - -Mqtt -<<Singleton>> - -<static>ha_restarts -<static>__client -<static>__cb_MqttIsUp - -<async>publish() -<async>close() + +Mqtt +<<Singleton>> + +<static>ha_restarts +<static>__client +<static>__cb_MqttIsUp + +<async>publish() +<async>close() A2 - -Inverter - -cls.db_stat -cls.entity_prfx -cls.discovery_prfx -cls.proxy_node_id -cls.proxy_unique_id -cls.mqtt:Mqtt - + +Inverter + +cls.db_stat +cls.entity_prfx +cls.discovery_prfx +cls.proxy_node_id +cls.proxy_unique_id +cls.mqtt:Mqtt +__ha_restarts + +async_create_remote(inv_prot, conn_class)async_publ_mqtt() A1->A2 - - - + + + A3 - -InverterG3 - -__ha_restarts - -async_create_remote() -async_publ_mqtt() -close() + +InverterG3 + +addr +remote:StreamPtr +local:StreamPtr + +async_create_remote() +close() A2->A3 - - + + A4 - -InverterG3P - -__ha_restarts - -async_create_remote( -)async_publ_mqtt() -close() + +InverterG3P + +addr +remote:StreamPtr +local:StreamPtr + +async_create_remote() +close() A2->A4 - - + + A5 - -IterRegistry - - -__iter__ + +IterRegistry + + +__iter__ A6 - -Message - -server_side:bool -header_valid:bool -header_len:unsigned -data_len:unsigned -unique_id -node_id -sug_area -_recv_buffer:bytearray -_send_buffer:bytearray -_forward_buffer:bytearray -db:Infos -new_data:list -state - -_read():void<abstract> -close():void -inc_counter():void -dec_counter():void + +Message + +server_side:bool +header_valid:bool +header_len:unsigned +data_len:unsigned +unique_id +node_id +sug_area +_recv_buffer:bytearray +_send_buffer:bytearray +_forward_buffer:bytearray +db:Infos +new_data:list +state + +_read():void<abstract> +close():void +inc_counter():void +dec_counter():void A5->A6 - - + + A21 - -Talent - -await_conn_resp_cnt -id_str -contact_name -contact_mail -db:InfosG3 -mb:Modbus -switch - -msg_contact_info() -msg_ota_update() -msg_get_time() -msg_collector_data() -msg_inverter_data() -msg_unknown() -close() + +Talent + +await_conn_resp_cnt +id_str +contact_name +contact_mail +db:InfosG3 +mb:Modbus +switch + +msg_contact_info() +msg_ota_update() +msg_get_time() +msg_collector_data() +msg_inverter_data() +msg_unknown() +close() A6->A21 - - + + A22 - -SolarmanV5 - -control -serial -snr -db:InfosG3P -mb:Modbus -switch - -msg_unknown() -close() + +SolarmanV5 + +control +serial +snr +db:InfosG3P +mb:Modbus +switch + +msg_unknown() +close() A6->A22 - - + + A7 - -<<AsyncIfc>> - - -set_node_id() -get_conn_no() -tx_add() -tx_flush() -tx_get() -tx_peek() -tx_log() -tx_clear() -tx_len() -fwd_add() -fwd_flush() -fwd_log() -fwd_clear() -rx_get() -rx_peek() -rx_log() -rx_clear() -rx_len() -rx_set_cb() -prot_set_timeout_cb() + +<<AsyncIfc>> + + +set_node_id() +get_conn_no() +tx_add() +tx_flush() +tx_get() +tx_peek() +tx_log() +tx_clear() +tx_len() +fwd_add() +fwd_flush() +fwd_log() +fwd_clear() +rx_get() +rx_peek() +rx_log() +rx_clear() +rx_len() +rx_set_cb() +prot_set_timeout_cb() A8 - -AsyncIfcImpl - -fwd_fifo:ByteFifo -tx_fifo:ByteFifo -rx_fifo:ByteFifo -conn_no:Count -node_id -timeout_cb + +AsyncIfcImpl + +fwd_fifo:ByteFifo +tx_fifo:ByteFifo +rx_fifo:ByteFifo +conn_no:Count +node_id +timeout_cb A7->A8 - - + + A9 - -AsyncStream - -reader -writer -addr -r_addr -l_addr - -<async>loop -disc() -close() -healthy() -__async_read() -__async_write() -__async_forward() + +AsyncStream + +reader +writer +addr +r_addr +l_addr + +<async>loop +disc() +close() +healthy() +__async_read() +__async_write() +__async_forward() A8->A9 - - + + A10 - -AsyncStreamServer - -async_create_remote - -<async>server_loop() -<async>_async_forward() -<async>publish_outstanding_mqtt() -close() + +AsyncStreamServer + +async_create_remote + +<async>server_loop() +<async>_async_forward() +<async>publish_outstanding_mqtt() +close() A9->A10 - - + + A11 - -AsyncStreamClient - - -<async>client_loop() -<async>_async_forward()) + +AsyncStreamClient + + +<async>client_loop() +<async>_async_forward()) A9->A11 - - + + A12 - -ConnectionG3 - -remote.stream:ConnectionG3 - -healthy() + +ConnectionG3 + +remote.stream:ConnectionG3 + +healthy() A13 - -ConnectionG3Client - -remote.stream:ConnectionG3 - -close() + +ConnectionG3Client + +_ifc:AsyncStreamClient +conn_no +addr + +close() A12->A13 - - + + A14 - -ConnectionG3Server - -remote.stream:ConnectionG3 - -close() + +ConnectionG3Server + +_ifc:AsyncStreamServer +conn_no +addr + +close() A12->A14 - - + + + + + +A13->A3 + A13->A11 - - - -1 - - - -A13->A14 - - + + + +1 A14->A3 - - + A14->A10 - - - -1 + + + +1 A15 - -ConnectionG3P - -remote.stream:ConnectionG3P - -healthy() -close() + +ConnectionG3P + +remote.stream:ConnectionG3P + +healthy() +close() A16 - -ConnectionG3PClient - -remote.stream:ConnectionG3P - -close() + +ConnectionG3PClient + +_ifc:AsyncStreamClient +conn_no +addr + +close() A15->A16 - - + + A17 - -ConnectionG3PServer - -remote.stream:ConnectionG3P - -close() + +ConnectionG3PServer + +_ifc:AsyncStreamServer +conn_no +addr + +close() A15->A17 - - + + + + + +A16->A4 + A16->A11 - - - -1 - - - -A16->A17 - - + + + +1 A17->A4 - - + A17->A10 - - - -1 + + + +1 A18 - -Infos - -stat -new_stat_data -info_dev - -static_init() -dev_value() -inc_counter() -dec_counter() -ha_proxy_conf -ha_conf -ha_remove -update_db -set_db_def_value -get_db_value -ignore_this_device + +Infos + +stat +new_stat_data +info_dev + +static_init() +dev_value() +inc_counter() +dec_counter() +ha_proxy_conf +ha_conf +ha_remove +update_db +set_db_def_value +get_db_value +ignore_this_device A19 - -InfosG3 - - -ha_confs() -parse() + +InfosG3 + + +ha_confs() +parse() A18->A19 - - + + A20 - -InfosG3P - - -ha_confs() -parse() + +InfosG3P + + +ha_confs() +parse() A18->A20 - - + + A21->A7 - - -use + + +use A21->A12 - - + + A21->A19 - - + + A22->A7 - - -use + + +use A22->A15 - - + + A22->A20 - - + + A23 - -Modbus - -que -snd_handler -rsp_handler -timeout -max_retires -last_xxx -err -retry_cnt -req_pend -tim - -build_msg() -recv_req() -recv_resp() -close() + +Modbus + +que +snd_handler +rsp_handler +timeout +max_retires +last_xxx +err +retry_cnt +req_pend +tim + +build_msg() +recv_req() +recv_resp() +close() A23->A21 - - -has -1 + + +has +1 A23->A22 - - -has -1 + + +has +1 A24 - -ModbusConn - -host -port -addr -stream:InverterG3P - + +ModbusConn + +host +port +addr +stream:InverterG3P + A24->A4 - - -1 -has + + +1 +has diff --git a/app/proxy.yuml b/app/proxy.yuml index c750340..c19f5c6 100644 --- a/app/proxy.yuml +++ b/app/proxy.yuml @@ -5,9 +5,9 @@ [note: You can stick notes on diagrams too!{bg:cornsilk}] [Mqtt;<>|ha_restarts;__client;__cb_MqttIsUp|publish();close()] -[Inverter|cls.db_stat;cls.entity_prfx;cls.discovery_prfx;cls.proxy_node_id;cls.proxy_unique_id;cls.mqtt:Mqtt|] -[Inverter]^[InverterG3|__ha_restarts|async_create_remote();async_publ_mqtt();;close()] -[Inverter]^[InverterG3P|__ha_restarts|async_create_remote(;)async_publ_mqtt();close()] +[Inverter|cls.db_stat;cls.entity_prfx;cls.discovery_prfx;cls.proxy_node_id;cls.proxy_unique_id;cls.mqtt:Mqtt;;__ha_restarts|async_create_remote(inv_prot, conn_class)async_publ_mqtt()] +[Inverter]^[InverterG3|addr;remote:StreamPtr;local:StreamPtr|async_create_remote();;close()] +[Inverter]^[InverterG3P|addr;remote:StreamPtr;local:StreamPtr|async_create_remote();;close()] [Mqtt;<>]<-++[Inverter] [IterRegistry||__iter__]^[Message|server_side:bool;header_valid:bool;header_len:unsigned;data_len:unsigned;unique_id;node_id;sug_area;_recv_buffer:bytearray;_send_buffer:bytearray;_forward_buffer:bytearray;db:Infos;new_data:list;state|_read():void;close():void;inc_counter():void;dec_counter():void] @@ -24,23 +24,23 @@ [ConnectionG3|remote.stream:ConnectionG3|healthy()] -[ConnectionG3Client|remote.stream:ConnectionG3|close()] -[ConnectionG3Server|remote.stream:ConnectionG3|;close()] +[ConnectionG3Client|_ifc:AsyncStreamClient;conn_no;addr|close()] +[ConnectionG3Server|_ifc:AsyncStreamServer;conn_no;addr|;close()] [ConnectionG3]^[ConnectionG3Client] [ConnectionG3]^[ConnectionG3Server] -[ConnectionG3Client]<-[ConnectionG3Server] +[ConnectionG3Client]-[InverterG3] [ConnectionG3Client]++-1>[AsyncStreamClient] -[ConnectionG3Server]^[InverterG3] +[ConnectionG3Server]-[InverterG3] [ConnectionG3Server]++-1>[AsyncStreamServer] [ConnectionG3P|remote.stream:ConnectionG3P|healthy();close()] -[ConnectionG3PClient|remote.stream:ConnectionG3P|close()] -[ConnectionG3PServer|remote.stream:ConnectionG3P|;close()] +[ConnectionG3PClient|_ifc:AsyncStreamClient;conn_no;addr|close()] +[ConnectionG3PServer|_ifc:AsyncStreamServer;conn_no;addr|;close()] [ConnectionG3P]^[ConnectionG3PClient] [ConnectionG3P]^[ConnectionG3PServer] -[ConnectionG3PClient]<-[ConnectionG3PServer] +[ConnectionG3PClient]-[InverterG3P] [ConnectionG3PClient]++-1>[AsyncStreamClient] -[ConnectionG3PServer]^[InverterG3P] +[ConnectionG3PServer]-[InverterG3P] [ConnectionG3PServer]++-1>[AsyncStreamServer] [Infos|stat;new_stat_data;info_dev|static_init();dev_value();inc_counter();dec_counter();ha_proxy_conf;ha_conf;ha_remove;update_db;set_db_def_value;get_db_value;ignore_this_device] diff --git a/app/src/gen3/connection_g3.py b/app/src/gen3/connection_g3.py index 8e213b1..8229e3b 100644 --- a/app/src/gen3/connection_g3.py +++ b/app/src/gen3/connection_g3.py @@ -3,23 +3,19 @@ from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3.connection_g3": from app.src.async_stream import AsyncStreamServer - from app.src.async_stream import AsyncStreamClient, StreamPtr + from app.src.async_stream import AsyncStreamClient + from app.src.inverter import Inverter from app.src.gen3.talent import Talent else: # pragma: no cover from async_stream import AsyncStreamServer - from async_stream import AsyncStreamClient, StreamPtr + from async_stream import AsyncStreamClient + from inverter import Inverter from gen3.talent import Talent logger = logging.getLogger('conn') class ConnectionG3(Talent): - async def async_create_remote(self) -> None: - pass # virtual interface # pragma: no cover - - async def async_publ_mqtt(self) -> None: - pass # virtual interface # pragma: no cover - def healthy(self) -> bool: logger.debug('ConnectionG3 healthy()') return self._ifc.healthy() @@ -32,16 +28,15 @@ class ConnectionG3(Talent): class ConnectionG3Server(ConnectionG3): - def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, rstream: 'ConnectionG3Client', - id_str=b'') -> None: + def __init__(self, inverter: "Inverter", + reader: StreamReader, writer: StreamWriter, + addr, id_str=b'') -> None: server_side = True - self.remote = StreamPtr(rstream) self._ifc = AsyncStreamServer(reader, writer, - self.async_publ_mqtt, - self.async_create_remote, - self.remote) + inverter.async_publ_mqtt, + inverter.async_create_remote, + inverter.remote) self.conn_no = self._ifc.get_conn_no() self.addr = addr Talent.__init__(self, server_side, self._ifc, id_str) @@ -49,13 +44,12 @@ class ConnectionG3Server(ConnectionG3): class ConnectionG3Client(ConnectionG3): - def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, rstream: 'ConnectionG3Server', - id_str=b'') -> None: + def __init__(self, inverter: "Inverter", + reader: StreamReader, writer: StreamWriter, + addr, id_str=b'') -> None: server_side = False - self.remote = StreamPtr(rstream) self._ifc = AsyncStreamClient(reader, writer, - self.remote) + inverter.remote) self.conn_no = self._ifc.get_conn_no() self.addr = addr Talent.__init__(self, server_side, self._ifc, id_str) diff --git a/app/src/gen3/inverter_g3.py b/app/src/gen3/inverter_g3.py index 8b74716..0a4ae04 100644 --- a/app/src/gen3/inverter_g3.py +++ b/app/src/gen3/inverter_g3.py @@ -3,10 +3,12 @@ from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3.inverter_g3": from app.src.inverter import Inverter + from app.src.async_stream import StreamPtr from app.src.gen3.connection_g3 import ConnectionG3Server from app.src.gen3.connection_g3 import ConnectionG3Client else: # pragma: no cover from inverter import Inverter + from async_stream import StreamPtr from gen3.connection_g3 import ConnectionG3Server from gen3.connection_g3 import ConnectionG3Client @@ -14,40 +16,14 @@ else: # pragma: no cover logger_mqtt = logging.getLogger('mqtt') -class InverterG3(Inverter, ConnectionG3Server): - '''class Inverter is a derivation of an Async_Stream - - The class has some class method for managing common resources like a - connection to the MQTT broker or proxy error counter which are common - for all inverter connection - - Instances of the class are connections to an inverter and can have an - optional link to an remote connection to the TSUN cloud. A remote - connection dies with the inverter connection. - - class methods: - class_init(): initialize the common resources of the proxy (MQTT - broker, Proxy DB, etc). Must be called before the - first inverter instance can be created - class_close(): release the common resources of the proxy. Should not - be called before any instances of the class are - destroyed - - methods: - server_loop(addr): Async loop method for receiving messages from the - inverter (server-side) - client_loop(addr): Async loop method for receiving messages from the - TSUN cloud (client-side) - async_create_remote(): Establish a client connection to the TSUN cloud - async_publ_mqtt(): Publish data to MQTT broker - close(): Release method which must be called before a instance can be - destroyed - ''' - +class InverterG3(Inverter): def __init__(self, reader: StreamReader, writer: StreamWriter, addr): - Inverter.__init__(self) - ConnectionG3Server.__init__(self, reader, writer, addr, None) + super().__init__() self.addr = addr + self.remote = StreamPtr(None) + self.local = StreamPtr( + ConnectionG3Server(self, reader, writer, addr) + ) async def async_create_remote(self) -> None: await Inverter.async_create_remote( @@ -55,5 +31,5 @@ class InverterG3(Inverter, ConnectionG3Server): def close(self) -> None: logging.debug(f'InverterG3.close() {self.addr}') - ConnectionG3Server.close(self) + self.local.stream.close() # logging.info(f'Inverter refs: {gc.get_referrers(self)}') diff --git a/app/src/gen3plus/connection_g3p.py b/app/src/gen3plus/connection_g3p.py index d1b99f3..163bd89 100644 --- a/app/src/gen3plus/connection_g3p.py +++ b/app/src/gen3plus/connection_g3p.py @@ -3,23 +3,19 @@ from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3plus.connection_g3p": from app.src.async_stream import AsyncStreamServer - from app.src.async_stream import AsyncStreamClient, StreamPtr + from app.src.async_stream import AsyncStreamClient + from app.src.inverter import Inverter from app.src.gen3plus.solarman_v5 import SolarmanV5 else: # pragma: no cover from async_stream import AsyncStreamServer - from async_stream import AsyncStreamClient, StreamPtr + from async_stream import AsyncStreamClient + from inverter import Inverter from gen3plus.solarman_v5 import SolarmanV5 logger = logging.getLogger('conn') class ConnectionG3P(SolarmanV5): - async def async_create_remote(self) -> None: - pass # virtual interface # pragma: no cover - - async def async_publ_mqtt(self) -> None: - pass # virtual interface # pragma: no cover - def healthy(self) -> bool: logger.debug('ConnectionG3P healthy()') return self._ifc.healthy() @@ -32,16 +28,15 @@ class ConnectionG3P(SolarmanV5): class ConnectionG3PServer(ConnectionG3P): - def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, rstream: 'ConnectionG3PClient', - client_mode: bool) -> None: + def __init__(self, inverter: "Inverter", + reader: StreamReader, writer: StreamWriter, + addr, client_mode: bool) -> None: server_side = True - self.remote = StreamPtr(rstream) self._ifc = AsyncStreamServer(reader, writer, - self.async_publ_mqtt, - self.async_create_remote, - self.remote) + inverter.async_publ_mqtt, + inverter.async_create_remote, + inverter.remote) self.conn_no = self._ifc.get_conn_no() self.addr = addr SolarmanV5.__init__(self, server_side, client_mode, self._ifc) @@ -49,13 +44,13 @@ class ConnectionG3PServer(ConnectionG3P): class ConnectionG3PClient(ConnectionG3P): - def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, rstream: 'ConnectionG3PServer') -> None: + def __init__(self, inverter: "Inverter", + reader: StreamReader, writer: StreamWriter, + addr) -> None: server_side = False client_mode = False - self.remote = StreamPtr(rstream) - self._ifc = AsyncStreamClient(reader, writer, self.remote) + self._ifc = AsyncStreamClient(reader, writer, inverter.remote) self.conn_no = self._ifc.get_conn_no() self.addr = addr SolarmanV5.__init__(self, server_side, client_mode, self._ifc) diff --git a/app/src/gen3plus/inverter_g3p.py b/app/src/gen3plus/inverter_g3p.py index 0d15d92..2c6cb6d 100644 --- a/app/src/gen3plus/inverter_g3p.py +++ b/app/src/gen3plus/inverter_g3p.py @@ -3,10 +3,12 @@ from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3plus.inverter_g3p": from app.src.inverter import Inverter + from app.src.async_stream import StreamPtr from app.src.gen3plus.connection_g3p import ConnectionG3PServer from app.src.gen3plus.connection_g3p import ConnectionG3PClient else: # pragma: no cover from inverter import Inverter + from async_stream import StreamPtr from gen3plus.connection_g3p import ConnectionG3PServer from gen3plus.connection_g3p import ConnectionG3PClient @@ -14,42 +16,15 @@ else: # pragma: no cover logger_mqtt = logging.getLogger('mqtt') -class InverterG3P(Inverter, ConnectionG3PServer): - '''class Inverter is a derivation of an Async_Stream - - The class has some class method for managing common resources like a - connection to the MQTT broker or proxy error counter which are common - for all inverter connection - - Instances of the class are connections to an inverter and can have an - optional link to an remote connection to the TSUN cloud. A remote - connection dies with the inverter connection. - - class methods: - class_init(): initialize the common resources of the proxy (MQTT - broker, Proxy DB, etc). Must be called before the - first inverter instance can be created - class_close(): release the common resources of the proxy. Should not - be called before any instances of the class are - destroyed - - methods: - server_loop(addr): Async loop method for receiving messages from the - inverter (server-side) - client_loop(addr): Async loop method for receiving messages from the - TSUN cloud (client-side) - async_create_remote(): Establish a client connection to the TSUN cloud - async_publ_mqtt(): Publish data to MQTT broker - close(): Release method which must be called before a instance can be - destroyed - ''' - +class InverterG3P(Inverter): def __init__(self, reader: StreamReader, writer: StreamWriter, addr, client_mode: bool = False): - Inverter.__init__(self) - ConnectionG3PServer.__init__( - self, reader, writer, addr, None, client_mode=client_mode) + super().__init__() self.addr = addr + self.remote = StreamPtr(None) + self.local = StreamPtr( + ConnectionG3PServer(self, reader, writer, addr, client_mode) + ) async def async_create_remote(self) -> None: await Inverter.async_create_remote( @@ -57,5 +32,5 @@ class InverterG3P(Inverter, ConnectionG3PServer): def close(self) -> None: logging.debug(f'InverterG3P.close() {self.addr}') - ConnectionG3PServer.close(self) + self.local.stream.close() # logger.debug (f'Inverter refs: {gc.get_referrers(self)}') diff --git a/app/src/inverter.py b/app/src/inverter.py index 68b7996..2f53ef8 100644 --- a/app/src/inverter.py +++ b/app/src/inverter.py @@ -17,6 +17,28 @@ logger_mqtt = logging.getLogger('mqtt') class Inverter(): + '''class Inverter is a baseclass + + The class has some class method for managing common resources like a + connection to the MQTT broker or proxy error counter which are common + for all inverter connection + + Instances of the class are connections to an inverter and can have an + optional link to an remote connection to the TSUN cloud. A remote + connection dies with the inverter connection. + + class methods: + class_init(): initialize the common resources of the proxy (MQTT + broker, Proxy DB, etc). Must be called before the + first inverter instance can be created + class_close(): release the common resources of the proxy. Should not + be called before any instances of the class are + destroyed + + methods: + async_create_remote(): Establish a client connection to the TSUN cloud + async_publ_mqtt(): Publish data to MQTT broker + ''' @classmethod def class_init(cls) -> None: logging.debug('Inverter.class_init') @@ -94,17 +116,18 @@ class Inverter(): host = tsun['host'] port = tsun['port'] addr = (host, port) + stream = self.local.stream try: - logging.info(f'[{self.node_id}] Connect to {addr}') + logging.info(f'[{stream.node_id}] Connect to {addr}') connect = asyncio.open_connection(host, port) reader, writer = await connect - if hasattr(self, 'id_str'): + if hasattr(stream, 'id_str'): self.remote.stream = conn_class( - reader, writer, addr, self, self.id_str) + self, reader, writer, addr, stream.id_str) else: self.remote.stream = conn_class( - reader, writer, addr, self) + self, reader, writer, addr) logging.info(f'[{self.remote.stream.node_id}:' f'{self.remote.stream.conn_no}] ' @@ -114,56 +137,57 @@ class Inverter(): except (ConnectionRefusedError, TimeoutError) as error: logging.info(f'{error}') except Exception: - self.inc_counter('SW_Exception') + Infos.inc_counter('SW_Exception') logging.error( f"Inverter: Exception for {addr}:\n" f"{traceback.format_exc()}") async def async_publ_mqtt(self) -> None: '''publish data to MQTT broker''' - if not self.unique_id: + stream = self.local.stream + if not stream.unique_id: return # check if new inverter or collector infos are available or when the # home assistant has changed the status back to online try: - if (('inverter' in self.new_data and self.new_data['inverter']) - or ('collector' in self.new_data and - self.new_data['collector']) + if (('inverter' in stream.new_data and stream.new_data['inverter']) + or ('collector' in stream.new_data and + stream.new_data['collector']) or self.mqtt.ha_restarts != self.__ha_restarts): await self._register_proxy_stat_home_assistant() - await self.__register_home_assistant() + await self.__register_home_assistant(stream) self.__ha_restarts = self.mqtt.ha_restarts - for key in self.new_data: - await self.__async_publ_mqtt_packet(key) + for key in stream.new_data: + await self.__async_publ_mqtt_packet(stream, key) for key in Infos.new_stat_data: await Inverter._async_publ_mqtt_proxy_stat(key) except MqttCodeError as error: logging.error(f'Mqtt except: {error}') except Exception: - self.inc_counter('SW_Exception') + Infos.inc_counter('SW_Exception') logging.error( f"Inverter: Exception:\n" f"{traceback.format_exc()}") - async def __async_publ_mqtt_packet(self, key): - db = self.db.db - if key in db and self.new_data[key]: + async def __async_publ_mqtt_packet(self, stream, key): + db = stream.db.db + if key in db and stream.new_data[key]: data_json = json.dumps(db[key]) - node_id = self.node_id + node_id = stream.node_id logger_mqtt.debug(f'{key}: {data_json}') await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 - self.new_data[key] = False + stream.new_data[key] = False - async def __register_home_assistant(self) -> None: + async def __register_home_assistant(self, stream) -> None: '''register all our topics at home assistant''' - for data_json, component, node_id, id in self.db.ha_confs( - self.entity_prfx, self.node_id, self.unique_id, - self.sug_area): + for data_json, component, node_id, id in stream.db.ha_confs( + self.entity_prfx, stream.node_id, stream.unique_id, + stream.sug_area): logger_mqtt.debug(f"MQTT Register: cmp:'{component}'" f" node_id:'{node_id}' {data_json}") await self.mqtt.publish(f"{self.discovery_prfx}{component}" f"/{node_id}{id}/config", data_json) - self.db.reg_clr_at_midnight(f'{self.entity_prfx}{self.node_id}') + stream.db.reg_clr_at_midnight(f'{self.entity_prfx}{stream.node_id}') diff --git a/app/src/modbus_tcp.py b/app/src/modbus_tcp.py index ab7057f..da4bd03 100644 --- a/app/src/modbus_tcp.py +++ b/app/src/modbus_tcp.py @@ -19,24 +19,25 @@ class ModbusConn(): self.host = host self.port = port self.addr = (host, port) - self.stream = None + self.inverter = None async def __aenter__(self) -> 'InverterG3P': '''Establish a client connection to the TSUN cloud''' connection = asyncio.open_connection(self.host, self.port) reader, writer = await connection - self.stream = InverterG3P(reader, writer, self.addr, - client_mode=True) - logging.info(f'[{self.stream.node_id}:{self.stream.conn_no}] ' + self.inverter = InverterG3P(reader, writer, self.addr, + client_mode=True) + stream = self.inverter.local.stream + logging.info(f'[{stream.node_id}:{stream.conn_no}] ' f'Connected to {self.addr}') Infos.inc_counter('Inverter_Cnt') - await self.stream._ifc.publish_outstanding_mqtt() - return self.stream + await self.inverter.local._ifc.publish_outstanding_mqtt() + return self.inverter async def __aexit__(self, exc_type, exc, tb): Infos.dec_counter('Inverter_Cnt') - await self.stream._ifc.publish_outstanding_mqtt() - self.stream.close() + await self.inverter.local._ifc.publish_outstanding_mqtt() + self.inverter.close() class ModbusTcp(): @@ -61,7 +62,8 @@ class ModbusTcp(): '''Loop for receiving messages from the TSUN cloud (client-side)''' while True: try: - async with ModbusConn(host, port) as stream: + async with ModbusConn(host, port) as inverter: + stream = inverter.local.stream await stream.send_start_cmd(snr, host) await stream._ifc.loop() logger.info(f'[{stream.node_id}:{stream.conn_no}] ' diff --git a/app/tests/test_connection_g3.py b/app/tests/test_connection_g3.py index e182560..b215f18 100644 --- a/app/tests/test_connection_g3.py +++ b/app/tests/test_connection_g3.py @@ -4,10 +4,24 @@ import asyncio from itertools import count from mock import patch +from app.src.async_stream import StreamPtr from app.src.async_stream import AsyncStream, AsyncIfcImpl from app.src.gen3.connection_g3 import ConnectionG3Server from app.src.gen3.talent import Talent + +class FakeInverter(): + async def async_publ_mqtt(self) -> None: + pass # dummy funcion + + async def async_create_remote(self, inv_prot: str, conn_class) -> None: + pass # dummy function + + def __init__ (self): + self.remote = StreamPtr(None) + self.local = StreamPtr(None) + + @pytest.fixture def patch_async_init(): with patch.object(AsyncStream, '__init__') as conn: @@ -71,8 +85,8 @@ def test_method_calls(patch_talent_init, patch_healthy, patch_async_close, patch writer = FakeWriter() id_str = "id_string" addr = ('proxy.local', 10000) - conn = ConnectionG3Server(reader, writer, addr, - rstream= None, id_str=id_str) + conn = ConnectionG3Server(FakeInverter(), reader, writer, addr, + id_str=id_str) assert 5 == conn._ifc.get_conn_no() spy2.assert_called_once_with(conn, True, conn._ifc, id_str) conn.healthy() diff --git a/app/tests/test_connection_g3p.py b/app/tests/test_connection_g3p.py index 85ed802..d8a8769 100644 --- a/app/tests/test_connection_g3p.py +++ b/app/tests/test_connection_g3p.py @@ -5,10 +5,23 @@ import asyncio from itertools import count from mock import patch from app.src.singleton import Singleton -from app.src.async_stream import AsyncStream, AsyncIfcImpl +from app.src.async_stream import AsyncStream, AsyncIfcImpl, StreamPtr from app.src.gen3plus.connection_g3p import ConnectionG3PServer from app.src.gen3plus.solarman_v5 import SolarmanV5 + +class FakeInverter(): + async def async_publ_mqtt(self) -> None: + pass # dummy funcion + + async def async_create_remote(self, inv_prot: str, conn_class) -> None: + pass # dummy function + + def __init__ (self): + self.remote = StreamPtr(None) + self.local = StreamPtr(None) + + @pytest.fixture def patch_async_init(): with patch.object(AsyncStream, '__init__', return_value= None) as conn: @@ -76,8 +89,8 @@ def test_method_calls(patch_solarman_init, patch_healthy, patch_async_close, pat reader = FakeReader() writer = FakeWriter() addr = ('proxy.local', 10000) - conn = ConnectionG3PServer(reader, writer, addr, - rstream= None, client_mode=False) + conn = ConnectionG3PServer(FakeInverter(), reader, writer, addr, + client_mode=False) assert 5 == conn._ifc.get_conn_no() spy2.assert_called_once_with(conn, True, False, conn._ifc) conn.healthy() diff --git a/app/tests/test_inverter_g3.py b/app/tests/test_inverter_g3.py index f61c25d..9bbce5b 100644 --- a/app/tests/test_inverter_g3.py +++ b/app/tests/test_inverter_g3.py @@ -104,18 +104,14 @@ def patch_open_connection(): yield conn -def test_method_calls(patch_conn_init, patch_conn_close): - spy1 = patch_conn_init +def test_method_calls(patch_conn_close): spy2 = patch_conn_close reader = FakeReader() writer = FakeWriter() addr = ('proxy.local', 10000) inverter = InverterG3(reader, writer, addr) - inverter.l_addr = '' - inverter.r_addr = '' - - spy1.assert_called_once() - spy1.assert_called_once_with(inverter, reader, writer, addr, None) + assert inverter.local.stream + assert inverter.local.ifc inverter.close() spy2.assert_called_once() @@ -171,18 +167,19 @@ async def test_mqtt_publish(config_conn, patch_open_connection, patch_conn_close Inverter.class_init() inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) + stream = inverter.local.stream await inverter.async_publ_mqtt() # check call with invalid unique_id - inverter._Talent__set_serial_no(serial_no= "123344") + stream._Talent__set_serial_no(serial_no= "123344") - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == False + assert stream.new_data['inverter'] == False - inverter.new_data['env'] = True - inverter.db.db['env'] = {} + stream.new_data['env'] = True + stream.db.db['env'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['env'] == False + assert stream.new_data['env'] == False Infos.new_stat_data['proxy'] = True await inverter.async_publ_mqtt() @@ -203,12 +200,12 @@ async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err, patc Inverter.class_init() inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) - inverter._Talent__set_serial_no(serial_no= "123344") - - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream = inverter.local.stream + stream._Talent__set_serial_no(serial_no= "123344") + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == True + assert stream.new_data['inverter'] == True inverter.close() spy1.assert_called_once() @@ -225,12 +222,13 @@ async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except Inverter.class_init() inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) - inverter._Talent__set_serial_no(serial_no= "123344") + stream = inverter.local.stream + stream._Talent__set_serial_no(serial_no= "123344") - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == True + assert stream.new_data['inverter'] == True inverter.close() spy1.assert_called_once() diff --git a/app/tests/test_inverter_g3p.py b/app/tests/test_inverter_g3p.py index b60b50c..5785031 100644 --- a/app/tests/test_inverter_g3p.py +++ b/app/tests/test_inverter_g3p.py @@ -105,18 +105,14 @@ def patch_open_connection(): yield conn -def test_method_calls(patch_conn_init, patch_conn_close): - spy1 = patch_conn_init +def test_method_calls(patch_conn_close): spy2 = patch_conn_close reader = FakeReader() writer = FakeWriter() addr = ('proxy.local', 10000) inverter = InverterG3P(reader, writer, addr, client_mode=False) - inverter.l_addr = '' - inverter.r_addr = '' - - spy1.assert_called_once() - spy1.assert_called_once_with(inverter, reader, writer, addr, None, client_mode=False) + assert inverter.local.stream + assert inverter.local.ifc inverter.close() spy2.assert_called_once() @@ -172,18 +168,19 @@ async def test_mqtt_publish(config_conn, patch_open_connection, patch_conn_close Inverter.class_init() inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) + stream = inverter.local.stream await inverter.async_publ_mqtt() # check call with invalid unique_id - inverter._SolarmanV5__set_serial_no(snr= 123344) + stream._SolarmanV5__set_serial_no(snr= 123344) - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == False + assert stream.new_data['inverter'] == False - inverter.new_data['env'] = True - inverter.db.db['env'] = {} + stream.new_data['env'] = True + stream.db.db['env'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['env'] == False + assert stream.new_data['env'] == False Infos.new_stat_data['proxy'] = True await inverter.async_publ_mqtt() @@ -204,12 +201,12 @@ async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err, patc Inverter.class_init() inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) - inverter._SolarmanV5__set_serial_no(snr= 123344) - - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream = inverter.local.stream + stream._SolarmanV5__set_serial_no(snr= 123344) + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == True + assert stream.new_data['inverter'] == True inverter.close() spy1.assert_called_once() @@ -226,12 +223,13 @@ async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except Inverter.class_init() inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) - inverter._SolarmanV5__set_serial_no(snr= 123344) + stream = inverter.local.stream + stream._SolarmanV5__set_serial_no(snr= 123344) - inverter.new_data['inverter'] = True - inverter.db.db['inverter'] = {} + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() - assert inverter.new_data['inverter'] == True + assert stream.new_data['inverter'] == True inverter.close() spy1.assert_called_once() diff --git a/app/tests/test_modbus_tcp.py b/app/tests/test_modbus_tcp.py index 353a488..582e57c 100644 --- a/app/tests/test_modbus_tcp.py +++ b/app/tests/test_modbus_tcp.py @@ -150,7 +150,8 @@ async def test_modbus_conn(patch_open): _ = patch_open assert Infos.stat['proxy']['Inverter_Cnt'] == 0 - async with ModbusConn('test.local', 1234) as stream: + async with ModbusConn('test.local', 1234) as inverter: + stream = inverter.local.stream assert stream.node_id == 'G3P' assert stream.addr == ('test.local', 1234) assert type(stream._ifc._reader) is FakeReader From 41aeac41688bdf69e82cb54a93bd8363dd3afbe0 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Sun, 29 Sep 2024 20:08:04 +0200 Subject: [PATCH 2/2] resolution of connection classes - remove ConnectionG3Client - remove ConnectionG3Server - remove ConnectionG3PClient - remove ConnectionG3PServer --- app/proxy.svg | 965 ++++++++++++++--------------- app/proxy.yuml | 32 +- app/src/gen3/connection_g3.py | 45 +- app/src/gen3/inverter_g3.py | 26 +- app/src/gen3/talent.py | 9 +- app/src/gen3plus/connection_g3p.py | 46 +- app/src/gen3plus/inverter_g3p.py | 25 +- app/src/gen3plus/solarman_v5.py | 10 +- app/src/inverter.py | 87 --- app/src/inverter_base.py | 108 ++++ app/src/modbus_tcp.py | 6 +- app/tests/test_connection_g3.py | 22 +- app/tests/test_connection_g3p.py | 22 +- app/tests/test_inverter_g3.py | 6 +- app/tests/test_inverter_g3p.py | 6 +- app/tests/test_modbus_tcp.py | 14 +- app/tests/test_mqtt.py | 6 +- app/tests/test_solarman.py | 10 +- app/tests/test_talent.py | 8 +- 19 files changed, 679 insertions(+), 774 deletions(-) create mode 100644 app/src/inverter_base.py diff --git a/app/proxy.svg b/app/proxy.svg index c3857fb..dea173f 100644 --- a/app/proxy.svg +++ b/app/proxy.svg @@ -4,587 +4,526 @@ - - + + G - + A0 - - - -You can stick notes -on diagrams too! + + + +You can stick notes +on diagrams too! A1 - -Mqtt -<<Singleton>> - -<static>ha_restarts -<static>__client -<static>__cb_MqttIsUp - -<async>publish() -<async>close() + +Mqtt +<<Singleton>> + +<static>ha_restarts +<static>__client +<static>__cb_MqttIsUp + +<async>publish() +<async>close() A2 - -Inverter - -cls.db_stat -cls.entity_prfx -cls.discovery_prfx -cls.proxy_node_id -cls.proxy_unique_id -cls.mqtt:Mqtt -__ha_restarts - -async_create_remote(inv_prot, conn_class)async_publ_mqtt() + +Inverter + +cls.db_stat +cls.entity_prfx +cls.discovery_prfx +cls.proxy_node_id +cls.proxy_unique_id +cls.mqtt:Mqtt +__ha_restarts + +async_create_remote(inv_prot, conn_class)async_publ_mqtt() A1->A2 - - - + + + A3 - -InverterG3 - -addr -remote:StreamPtr -local:StreamPtr - -async_create_remote() -close() + +InverterG3 + +addr +remote:StreamPtr +local:StreamPtr + +async_create_remote() +close() A2->A3 - - + + A4 - -InverterG3P - -addr -remote:StreamPtr -local:StreamPtr - -async_create_remote() -close() + +InverterG3P + +addr +remote:StreamPtr +local:StreamPtr + +async_create_remote() +close() A2->A4 - - - - - -A5 - -IterRegistry - - -__iter__ - - - -A6 - -Message - -server_side:bool -header_valid:bool -header_len:unsigned -data_len:unsigned -unique_id -node_id -sug_area -_recv_buffer:bytearray -_send_buffer:bytearray -_forward_buffer:bytearray -db:Infos -new_data:list -state - -_read():void<abstract> -close():void -inc_counter():void -dec_counter():void - - - -A5->A6 - - - - - -A21 - -Talent - -await_conn_resp_cnt -id_str -contact_name -contact_mail -db:InfosG3 -mb:Modbus -switch - -msg_contact_info() -msg_ota_update() -msg_get_time() -msg_collector_data() -msg_inverter_data() -msg_unknown() -close() - - - -A6->A21 - - - - - -A22 - -SolarmanV5 - -control -serial -snr -db:InfosG3P -mb:Modbus -switch - -msg_unknown() -close() - - - -A6->A22 - - - - - -A7 - -<<AsyncIfc>> - - -set_node_id() -get_conn_no() -tx_add() -tx_flush() -tx_get() -tx_peek() -tx_log() -tx_clear() -tx_len() -fwd_add() -fwd_flush() -fwd_log() -fwd_clear() -rx_get() -rx_peek() -rx_log() -rx_clear() -rx_len() -rx_set_cb() -prot_set_timeout_cb() - - - -A8 - -AsyncIfcImpl - -fwd_fifo:ByteFifo -tx_fifo:ByteFifo -rx_fifo:ByteFifo -conn_no:Count -node_id -timeout_cb - - - -A7->A8 - - - - - -A9 - -AsyncStream - -reader -writer -addr -r_addr -l_addr - -<async>loop -disc() -close() -healthy() -__async_read() -__async_write() -__async_forward() - - - -A8->A9 - - + + A10 - -AsyncStreamServer - -async_create_remote - -<async>server_loop() -<async>_async_forward() -<async>publish_outstanding_mqtt() -close() + +AsyncStreamServer + +async_create_remote + +<async>server_loop() +<async>_async_forward() +<async>publish_outstanding_mqtt() +close() - - -A9->A10 - - + + +A3->A10 + + + +local A11 - -AsyncStreamClient - - -<async>client_loop() -<async>_async_forward()) + +AsyncStreamClient + + +<async>client_loop() +<async>_async_forward()) - - -A9->A11 - - - - - -A12 - -ConnectionG3 - -remote.stream:ConnectionG3 - -healthy() - - - -A13 - -ConnectionG3Client - -_ifc:AsyncStreamClient -conn_no -addr - -close() - - - -A12->A13 - - - - - -A14 - -ConnectionG3Server - -_ifc:AsyncStreamServer -conn_no -addr - -close() - - + -A12->A14 - - +A3->A11 + + +remote - - -A13->A3 - + + +A4->A10 + + + +local - - -A13->A11 - - - -1 - - - -A14->A3 - - - + -A14->A10 - - - -1 +A4->A11 + + +remote - - -A15 - -ConnectionG3P - -remote.stream:ConnectionG3P - -healthy() -close() + + +A5 + +IterRegistry + + +__iter__ - - -A16 - -ConnectionG3PClient - -_ifc:AsyncStreamClient -conn_no -addr - -close() + + +A6 + +Message + +server_side:bool +header_valid:bool +header_len:unsigned +data_len:unsigned +unique_id +node_id +sug_area +_recv_buffer:bytearray +_send_buffer:bytearray +_forward_buffer:bytearray +db:Infos +new_data:list +state + +_read():void<abstract> +close():void +inc_counter():void +dec_counter():void - - -A15->A16 - - + + +A5->A6 + + A17 - -ConnectionG3PServer - -_ifc:AsyncStreamServer -conn_no -addr - -close() + +Talent + +ifc:AsyncIfc +conn_no +addr +await_conn_resp_cnt +id_str +contact_name +contact_mail +db:InfosG3 +mb:Modbus +switch + +msg_contact_info() +msg_ota_update() +msg_get_time() +msg_collector_data() +msg_inverter_data() +msg_unknown() +healthy() +close() - - -A15->A17 - - - - - -A16->A4 - - - - -A16->A11 - - - -1 - - - -A17->A4 - - - - -A17->A10 - - - -1 + + +A6->A17 + + A18 - -Infos - -stat -new_stat_data -info_dev - -static_init() -dev_value() -inc_counter() -dec_counter() -ha_proxy_conf -ha_conf -ha_remove -update_db -set_db_def_value -get_db_value -ignore_this_device + +SolarmanV5 + +ifc:AsyncIfc +conn_no +addr +control +serial +snr +db:InfosG3P +mb:Modbus +switch + +msg_unknown() +healthy() +close() + + + +A6->A18 + + + + + +A7 + +<<AsyncIfc>> + + +set_node_id() +get_conn_no() +tx_add() +tx_flush() +tx_get() +tx_peek() +tx_log() +tx_clear() +tx_len() +fwd_add() +fwd_flush() +fwd_log() +fwd_clear() +rx_get() +rx_peek() +rx_log() +rx_clear() +rx_len() +rx_set_cb() +prot_set_timeout_cb() + + + +A8 + +AsyncIfcImpl + +fwd_fifo:ByteFifo +tx_fifo:ByteFifo +rx_fifo:ByteFifo +conn_no:Count +node_id +timeout_cb + + + +A7->A8 + + + + + +A9 + +AsyncStream + +reader +writer +addr +r_addr +l_addr + +<async>loop +disc() +close() +healthy() +__async_read() +__async_write() +__async_forward() + + + +A8->A9 + + + + + +A9->A10 + + + + + +A9->A11 + + + + + +A12 + +ConnectionG3 + + + + + +A12->A3 + + +remote + + + +A12->A3 + + + +local + + + +A13 + +ConnectionG3P + + + + + +A13->A4 + + +remote + + + +A13->A4 + + + +local + + + +A14 + +Infos + +stat +new_stat_data +info_dev + +static_init() +dev_value() +inc_counter() +dec_counter() +ha_proxy_conf +ha_conf +ha_remove +update_db +set_db_def_value +get_db_value +ignore_this_device + + + +A15 + +InfosG3 + + +ha_confs() +parse() + + + +A14->A15 + + + + + +A16 + +InfosG3P + + +ha_confs() +parse() + + + +A14->A16 + + + + + +A17->A7 + + +use + + + +A17->A12 + + + + + +A17->A15 + + + + + +A18->A7 + + +use + + + +A18->A13 + + + + + +A18->A16 + + A19 - -InfosG3 - - -ha_confs() -parse() + +Modbus + +que +snd_handler +rsp_handler +timeout +max_retires +last_xxx +err +retry_cnt +req_pend +tim + +build_msg() +recv_req() +recv_resp() +close() - - -A18->A19 - - + + +A19->A17 + + +has +1 + + + +A19->A18 + + +has +1 A20 - -InfosG3P - - -ha_confs() -parse() + +ModbusConn + +host +port +addr +stream:InverterG3P + - - -A18->A20 - - - - - -A21->A7 - - -use - - - -A21->A12 - - - - - -A21->A19 - - - - - -A22->A7 - - -use - - - -A22->A15 - - - - - -A22->A20 - - - - - -A23 - -Modbus - -que -snd_handler -rsp_handler -timeout -max_retires -last_xxx -err -retry_cnt -req_pend -tim - -build_msg() -recv_req() -recv_resp() -close() - - - -A23->A21 - - -has -1 - - - -A23->A22 - - -has -1 - - - -A24 - -ModbusConn - -host -port -addr -stream:InverterG3P - - - - -A24->A4 - - -1 -has + + +A20->A4 + + +1 +has diff --git a/app/proxy.yuml b/app/proxy.yuml index c19f5c6..d463419 100644 --- a/app/proxy.yuml +++ b/app/proxy.yuml @@ -23,35 +23,27 @@ [AsyncStream]<-[AsyncStreamClient] -[ConnectionG3|remote.stream:ConnectionG3|healthy()] -[ConnectionG3Client|_ifc:AsyncStreamClient;conn_no;addr|close()] -[ConnectionG3Server|_ifc:AsyncStreamServer;conn_no;addr|;close()] -[ConnectionG3]^[ConnectionG3Client] -[ConnectionG3]^[ConnectionG3Server] -[ConnectionG3Client]-[InverterG3] -[ConnectionG3Client]++-1>[AsyncStreamClient] -[ConnectionG3Server]-[InverterG3] -[ConnectionG3Server]++-1>[AsyncStreamServer] +[ConnectionG3||] +[ConnectionG3][AsyncStreamClient] +[ConnectionG3]<-local++[InverterG3] +[InverterG3]++local->[AsyncStreamServer] -[ConnectionG3P|remote.stream:ConnectionG3P|healthy();close()] -[ConnectionG3PClient|_ifc:AsyncStreamClient;conn_no;addr|close()] -[ConnectionG3PServer|_ifc:AsyncStreamServer;conn_no;addr|;close()] -[ConnectionG3P]^[ConnectionG3PClient] -[ConnectionG3P]^[ConnectionG3PServer] -[ConnectionG3PClient]-[InverterG3P] -[ConnectionG3PClient]++-1>[AsyncStreamClient] -[ConnectionG3PServer]-[InverterG3P] -[ConnectionG3PServer]++-1>[AsyncStreamServer] +[ConnectionG3P||] +[ConnectionG3P][AsyncStreamClient] +[ConnectionG3P]<-local++[InverterG3P] +[InverterG3P]++local->[AsyncStreamServer] [Infos|stat;new_stat_data;info_dev|static_init();dev_value();inc_counter();dec_counter();ha_proxy_conf;ha_conf;ha_remove;update_db;set_db_def_value;get_db_value;ignore_this_device] [Infos]^[InfosG3||ha_confs();parse()] [Infos]^[InfosG3P||ha_confs();parse()] -[Talent|await_conn_resp_cnt;id_str;contact_name;contact_mail;db:InfosG3;mb:Modbus;switch|msg_contact_info();msg_ota_update();msg_get_time();msg_collector_data();msg_inverter_data();msg_unknown();;close()] +[Talent|ifc:AsyncIfc;conn_no;addr;;await_conn_resp_cnt;id_str;contact_name;contact_mail;db:InfosG3;mb:Modbus;switch|msg_contact_info();msg_ota_update();msg_get_time();msg_collector_data();msg_inverter_data();msg_unknown();;healthy();close()] [Talent]^[ConnectionG3] [Talent]use->[<>] [Talent]->[InfosG3] -[SolarmanV5|control;serial;snr;db:InfosG3P;mb:Modbus;switch|msg_unknown();;close()] +[SolarmanV5|ifc:AsyncIfc;conn_no;addr;;control;serial;snr;db:InfosG3P;mb:Modbus;switch|msg_unknown();;healthy();close()] [SolarmanV5]^[ConnectionG3P] [SolarmanV5]use->[<>] [SolarmanV5]->[InfosG3P] diff --git a/app/src/gen3/connection_g3.py b/app/src/gen3/connection_g3.py index 8229e3b..6123693 100644 --- a/app/src/gen3/connection_g3.py +++ b/app/src/gen3/connection_g3.py @@ -1,55 +1,16 @@ import logging -from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3.connection_g3": - from app.src.async_stream import AsyncStreamServer - from app.src.async_stream import AsyncStreamClient - from app.src.inverter import Inverter from app.src.gen3.talent import Talent else: # pragma: no cover - from async_stream import AsyncStreamServer - from async_stream import AsyncStreamClient - from inverter import Inverter from gen3.talent import Talent logger = logging.getLogger('conn') class ConnectionG3(Talent): - def healthy(self) -> bool: - logger.debug('ConnectionG3 healthy()') - return self._ifc.healthy() + def __init__(self, addr, ifc, server_side, id_str=b'') -> None: + super().__init__(addr, server_side, ifc, id_str) def close(self): - self._ifc.close() - Talent.close(self) - # logger.info(f'AsyncStream refs: {gc.get_referrers(self)}') - - -class ConnectionG3Server(ConnectionG3): - - def __init__(self, inverter: "Inverter", - reader: StreamReader, writer: StreamWriter, - addr, id_str=b'') -> None: - - server_side = True - self._ifc = AsyncStreamServer(reader, writer, - inverter.async_publ_mqtt, - inverter.async_create_remote, - inverter.remote) - self.conn_no = self._ifc.get_conn_no() - self.addr = addr - Talent.__init__(self, server_side, self._ifc, id_str) - - -class ConnectionG3Client(ConnectionG3): - - def __init__(self, inverter: "Inverter", - reader: StreamReader, writer: StreamWriter, - addr, id_str=b'') -> None: - server_side = False - self._ifc = AsyncStreamClient(reader, writer, - inverter.remote) - self.conn_no = self._ifc.get_conn_no() - self.addr = addr - Talent.__init__(self, server_side, self._ifc, id_str) + super().close() diff --git a/app/src/gen3/inverter_g3.py b/app/src/gen3/inverter_g3.py index 0a4ae04..5b12147 100644 --- a/app/src/gen3/inverter_g3.py +++ b/app/src/gen3/inverter_g3.py @@ -2,32 +2,38 @@ import logging from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3.inverter_g3": - from app.src.inverter import Inverter + from app.src.inverter_base import InverterBase from app.src.async_stream import StreamPtr - from app.src.gen3.connection_g3 import ConnectionG3Server - from app.src.gen3.connection_g3 import ConnectionG3Client + from app.src.async_stream import AsyncStreamServer + from app.src.gen3.connection_g3 import ConnectionG3 else: # pragma: no cover - from inverter import Inverter + from inverter_base import InverterBase from async_stream import StreamPtr - from gen3.connection_g3 import ConnectionG3Server - from gen3.connection_g3 import ConnectionG3Client + from async_stream import AsyncStreamServer + from gen3.connection_g3 import ConnectionG3 logger_mqtt = logging.getLogger('mqtt') -class InverterG3(Inverter): +class InverterG3(InverterBase): def __init__(self, reader: StreamReader, writer: StreamWriter, addr): super().__init__() self.addr = addr + self.remote = StreamPtr(None) + ifc = AsyncStreamServer(reader, writer, + self.async_publ_mqtt, + self.async_create_remote, + self.remote) + self.remote = StreamPtr(None) self.local = StreamPtr( - ConnectionG3Server(self, reader, writer, addr) + ConnectionG3(addr, ifc, True) ) async def async_create_remote(self) -> None: - await Inverter.async_create_remote( - self, 'tsun', ConnectionG3Client) + await InverterBase.async_create_remote( + self, 'tsun', ConnectionG3) def close(self) -> None: logging.debug(f'InverterG3.close() {self.addr}') diff --git a/app/src/gen3/talent.py b/app/src/gen3/talent.py index b662576..4f9b309 100644 --- a/app/src/gen3/talent.py +++ b/app/src/gen3/talent.py @@ -46,13 +46,15 @@ class Talent(Message): MB_REGULAR_TIMEOUT = 60 TXT_UNKNOWN_CTRL = 'Unknown Ctrl' - def __init__(self, server_side: bool, ifc: "AsyncIfc", id_str=b''): + def __init__(self, addr, server_side: bool, ifc: "AsyncIfc", id_str=b''): super().__init__(server_side, self.send_modbus_cb, mb_timeout=15) ifc.rx_set_cb(self.read) ifc.prot_set_timeout_cb(self._timeout) ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn) ifc.prot_set_update_header_cb(self._update_header) + self.addr = addr self.ifc = ifc + self.conn_no = ifc.get_conn_no() self.await_conn_resp_cnt = 0 self.id_str = id_str self.contact_name = b'' @@ -93,6 +95,10 @@ class Talent(Message): ''' Our puplic methods ''' + def healthy(self) -> bool: + logger.debug('Talent healthy()') + return self.ifc.healthy() + def close(self) -> None: logging.debug('Talent.close()') if self.server_side: @@ -110,6 +116,7 @@ class Talent(Message): self.log_lvl.clear() self.state = State.closed self.mb_timer.close() + self.ifc.close() self.ifc.rx_set_cb(None) self.ifc.prot_set_timeout_cb(None) self.ifc.prot_set_init_new_client_conn_cb(None) diff --git a/app/src/gen3plus/connection_g3p.py b/app/src/gen3plus/connection_g3p.py index 163bd89..b86592e 100644 --- a/app/src/gen3plus/connection_g3p.py +++ b/app/src/gen3plus/connection_g3p.py @@ -1,56 +1,18 @@ import logging -from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3plus.connection_g3p": - from app.src.async_stream import AsyncStreamServer - from app.src.async_stream import AsyncStreamClient - from app.src.inverter import Inverter from app.src.gen3plus.solarman_v5 import SolarmanV5 else: # pragma: no cover - from async_stream import AsyncStreamServer - from async_stream import AsyncStreamClient - from inverter import Inverter from gen3plus.solarman_v5 import SolarmanV5 logger = logging.getLogger('conn') class ConnectionG3P(SolarmanV5): - def healthy(self) -> bool: - logger.debug('ConnectionG3P healthy()') - return self._ifc.healthy() + def __init__(self, addr, ifc, server_side, + client_mode: bool = False) -> None: + super().__init__(addr, server_side, client_mode, ifc) def close(self): - self._ifc.close() - SolarmanV5.close(self) + super().close() # logger.info(f'AsyncStream refs: {gc.get_referrers(self)}') - - -class ConnectionG3PServer(ConnectionG3P): - - def __init__(self, inverter: "Inverter", - reader: StreamReader, writer: StreamWriter, - addr, client_mode: bool) -> None: - - server_side = True - self._ifc = AsyncStreamServer(reader, writer, - inverter.async_publ_mqtt, - inverter.async_create_remote, - inverter.remote) - self.conn_no = self._ifc.get_conn_no() - self.addr = addr - SolarmanV5.__init__(self, server_side, client_mode, self._ifc) - - -class ConnectionG3PClient(ConnectionG3P): - - def __init__(self, inverter: "Inverter", - reader: StreamReader, writer: StreamWriter, - addr) -> None: - - server_side = False - client_mode = False - self._ifc = AsyncStreamClient(reader, writer, inverter.remote) - self.conn_no = self._ifc.get_conn_no() - self.addr = addr - SolarmanV5.__init__(self, server_side, client_mode, self._ifc) diff --git a/app/src/gen3plus/inverter_g3p.py b/app/src/gen3plus/inverter_g3p.py index 2c6cb6d..f8fbfc7 100644 --- a/app/src/gen3plus/inverter_g3p.py +++ b/app/src/gen3plus/inverter_g3p.py @@ -2,33 +2,38 @@ import logging from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3plus.inverter_g3p": - from app.src.inverter import Inverter + from app.src.inverter_base import InverterBase from app.src.async_stream import StreamPtr - from app.src.gen3plus.connection_g3p import ConnectionG3PServer - from app.src.gen3plus.connection_g3p import ConnectionG3PClient + from app.src.async_stream import AsyncStreamServer + from app.src.gen3plus.connection_g3p import ConnectionG3P else: # pragma: no cover - from inverter import Inverter + from inverter_base import InverterBase from async_stream import StreamPtr - from gen3plus.connection_g3p import ConnectionG3PServer - from gen3plus.connection_g3p import ConnectionG3PClient + from async_stream import AsyncStreamServer + from gen3plus.connection_g3p import ConnectionG3P logger_mqtt = logging.getLogger('mqtt') -class InverterG3P(Inverter): +class InverterG3P(InverterBase): def __init__(self, reader: StreamReader, writer: StreamWriter, addr, client_mode: bool = False): super().__init__() self.addr = addr self.remote = StreamPtr(None) + ifc = AsyncStreamServer(reader, writer, + self.async_publ_mqtt, + self.async_create_remote, + self.remote) + self.local = StreamPtr( - ConnectionG3PServer(self, reader, writer, addr, client_mode) + ConnectionG3P(addr, ifc, True, client_mode) ) async def async_create_remote(self) -> None: - await Inverter.async_create_remote( - self, 'solarman', ConnectionG3PClient) + await InverterBase.async_create_remote( + self, 'solarman', ConnectionG3P) def close(self) -> None: logging.debug(f'InverterG3P.close() {self.addr}') diff --git a/app/src/gen3plus/solarman_v5.py b/app/src/gen3plus/solarman_v5.py index 3b0de54..a9dfb79 100644 --- a/app/src/gen3plus/solarman_v5.py +++ b/app/src/gen3plus/solarman_v5.py @@ -62,14 +62,17 @@ class SolarmanV5(Message): HDR_FMT = ' bool: + logger.debug('SolarmanV5 healthy()') + return self.ifc.healthy() + def close(self) -> None: logging.debug('Solarman.close()') if self.server_side: @@ -167,6 +174,7 @@ class SolarmanV5(Message): self.log_lvl.clear() self.state = State.closed self.mb_timer.close() + self.ifc.close() self.ifc.rx_set_cb(None) self.ifc.prot_set_timeout_cb(None) self.ifc.prot_set_init_new_client_conn_cb(None) diff --git a/app/src/inverter.py b/app/src/inverter.py index 2f53ef8..dc224ee 100644 --- a/app/src/inverter.py +++ b/app/src/inverter.py @@ -1,8 +1,6 @@ import asyncio import logging -import traceback import json -from aiomqtt import MqttCodeError if __name__ == "app.src.inverter": from app.src.config import Config @@ -106,88 +104,3 @@ class Inverter(): logging.info('Close MQTT Task') loop.run_until_complete(cls.mqtt.close()) cls.mqtt = None - - def __init__(self): - self.__ha_restarts = -1 - - async def async_create_remote(self, inv_prot: str, conn_class) -> None: - '''Establish a client connection to the TSUN cloud''' - tsun = Config.get(inv_prot) - host = tsun['host'] - port = tsun['port'] - addr = (host, port) - stream = self.local.stream - - try: - logging.info(f'[{stream.node_id}] Connect to {addr}') - connect = asyncio.open_connection(host, port) - reader, writer = await connect - if hasattr(stream, 'id_str'): - self.remote.stream = conn_class( - self, reader, writer, addr, stream.id_str) - else: - self.remote.stream = conn_class( - self, reader, writer, addr) - - logging.info(f'[{self.remote.stream.node_id}:' - f'{self.remote.stream.conn_no}] ' - f'Connected to {addr}') - asyncio.create_task(self.remote.ifc.client_loop(addr)) - - except (ConnectionRefusedError, TimeoutError) as error: - logging.info(f'{error}') - except Exception: - Infos.inc_counter('SW_Exception') - logging.error( - f"Inverter: Exception for {addr}:\n" - f"{traceback.format_exc()}") - - async def async_publ_mqtt(self) -> None: - '''publish data to MQTT broker''' - stream = self.local.stream - if not stream.unique_id: - return - # check if new inverter or collector infos are available or when the - # home assistant has changed the status back to online - try: - if (('inverter' in stream.new_data and stream.new_data['inverter']) - or ('collector' in stream.new_data and - stream.new_data['collector']) - or self.mqtt.ha_restarts != self.__ha_restarts): - await self._register_proxy_stat_home_assistant() - await self.__register_home_assistant(stream) - self.__ha_restarts = self.mqtt.ha_restarts - - for key in stream.new_data: - await self.__async_publ_mqtt_packet(stream, key) - for key in Infos.new_stat_data: - await Inverter._async_publ_mqtt_proxy_stat(key) - - except MqttCodeError as error: - logging.error(f'Mqtt except: {error}') - except Exception: - Infos.inc_counter('SW_Exception') - logging.error( - f"Inverter: Exception:\n" - f"{traceback.format_exc()}") - - async def __async_publ_mqtt_packet(self, stream, key): - db = stream.db.db - if key in db and stream.new_data[key]: - data_json = json.dumps(db[key]) - node_id = stream.node_id - logger_mqtt.debug(f'{key}: {data_json}') - await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 - stream.new_data[key] = False - - async def __register_home_assistant(self, stream) -> None: - '''register all our topics at home assistant''' - for data_json, component, node_id, id in stream.db.ha_confs( - self.entity_prfx, stream.node_id, stream.unique_id, - stream.sug_area): - logger_mqtt.debug(f"MQTT Register: cmp:'{component}'" - f" node_id:'{node_id}' {data_json}") - await self.mqtt.publish(f"{self.discovery_prfx}{component}" - f"/{node_id}{id}/config", data_json) - - stream.db.reg_clr_at_midnight(f'{self.entity_prfx}{stream.node_id}') diff --git a/app/src/inverter_base.py b/app/src/inverter_base.py new file mode 100644 index 0000000..5784a2b --- /dev/null +++ b/app/src/inverter_base.py @@ -0,0 +1,108 @@ +import asyncio +import logging +import traceback +import json +from aiomqtt import MqttCodeError + +if __name__ == "app.src.inverter_base": + from app.src.inverter import Inverter + from app.src.async_stream import AsyncStreamClient + from app.src.config import Config + from app.src.infos import Infos +else: # pragma: no cover + from inverter import Inverter + from async_stream import AsyncStreamClient + from config import Config + from infos import Infos + +logger_mqtt = logging.getLogger('mqtt') + + +class InverterBase(Inverter): + def __init__(self): + self.__ha_restarts = -1 + + async def async_create_remote(self, inv_prot: str, conn_class) -> None: + '''Establish a client connection to the TSUN cloud''' + tsun = Config.get(inv_prot) + host = tsun['host'] + port = tsun['port'] + addr = (host, port) + stream = self.local.stream + + try: + logging.info(f'[{stream.node_id}] Connect to {addr}') + connect = asyncio.open_connection(host, port) + reader, writer = await connect + ifc = AsyncStreamClient(reader, writer, + self.remote) + + if hasattr(stream, 'id_str'): + self.remote.stream = conn_class( + addr, ifc, False, stream.id_str) + else: + self.remote.stream = conn_class( + addr, ifc, False) + + logging.info(f'[{self.remote.stream.node_id}:' + f'{self.remote.stream.conn_no}] ' + f'Connected to {addr}') + asyncio.create_task(self.remote.ifc.client_loop(addr)) + + except (ConnectionRefusedError, TimeoutError) as error: + logging.info(f'{error}') + except Exception: + Infos.inc_counter('SW_Exception') + logging.error( + f"Inverter: Exception for {addr}:\n" + f"{traceback.format_exc()}") + + async def async_publ_mqtt(self) -> None: + '''publish data to MQTT broker''' + stream = self.local.stream + if not stream.unique_id: + return + # check if new inverter or collector infos are available or when the + # home assistant has changed the status back to online + try: + if (('inverter' in stream.new_data and stream.new_data['inverter']) + or ('collector' in stream.new_data and + stream.new_data['collector']) + or self.mqtt.ha_restarts != self.__ha_restarts): + await self._register_proxy_stat_home_assistant() + await self.__register_home_assistant(stream) + self.__ha_restarts = self.mqtt.ha_restarts + + for key in stream.new_data: + await self.__async_publ_mqtt_packet(stream, key) + for key in Infos.new_stat_data: + await Inverter._async_publ_mqtt_proxy_stat(key) + + except MqttCodeError as error: + logging.error(f'Mqtt except: {error}') + except Exception: + Infos.inc_counter('SW_Exception') + logging.error( + f"Inverter: Exception:\n" + f"{traceback.format_exc()}") + + async def __async_publ_mqtt_packet(self, stream, key): + db = stream.db.db + if key in db and stream.new_data[key]: + data_json = json.dumps(db[key]) + node_id = stream.node_id + logger_mqtt.debug(f'{key}: {data_json}') + await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 + stream.new_data[key] = False + + async def __register_home_assistant(self, stream) -> None: + '''register all our topics at home assistant''' + for data_json, component, node_id, id in stream.db.ha_confs( + self.entity_prfx, stream.node_id, stream.unique_id, + stream.sug_area): + logger_mqtt.debug(f"MQTT Register: cmp:'{component}'" + f" node_id:'{node_id}' {data_json}") + await self.mqtt.publish(f"{self.discovery_prfx}{component}" + f"/{node_id}{id}/config", data_json) + + stream.db.reg_clr_at_midnight(f'{self.entity_prfx}{stream.node_id}') diff --git a/app/src/modbus_tcp.py b/app/src/modbus_tcp.py index da4bd03..bac6243 100644 --- a/app/src/modbus_tcp.py +++ b/app/src/modbus_tcp.py @@ -31,12 +31,12 @@ class ModbusConn(): logging.info(f'[{stream.node_id}:{stream.conn_no}] ' f'Connected to {self.addr}') Infos.inc_counter('Inverter_Cnt') - await self.inverter.local._ifc.publish_outstanding_mqtt() + await self.inverter.local.ifc.publish_outstanding_mqtt() return self.inverter async def __aexit__(self, exc_type, exc, tb): Infos.dec_counter('Inverter_Cnt') - await self.inverter.local._ifc.publish_outstanding_mqtt() + await self.inverter.local.ifc.publish_outstanding_mqtt() self.inverter.close() @@ -65,7 +65,7 @@ class ModbusTcp(): async with ModbusConn(host, port) as inverter: stream = inverter.local.stream await stream.send_start_cmd(snr, host) - await stream._ifc.loop() + await stream.ifc.loop() logger.info(f'[{stream.node_id}:{stream.conn_no}] ' f'Connection closed - Shutdown: ' f'{stream.shutdown_started}') diff --git a/app/tests/test_connection_g3.py b/app/tests/test_connection_g3.py index b215f18..167a655 100644 --- a/app/tests/test_connection_g3.py +++ b/app/tests/test_connection_g3.py @@ -5,8 +5,8 @@ import asyncio from itertools import count from mock import patch from app.src.async_stream import StreamPtr -from app.src.async_stream import AsyncStream, AsyncIfcImpl -from app.src.gen3.connection_g3 import ConnectionG3Server +from app.src.async_stream import AsyncStream, AsyncStreamServer, AsyncIfcImpl +from app.src.gen3.connection_g3 import ConnectionG3 from app.src.gen3.talent import Talent @@ -75,24 +75,26 @@ class FakeWriter(): -def test_method_calls(patch_talent_init, patch_healthy, patch_async_close, patch_talent_close): +def test_method_calls(patch_healthy, patch_async_close): AsyncIfcImpl._ids = count(5) - spy2 = patch_talent_init spy3 = patch_healthy spy4 = patch_async_close - spy5 = patch_talent_close reader = FakeReader() writer = FakeWriter() id_str = "id_string" addr = ('proxy.local', 10000) - conn = ConnectionG3Server(FakeInverter(), reader, writer, addr, - id_str=id_str) - assert 5 == conn._ifc.get_conn_no() - spy2.assert_called_once_with(conn, True, conn._ifc, id_str) + inv = FakeInverter() + ifc = AsyncStreamServer(reader, writer, + inv.async_publ_mqtt, + inv.async_create_remote, + inv.remote) + + conn = ConnectionG3(addr, ifc, server_side=True, id_str=id_str) + assert 5 == conn.conn_no + assert 5 == conn.ifc.get_conn_no() conn.healthy() spy3.assert_called_once() conn.close() spy4.assert_called_once() - spy5.assert_called_once() diff --git a/app/tests/test_connection_g3p.py b/app/tests/test_connection_g3p.py index d8a8769..7ab1b43 100644 --- a/app/tests/test_connection_g3p.py +++ b/app/tests/test_connection_g3p.py @@ -5,8 +5,9 @@ import asyncio from itertools import count from mock import patch from app.src.singleton import Singleton -from app.src.async_stream import AsyncStream, AsyncIfcImpl, StreamPtr -from app.src.gen3plus.connection_g3p import ConnectionG3PServer +from app.src.async_stream import StreamPtr +from app.src.async_stream import AsyncStream, AsyncStreamServer, AsyncIfcImpl +from app.src.gen3plus.connection_g3p import ConnectionG3P from app.src.gen3plus.solarman_v5 import SolarmanV5 @@ -80,24 +81,25 @@ class FakeWriter(): -def test_method_calls(patch_solarman_init, patch_healthy, patch_async_close, patch_solarman_close): +def test_method_calls(patch_healthy, patch_async_close): AsyncIfcImpl._ids = count(5) - spy2 = patch_solarman_init spy3 = patch_healthy spy4 = patch_async_close - spy5 = patch_solarman_close reader = FakeReader() writer = FakeWriter() addr = ('proxy.local', 10000) - conn = ConnectionG3PServer(FakeInverter(), reader, writer, addr, - client_mode=False) - assert 5 == conn._ifc.get_conn_no() - spy2.assert_called_once_with(conn, True, False, conn._ifc) + inv = FakeInverter() + ifc = AsyncStreamServer(reader, writer, + inv.async_publ_mqtt, + inv.async_create_remote, + inv.remote) + conn = ConnectionG3P(addr, ifc, server_side=True, client_mode=False) + assert 5 == conn.conn_no + assert 5 == conn.ifc.get_conn_no() conn.healthy() spy3.assert_called_once() conn.close() spy4.assert_called_once() - spy5.assert_called_once() diff --git a/app/tests/test_inverter_g3.py b/app/tests/test_inverter_g3.py index 9bbce5b..e30f472 100644 --- a/app/tests/test_inverter_g3.py +++ b/app/tests/test_inverter_g3.py @@ -8,7 +8,7 @@ from app.src.infos import Infos from app.src.config import Config from app.src.inverter import Inverter from app.src.singleton import Singleton -from app.src.gen3.connection_g3 import ConnectionG3Server +from app.src.gen3.connection_g3 import ConnectionG3 from app.src.gen3.inverter_g3 import InverterG3 from app.tests.test_modbus_tcp import patch_mqtt_err, patch_mqtt_except, test_port, test_hostname @@ -44,12 +44,12 @@ def module_init(): @pytest.fixture def patch_conn_init(): - with patch.object(ConnectionG3Server, '__init__', return_value= None) as conn: + with patch.object(ConnectionG3, '__init__', return_value= None) as conn: yield conn @pytest.fixture def patch_conn_close(): - with patch.object(ConnectionG3Server, 'close') as conn: + with patch.object(ConnectionG3, 'close') as conn: yield conn class FakeReader(): diff --git a/app/tests/test_inverter_g3p.py b/app/tests/test_inverter_g3p.py index 5785031..5a4e5a4 100644 --- a/app/tests/test_inverter_g3p.py +++ b/app/tests/test_inverter_g3p.py @@ -8,7 +8,7 @@ from app.src.infos import Infos from app.src.config import Config from app.src.inverter import Inverter from app.src.singleton import Singleton -from app.src.gen3plus.connection_g3p import ConnectionG3PServer +from app.src.gen3plus.connection_g3p import ConnectionG3P from app.src.gen3plus.inverter_g3p import InverterG3P from app.tests.test_modbus_tcp import patch_mqtt_err, patch_mqtt_except, test_port, test_hostname @@ -45,12 +45,12 @@ def module_init(): @pytest.fixture def patch_conn_init(): - with patch.object(ConnectionG3PServer, '__init__', return_value= None) as conn: + with patch.object(ConnectionG3P, '__init__', return_value= None) as conn: yield conn @pytest.fixture def patch_conn_close(): - with patch.object(ConnectionG3PServer, 'close') as conn: + with patch.object(ConnectionG3P, 'close') as conn: yield conn class FakeReader(): diff --git a/app/tests/test_modbus_tcp.py b/app/tests/test_modbus_tcp.py index 582e57c..22887e8 100644 --- a/app/tests/test_modbus_tcp.py +++ b/app/tests/test_modbus_tcp.py @@ -154,8 +154,8 @@ async def test_modbus_conn(patch_open): stream = inverter.local.stream assert stream.node_id == 'G3P' assert stream.addr == ('test.local', 1234) - assert type(stream._ifc._reader) is FakeReader - assert type(stream._ifc._writer) is FakeWriter + assert type(stream.ifc._reader) is FakeReader + assert type(stream.ifc._writer) is FakeWriter assert Infos.stat['proxy']['Inverter_Cnt'] == 1 assert Infos.stat['proxy']['Inverter_Cnt'] == 0 @@ -206,7 +206,7 @@ async def test_modbus_cnf2(config_conn, patch_no_mqtt, patch_open): test += 1 assert Infos.stat['proxy']['Inverter_Cnt'] == 1 m.shutdown_started = True - m._ifc._reader.on_recv.set() + m.ifc._reader.on_recv.set() del m assert 1 == test @@ -266,14 +266,14 @@ async def test_mqtt_err(config_conn, patch_mqtt_err, patch_open): test += 1 if test == 1: m.shutdown_started = False - m._ifc._reader.on_recv.set() + m.ifc._reader.on_recv.set() await asyncio.sleep(0.1) assert m.state == State.closed await asyncio.sleep(0.1) await asyncio.sleep(0.1) else: m.shutdown_started = True - m._ifc._reader.on_recv.set() + m.ifc._reader.on_recv.set() del m await asyncio.sleep(0.01) @@ -299,13 +299,13 @@ async def test_mqtt_except(config_conn, patch_mqtt_except, patch_open): test += 1 if test == 1: m.shutdown_started = False - m._ifc._reader.on_recv.set() + m.ifc._reader.on_recv.set() await asyncio.sleep(0.1) assert m.state == State.closed await asyncio.sleep(0.1) else: m.shutdown_started = True - m._ifc._reader.on_recv.set() + m.ifc._reader.on_recv.set() del m await asyncio.sleep(0.01) diff --git a/app/tests/test_mqtt.py b/app/tests/test_mqtt.py index 5ce2ea5..9fb857e 100644 --- a/app/tests/test_mqtt.py +++ b/app/tests/test_mqtt.py @@ -45,7 +45,7 @@ def config_no_conn(test_port): @pytest.fixture def spy_at_cmd(): - conn = SolarmanV5(server_side=True, client_mode= False, ifc=AsyncIfcImpl()) + conn = SolarmanV5(('test.local', 1234), server_side=True, client_mode= False, ifc=AsyncIfcImpl()) conn.node_id = 'inv_2/' with patch.object(conn, 'send_at_cmd', wraps=conn.send_at_cmd) as wrapped_conn: yield wrapped_conn @@ -53,7 +53,7 @@ def spy_at_cmd(): @pytest.fixture def spy_modbus_cmd(): - conn = SolarmanV5(server_side=True, client_mode= False, ifc=AsyncIfcImpl()) + conn = SolarmanV5(('test.local', 1234), server_side=True, client_mode= False, ifc=AsyncIfcImpl()) conn.node_id = 'inv_1/' with patch.object(conn, 'send_modbus_cmd', wraps=conn.send_modbus_cmd) as wrapped_conn: yield wrapped_conn @@ -61,7 +61,7 @@ def spy_modbus_cmd(): @pytest.fixture def spy_modbus_cmd_client(): - conn = SolarmanV5(server_side=False, client_mode= False, ifc=AsyncIfcImpl()) + conn = SolarmanV5(('test.local', 1234), server_side=False, client_mode= False, ifc=AsyncIfcImpl()) conn.node_id = 'inv_1/' with patch.object(conn, 'send_modbus_cmd', wraps=conn.send_modbus_cmd) as wrapped_conn: yield wrapped_conn diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index 41a8101..6da7d23 100644 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -35,7 +35,7 @@ class Mqtt(): class MemoryStream(SolarmanV5): def __init__(self, msg, chunks = (0,), server_side: bool = True): _ifc = AsyncIfcImpl() - super().__init__(server_side, client_mode=False, ifc=_ifc) + super().__init__(('test.local', 1234), server_side, client_mode=False, ifc=_ifc) if server_side: self.mb.timeout = 0.4 # overwrite for faster testing self.remote = StreamPtr(None) @@ -1236,9 +1236,9 @@ def test_build_logger_modell(config_tsun_allow_all, device_ind_msg): def test_msg_iterator(): Message._registry.clear() - m1 = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfcImpl()) - m2 = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfcImpl()) - m3 = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfcImpl()) + m1 = SolarmanV5(('test1.local', 1234), server_side=True, client_mode=False, ifc=AsyncIfcImpl()) + m2 = SolarmanV5(('test2.local', 1234), server_side=True, client_mode=False, ifc=AsyncIfcImpl()) + m3 = SolarmanV5(('test3.local', 1234), server_side=True, client_mode=False, ifc=AsyncIfcImpl()) m3.close() del m3 test1 = 0 @@ -1256,7 +1256,7 @@ def test_msg_iterator(): assert test2 == 1 def test_proxy_counter(): - m = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfcImpl()) + m = SolarmanV5(('test.local', 1234), server_side=True, client_mode=False, ifc=AsyncIfcImpl()) assert m.new_data == {} m.db.stat['proxy']['Unknown_Msg'] = 0 Infos.new_stat_data['proxy'] = False diff --git a/app/tests/test_talent.py b/app/tests/test_talent.py index 7ef01b1..38b0edd 100644 --- a/app/tests/test_talent.py +++ b/app/tests/test_talent.py @@ -20,7 +20,7 @@ tracer = logging.getLogger('tracer') class MemoryStream(Talent): def __init__(self, msg, chunks = (0,), server_side: bool = True): self.ifc = AsyncIfcImpl() - super().__init__(server_side, self.ifc) + super().__init__(('test.local', 1234), server_side, self.ifc) if server_side: self.mb.timeout = 0.4 # overwrite for faster testing self.remote = StreamPtr(None) @@ -1639,9 +1639,9 @@ def test_ctrl_byte(): def test_msg_iterator(): - m1 = Talent(server_side=True, ifc=AsyncIfcImpl()) - m2 = Talent(server_side=True, ifc=AsyncIfcImpl()) - m3 = Talent(server_side=True, ifc=AsyncIfcImpl()) + m1 = Talent(('test1.local', 1234), server_side=True, ifc=AsyncIfcImpl()) + m2 = Talent(('test2.local', 1234), server_side=True, ifc=AsyncIfcImpl()) + m3 = Talent(('test3.local', 1234), server_side=True, ifc=AsyncIfcImpl()) m3.close() del m3 test1 = 0