Files
tsun-gen3-proxy/app/tests/test_byte_fifo.py
Stefan Allius 62ea2a9e6f Refactoring async stream (#194)
* GEN3: Invalid Contact Info Msg
Fixes #191

* introduce ifc with FIFOs

* add object factory

* use AsyncIfc class with FIFO

* declare more methods as classmethods

* - refactoring

- remove _forward_buffer
- make async_write private

* remove _forward_buffer

* refactoring

* avoid mqtt handling for invalid serial numbers

* add two more callbacks

* FIX update_header_cb handling

* split AsyncStream in two classes

* split ConnectionG3(P) in server and client class

* update class diagramm

* refactor server creation

* remove duplicated imports

* reduce code duplication

* move StremPtr instances into Inverter class

* resolution of connection classes

- remove ConnectionG3Client
- remove ConnectionG3Server
- remove ConnectionG3PClient
- remove ConnectionG3PServer

* fix server connections

* fix client loop closing

* don't overwrite self.remote in constructor

* update class diagramm

* fixes

- fixes null pointer accesses
- initalize AsyncStreamClient with proper
  StreamPtr instance

* add close callback

* refactor close handling

* remove connection classes

* move more code into InverterBase class

* remove test_inverter_base.py

* add abstract inverter interface class

* initial commit

* fix sonar qube warnings

* rename class Inverter into Proxy

* fix typo

* move class InverterIfc into a separate file

* add more testcases

* use ProtocolIfc class

* add unit tests for AsyncStream class

* icrease test coverage

* reduce cognitive complexity

* increase test coverage

* increase tes coverage

* simplify heartbeat handler

* remove obsolete tx_get method

* add more unittests

* update changelog

* remove __del__ method for proper gc runs

* check releasing of ModbusConn instances

* call garbage collector to release unreachable objs

* decrease ref counter after the with block
2024-10-13 16:07:01 +02:00

44 lines
1023 B
Python

# test_with_pytest.py
from app.src.byte_fifo import ByteFifo
def test_fifo():
read = ByteFifo()
assert 0 == len(read)
read += b'12'
assert 2 == len(read)
read += bytearray("34", encoding='UTF8')
assert 4 == len(read)
assert b'12' == read.peek(2)
assert 4 == len(read)
assert b'1234' == read.peek()
assert 4 == len(read)
assert b'12' == read.get(2)
assert 2 == len(read)
assert b'34' == read.get()
assert 0 == len(read)
def test_fifo_fmt():
read = ByteFifo()
read += b'1234'
assert b'1234' == read.peek()
assert " 0000 | 31 32 33 34 | 1234" == f'{read}'
def test_fifo_observer():
read = ByteFifo()
def _read():
assert b'1234' == read.get(4)
read += b'12'
assert 2 == len(read)
read()
read.reg_trigger(_read)
read += b'34'
assert 4 == len(read)
read()
assert 0 == len(read)
assert b'' == read.peek(2)
assert b'' == read.get(2)
assert 0 == len(read)