From b6431f844852cdb4d96758311f529305511e94f8 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Thu, 9 Nov 2023 20:03:09 +0100 Subject: [PATCH] improve client conn disconection - check for race cond. on closing and establishing client connections - improve connection trace --- CHANGELOG.md | 1 + app/src/async_stream.py | 21 +++++++++++++-------- app/src/inverter.py | 19 +++++++++++++------ 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b774e3a..515067b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - add int64 data type to info parser - allow multiple calls to Message.close() +- check for race cond. on closing and establishing client connections ## [0.5.1] - 2023-11-05 diff --git a/app/src/async_stream.py b/app/src/async_stream.py index 96a0ff5..98c8678 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -16,11 +16,15 @@ class AsyncStream(Message): self.writer = writer self.remoteStream = remote_stream self.addr = addr + self.r_addr = '' + self.l_addr = '' ''' Our puplic methods ''' - async def loop(self) -> None: + async def loop(self): + self.r_addr = self.writer.get_extra_info('peername') + self.l_addr = self.writer.get_extra_info('sockname') while True: try: @@ -35,26 +39,27 @@ class AsyncStream(Message): ConnectionAbortedError, BrokenPipeError, RuntimeError) as error: - logger.warning(f'In loop for {self.addr}: {error}') + logger.warning(f'In loop for l{self.l_addr} | ' + f'r{self.r_addr}: {error}') self.close() - return + return self except Exception: logger.error( f"Exception for {self.addr}:\n" f"{traceback.format_exc()}") self.close() - return + return self def disc(self) -> None: - logger.debug(f'in AsyncStream.disc() {self.addr}') + logger.debug(f'in AsyncStream.disc() l{self.l_addr} | r{self.r_addr}') self.writer.close() def close(self): - logger.debug(f'in AsyncStream.close() {self.addr}') + logger.debug(f'in AsyncStream.close() l{self.l_addr} | r{self.r_addr}') self.writer.close() super().close() # call close handler in the parent class -# logger.info (f'AsyncStream refs: {gc.get_referrers(self)}') +# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}') ''' Our private methods @@ -96,4 +101,4 @@ class AsyncStream(Message): pass def __del__(self): - logging.debug(f"AsyncStream.__del__ {self.addr}") + logging.debug(f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}") diff --git a/app/src/inverter.py b/app/src/inverter.py index d4a8d85..01b67e0 100644 --- a/app/src/inverter.py +++ b/app/src/inverter.py @@ -107,7 +107,7 @@ class Inverter(AsyncStream): self.inc_counter('Inverter_Cnt') await self.loop() self.dec_counter('Inverter_Cnt') - logging.info(f'Server loop stopped for {addr}') + logging.info(f'Server loop stopped for r{self.r_addr}') # if the server connection closes, we also have to disconnect # the connection to te TSUN cloud @@ -121,15 +121,22 @@ class Inverter(AsyncStream): async def client_loop(self, addr): '''Loop for receiving messages from the TSUN cloud (client-side)''' - await self.remoteStream.loop() - logging.info(f'Client loop stopped for {addr}') + clientStream = await self.remoteStream.loop() + logging.info(f'Client loop stopped for l{clientStream.l_addr}') # if the client connection closes, we don't touch the server # connection. Instead we erase the client connection stream, # thus on the next received packet from the inverter, we can # establish a new connection to the TSUN cloud - self.remoteStream.remoteStream = None # erase backlink to inverter - self.remoteStream = None # than erase client connection + + # erase backlink to inverter + clientStream.remoteStream = None + + if self.remoteStream == clientStream: + # logging.debug(f'Client l{clientStream.l_addr} refs:' + # f' {gc.get_referrers(clientStream)}') + # than erase client connection + self.remoteStream = None async def async_create_remote(self) -> None: '''Establish a client connection to the TSUN cloud''' @@ -197,7 +204,7 @@ class Inverter(AsyncStream): f"/{node_id}{id}/config", data_json) def close(self) -> None: - logging.debug(f'Inverter.close() {self.addr}') + logging.debug(f'Inverter.close() l{self.l_addr} | r{self.r_addr}') super().close() # call close handler in the parent class # logger.debug (f'Inverter refs: {gc.get_referrers(self)}')