From 91873d0c340fa7e70491a36a50788124d4b480ba Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Wed, 8 May 2024 23:52:31 +0200 Subject: [PATCH] await wait_closed() on disconnects --- app/src/async_stream.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/app/src/async_stream.py b/app/src/async_stream.py index 28873e8..ac6c54f 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -1,5 +1,6 @@ import logging import traceback +import asyncio from messages import hex_dump_memory logger = logging.getLogger('conn') @@ -27,7 +28,7 @@ class AsyncStream(): # the connection to te TSUN cloud if self.remoteStream: logging.debug("disconnect client connection") - self.remoteStream.disc() + await self.remoteStream.disc() try: await self._async_publ_mqtt_proxy_stat('proxy') except Exception: @@ -58,6 +59,7 @@ class AsyncStream(): while True: try: + # await asyncio.wait_for(self.__async_read(), 0.3) await self.__async_read() if self.unique_id: @@ -65,25 +67,32 @@ class AsyncStream(): await self.__async_forward() await self.async_publ_mqtt() + except asyncio.TimeoutError: + pass + except (ConnectionResetError, ConnectionAbortedError, - BrokenPipeError, - RuntimeError) as error: - logger.warning(f'In loop for l{self.l_addr} | ' - f'r{self.r_addr}: {error}') - self.close() + BrokenPipeError) as error: + logger.error(f'{error} for l{self.l_addr} | ' + f'r{self.r_addr}') + await self.disc() return self + + except RuntimeError as error: + logger.warning(f"{error} for {self.l_addr}") + await self.disc() + return self + except Exception: self.inc_counter('SW_Exception') logger.error( f"Exception for {self.addr}:\n" f"{traceback.format_exc()}") - self.close() - return self - def disc(self) -> None: + async def disc(self) -> None: logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}') self.writer.close() + await self.writer.wait_closed() def close(self): logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')