add unit tests for AsyncStream class

This commit is contained in:
Stefan Allius
2024-10-06 17:43:40 +02:00
parent 9852f44dfa
commit e074a39f5a
8 changed files with 407 additions and 221 deletions

View File

@@ -56,21 +56,11 @@ class AsyncIfc(ABC):
''' add data to forward queue'''
pass # pragma: no cover
@abstractmethod
def fwd_flush(self):
''' send forward queue and clears it'''
pass # pragma: no cover
@abstractmethod
def fwd_log(self, level, info):
''' log the forward queue'''
pass # pragma: no cover
@abstractmethod
def fwd_clear(self):
''' clear forward queue'''
pass # pragma: no cover
#
# RX - QUEUE
#

View File

@@ -80,18 +80,10 @@ class AsyncIfcImpl(AsyncIfc):
''' add data to forward queue'''
self.fwd_fifo += data
def fwd_flush(self):
''' send forward queue and clears it'''
self.fwd_fifo()
def fwd_log(self, level, info):
''' log the forward queue'''
self.fwd_fifo.logging(level, info)
def fwd_clear(self):
''' clear forward queue'''
self.fwd_fifo.clear()
def rx_get(self, size: int = None) -> bytearray:
'''removes size numbers of bytes and return them'''
return self.rx_fifo.get(size)
@@ -131,9 +123,6 @@ class StreamPtr():
self.stream = _stream
self.ifc = _ifc
def __str__(self) -> str:
return f'ifc:{self._ifc}, stream: {self._stream}'
@property
def ifc(self):
return self._ifc
@@ -181,8 +170,8 @@ class AsyncStream(AsyncIfcImpl):
self._writer.write(self.tx_fifo.get())
def __timeout(self) -> int:
if self.timeout_cb is callable:
return self.timeout_cb
if self.timeout_cb:
return self.timeout_cb()
return 360
async def loop(self) -> Self:
@@ -273,7 +262,7 @@ class AsyncStream(AsyncIfcImpl):
self.proc_start = time.time()
self.rx_fifo += data
wait = self.rx_fifo() # call read in parent class
if wait > 0:
if wait and wait > 0:
await asyncio.sleep(wait)
else:
raise RuntimeError("Peer closed.")

View File

@@ -18,10 +18,11 @@ class ByteFifo:
self.__buf.extend(data)
return self
def __call__(self) -> None:
def __call__(self):
'''triggers the observer'''
if callable(self.__trigger_cb):
return self.__trigger_cb()
return None
def get(self, size: int = None) -> bytearray:
'''removes size numbers of byte and return them'''