improve client conn disconection
- check for race cond. on closing and establishing client connections - improve connection trace
This commit is contained in:
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
- add int64 data type to info parser
|
- add int64 data type to info parser
|
||||||
- allow multiple calls to Message.close()
|
- allow multiple calls to Message.close()
|
||||||
|
- check for race cond. on closing and establishing client connections
|
||||||
|
|
||||||
## [0.5.1] - 2023-11-05
|
## [0.5.1] - 2023-11-05
|
||||||
|
|
||||||
|
|||||||
@@ -16,11 +16,15 @@ class AsyncStream(Message):
|
|||||||
self.writer = writer
|
self.writer = writer
|
||||||
self.remoteStream = remote_stream
|
self.remoteStream = remote_stream
|
||||||
self.addr = addr
|
self.addr = addr
|
||||||
|
self.r_addr = ''
|
||||||
|
self.l_addr = ''
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Our puplic methods
|
Our puplic methods
|
||||||
'''
|
'''
|
||||||
async def loop(self) -> None:
|
async def loop(self):
|
||||||
|
self.r_addr = self.writer.get_extra_info('peername')
|
||||||
|
self.l_addr = self.writer.get_extra_info('sockname')
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -35,26 +39,27 @@ class AsyncStream(Message):
|
|||||||
ConnectionAbortedError,
|
ConnectionAbortedError,
|
||||||
BrokenPipeError,
|
BrokenPipeError,
|
||||||
RuntimeError) as error:
|
RuntimeError) as error:
|
||||||
logger.warning(f'In loop for {self.addr}: {error}')
|
logger.warning(f'In loop for l{self.l_addr} | '
|
||||||
|
f'r{self.r_addr}: {error}')
|
||||||
self.close()
|
self.close()
|
||||||
return
|
return self
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Exception for {self.addr}:\n"
|
f"Exception for {self.addr}:\n"
|
||||||
f"{traceback.format_exc()}")
|
f"{traceback.format_exc()}")
|
||||||
self.close()
|
self.close()
|
||||||
return
|
return self
|
||||||
|
|
||||||
def disc(self) -> None:
|
def disc(self) -> None:
|
||||||
logger.debug(f'in AsyncStream.disc() {self.addr}')
|
logger.debug(f'in AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
|
||||||
self.writer.close()
|
self.writer.close()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
logger.debug(f'in AsyncStream.close() {self.addr}')
|
logger.debug(f'in AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
|
||||||
self.writer.close()
|
self.writer.close()
|
||||||
super().close() # call close handler in the parent class
|
super().close() # call close handler in the parent class
|
||||||
|
|
||||||
# logger.info (f'AsyncStream refs: {gc.get_referrers(self)}')
|
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Our private methods
|
Our private methods
|
||||||
@@ -96,4 +101,4 @@ class AsyncStream(Message):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
logging.debug(f"AsyncStream.__del__ {self.addr}")
|
logging.debug(f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}")
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ class Inverter(AsyncStream):
|
|||||||
self.inc_counter('Inverter_Cnt')
|
self.inc_counter('Inverter_Cnt')
|
||||||
await self.loop()
|
await self.loop()
|
||||||
self.dec_counter('Inverter_Cnt')
|
self.dec_counter('Inverter_Cnt')
|
||||||
logging.info(f'Server loop stopped for {addr}')
|
logging.info(f'Server loop stopped for r{self.r_addr}')
|
||||||
|
|
||||||
# if the server connection closes, we also have to disconnect
|
# if the server connection closes, we also have to disconnect
|
||||||
# the connection to te TSUN cloud
|
# the connection to te TSUN cloud
|
||||||
@@ -121,15 +121,22 @@ class Inverter(AsyncStream):
|
|||||||
|
|
||||||
async def client_loop(self, addr):
|
async def client_loop(self, addr):
|
||||||
'''Loop for receiving messages from the TSUN cloud (client-side)'''
|
'''Loop for receiving messages from the TSUN cloud (client-side)'''
|
||||||
await self.remoteStream.loop()
|
clientStream = await self.remoteStream.loop()
|
||||||
logging.info(f'Client loop stopped for {addr}')
|
logging.info(f'Client loop stopped for l{clientStream.l_addr}')
|
||||||
|
|
||||||
# if the client connection closes, we don't touch the server
|
# if the client connection closes, we don't touch the server
|
||||||
# connection. Instead we erase the client connection stream,
|
# connection. Instead we erase the client connection stream,
|
||||||
# thus on the next received packet from the inverter, we can
|
# thus on the next received packet from the inverter, we can
|
||||||
# establish a new connection to the TSUN cloud
|
# establish a new connection to the TSUN cloud
|
||||||
self.remoteStream.remoteStream = None # erase backlink to inverter
|
|
||||||
self.remoteStream = None # than erase client connection
|
# erase backlink to inverter
|
||||||
|
clientStream.remoteStream = None
|
||||||
|
|
||||||
|
if self.remoteStream == clientStream:
|
||||||
|
# logging.debug(f'Client l{clientStream.l_addr} refs:'
|
||||||
|
# f' {gc.get_referrers(clientStream)}')
|
||||||
|
# than erase client connection
|
||||||
|
self.remoteStream = None
|
||||||
|
|
||||||
async def async_create_remote(self) -> None:
|
async def async_create_remote(self) -> None:
|
||||||
'''Establish a client connection to the TSUN cloud'''
|
'''Establish a client connection to the TSUN cloud'''
|
||||||
@@ -197,7 +204,7 @@ class Inverter(AsyncStream):
|
|||||||
f"/{node_id}{id}/config", data_json)
|
f"/{node_id}{id}/config", data_json)
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
logging.debug(f'Inverter.close() {self.addr}')
|
logging.debug(f'Inverter.close() l{self.l_addr} | r{self.r_addr}')
|
||||||
super().close() # call close handler in the parent class
|
super().close() # call close handler in the parent class
|
||||||
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}')
|
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}')
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user