S allius/issue100 (#101)

* detect dead connections

- disconnect connection on Msg receive timeout
- improve connection trace (add connection id)

* update changelog
This commit is contained in:
Stefan Allius
2024-06-17 23:10:54 +02:00
committed by GitHub
parent 9ff1453922
commit 2d4679a361
4 changed files with 46 additions and 15 deletions

View File

@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [unreleased] ## [unreleased]
- detect dead connections [#100](https://github.com/s-allius/tsun-gen3-proxy/issues/100)
- improve connection logging wirt a unique connection id
- Add healthcheck, readiness and liveness checks [#91](https://github.com/s-allius/tsun-gen3-proxy/issues/91) - Add healthcheck, readiness and liveness checks [#91](https://github.com/s-allius/tsun-gen3-proxy/issues/91)
- MODBUS close handler releases internal resource [#93](https://github.com/s-allius/tsun-gen3-proxy/issues/93) - MODBUS close handler releases internal resource [#93](https://github.com/s-allius/tsun-gen3-proxy/issues/93)
- add exception handling for message forwarding [#94](https://github.com/s-allius/tsun-gen3-proxy/issues/94) - add exception handling for message forwarding [#94](https://github.com/s-allius/tsun-gen3-proxy/issues/94)

View File

@@ -1,15 +1,22 @@
import asyncio
import logging import logging
import traceback import traceback
import time import time
from asyncio import StreamReader, StreamWriter from asyncio import StreamReader, StreamWriter
from messages import hex_dump_memory, State from messages import hex_dump_memory, State
from typing import Self from typing import Self
from itertools import count
import gc import gc
logger = logging.getLogger('conn') logger = logging.getLogger('conn')
class AsyncStream(): class AsyncStream():
_ids = count(0)
MAX_PROC_TIME = 2
'''maximum processing time for a received msg in sec'''
MAX_IDLE_TIME = 400
'''maximum time without a received msg in sec'''
def __init__(self, reader: StreamReader, writer: StreamWriter, def __init__(self, reader: StreamReader, writer: StreamWriter,
addr) -> None: addr) -> None:
@@ -19,22 +26,26 @@ class AsyncStream():
self.addr = addr self.addr = addr
self.r_addr = '' self.r_addr = ''
self.l_addr = '' self.l_addr = ''
self.conn_no = next(self._ids)
self.proc_start = None # start processing start timestamp self.proc_start = None # start processing start timestamp
self.proc_max = 0 self.proc_max = 0
async def server_loop(self, addr: str) -> None: async def server_loop(self, addr: str) -> None:
'''Loop for receiving messages from the inverter (server-side)''' '''Loop for receiving messages from the inverter (server-side)'''
logging.info(f'[{self.node_id}] Accept connection from {addr}') logger.info(f'[{self.node_id}:{self.conn_no}] '
f'Accept connection from {addr}')
self.inc_counter('Inverter_Cnt') self.inc_counter('Inverter_Cnt')
await self.loop() await self.loop()
self.dec_counter('Inverter_Cnt') self.dec_counter('Inverter_Cnt')
logging.info(f'[{self.node_id}] Server loop stopped for' logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for'
f' r{self.r_addr}') f' r{self.r_addr}')
# if the server connection closes, we also have to disconnect # if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud # the connection to te TSUN cloud
if self.remoteStream: if self.remoteStream:
logging.debug("disconnect client connection") logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remoteStream.node_id}:'
f'{self.remoteStream.conn_no}]')
await self.remoteStream.disc() await self.remoteStream.disc()
try: try:
await self._async_publ_mqtt_proxy_stat('proxy') await self._async_publ_mqtt_proxy_stat('proxy')
@@ -44,8 +55,9 @@ class AsyncStream():
async def client_loop(self, addr: str) -> None: async def client_loop(self, addr: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)''' '''Loop for receiving messages from the TSUN cloud (client-side)'''
clientStream = await self.remoteStream.loop() clientStream = await self.remoteStream.loop()
logging.info(f'[{self.node_id}] Client loop stopped for' logger.info(f'[{clientStream.node_id}:{clientStream.conn_no}] '
f' l{clientStream.l_addr}') 'Client loop stopped for'
f' l{clientStream.l_addr}')
# if the client connection closes, we don't touch the server # if the client connection closes, we don't touch the server
# connection. Instead we erase the client connection stream, # connection. Instead we erase the client connection stream,
@@ -73,22 +85,31 @@ class AsyncStream():
self.proc_max = proc self.proc_max = proc
self.proc_start = None self.proc_start = None
await self.__async_read() await asyncio.wait_for(self.__async_read(), self.MAX_IDLE_TIME)
if self.unique_id: if self.unique_id:
await self.async_write() await self.async_write()
await self.__async_forward() await self.__async_forward()
await self.async_publ_mqtt() await self.async_publ_mqtt()
except asyncio.TimeoutError:
logger.warning(f'[{self.node_id}:{self.conn_no}] Dead '
f'connection timeout for {self.l_addr}')
await self.disc()
self.close()
return self
except OSError as error: except OSError as error:
logger.error(f'[{self.node_id}] {error} for l{self.l_addr} | ' logger.error(f'[{self.node_id}:{self.conn_no}] '
f'{error} for l{self.l_addr} | '
f'r{self.r_addr}') f'r{self.r_addr}')
await self.disc() await self.disc()
self.close() self.close()
return self return self
except RuntimeError as error: except RuntimeError as error:
logger.info(f"[{self.node_id}] {error} for {self.l_addr}") logger.info(f'[{self.node_id}:{self.conn_no}] '
f'{error} for {self.l_addr}')
await self.disc() await self.disc()
self.close() self.close()
return self return self
@@ -131,8 +152,9 @@ class AsyncStream():
elapsed = 0 elapsed = 0
if self.proc_start is not None: if self.proc_start is not None:
elapsed = time.time() - self.proc_start elapsed = time.time() - self.proc_start
if self.state == State.closed or elapsed > 1: if self.state == State.closed or elapsed > self.MAX_PROC_TIME:
logging.debug(f'[{self.node_id}:{type(self).__name__}]' logging.debug(f'[{self.node_id}:{self.conn_no}:'
f'{type(self).__name__}]'
f' act:{round(1000*elapsed)}ms' f' act:{round(1000*elapsed)}ms'
f' max:{round(1000*self.proc_max)}ms') f' max:{round(1000*self.proc_max)}ms')
logging.debug(f'Healthy()) refs: {gc.get_referrers(self)}') logging.debug(f'Healthy()) refs: {gc.get_referrers(self)}')
@@ -176,7 +198,7 @@ class AsyncStream():
if self.remoteStream: if self.remoteStream:
rmt = self.remoteStream rmt = self.remoteStream
self.remoteStream = None self.remoteStream = None
logger.error(f'[{rmt.node_id}] Fwd: {error} for ' logger.error(f'[{rmt.node_id}:{rmt.conn_no}] Fwd: {error} for '
f'l{rmt.l_addr} | r{rmt.r_addr}') f'l{rmt.l_addr} | r{rmt.r_addr}')
await rmt.disc() await rmt.disc()
rmt.close() rmt.close()
@@ -185,7 +207,8 @@ class AsyncStream():
if self.remoteStream: if self.remoteStream:
rmt = self.remoteStream rmt = self.remoteStream
self.remoteStream = None self.remoteStream = None
logger.info(f"[{rmt.node_id}] Fwd: {error} for {rmt.l_addr}") logger.info(f'[{rmt.node_id}:{rmt.conn_no}] '
f'Fwd: {error} for {rmt.l_addr}')
await rmt.disc() await rmt.disc()
rmt.close() rmt.close()

View File

@@ -57,11 +57,14 @@ class InverterG3(Inverter, ConnectionG3):
addr = (host, port) addr = (host, port)
try: try:
logging.info(f'[{self.node_id}] Connected to {addr}') logging.info(f'[{self.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port) connect = asyncio.open_connection(host, port)
reader, writer = await connect reader, writer = await connect
self.remoteStream = ConnectionG3(reader, writer, addr, self, self.remoteStream = ConnectionG3(reader, writer, addr, self,
False, self.id_str) False, self.id_str)
logging.info(f'[{self.remoteStream.node_id}:'
f'{self.remoteStream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self.client_loop(addr)) asyncio.create_task(self.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error: except (ConnectionRefusedError, TimeoutError) as error:

View File

@@ -57,11 +57,14 @@ class InverterG3P(Inverter, ConnectionG3P):
addr = (host, port) addr = (host, port)
try: try:
logging.info(f'[{self.node_id}] Connected to {addr}') logging.info(f'[{self.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port) connect = asyncio.open_connection(host, port)
reader, writer = await connect reader, writer = await connect
self.remoteStream = ConnectionG3P(reader, writer, addr, self, self.remoteStream = ConnectionG3P(reader, writer, addr, self,
False) False)
logging.info(f'[{self.remoteStream.node_id}:'
f'{self.remoteStream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self.client_loop(addr)) asyncio.create_task(self.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error: except (ConnectionRefusedError, TimeoutError) as error: