introduce ifc with FIFOs

This commit is contained in:
Stefan Allius
2024-09-22 10:28:36 +02:00
parent 39540678fb
commit f216c2434e
3 changed files with 107 additions and 0 deletions

11
app/src/async_ifc.py Normal file
View File

@@ -0,0 +1,11 @@
if __name__ == "app.src.async_ifc":
from app.src.byte_fifo import ByteFifo
else: # pragma: no cover
from byte_fifo import ByteFifo
class AsyncIfc():
def __init__(self):
self.read = ByteFifo()
self.write = ByteFifo()
self.forward = ByteFifo()

53
app/src/byte_fifo.py Normal file
View File

@@ -0,0 +1,53 @@
if __name__ == "app.src.byte_fifo":
from app.src.messages import hex_dump_str, hex_dump_memory
else: # pragma: no cover
from messages import hex_dump_str, hex_dump_memory
class ByteFifo:
""" a byte FIFO buffer with trigger callback """
def __init__(self):
self.__buf = bytearray()
self.__trigger_cb = None
def reg_trigger(self, cb) -> None:
self.__trigger_cb = cb
def __iadd__(self, data):
self.__buf.extend(data)
return self
def __call__(self) -> None:
'''triggers the observer'''
if callable(self.__trigger_cb):
return self.__trigger_cb()
def get(self, size: int = None) -> bytearray:
'''removes size numbers of byte and return them'''
if not size:
data = self.__buf
self.clear()
else:
data = self.__buf[:size]
# The fast delete syntax
self.__buf[:size] = b''
return data
def peek(self, size: int = None) -> bytearray:
'''returns size numbers of byte without removing them'''
if not size:
return self.__buf
return self.__buf[:size]
def clear(self):
self.__buf = bytearray()
def __len__(self) -> int:
return len(self.__buf)
def __str__(self) -> str:
return hex_dump_str(self.__buf, self.__len__())
def logging(self, level, info):
hex_dump_memory(level, info, self.__buf, self.__len__())

View File

@@ -0,0 +1,43 @@
# test_with_pytest.py
from app.src.byte_fifo import ByteFifo
def test_fifo():
read = ByteFifo()
assert 0 == len(read)
read += b'12'
assert 2 == len(read)
read += bytearray("34", encoding='UTF8')
assert 4 == len(read)
assert b'12' == read.peek(2)
assert 4 == len(read)
assert b'1234' == read.peek()
assert 4 == len(read)
assert b'12' == read.get(2)
assert 2 == len(read)
assert b'34' == read.get()
assert 0 == len(read)
def test_fifo_fmt():
read = ByteFifo()
read += b'1234'
assert b'1234' == read.peek()
assert " 0000 | 31 32 33 34 | 1234" == f'{read}'
def test_fifo_observer():
read = ByteFifo()
def _read():
assert b'1234' == read.get(4)
read += b'12'
assert 2 == len(read)
read()
read.reg_trigger(_read)
read += b'34'
assert 4 == len(read)
read()
assert 0 == len(read)
assert b'' == read.peek(2)
assert b'' == read.get(2)
assert 0 == len(read)