From d308c3a9fa45f12034036de5c017bd9c9b2f6304 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Tue, 3 Oct 2023 11:45:17 +0200 Subject: [PATCH] Revert "fix memory leak on connection aborts" This reverts commit f097b3350b0f1f447eafd9067b791b99f790a6bc. --- app/src/async_stream.py | 56 +++++++++++++++++------------------------ 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/app/src/async_stream.py b/app/src/async_stream.py index a09a67e..7c7e913 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -1,24 +1,19 @@ -import logging, traceback, json -#import gc, ctypes +import logging, traceback, aiomqtt, json from config import Config from messages import Message, hex_dump_memory from mqtt import Mqtt logger = logging.getLogger('conn') - -#def ref_count(address): -# return ctypes.c_long.from_address(address).value +logger_mqtt = logging.getLogger('mqtt') class AsyncStream(Message): - def __init__(self, proxy, reader, writer, addr, weak_stream=None, server_side=True): - logger.debug (f"AsyncStream __init__ {self}") - + def __init__(self, proxy, reader, writer, addr, stream=None, server_side=True): super().__init__() self.proxy = proxy self.reader = reader self.writer = writer - self.__WkRemoteStream = weak_stream + self.remoteStream = stream self.addr = addr self.server_side = server_side self.mqtt = Mqtt() @@ -29,26 +24,26 @@ class AsyncStream(Message): Our puplic methods ''' def set_serial_no(self, serial_no : str): - logger.debug(f'SerialNo: {serial_no}') + logger_mqtt.info(f'SerialNo: {serial_no}') if self.unique_id != serial_no: inverters = Config.get('inverters') - #logger.debug(f'Inverters: {inverters}') + #logger_mqtt.debug(f'Inverters: {inverters}') if serial_no in inverters: - logger.debug(f'SerialNo {serial_no} allowed!') + logger_mqtt.debug(f'SerialNo {serial_no} allowed!') inv = inverters[serial_no] self.node_id = inv['node_id'] self.sug_area = inv['suggested_area'] else: - logger.debug(f'SerialNo {serial_no} not known!') + logger_mqtt.debug(f'SerialNo {serial_no} not known!') self.node_id = '' self.sug_area = '' if not inverters['allow_all']: self.unique_id = None - logger.error('ignore message from unknow inverter!') + logger_mqtt.error('ignore message from unknow inverter!') return self.unique_id = serial_no @@ -63,7 +58,7 @@ class AsyncStream(Message): if self.server_side: try: for data_json, component, id in self.db.ha_confs(self.entitiy_prfx + self.node_id, self.unique_id, self.sug_area): - logger.debug(f'Register MQTT: {data_json}') + logger_mqtt.debug(f'Register: {data_json}') await self.mqtt.publish(f"{self.discovery_prfx}{component}/{self.node_id}{id}/config", data_json) except Exception: @@ -105,14 +100,11 @@ class AsyncStream(Message): self.writer.close() - def close(self) -> None: - logger.debug(f'in AsyncStream.close() {self.addr}') - super().close() + def close(self): + logger.info(f'in async_stream.close() {self.addr}') self.writer.close() - del self.proxy - - #logger.info (f'refcount: {ref_count(id (self))}') - #logger.info (f'AsyncStream refs: {gc.get_referrers(self)}') + self.proxy = None + self.remoteStream = None ''' @@ -135,17 +127,15 @@ class AsyncStream(Message): async def __async_forward(self) -> None: if self._forward_buffer: - if not self.__WkRemoteStream: + if not self.remoteStream: tsun = Config.get('tsun') - self.__WkRemoteStream = await self.proxy.CreateClientStream (tsun['host'], tsun['port']) + self.remoteStream = await self.proxy.CreateClientStream (self, tsun['host'], tsun['port']) - if self.__WkRemoteStream: - remoteStream = self.__WkRemoteStream() - if remoteStream: - hex_dump_memory(logging.DEBUG, f'Forward to {remoteStream.addr}:', self._forward_buffer, len(self._forward_buffer)) - remoteStream.writer.write (self._forward_buffer) - await remoteStream.writer.drain() - self._forward_buffer = bytearray(0) + if self.remoteStream: + hex_dump_memory(logging.DEBUG, f'Forward to {self.remoteStream.addr}:', self._forward_buffer, len(self._forward_buffer)) + self.remoteStream.writer.write (self._forward_buffer) + await self.remoteStream.writer.drain() + self._forward_buffer = bytearray(0) async def __async_publ_mqtt(self) -> None: if self.server_side: @@ -159,11 +149,11 @@ class AsyncStream(Message): for key in self.new_data: if self.new_data[key] and key in db: data_json = json.dumps(db[key]) - #logger.info(f'MQTT publish {key}: {data_json}') + logger_mqtt.info(f'{key}: {data_json}') await self.mqtt.publish(f"{self.entitiy_prfx}{self.node_id}{key}", data_json) self.new_data[key] = False def __del__ (self): logger.debug ("AsyncStream __del__") - super().__del__() +