* Code Cleanup (#158)


* print coverage report

* create sonar-project property file

* install all py dependencies in one step

* code cleanup

* reduce cognitive complexity

* do not build on *.yml changes

* optimise versionstring handling (#159)

- Reading the version string from the image updates
  it even if the image is re-pulled without re-deployment

* fix linter warning

* exclude *.pyi filese

* ignore some rules for tests

* cleanup (#160)

* Sonar qube 3 (#163)

fix SonarQube warnings in modbus.py

* Sonar qube 3 (#164)


* fix SonarQube warnings

* Sonar qube 3 (#165)

* cleanup

* Add support for TSUN Titan inverter
Fixes #161


* fix SonarQube warnings

* fix error

* rename field "config"

* SonarQube reads flake8 output

* don't stop on flake8 errors

* flake8 scan only app/src for SonarQube

* update flake8 run

* ignore flake8 C901

* cleanup

* fix linter warnings

* ignore changed *.yml files

* read sensor list solarman data packets

* catch 'No route to' error and log only in debug mode

* fix unit tests

* add sensor_list configuration

* adapt unit tests

* fix SonarQube warnings

* Sonar qube 3 (#166)

* add unittests for mqtt.py

* add mock

* move test requirements into a file

* fix unit tests

* fix formating

* initial version

* fix SonarQube warning

* Sonar qube 4 (#169)

* add unit test for inverter.py

* fix SonarQube warning

* Sonar qube 5 (#170)

* fix SonarLints warnings

* use random IP adresses for unit tests

* Docker: The description ist missing (#171)

Fixes #167

* S allius/issue167 (#172)

* cleanup

* Sonar qube 6 (#174)

* test class ModbusConn

* Sonar qube 3 (#178)

* add more unit tests

* GEN3: don't crash on overwritten msg in the receive buffer

* improve test coverage und reduce test delays

* reduce cognitive complexity

* fix merge

* fix merge conflikt

* fix merge conflict

* S allius/issue182 (#183)

* GEN3: After inverter firmware update the 'Unknown Msg Type' increases continuously
Fixes #182

* add support for Controller serial no and MAC

* test hardening

* GEN3: add support for new messages of version 3 firmwares

* bump libraries to latest versions

- bump aiomqtt to version 2.3.0
- bump aiohttp to version 3.10.5

* improve test coverage

* reduce cognective complexity

* fix target preview

* remove dubbled fixtures

* increase test coverage

* Update README.md (#185)

update badges

* S allius/issue186 (#187)

* Parse more values in Server Mode
Fixes #186

* read OUTPUT_COEFFICIENT and MAC_ADDR in SrvMode

* fix unit test

* increase test coverage

* S allius/issue186 (#188)

* increase test coverage

* update changelog

* add dokumentation

* change default config

* Update README.md (#189)

Config file is now foldable

* GEN3: Invalid Contact Info Msg (#192)

Fixes #191

* Refactoring async stream (#194)

* GEN3: Invalid Contact Info Msg
Fixes #191

* introduce ifc with FIFOs

* add object factory

* use AsyncIfc class with FIFO

* declare more methods as classmethods

* - refactoring

- remove _forward_buffer
- make async_write private

* remove _forward_buffer

* refactoring

* avoid mqtt handling for invalid serial numbers

* add two more callbacks

* FIX update_header_cb handling

* split AsyncStream in two classes

* split ConnectionG3(P) in server and client class

* update class diagramm

* refactor server creation

* remove duplicated imports

* reduce code duplication

* move StremPtr instances into Inverter class

* resolution of connection classes

- remove ConnectionG3Client
- remove ConnectionG3Server
- remove ConnectionG3PClient
- remove ConnectionG3PServer

* fix server connections

* fix client loop closing

* don't overwrite self.remote in constructor

* update class diagramm

* fixes

- fixes null pointer accesses
- initalize AsyncStreamClient with proper
  StreamPtr instance

* add close callback

* refactor close handling

* remove connection classes

* move more code into InverterBase class

* remove test_inverter_base.py

* add abstract inverter interface class

* initial commit

* fix sonar qube warnings

* rename class Inverter into Proxy

* fix typo

* move class InverterIfc into a separate file

* add more testcases

* use ProtocolIfc class

* add unit tests for AsyncStream class

* icrease test coverage

* reduce cognitive complexity

* increase test coverage

* increase tes coverage

* simplify heartbeat handler

* remove obsolete tx_get method

* add more unittests

* update changelog

* remove __del__ method for proper gc runs

* check releasing of ModbusConn instances

* call garbage collector to release unreachable objs

* decrease ref counter after the with block

* S allius/issue196 (#198)

* fix healthcheck

- on infrastructure with IPv6 support localhost
  might be resolved to an IPv6 adress. Since the
  proxy only support IPv4 for now, we replace
  localhost by 127.0.0.1, to fix this

* merge from main
This commit is contained in:
Stefan Allius
2024-10-13 18:12:10 +02:00
committed by GitHub
parent bfea38d9da
commit c956c13d13
39 changed files with 3299 additions and 1888 deletions

View File

@@ -7,17 +7,136 @@ from typing import Self
from itertools import count
if __name__ == "app.src.async_stream":
from app.src.messages import hex_dump_memory, State
from app.src.proxy import Proxy
from app.src.byte_fifo import ByteFifo
from app.src.async_ifc import AsyncIfc
from app.src.infos import Infos
else: # pragma: no cover
from messages import hex_dump_memory, State
from proxy import Proxy
from byte_fifo import ByteFifo
from async_ifc import AsyncIfc
from infos import Infos
import gc
logger = logging.getLogger('conn')
class AsyncStream():
class AsyncIfcImpl(AsyncIfc):
_ids = count(0)
def __init__(self) -> None:
logger.debug('AsyncIfcImpl.__init__')
self.fwd_fifo = ByteFifo()
self.tx_fifo = ByteFifo()
self.rx_fifo = ByteFifo()
self.conn_no = next(self._ids)
self.node_id = ''
self.timeout_cb = None
self.init_new_client_conn_cb = None
self.update_header_cb = None
def close(self):
self.timeout_cb = None
self.fwd_fifo.reg_trigger(None)
self.tx_fifo.reg_trigger(None)
self.rx_fifo.reg_trigger(None)
def set_node_id(self, value: str):
self.node_id = value
def get_conn_no(self):
return self.conn_no
def tx_add(self, data: bytearray):
''' add data to transmit queue'''
self.tx_fifo += data
def tx_flush(self):
''' send transmit queue and clears it'''
self.tx_fifo()
def tx_peek(self, size: int = None) -> bytearray:
'''returns size numbers of byte without removing them'''
return self.tx_fifo.peek(size)
def tx_log(self, level, info):
''' log the transmit queue'''
self.tx_fifo.logging(level, info)
def tx_clear(self):
''' clear transmit queue'''
self.tx_fifo.clear()
def tx_len(self):
''' get numner of bytes in the transmit queue'''
return len(self.tx_fifo)
def fwd_add(self, data: bytearray):
''' add data to forward queue'''
self.fwd_fifo += data
def fwd_log(self, level, info):
''' log the forward queue'''
self.fwd_fifo.logging(level, info)
def rx_get(self, size: int = None) -> bytearray:
'''removes size numbers of bytes and return them'''
return self.rx_fifo.get(size)
def rx_peek(self, size: int = None) -> bytearray:
'''returns size numbers of byte without removing them'''
return self.rx_fifo.peek(size)
def rx_log(self, level, info):
''' logs the receive queue'''
self.rx_fifo.logging(level, info)
def rx_clear(self):
''' clear receive queue'''
self.rx_fifo.clear()
def rx_len(self):
''' get numner of bytes in the receive queue'''
return len(self.rx_fifo)
def rx_set_cb(self, callback):
self.rx_fifo.reg_trigger(callback)
def prot_set_timeout_cb(self, callback):
self.timeout_cb = callback
def prot_set_init_new_client_conn_cb(self, callback):
self.init_new_client_conn_cb = callback
def prot_set_update_header_cb(self, callback):
self.update_header_cb = callback
class StreamPtr():
'''Descr StreamPtr'''
def __init__(self, _stream, _ifc=None):
self.stream = _stream
self.ifc = _ifc
@property
def ifc(self):
return self._ifc
@ifc.setter
def ifc(self, value):
self._ifc = value
@property
def stream(self):
return self._stream
@stream.setter
def stream(self, value):
self._stream = value
class AsyncStream(AsyncIfcImpl):
MAX_PROC_TIME = 2
'''maximum processing time for a received msg in sec'''
MAX_START_TIME = 400
@@ -28,95 +147,42 @@ class AsyncStream():
'''maximum default time without a received msg in sec'''
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr) -> None:
rstream: "StreamPtr") -> None:
AsyncIfcImpl.__init__(self)
logger.debug('AsyncStream.__init__')
self.reader = reader
self.writer = writer
self.addr = addr
self.r_addr = ''
self.l_addr = ''
self.conn_no = next(self._ids)
self.remote = rstream
self.tx_fifo.reg_trigger(self.__write_cb)
self._reader = reader
self._writer = writer
self.r_addr = writer.get_extra_info('peername')
self.l_addr = writer.get_extra_info('sockname')
self.proc_start = None # start processing start timestamp
self.proc_max = 0
self.async_publ_mqtt = None # will be set AsyncStreamServer only
def __write_cb(self):
self._writer.write(self.tx_fifo.get())
def __timeout(self) -> int:
if self.state == State.init or self.state == State.received:
to = self.MAX_START_TIME
elif self.state == State.up and \
self.server_side and self.modbus_polling:
to = self.MAX_INV_IDLE_TIME
else:
to = self.MAX_DEF_IDLE_TIME
return to
async def publish_outstanding_mqtt(self):
'''Publish all outstanding MQTT topics'''
try:
if self.unique_id:
await self.async_publ_mqtt()
await self._async_publ_mqtt_proxy_stat('proxy')
except Exception:
pass
async def server_loop(self, addr: str) -> None:
'''Loop for receiving messages from the inverter (server-side)'''
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'Accept connection from {addr}')
self.inc_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
await self.loop()
self.dec_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for'
f' r{self.r_addr}')
# if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud
if self.remote_stream:
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remote_stream.node_id}:'
f'{self.remote_stream.conn_no}]')
await self.remote_stream.disc()
async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
client_stream = await self.remote_stream.loop()
logger.info(f'[{client_stream.node_id}:{client_stream.conn_no}] '
'Client loop stopped for'
f' l{client_stream.l_addr}')
# if the client connection closes, we don't touch the server
# connection. Instead we erase the client connection stream,
# thus on the next received packet from the inverter, we can
# establish a new connection to the TSUN cloud
# erase backlink to inverter
client_stream.remote_stream = None
if self.remote_stream == client_stream:
# logging.debug(f'Client l{client_stream.l_addr} refs:'
# f' {gc.get_referrers(client_stream)}')
# than erase client connection
self.remote_stream = None
if self.timeout_cb:
return self.timeout_cb()
return 360
async def loop(self) -> Self:
"""Async loop handler for precessing all received messages"""
self.r_addr = self.writer.get_extra_info('peername')
self.l_addr = self.writer.get_extra_info('sockname')
self.proc_start = time.time()
while True:
try:
proc = time.time() - self.proc_start
if proc > self.proc_max:
self.proc_max = proc
self.proc_start = None
self.__calc_proc_time()
dead_conn_to = self.__timeout()
await asyncio.wait_for(self.__async_read(),
dead_conn_to)
if self.unique_id:
await self.async_write()
await self.__async_forward()
await self.__async_write()
await self.__async_forward()
if self.async_publ_mqtt:
await self.async_publ_mqtt()
except asyncio.TimeoutError:
@@ -124,7 +190,6 @@ class AsyncStream():
f'connection timeout ({dead_conn_to}s) '
f'for {self.l_addr}')
await self.disc()
self.close()
return self
except OSError as error:
@@ -132,56 +197,54 @@ class AsyncStream():
f'{error} for l{self.l_addr} | '
f'r{self.r_addr}')
await self.disc()
self.close()
return self
except RuntimeError as error:
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'{error} for {self.l_addr}')
await self.disc()
self.close()
return self
except Exception:
self.inc_counter('SW_Exception')
Infos.inc_counter('SW_Exception')
logger.error(
f"Exception for {self.addr}:\n"
f"Exception for {self.r_addr}:\n"
f"{traceback.format_exc()}")
await asyncio.sleep(0) # be cooperative to other task
async def async_write(self, headline: str = 'Transmit to ') -> None:
"""Async write handler to transmit the send_buffer"""
if self._send_buffer:
hex_dump_memory(logging.INFO, f'{headline}{self.addr}:',
self._send_buffer, len(self._send_buffer))
self.writer.write(self._send_buffer)
await self.writer.drain()
self._send_buffer = bytearray(0) # self._send_buffer[sent:]
def __calc_proc_time(self):
if self.proc_start:
proc = time.time() - self.proc_start
if proc > self.proc_max:
self.proc_max = proc
self.proc_start = None
async def disc(self) -> None:
"""Async disc handler for graceful disconnect"""
if self.writer.is_closing():
self.remote = None
if self._writer.is_closing():
return
logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
self.writer.close()
await self.writer.wait_closed()
self._writer.close()
await self._writer.wait_closed()
def close(self) -> None:
logging.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
"""close handler for a no waiting disconnect
hint: must be called before releasing the connection instance
"""
self.reader.feed_eof() # abort awaited read
if self.writer.is_closing():
super().close()
self._reader.feed_eof() # abort awaited read
if self._writer.is_closing():
return
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
self.writer.close()
self._writer.close()
def healthy(self) -> bool:
elapsed = 0
if self.proc_start is not None:
elapsed = time.time() - self.proc_start
if self.state == State.closed or elapsed > self.MAX_PROC_TIME:
if elapsed > self.MAX_PROC_TIME:
logging.debug(f'[{self.node_id}:{self.conn_no}:'
f'{type(self).__name__}]'
f' act:{round(1000*elapsed)}ms'
@@ -194,61 +257,139 @@ class AsyncStream():
'''
async def __async_read(self) -> None:
"""Async read handler to read received data from TCP stream"""
data = await self.reader.read(4096)
data = await self._reader.read(4096)
if data:
self.proc_start = time.time()
self._recv_buffer += data
wait = self.read() # call read in parent class
if wait > 0:
self.rx_fifo += data
wait = self.rx_fifo() # call read in parent class
if wait and wait > 0:
await asyncio.sleep(wait)
else:
raise RuntimeError("Peer closed.")
async def __async_write(self, headline: str = 'Transmit to ') -> None:
"""Async write handler to transmit the send_buffer"""
if len(self.tx_fifo) > 0:
self.tx_fifo.logging(logging.INFO, f'{headline}{self.r_addr}:')
self._writer.write(self.tx_fifo.get())
await self._writer.drain()
async def __async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""
if not self._forward_buffer:
if len(self.fwd_fifo) == 0:
return
try:
if not self.remote_stream:
await self.async_create_remote()
if self.remote_stream:
if self.remote_stream._init_new_client_conn():
await self.remote_stream.async_write()
if self.remote_stream:
self.remote_stream._update_header(self._forward_buffer)
hex_dump_memory(logging.INFO,
f'Forward to {self.remote_stream.addr}:',
self._forward_buffer,
len(self._forward_buffer))
self.remote_stream.writer.write(self._forward_buffer)
await self.remote_stream.writer.drain()
self._forward_buffer = bytearray(0)
await self._async_forward()
except OSError as error:
if self.remote_stream:
rmt = self.remote_stream
self.remote_stream = None
logger.error(f'[{rmt.node_id}:{rmt.conn_no}] Fwd: {error} for '
f'l{rmt.l_addr} | r{rmt.r_addr}')
await rmt.disc()
rmt.close()
if self.remote.stream:
rmt = self.remote
logger.error(f'[{rmt.stream.node_id}:{rmt.stream.conn_no}] '
f'Fwd: {error} for '
f'l{rmt.ifc.l_addr} | r{rmt.ifc.r_addr}')
await rmt.ifc.disc()
if rmt.ifc.close_cb:
rmt.ifc.close_cb()
except RuntimeError as error:
if self.remote_stream:
rmt = self.remote_stream
self.remote_stream = None
logger.info(f'[{rmt.node_id}:{rmt.conn_no}] '
f'Fwd: {error} for {rmt.l_addr}')
await rmt.disc()
rmt.close()
if self.remote.stream:
rmt = self.remote
logger.info(f'[{rmt.stream.node_id}:{rmt.stream.conn_no}] '
f'Fwd: {error} for {rmt.ifc.l_addr}')
await rmt.ifc.disc()
if rmt.ifc.close_cb:
rmt.ifc.close_cb()
except Exception:
self.inc_counter('SW_Exception')
Infos.inc_counter('SW_Exception')
logger.error(
f"Fwd Exception for {self.addr}:\n"
f"Fwd Exception for {self.r_addr}:\n"
f"{traceback.format_exc()}")
def __del__(self):
logger.debug(
f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}")
class AsyncStreamServer(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter,
async_publ_mqtt, create_remote,
rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, rstream)
self.create_remote = create_remote
self.async_publ_mqtt = async_publ_mqtt
def close(self) -> None:
logging.debug('AsyncStreamServer.close()')
self.create_remote = None
self.async_publ_mqtt = None
super().close()
async def server_loop(self) -> None:
'''Loop for receiving messages from the inverter (server-side)'''
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'Accept connection from {self.r_addr}')
Infos.inc_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
await self.loop()
Infos.dec_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for'
f' r{self.r_addr}')
# if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud
if self.remote and self.remote.stream:
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remote.ifc.node_id}:'
f'{self.remote.ifc.conn_no}]')
await self.remote.ifc.disc()
async def _async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""
if not self.remote.stream:
await self.create_remote()
if self.remote.stream and \
self.remote.ifc.init_new_client_conn_cb():
await self.remote.ifc._AsyncStream__async_write()
if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.r_addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain()
async def publish_outstanding_mqtt(self):
'''Publish all outstanding MQTT topics'''
try:
await self.async_publ_mqtt()
await Proxy._async_publ_mqtt_proxy_stat('proxy')
except Exception:
pass
class AsyncStreamClient(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter,
rstream: "StreamPtr", close_cb) -> None:
AsyncStream.__init__(self, reader, writer, rstream)
self.close_cb = close_cb
def close(self) -> None:
logging.debug('AsyncStreamClient.close()')
self.close_cb = None
super().close()
async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
await self.loop()
logger.info(f'[{self.node_id}:{self.conn_no}] '
'Client loop stopped for'
f' l{self.l_addr}')
if self.close_cb:
self.close_cb()
async def _async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""
if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.r_addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain()