diff --git a/app/src/async_ifc.py b/app/src/async_ifc.py new file mode 100644 index 0000000..b0dde65 --- /dev/null +++ b/app/src/async_ifc.py @@ -0,0 +1,11 @@ +if __name__ == "app.src.async_ifc": + from app.src.byte_fifo import ByteFifo +else: # pragma: no cover + from byte_fifo import ByteFifo + + +class AsyncIfc(): + def __init__(self): + self.read = ByteFifo() + self.write = ByteFifo() + self.forward = ByteFifo() diff --git a/app/src/byte_fifo.py b/app/src/byte_fifo.py new file mode 100644 index 0000000..90eb8ba --- /dev/null +++ b/app/src/byte_fifo.py @@ -0,0 +1,53 @@ + +if __name__ == "app.src.byte_fifo": + from app.src.messages import hex_dump_str, hex_dump_memory +else: # pragma: no cover + from messages import hex_dump_str, hex_dump_memory + + +class ByteFifo: + """ a byte FIFO buffer with trigger callback """ + def __init__(self): + self.__buf = bytearray() + self.__trigger_cb = None + + def reg_trigger(self, cb) -> None: + self.__trigger_cb = cb + + def __iadd__(self, data): + self.__buf.extend(data) + return self + + def __call__(self) -> None: + '''triggers the observer''' + if callable(self.__trigger_cb): + return self.__trigger_cb() + + def get(self, size: int = None) -> bytearray: + '''removes size numbers of byte and return them''' + if not size: + data = self.__buf + self.clear() + else: + data = self.__buf[:size] + # The fast delete syntax + self.__buf[:size] = b'' + return data + + def peek(self, size: int = None) -> bytearray: + '''returns size numbers of byte without removing them''' + if not size: + return self.__buf + return self.__buf[:size] + + def clear(self): + self.__buf = bytearray() + + def __len__(self) -> int: + return len(self.__buf) + + def __str__(self) -> str: + return hex_dump_str(self.__buf, self.__len__()) + + def logging(self, level, info): + hex_dump_memory(level, info, self.__buf, self.__len__()) diff --git a/app/tests/test_byte_fifo.py b/app/tests/test_byte_fifo.py new file mode 100644 index 0000000..1544cc0 --- /dev/null +++ b/app/tests/test_byte_fifo.py @@ -0,0 +1,43 @@ +# test_with_pytest.py + +from app.src.byte_fifo import ByteFifo + +def test_fifo(): + read = ByteFifo() + assert 0 == len(read) + read += b'12' + assert 2 == len(read) + read += bytearray("34", encoding='UTF8') + assert 4 == len(read) + assert b'12' == read.peek(2) + assert 4 == len(read) + assert b'1234' == read.peek() + assert 4 == len(read) + assert b'12' == read.get(2) + assert 2 == len(read) + assert b'34' == read.get() + assert 0 == len(read) + +def test_fifo_fmt(): + read = ByteFifo() + read += b'1234' + assert b'1234' == read.peek() + assert " 0000 | 31 32 33 34 | 1234" == f'{read}' + +def test_fifo_observer(): + read = ByteFifo() + + def _read(): + assert b'1234' == read.get(4) + + read += b'12' + assert 2 == len(read) + read() + read.reg_trigger(_read) + read += b'34' + assert 4 == len(read) + read() + assert 0 == len(read) + assert b'' == read.peek(2) + assert b'' == read.get(2) + assert 0 == len(read)