diff --git a/app/src/async_stream.py b/app/src/async_stream.py index f3b0e22..fd25489 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -1,19 +1,24 @@ -import logging, traceback, aiomqtt, json +import logging, traceback, json +#import gc, ctypes from config import Config from messages import Message, hex_dump_memory from mqtt import Mqtt logger = logging.getLogger('conn') -logger_mqtt = logging.getLogger('mqtt') + +#def ref_count(address): +# return ctypes.c_long.from_address(address).value class AsyncStream(Message): - def __init__(self, proxy, reader, writer, addr, stream=None, server_side=True): + def __init__(self, proxy, reader, writer, addr, weak_stream=None, server_side=True): + logger.debug (f"AsyncStream __init__ {self}") + super().__init__() self.proxy = proxy self.reader = reader self.writer = writer - self.remoteStream = stream + self.__WkRemoteStream = weak_stream self.addr = addr self.server_side = server_side self.mqtt = Mqtt() @@ -24,26 +29,26 @@ class AsyncStream(Message): Our puplic methods ''' def set_serial_no(self, serial_no : str): - logger_mqtt.info(f'SerialNo: {serial_no}') + logger.debug(f'SerialNo: {serial_no}') if self.unique_id != serial_no: inverters = Config.get('inverters') - #logger_mqtt.debug(f'Inverters: {inverters}') + #logger.debug(f'Inverters: {inverters}') if serial_no in inverters: - logger_mqtt.debug(f'SerialNo {serial_no} allowed!') + logger.debug(f'SerialNo {serial_no} allowed!') inv = inverters[serial_no] self.node_id = inv['node_id'] self.sug_area = inv['suggested_area'] else: - logger_mqtt.debug(f'SerialNo {serial_no} not known!') + logger.debug(f'SerialNo {serial_no} not known!') self.node_id = '' self.sug_area = '' if not inverters['allow_all']: self.unique_id = None - logger_mqtt.error('ignore message from unknow inverter!') + logger.error('ignore message from unknow inverter!') return self.unique_id = serial_no @@ -58,7 +63,7 @@ class AsyncStream(Message): if self.server_side: try: for data_json, component, id in self.db.ha_confs(self.entitiy_prfx + self.node_id, self.unique_id, self.sug_area): - logger_mqtt.debug(f'Register: {data_json}') + logger.debug(f'Register MQTT: {data_json}') await self.mqtt.publish(f"{self.discovery_prfx}{component}/{self.node_id}{id}/config", data_json) except Exception: @@ -95,11 +100,14 @@ class AsyncStream(Message): self.close() return - def close(self): - logger.info(f'in async_stream.close() {self.addr}') + def close(self) -> None: + logger.debug(f'in AsyncStream.close() {self.addr}') + super().close() self.writer.close() - self.proxy = None - self.remoteStream = None + del self.proxy + + #logger.info (f'refcount: {ref_count(id (self))}') + #logger.info (f'AsyncStream refs: {gc.get_referrers(self)}') ''' @@ -122,15 +130,17 @@ class AsyncStream(Message): async def __async_forward(self) -> None: if self._forward_buffer: - if not self.remoteStream: + if not self.__WkRemoteStream: tsun = Config.get('tsun') - self.remoteStream = await self.proxy.CreateClientStream (self, tsun['host'], tsun['port']) + self.__WkRemoteStream = await self.proxy.CreateClientStream (tsun['host'], tsun['port']) - if self.remoteStream: - hex_dump_memory(logging.DEBUG, f'Forward to {self.remoteStream.addr}:', self._forward_buffer, len(self._forward_buffer)) - self.remoteStream.writer.write (self._forward_buffer) - await self.remoteStream.writer.drain() - self._forward_buffer = bytearray(0) + if self.__WkRemoteStream: + remoteStream = self.__WkRemoteStream() + if remoteStream: + hex_dump_memory(logging.DEBUG, f'Forward to {remoteStream.addr}:', self._forward_buffer, len(self._forward_buffer)) + remoteStream.writer.write (self._forward_buffer) + await remoteStream.writer.drain() + self._forward_buffer = bytearray(0) async def __async_publ_mqtt(self) -> None: if self.server_side: @@ -144,11 +154,11 @@ class AsyncStream(Message): for key in self.new_data: if self.new_data[key] and key in db: data_json = json.dumps(db[key]) - logger_mqtt.info(f'{key}: {data_json}') + #logger.info(f'MQTT publish {key}: {data_json}') await self.mqtt.publish(f"{self.entitiy_prfx}{self.node_id}{key}", data_json) self.new_data[key] = False def __del__ (self): logger.debug ("AsyncStream __del__") - + super().__del__()