diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f9ef4d..932a10d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [unreleased] +- detect dead connections [#100](https://github.com/s-allius/tsun-gen3-proxy/issues/100) +- improve connection logging wirt a unique connection id - Add healthcheck, readiness and liveness checks [#91](https://github.com/s-allius/tsun-gen3-proxy/issues/91) - MODBUS close handler releases internal resource [#93](https://github.com/s-allius/tsun-gen3-proxy/issues/93) - add exception handling for message forwarding [#94](https://github.com/s-allius/tsun-gen3-proxy/issues/94) diff --git a/app/src/async_stream.py b/app/src/async_stream.py index 1b55ade..e829fdd 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -1,15 +1,22 @@ +import asyncio import logging import traceback import time from asyncio import StreamReader, StreamWriter from messages import hex_dump_memory, State from typing import Self +from itertools import count import gc logger = logging.getLogger('conn') class AsyncStream(): + _ids = count(0) + MAX_PROC_TIME = 2 + '''maximum processing time for a received msg in sec''' + MAX_IDLE_TIME = 400 + '''maximum time without a received msg in sec''' def __init__(self, reader: StreamReader, writer: StreamWriter, addr) -> None: @@ -19,22 +26,26 @@ class AsyncStream(): self.addr = addr self.r_addr = '' self.l_addr = '' + self.conn_no = next(self._ids) self.proc_start = None # start processing start timestamp self.proc_max = 0 async def server_loop(self, addr: str) -> None: '''Loop for receiving messages from the inverter (server-side)''' - logging.info(f'[{self.node_id}] Accept connection from {addr}') + logger.info(f'[{self.node_id}:{self.conn_no}] ' + f'Accept connection from {addr}') self.inc_counter('Inverter_Cnt') await self.loop() self.dec_counter('Inverter_Cnt') - logging.info(f'[{self.node_id}] Server loop stopped for' - f' r{self.r_addr}') + logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for' + f' r{self.r_addr}') # if the server connection closes, we also have to disconnect # the connection to te TSUN cloud if self.remoteStream: - logging.debug("disconnect client connection") + logger.info(f'[{self.node_id}:{self.conn_no}] disc client ' + f'connection: [{self.remoteStream.node_id}:' + f'{self.remoteStream.conn_no}]') await self.remoteStream.disc() try: await self._async_publ_mqtt_proxy_stat('proxy') @@ -44,8 +55,9 @@ class AsyncStream(): async def client_loop(self, addr: str) -> None: '''Loop for receiving messages from the TSUN cloud (client-side)''' clientStream = await self.remoteStream.loop() - logging.info(f'[{self.node_id}] Client loop stopped for' - f' l{clientStream.l_addr}') + logger.info(f'[{clientStream.node_id}:{clientStream.conn_no}] ' + 'Client loop stopped for' + f' l{clientStream.l_addr}') # if the client connection closes, we don't touch the server # connection. Instead we erase the client connection stream, @@ -73,22 +85,31 @@ class AsyncStream(): self.proc_max = proc self.proc_start = None - await self.__async_read() + await asyncio.wait_for(self.__async_read(), self.MAX_IDLE_TIME) if self.unique_id: await self.async_write() await self.__async_forward() await self.async_publ_mqtt() + except asyncio.TimeoutError: + logger.warning(f'[{self.node_id}:{self.conn_no}] Dead ' + f'connection timeout for {self.l_addr}') + await self.disc() + self.close() + return self + except OSError as error: - logger.error(f'[{self.node_id}] {error} for l{self.l_addr} | ' + logger.error(f'[{self.node_id}:{self.conn_no}] ' + f'{error} for l{self.l_addr} | ' f'r{self.r_addr}') await self.disc() self.close() return self except RuntimeError as error: - logger.info(f"[{self.node_id}] {error} for {self.l_addr}") + logger.info(f'[{self.node_id}:{self.conn_no}] ' + f'{error} for {self.l_addr}') await self.disc() self.close() return self @@ -131,8 +152,9 @@ class AsyncStream(): elapsed = 0 if self.proc_start is not None: elapsed = time.time() - self.proc_start - if self.state == State.closed or elapsed > 1: - logging.debug(f'[{self.node_id}:{type(self).__name__}]' + if self.state == State.closed or elapsed > self.MAX_PROC_TIME: + logging.debug(f'[{self.node_id}:{self.conn_no}:' + f'{type(self).__name__}]' f' act:{round(1000*elapsed)}ms' f' max:{round(1000*self.proc_max)}ms') logging.debug(f'Healthy()) refs: {gc.get_referrers(self)}') @@ -176,7 +198,7 @@ class AsyncStream(): if self.remoteStream: rmt = self.remoteStream self.remoteStream = None - logger.error(f'[{rmt.node_id}] Fwd: {error} for ' + logger.error(f'[{rmt.node_id}:{rmt.conn_no}] Fwd: {error} for ' f'l{rmt.l_addr} | r{rmt.r_addr}') await rmt.disc() rmt.close() @@ -185,7 +207,8 @@ class AsyncStream(): if self.remoteStream: rmt = self.remoteStream self.remoteStream = None - logger.info(f"[{rmt.node_id}] Fwd: {error} for {rmt.l_addr}") + logger.info(f'[{rmt.node_id}:{rmt.conn_no}] ' + f'Fwd: {error} for {rmt.l_addr}') await rmt.disc() rmt.close() diff --git a/app/src/gen3/inverter_g3.py b/app/src/gen3/inverter_g3.py index a98f6b5..a20dc77 100644 --- a/app/src/gen3/inverter_g3.py +++ b/app/src/gen3/inverter_g3.py @@ -57,11 +57,14 @@ class InverterG3(Inverter, ConnectionG3): addr = (host, port) try: - logging.info(f'[{self.node_id}] Connected to {addr}') + logging.info(f'[{self.node_id}] Connect to {addr}') connect = asyncio.open_connection(host, port) reader, writer = await connect self.remoteStream = ConnectionG3(reader, writer, addr, self, False, self.id_str) + logging.info(f'[{self.remoteStream.node_id}:' + f'{self.remoteStream.conn_no}] ' + f'Connected to {addr}') asyncio.create_task(self.client_loop(addr)) except (ConnectionRefusedError, TimeoutError) as error: diff --git a/app/src/gen3plus/inverter_g3p.py b/app/src/gen3plus/inverter_g3p.py index d69273c..74c9b6b 100644 --- a/app/src/gen3plus/inverter_g3p.py +++ b/app/src/gen3plus/inverter_g3p.py @@ -57,11 +57,14 @@ class InverterG3P(Inverter, ConnectionG3P): addr = (host, port) try: - logging.info(f'[{self.node_id}] Connected to {addr}') + logging.info(f'[{self.node_id}] Connect to {addr}') connect = asyncio.open_connection(host, port) reader, writer = await connect self.remoteStream = ConnectionG3P(reader, writer, addr, self, False) + logging.info(f'[{self.remoteStream.node_id}:' + f'{self.remoteStream.conn_no}] ' + f'Connected to {addr}') asyncio.create_task(self.client_loop(addr)) except (ConnectionRefusedError, TimeoutError) as error: