From bc54944077c3a31a145e3edbef4108f1d5ebb9a3 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Wed, 25 Sep 2024 22:41:01 +0200 Subject: [PATCH] add two more callbacks --- app/src/async_ifc.py | 8 +++++ app/src/async_stream.py | 62 +++++++++++++++++++++++++-------- app/src/gen3/talent.py | 4 +++ app/src/gen3plus/solarman_v5.py | 5 +++ app/src/messages.py | 4 --- 5 files changed, 65 insertions(+), 18 deletions(-) diff --git a/app/src/async_ifc.py b/app/src/async_ifc.py index 333fb06..144f270 100644 --- a/app/src/async_ifc.py +++ b/app/src/async_ifc.py @@ -109,3 +109,11 @@ class AsyncIfc(ABC): @abstractmethod def prot_set_timeout_cb(self, callback): pass # pragma: no cover + + @abstractmethod + def prot_set_init_new_client_conn_cb(self, callback): + pass # pragma: no cover + + @abstractmethod + def prot_set_update_header_cb(self, callback): + pass # pragma: no cover diff --git a/app/src/async_stream.py b/app/src/async_stream.py index 83e3426..0cc449c 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -31,6 +31,16 @@ class AsyncIfcImpl(AsyncIfc): self.conn_no = next(self._ids) self.node_id = '' self.timeout_cb = None + self.init_new_client_conn_cb = None + self.update_header_cb = None + + def close(self): + self.timeout_cb = None + self.init_new_client_conn_cb = None + self.update_header_cb = None + self.fwd_fifo.reg_trigger(None) + self.tx_fifo.reg_trigger(None) + self.rx_fifo.reg_trigger(None) def set_node_id(self, value: str): self.node_id = value @@ -108,10 +118,33 @@ class AsyncIfcImpl(AsyncIfc): def prot_set_timeout_cb(self, callback): self.timeout_cb = callback + def prot_set_init_new_client_conn_cb(self, callback): + self.init_new_client_conn_cb = callback + + def prot_set_update_header_cb(self, callback): + self.update_header_cb = callback + class StreamPtr(): - def __init__(self, stream): - self.stream = stream + '''Descr StreamPtr''' + def __init__(self, _stream): + self.stream = _stream + + @property + def ifc(self): + return self._ifc + + @property + def stream(self): + return self._stream + + @stream.setter + def stream(self, value): + self._stream = value + if value: + self._ifc = value.ifc + else: + self._ifc = None class AsyncStream(AsyncIfcImpl): @@ -176,13 +209,13 @@ class AsyncStream(AsyncIfcImpl): # the connection to te TSUN cloud if self.remote.stream: logger.info(f'[{self.node_id}:{self.conn_no}] disc client ' - f'connection: [{self.remote.stream.node_id}:' - f'{self.remote.stream.conn_no}]') - await self.remote.stream._ifc.disc() + f'connection: [{self.remote.ifc.node_id}:' + f'{self.remote.ifc.conn_no}]') + await self.remote.ifc.disc() async def client_loop(self, _: str) -> None: '''Loop for receiving messages from the TSUN cloud (client-side)''' - client_stream = await self.remote.stream._ifc.loop() + client_stream = await self.remote.ifc.loop() logger.info(f'[{client_stream.node_id}:{client_stream.conn_no}] ' 'Client loop stopped for' f' l{client_stream.l_addr}') @@ -271,8 +304,8 @@ class AsyncStream(AsyncIfcImpl): hint: must be called before releasing the connection instance """ - self.tx_fifo.reg_trigger(None) self.async_create_remote = None + super().close() self._reader.feed_eof() # abort awaited read if self._writer.is_closing(): return @@ -314,15 +347,16 @@ class AsyncStream(AsyncIfcImpl): if not self.remote.stream: await self.async_create_remote() if self.remote.stream: - if self.remote.stream._init_new_client_conn(): - await self.remote.stream._ifc.__async_write() + if self.remote.ifc.init_new_client_conn_cb(): + await self.remote.ifc.__async_write() if self.remote.stream: - self.remote.stream._update_header(self.fwd_fifo.peek()) - self.fwd_fifo.logging(logging.INFO, 'Forward to ' - f'{self.remote.stream.addr}:') - self.remote.stream._ifc._writer.write(self.fwd_fifo.get()) - await self.remote.stream._ifc._writer.drain() + if self.remote.ifc.update_header_cb is callable: + self.remote.ifc.update_header_cb(self.fwd_fifo.peek()) + self.fwd_fifo.logging(logging.INFO, 'Forward to ' + f'{self.remote.ifc.addr}:') + self.remote.ifc._writer.write(self.fwd_fifo.get()) + await self.remote.ifc._writer.drain() except OSError as error: if self.remote.stream: diff --git a/app/src/gen3/talent.py b/app/src/gen3/talent.py index 4fd24a3..b662576 100644 --- a/app/src/gen3/talent.py +++ b/app/src/gen3/talent.py @@ -50,6 +50,8 @@ class Talent(Message): super().__init__(server_side, self.send_modbus_cb, mb_timeout=15) ifc.rx_set_cb(self.read) ifc.prot_set_timeout_cb(self._timeout) + ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn) + ifc.prot_set_update_header_cb(self._update_header) self.ifc = ifc self.await_conn_resp_cnt = 0 self.id_str = id_str @@ -110,6 +112,8 @@ class Talent(Message): self.mb_timer.close() self.ifc.rx_set_cb(None) self.ifc.prot_set_timeout_cb(None) + self.ifc.prot_set_init_new_client_conn_cb(None) + self.ifc.prot_set_update_header_cb(None) super().close() def __set_serial_no(self, serial_no: str): diff --git a/app/src/gen3plus/solarman_v5.py b/app/src/gen3plus/solarman_v5.py index 7f0caf2..3b0de54 100644 --- a/app/src/gen3plus/solarman_v5.py +++ b/app/src/gen3plus/solarman_v5.py @@ -66,6 +66,9 @@ class SolarmanV5(Message): super().__init__(server_side, self.send_modbus_cb, mb_timeout=8) ifc.rx_set_cb(self.read) ifc.prot_set_timeout_cb(self._timeout) + ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn) + ifc.prot_set_update_header_cb(self._update_header) + self.ifc = ifc self.header_len = 11 # overwrite construcor in class Message self.control = 0 @@ -166,6 +169,8 @@ class SolarmanV5(Message): self.mb_timer.close() self.ifc.rx_set_cb(None) self.ifc.prot_set_timeout_cb(None) + self.ifc.prot_set_init_new_client_conn_cb(None) + self.ifc.prot_set_update_header_cb(None) super().close() async def send_start_cmd(self, snr: int, host: str, diff --git a/app/src/messages.py b/app/src/messages.py index 3a6f0e2..0fe66b9 100644 --- a/app/src/messages.py +++ b/app/src/messages.py @@ -134,10 +134,6 @@ class Message(metaclass=IterRegistry): # to our _recv_buffer return # pragma: no cover - def _update_header(self, _forward_buffer): - '''callback for updating the header of the forward buffer''' - pass # pragma: no cover - def _set_mqtt_timestamp(self, key, ts: float | None): if key not in self.new_data or \ not self.new_data[key]: