Compare commits

...

20 Commits

Author SHA1 Message Date
Stefan Allius
4c51a159af remoce data logs from console 2023-10-03 20:32:46 +02:00
Stefan Allius
450012aac5 Version 0.0.6 2023-10-03 20:23:25 +02:00
Stefan Allius
00f800c17a put packet dumps only into tracer.log 2023-10-03 20:21:59 +02:00
Stefan Allius
421f7a533a dealloc async_stream instances in connection termination 2023-10-03 19:47:09 +02:00
Stefan Allius
6d9be75ce3 dealloc async_stream instances in connection termination
- improve close handler
- clearify logging on disconnection
2023-10-03 19:44:24 +02:00
Stefan Allius
0886b30032 fix control byte output in tx trace 2023-10-03 14:01:42 +02:00
Stefan Allius
d308c3a9fa Revert "fix memory leak on connection aborts"
This reverts commit f097b3350b.
2023-10-03 11:45:17 +02:00
Stefan Allius
38dacf2b97 Revert "use weakrefs to solve circular references"
This reverts commit dfe8bcb01e.
2023-10-03 11:43:08 +02:00
Stefan Allius
700b946acf dealloc async_stream instances in connection termination 2023-10-03 01:35:53 +02:00
Stefan Allius
dfe8bcb01e use weakrefs to solve circular references
- cleanup logging
2023-10-03 01:31:23 +02:00
Stefan Allius
a8449e8417 implement disc method 2023-10-03 01:30:06 +02:00
Stefan Allius
f097b3350b fix memory leak on connection aborts
- use weakrefs
- call Message.close() in the parent class
- call Message.__del__()
- cleanup logging
2023-10-03 00:48:22 +02:00
Stefan Allius
056e182f64 implement close() to release cercular references 2023-10-03 00:46:45 +02:00
Stefan Allius
00f1fe01bf disable MQTT debug logs 2023-10-03 00:45:56 +02:00
Stefan Allius
108da0a97e Merge pull request #12 from s-allius/s-allius/issue5
S allius/issue5
2023-10-02 19:49:46 +02:00
Stefan Allius
e5d19ce07d Force MQTT registration
- when the home assistant has set the status to online again
2023-10-02 19:42:42 +02:00
Stefan Allius
464e542a47 clearify comment 2023-10-02 19:38:34 +02:00
Stefan Allius
414eb19ffb clarify comment 2023-10-02 19:35:59 +02:00
Stefan Allius
283bc2257b send autoconfig on HA restart
Fixes #5
2023-10-02 19:31:12 +02:00
Stefan Allius
198146b5f4 Bump aiomqtt to version 1.2.1 2023-10-01 22:41:41 +02:00
7 changed files with 58 additions and 21 deletions

View File

@@ -7,7 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
###
## [0.0.6] - 2023-10-03
- Bump aiomqtt to version 1.2.1
- Force MQTT registration when the home assistant has set the status to online again
- fix control byte output in tx trace
- dealloc async_stream instances in connection termination
## [0.0.5] - 2023-10-01

View File

@@ -1,2 +1,2 @@
aiomqtt==1.2.0
schema
aiomqtt==1.2.1
schema==0.7.5

View File

