add two more callbacks

This commit is contained in:
Stefan Allius
2024-09-25 22:41:01 +02:00
parent 37c977c2fe
commit bc54944077
5 changed files with 65 additions and 18 deletions

View File

@@ -31,6 +31,16 @@ class AsyncIfcImpl(AsyncIfc):
self.conn_no = next(self._ids)
self.node_id = ''
self.timeout_cb = None
self.init_new_client_conn_cb = None
self.update_header_cb = None
def close(self):
self.timeout_cb = None
self.init_new_client_conn_cb = None
self.update_header_cb = None
self.fwd_fifo.reg_trigger(None)
self.tx_fifo.reg_trigger(None)
self.rx_fifo.reg_trigger(None)
def set_node_id(self, value: str):
self.node_id = value
@@ -108,10 +118,33 @@ class AsyncIfcImpl(AsyncIfc):
def prot_set_timeout_cb(self, callback):
self.timeout_cb = callback
def prot_set_init_new_client_conn_cb(self, callback):
self.init_new_client_conn_cb = callback
def prot_set_update_header_cb(self, callback):
self.update_header_cb = callback
class StreamPtr():
def __init__(self, stream):
self.stream = stream
'''Descr StreamPtr'''
def __init__(self, _stream):
self.stream = _stream
@property
def ifc(self):
return self._ifc
@property
def stream(self):
return self._stream
@stream.setter
def stream(self, value):
self._stream = value
if value:
self._ifc = value.ifc
else:
self._ifc = None
class AsyncStream(AsyncIfcImpl):
@@ -176,13 +209,13 @@ class AsyncStream(AsyncIfcImpl):
# the connection to te TSUN cloud
if self.remote.stream:
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}]')
await self.remote.stream._ifc.disc()
f'connection: [{self.remote.ifc.node_id}:'
f'{self.remote.ifc.conn_no}]')
await self.remote.ifc.disc()
async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
client_stream = await self.remote.stream._ifc.loop()
client_stream = await self.remote.ifc.loop()
logger.info(f'[{client_stream.node_id}:{client_stream.conn_no}] '
'Client loop stopped for'
f' l{client_stream.l_addr}')
@@ -271,8 +304,8 @@ class AsyncStream(AsyncIfcImpl):
hint: must be called before releasing the connection instance
"""
self.tx_fifo.reg_trigger(None)
self.async_create_remote = None
super().close()
self._reader.feed_eof() # abort awaited read
if self._writer.is_closing():
return
@@ -314,15 +347,16 @@ class AsyncStream(AsyncIfcImpl):
if not self.remote.stream:
await self.async_create_remote()
if self.remote.stream:
if self.remote.stream._init_new_client_conn():
await self.remote.stream._ifc.__async_write()
if self.remote.ifc.init_new_client_conn_cb():
await self.remote.ifc.__async_write()
if self.remote.stream:
self.remote.stream._update_header(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.stream.addr}:')
self.remote.stream._ifc._writer.write(self.fwd_fifo.get())
await self.remote.stream._ifc._writer.drain()
if self.remote.ifc.update_header_cb is callable:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain()
except OSError as error:
if self.remote.stream: