diff --git a/app/src/async_ifc.py b/app/src/async_ifc.py index 144f270..99204a9 100644 --- a/app/src/async_ifc.py +++ b/app/src/async_ifc.py @@ -117,3 +117,7 @@ class AsyncIfc(ABC): @abstractmethod def prot_set_update_header_cb(self, callback): pass # pragma: no cover + + @abstractmethod + def prot_set_close_header_cb(self, callback): + pass # pragma: no cover diff --git a/app/src/async_stream.py b/app/src/async_stream.py index 6e77f71..e08bdb4 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -35,12 +35,14 @@ class AsyncIfcImpl(AsyncIfc): self.timeout_cb = None self.init_new_client_conn_cb = None self.update_header_cb = None + self.close_cb = None def close(self): self.timeout_cb = None self.fwd_fifo.reg_trigger(None) self.tx_fifo.reg_trigger(None) self.rx_fifo.reg_trigger(None) + self.close_cb = None def set_node_id(self, value: str): self.node_id = value @@ -124,12 +126,18 @@ class AsyncIfcImpl(AsyncIfc): def prot_set_update_header_cb(self, callback): self.update_header_cb = callback + def prot_set_close_header_cb(self, callback): + self.close_cb = callback + class StreamPtr(): '''Descr StreamPtr''' def __init__(self, _stream): self.stream = _stream + def __str__(self) -> str: + return f'ifc:{self._ifc}, stream: {self._stream}' + @property def ifc(self): return self._ifc @@ -238,15 +246,19 @@ class AsyncStream(AsyncIfcImpl): await self._writer.wait_closed() def close(self) -> None: + logging.info(f'AsyncStream.close1() l{self.l_addr} | r{self.r_addr}') """close handler for a no waiting disconnect hint: must be called before releasing the connection instance """ + close_cb = self.close_cb super().close() + if close_cb: + close_cb() self._reader.feed_eof() # abort awaited read if self._writer.is_closing(): return - logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}') + logger.info(f'AsyncStream.close2() l{self.l_addr} | r{self.r_addr}') self._writer.close() def healthy(self) -> bool: @@ -315,7 +327,7 @@ class AsyncStream(AsyncIfcImpl): f"{traceback.format_exc()}") def __del__(self): - logger.debug( + logger.info( f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}") @@ -374,6 +386,8 @@ class AsyncStreamServer(AsyncStream): hint: must be called before releasing the connection instance """ + logging.info( + f'AsyncStreamServer.close() l{self.l_addr} | r{self.r_addr}') self.async_create_remote = None self.async_publ_mqtt = None super().close() @@ -386,6 +400,7 @@ class AsyncStreamClient(AsyncStream): async def client_loop(self, _: str) -> None: '''Loop for receiving messages from the TSUN cloud (client-side)''' + logging.info(f'AsynStream.client_loop{self} rem-> {self.remote}') await self.loop() logger.info(f'[{self.node_id}:{self.conn_no}] ' 'Client loop stopped for' @@ -398,7 +413,7 @@ class AsyncStreamClient(AsyncStream): # thus on the next received packet from the inverter, we can # establish a new connection to the TSUN cloud - if server_ifc.remote.ifc == self: + if server_ifc and server_ifc.remote.ifc == self: # logging.debug(f'Client l{client_stream.l_addr} refs:' # f' {gc.get_referrers(client_stream)}') # than erase client connection