add close callback

This commit is contained in:
Stefan Allius
2024-09-30 19:17:06 +02:00
parent f2ade43410
commit a1441fb4fd
2 changed files with 22 additions and 3 deletions

View File

@@ -117,3 +117,7 @@ class AsyncIfc(ABC):
@abstractmethod @abstractmethod
def prot_set_update_header_cb(self, callback): def prot_set_update_header_cb(self, callback):
pass # pragma: no cover pass # pragma: no cover
@abstractmethod
def prot_set_close_header_cb(self, callback):
pass # pragma: no cover

View File

@@ -35,12 +35,14 @@ class AsyncIfcImpl(AsyncIfc):
self.timeout_cb = None self.timeout_cb = None
self.init_new_client_conn_cb = None self.init_new_client_conn_cb = None
self.update_header_cb = None self.update_header_cb = None
self.close_cb = None
def close(self): def close(self):
self.timeout_cb = None self.timeout_cb = None
self.fwd_fifo.reg_trigger(None) self.fwd_fifo.reg_trigger(None)
self.tx_fifo.reg_trigger(None) self.tx_fifo.reg_trigger(None)
self.rx_fifo.reg_trigger(None) self.rx_fifo.reg_trigger(None)
self.close_cb = None
def set_node_id(self, value: str): def set_node_id(self, value: str):
self.node_id = value self.node_id = value
@@ -124,12 +126,18 @@ class AsyncIfcImpl(AsyncIfc):
def prot_set_update_header_cb(self, callback): def prot_set_update_header_cb(self, callback):
self.update_header_cb = callback self.update_header_cb = callback
def prot_set_close_header_cb(self, callback):
self.close_cb = callback
class StreamPtr(): class StreamPtr():
'''Descr StreamPtr''' '''Descr StreamPtr'''
def __init__(self, _stream): def __init__(self, _stream):
self.stream = _stream self.stream = _stream
def __str__(self) -> str:
return f'ifc:{self._ifc}, stream: {self._stream}'
@property @property
def ifc(self): def ifc(self):
return self._ifc return self._ifc
@@ -238,15 +246,19 @@ class AsyncStream(AsyncIfcImpl):
await self._writer.wait_closed() await self._writer.wait_closed()
def close(self) -> None: def close(self) -> None:
logging.info(f'AsyncStream.close1() l{self.l_addr} | r{self.r_addr}')
"""close handler for a no waiting disconnect """close handler for a no waiting disconnect
hint: must be called before releasing the connection instance hint: must be called before releasing the connection instance
""" """
close_cb = self.close_cb
super().close() super().close()
if close_cb:
close_cb()
self._reader.feed_eof() # abort awaited read self._reader.feed_eof() # abort awaited read
if self._writer.is_closing(): if self._writer.is_closing():
return return
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}') logger.info(f'AsyncStream.close2() l{self.l_addr} | r{self.r_addr}')
self._writer.close() self._writer.close()
def healthy(self) -> bool: def healthy(self) -> bool:
@@ -315,7 +327,7 @@ class AsyncStream(AsyncIfcImpl):
f"{traceback.format_exc()}") f"{traceback.format_exc()}")
def __del__(self): def __del__(self):
logger.debug( logger.info(
f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}") f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}")
@@ -374,6 +386,8 @@ class AsyncStreamServer(AsyncStream):
hint: must be called before releasing the connection instance hint: must be called before releasing the connection instance
""" """
logging.info(
f'AsyncStreamServer.close() l{self.l_addr} | r{self.r_addr}')
self.async_create_remote = None self.async_create_remote = None
self.async_publ_mqtt = None self.async_publ_mqtt = None
super().close() super().close()
@@ -386,6 +400,7 @@ class AsyncStreamClient(AsyncStream):
async def client_loop(self, _: str) -> None: async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)''' '''Loop for receiving messages from the TSUN cloud (client-side)'''
logging.info(f'AsynStream.client_loop{self} rem-> {self.remote}')
await self.loop() await self.loop()
logger.info(f'[{self.node_id}:{self.conn_no}] ' logger.info(f'[{self.node_id}:{self.conn_no}] '
'Client loop stopped for' 'Client loop stopped for'
@@ -398,7 +413,7 @@ class AsyncStreamClient(AsyncStream):
# thus on the next received packet from the inverter, we can # thus on the next received packet from the inverter, we can
# establish a new connection to the TSUN cloud # establish a new connection to the TSUN cloud
if server_ifc.remote.ifc == self: if server_ifc and server_ifc.remote.ifc == self:
# logging.debug(f'Client l{client_stream.l_addr} refs:' # logging.debug(f'Client l{client_stream.l_addr} refs:'
# f' {gc.get_referrers(client_stream)}') # f' {gc.get_referrers(client_stream)}')
# than erase client connection # than erase client connection