Compare commits

...

20 Commits

Author SHA1 Message Date
Stefan Allius
4c51a159af remoce data logs from console 2023-10-03 20:32:46 +02:00
Stefan Allius
450012aac5 Version 0.0.6 2023-10-03 20:23:25 +02:00
Stefan Allius
00f800c17a put packet dumps only into tracer.log 2023-10-03 20:21:59 +02:00
Stefan Allius
421f7a533a dealloc async_stream instances in connection termination 2023-10-03 19:47:09 +02:00
Stefan Allius
6d9be75ce3 dealloc async_stream instances in connection termination
- improve close handler
- clearify logging on disconnection
2023-10-03 19:44:24 +02:00
Stefan Allius
0886b30032 fix control byte output in tx trace 2023-10-03 14:01:42 +02:00
Stefan Allius
d308c3a9fa Revert "fix memory leak on connection aborts"
This reverts commit f097b3350b.
2023-10-03 11:45:17 +02:00
Stefan Allius
38dacf2b97 Revert "use weakrefs to solve circular references"
This reverts commit dfe8bcb01e.
2023-10-03 11:43:08 +02:00
Stefan Allius
700b946acf dealloc async_stream instances in connection termination 2023-10-03 01:35:53 +02:00
Stefan Allius
dfe8bcb01e use weakrefs to solve circular references
- cleanup logging
2023-10-03 01:31:23 +02:00
Stefan Allius
a8449e8417 implement disc method 2023-10-03 01:30:06 +02:00
Stefan Allius
f097b3350b fix memory leak on connection aborts
- use weakrefs
- call Message.close() in the parent class
- call Message.__del__()
- cleanup logging
2023-10-03 00:48:22 +02:00
Stefan Allius
056e182f64 implement close() to release cercular references 2023-10-03 00:46:45 +02:00
Stefan Allius
00f1fe01bf disable MQTT debug logs 2023-10-03 00:45:56 +02:00
Stefan Allius
108da0a97e Merge pull request #12 from s-allius/s-allius/issue5
S allius/issue5
2023-10-02 19:49:46 +02:00
Stefan Allius
e5d19ce07d Force MQTT registration
- when the home assistant has set the status to online again
2023-10-02 19:42:42 +02:00
Stefan Allius
464e542a47 clearify comment 2023-10-02 19:38:34 +02:00
Stefan Allius
414eb19ffb clarify comment 2023-10-02 19:35:59 +02:00
Stefan Allius
283bc2257b send autoconfig on HA restart
Fixes #5
2023-10-02 19:31:12 +02:00
Stefan Allius
198146b5f4 Bump aiomqtt to version 1.2.1 2023-10-01 22:41:41 +02:00
7 changed files with 58 additions and 21 deletions

View File

@@ -7,7 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### ## [0.0.6] - 2023-10-03
- Bump aiomqtt to version 1.2.1
- Force MQTT registration when the home assistant has set the status to online again
- fix control byte output in tx trace
- dealloc async_stream instances in connection termination
## [0.0.5] - 2023-10-01 ## [0.0.5] - 2023-10-01

View File

@@ -1,2 +1,2 @@
aiomqtt==1.2.0 aiomqtt==1.2.1
schema schema==0.7.5

View File

@@ -94,12 +94,21 @@ class AsyncStream(Message):
f"{traceback.format_exc()}") f"{traceback.format_exc()}")
self.close() self.close()
return return
def disc(self) -> None:
logger.debug(f'in AsyncStream.disc() {self.addr}')
self.writer.close()
def close(self): def close(self):
logger.info(f'in async_stream.close() {self.addr}') logger.debug(f'in AsyncStream.close() {self.addr}')
self.writer.close() self.writer.close()
self.proxy = None super().close() # call close handler in the parent class
self.remoteStream = None self.proxy = None # clear our refernce to the proxy, to avoid memory leaks
if self.remoteStream: # if we have knowledge about a remote stream, we del the references between the two streams
self.remoteStream.remoteStream = None
self.remoteStream = None
''' '''
@@ -136,8 +145,10 @@ class AsyncStream(Message):
if self.server_side: if self.server_side:
db = self.db.db db = self.db.db
if self.new_data.keys() & {'inverter', 'collector'}: # check if new inverter or collector infos are available or when the home assistant has changed the status back to online
if (self.new_data.keys() & {'inverter', 'collector'}) or self.mqtt.home_assistant_restarted:
await self.register_home_assistant() await self.register_home_assistant()
self.mqtt.home_assistant_restarted = False # clear flag
for key in self.new_data: for key in self.new_data:
if self.new_data[key] and key in db: if self.new_data[key] and key in db:
@@ -147,6 +158,7 @@ class AsyncStream(Message):
self.new_data[key] = False self.new_data[key] = False
def __del__ (self): def __del__ (self):
logger.debug ("AsyncStream __del__") logger.debug ("AsyncStream __del__")
super().__del__()

View File