@@ -94,12 +94,21 @@ class AsyncStream(Message):
f"{traceback.format_exc()}")
self.close()
return
def disc(self) -> None:
logger.debug(f'in AsyncStream.disc() {self.addr}')
self.writer.close()
def close(self):
logger.info(f'in async_stream.close() {self.addr}')
logger.debug(f'in AsyncStream.close() {self.addr}')
self.writer.close()
self.proxy = None
self.remoteStream = None
super().close() # call close handler in the parent class
self.proxy = None # clear our refernce to the proxy, to avoid memory leaks
if self.remoteStream: # if we have knowledge about a remote stream, we del the references between the two streams
self.remoteStream.remoteStream = None
self.remoteStream = None
'''
@@ -136,8 +145,10 @@ class AsyncStream(Message):
if self.server_side:
db = self.db.db
if self.new_data.keys() & {'inverter', 'collector'}:
# check if new inverter or collector infos are available or when the home assistant has changed the status back to online
if (self.new_data.keys() & {'inverter', 'collector'}) or self.mqtt.home_assistant_restarted:
await self.register_home_assistant()
self.mqtt.home_assistant_restarted = False # clear flag
for key in self.new_data:
if self.new_data[key] and key in db:
@@ -147,6 +158,7 @@ class AsyncStream(Message):
self.new_data[key] = False
def __del__ (self):
logger.debug ("AsyncStream __del__")
logger.debug ("AsyncStream __del__")
super().__del__()

View File

@@ -25,13 +25,13 @@ qualname=conn
[logger_data]
level=DEBUG
handlers=console_handler,file_handler_name1,file_handler_name2
handlers=file_handler_name1,file_handler_name2
propagate=0
qualname=data
[logger_mqtt]
level=DEBUG
handlers=console_handler,file_handler_name1,file_handler_name2
level=INFO
handlers=console_handler,file_handler_name1
propagate=0
qualname=mqtt
@@ -43,12 +43,12 @@ qualname=tracer
[handler_console_handler]
class=StreamHandler
level=INFO
level=DEBUG
formatter=console_formatter
[handler_file_handler_name1]
class=handlers.TimedRotatingFileHandler
level=NOTSET
level=INFO
formatter=file_formatter
args=('log/proxy.log', when:='midnight')

View File

@@ -100,6 +100,14 @@ class Message(metaclass=IterRegistry):
'''
Our puplic methods
'''
def close(self) -> None:
logger.debug(f'in Message.close()')
# we have refernces to methods of this class in self.switch
# so we have to erase self.switch, otherwise this instance can't be
# deallocated by the garbage collector ==> we get a memory leak
del self.switch
def read(self) -> None:
self._read()
@@ -186,7 +194,7 @@ class Message(metaclass=IterRegistry):
self.send_msg_ofs = len (self._send_buffer)
self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB', 0, self.id_str, ctrl, self.msg_id)
fnc = self.switch.get(self.msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'tx') + f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}' )
logger.info(self.__flow_str(self.server_side, 'tx') + f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}' )
def __finish_send_msg(self) -> None:
_len = len(self._send_buffer) - self.send_msg_ofs

View File

@@ -21,6 +21,7 @@ class Mqtt(metaclass=Singleton):
logger_mqtt.debug(f'MQTT: __init__')
loop = asyncio.get_event_loop()
self.task = loop.create_task(self.__loop())
self.home_assistant_restarted = False
def __del__(self):
@@ -55,7 +56,11 @@ class Mqtt(metaclass=Singleton):
async with self.client.messages() as messages:
await self.client.subscribe(f"{ha['auto_conf_prefix']}/status")
async for message in messages:
logger_mqtt.info(f'Home-Assistant Status: {message.payload.decode("UTF-8")}')
status = message.payload.decode("UTF-8")
logger_mqtt.info(f'Home-Assistant Status: {status}')
if status == 'online':
self.home_assistant_restarted = True # set flag to force MQTT registering
except aiomqtt.MqttError:
logger_mqtt.info(f"Connection lost; Reconnecting in {interval} seconds ...")
await asyncio.sleep(interval)

View File

@@ -7,21 +7,28 @@ class Proxy:
proxy.ClientStream = None
async def server_loop(proxy, addr):
logging.info(f'Accept connection from {addr}')
'''Loop for receiving messages from the inverter (server-side)'''
logging.info(f'Accept connection from {addr}')
await proxy.ServerStream.loop()
logging.info(f'Close server connection {addr}')
logging.info(f'Server loop stopped for {addr}')
# if the server connection closes, we also disconnect the connection to te TSUN cloud
if proxy.ClientStream:
logging.debug ("close client connection")
proxy.ClientStream.close()
logging.debug ("disconnect client connection")
proxy.ClientStream.disc()
async def client_loop(proxy, addr):
'''Loop for receiving messages from the TSUN cloud (client-side)'''
await proxy.ClientStream.loop()
logging.info(f'Close client connection {addr}')
proxy.ServerStream.remoteStream = None
logging.info(f'Client loop stopped for {addr}')
# if the client connection closes, we don't touch the server connection. Instead we erase the client
# connection stream, thus on the next received packet from the inverter, we can establish a new connection
# to the TSUN cloud
proxy.ClientStream = None
async def CreateClientStream (proxy, stream, host, port):
'''Establish a client connection to the TSUN cloud'''
addr = (host, port)
try: