await wait_closed() on disconnects
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
import logging
|
import logging
|
||||||
import traceback
|
import traceback
|
||||||
|
import asyncio
|
||||||
from messages import hex_dump_memory
|
from messages import hex_dump_memory
|
||||||
|
|
||||||
logger = logging.getLogger('conn')
|
logger = logging.getLogger('conn')
|
||||||
@@ -27,7 +28,7 @@ class AsyncStream():
|
|||||||
# the connection to te TSUN cloud
|
# the connection to te TSUN cloud
|
||||||
if self.remoteStream:
|
if self.remoteStream:
|
||||||
logging.debug("disconnect client connection")
|
logging.debug("disconnect client connection")
|
||||||
self.remoteStream.disc()
|
await self.remoteStream.disc()
|
||||||
try:
|
try:
|
||||||
await self._async_publ_mqtt_proxy_stat('proxy')
|
await self._async_publ_mqtt_proxy_stat('proxy')
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -58,6 +59,7 @@ class AsyncStream():
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
# await asyncio.wait_for(self.__async_read(), 0.3)
|
||||||
await self.__async_read()
|
await self.__async_read()
|
||||||
|
|
||||||
if self.unique_id:
|
if self.unique_id:
|
||||||
@@ -65,25 +67,32 @@ class AsyncStream():
|
|||||||
await self.__async_forward()
|
await self.__async_forward()
|
||||||
await self.async_publ_mqtt()
|
await self.async_publ_mqtt()
|
||||||
|
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
|
||||||
except (ConnectionResetError,
|
except (ConnectionResetError,
|
||||||
ConnectionAbortedError,
|
ConnectionAbortedError,
|
||||||
BrokenPipeError,
|
BrokenPipeError) as error:
|
||||||
RuntimeError) as error:
|
logger.error(f'{error} for l{self.l_addr} | '
|
||||||
logger.warning(f'In loop for l{self.l_addr} | '
|
f'r{self.r_addr}')
|
||||||
f'r{self.r_addr}: {error}')
|
await self.disc()
|
||||||
self.close()
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
except RuntimeError as error:
|
||||||
|
logger.warning(f"{error} for {self.l_addr}")
|
||||||
|
await self.disc()
|
||||||
|
return self
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
self.inc_counter('SW_Exception')
|
self.inc_counter('SW_Exception')
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Exception for {self.addr}:\n"
|
f"Exception for {self.addr}:\n"
|
||||||
f"{traceback.format_exc()}")
|
f"{traceback.format_exc()}")
|
||||||
self.close()
|
|
||||||
return self
|
|
||||||
|
|
||||||
def disc(self) -> None:
|
async def disc(self) -> None:
|
||||||
logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
|
logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
|
||||||
self.writer.close()
|
self.writer.close()
|
||||||
|
await self.writer.wait_closed()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
|
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
|
||||||
|
|||||||
Reference in New Issue
Block a user