@@ -25,13 +25,13 @@ qualname=conn
[logger_data] [logger_data]
level=DEBUG level=DEBUG
handlers=console_handler,file_handler_name1,file_handler_name2 handlers=file_handler_name1,file_handler_name2
propagate=0 propagate=0
qualname=data qualname=data
[logger_mqtt] [logger_mqtt]
level=DEBUG level=INFO
handlers=console_handler,file_handler_name1,file_handler_name2 handlers=console_handler,file_handler_name1
propagate=0 propagate=0
qualname=mqtt qualname=mqtt
@@ -43,12 +43,12 @@ qualname=tracer
[handler_console_handler] [handler_console_handler]
class=StreamHandler class=StreamHandler
level=INFO level=DEBUG
formatter=console_formatter formatter=console_formatter
[handler_file_handler_name1] [handler_file_handler_name1]
class=handlers.TimedRotatingFileHandler class=handlers.TimedRotatingFileHandler
level=NOTSET level=INFO
formatter=file_formatter formatter=file_formatter
args=('log/proxy.log', when:='midnight') args=('log/proxy.log', when:='midnight')

View File

@@ -100,6 +100,14 @@ class Message(metaclass=IterRegistry):
''' '''
Our puplic methods Our puplic methods
''' '''
def close(self) -> None:
logger.debug(f'in Message.close()')
# we have refernces to methods of this class in self.switch
# so we have to erase self.switch, otherwise this instance can't be
# deallocated by the garbage collector ==> we get a memory leak
del self.switch
def read(self) -> None: def read(self) -> None:
self._read() self._read()
@@ -186,7 +194,7 @@ class Message(metaclass=IterRegistry):
self.send_msg_ofs = len (self._send_buffer) self.send_msg_ofs = len (self._send_buffer)
self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB', 0, self.id_str, ctrl, self.msg_id) self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB', 0, self.id_str, ctrl, self.msg_id)
fnc = self.switch.get(self.msg_id, self.msg_unknown) fnc = self.switch.get(self.msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'tx') + f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}' ) logger.info(self.__flow_str(self.server_side, 'tx') + f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}' )
def __finish_send_msg(self) -> None: def __finish_send_msg(self) -> None:
_len = len(self._send_buffer) - self.send_msg_ofs _len = len(self._send_buffer) - self.send_msg_ofs

View File

@@ -21,6 +21,7 @@ class Mqtt(metaclass=Singleton):
logger_mqtt.debug(f'MQTT: __init__') logger_mqtt.debug(f'MQTT: __init__')
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
self.task = loop.create_task(self.__loop()) self.task = loop.create_task(self.__loop())
self.home_assistant_restarted = False
def __del__(self): def __del__(self):
@@ -55,7 +56,11 @@ class Mqtt(metaclass=Singleton):
async with self.client.messages() as messages: async with self.client.messages() as messages:
await self.client.subscribe(f"{ha['auto_conf_prefix']}/status") await self.client.subscribe(f"{ha['auto_conf_prefix']}/status")
async for message in messages: async for message in messages:
logger_mqtt.info(f'Home-Assistant Status: {message.payload.decode("UTF-8")}') status = message.payload.decode("UTF-8")
logger_mqtt.info(f'Home-Assistant Status: {status}')
if status == 'online':
self.home_assistant_restarted = True # set flag to force MQTT registering
except aiomqtt.MqttError: except aiomqtt.MqttError:
logger_mqtt.info(f"Connection lost; Reconnecting in {interval} seconds ...") logger_mqtt.info(f"Connection lost; Reconnecting in {interval} seconds ...")
await asyncio.sleep(interval) await asyncio.sleep(interval)

View File

@@ -7,21 +7,28 @@ class Proxy:
proxy.ClientStream = None proxy.ClientStream = None
async def server_loop(proxy, addr): async def server_loop(proxy, addr):
logging.info(f'Accept connection from {addr}') '''Loop for receiving messages from the inverter (server-side)'''
logging.info(f'Accept connection from {addr}')
await proxy.ServerStream.loop() await proxy.ServerStream.loop()
logging.info(f'Close server connection {addr}') logging.info(f'Server loop stopped for {addr}')
# if the server connection closes, we also disconnect the connection to te TSUN cloud
if proxy.ClientStream: if proxy.ClientStream:
logging.debug ("close client connection") logging.debug ("disconnect client connection")
proxy.ClientStream.close() proxy.ClientStream.disc()
async def client_loop(proxy, addr): async def client_loop(proxy, addr):
'''Loop for receiving messages from the TSUN cloud (client-side)'''
await proxy.ClientStream.loop() await proxy.ClientStream.loop()
logging.info(f'Close client connection {addr}') logging.info(f'Client loop stopped for {addr}')
proxy.ServerStream.remoteStream = None
# if the client connection closes, we don't touch the server connection. Instead we erase the client
# connection stream, thus on the next received packet from the inverter, we can establish a new connection
# to the TSUN cloud
proxy.ClientStream = None proxy.ClientStream = None
async def CreateClientStream (proxy, stream, host, port): async def CreateClientStream (proxy, stream, host, port):
'''Establish a client connection to the TSUN cloud'''
addr = (host, port) addr = (host, port)
try: try: