Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c51a159af | ||
|
|
450012aac5 | ||
|
|
00f800c17a | ||
|
|
421f7a533a | ||
|
|
6d9be75ce3 | ||
|
|
0886b30032 | ||
|
|
d308c3a9fa | ||
|
|
38dacf2b97 | ||
|
|
700b946acf | ||
|
|
dfe8bcb01e | ||
|
|
a8449e8417 | ||
|
|
f097b3350b | ||
|
|
056e182f64 | ||
|
|
00f1fe01bf | ||
|
|
108da0a97e | ||
|
|
e5d19ce07d | ||
|
|
464e542a47 | ||
|
|
414eb19ffb | ||
|
|
283bc2257b | ||
|
|
198146b5f4 |
@@ -7,7 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
###
|
||||
## [0.0.6] - 2023-10-03
|
||||
|
||||
- Bump aiomqtt to version 1.2.1
|
||||
- Force MQTT registration when the home assistant has set the status to online again
|
||||
- fix control byte output in tx trace
|
||||
- dealloc async_stream instances in connection termination
|
||||
|
||||
## [0.0.5] - 2023-10-01
|
||||
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
aiomqtt==1.2.0
|
||||
schema
|
||||
aiomqtt==1.2.1
|
||||
schema==0.7.5
|
||||
@@ -94,12 +94,21 @@ class AsyncStream(Message):
|
||||
f"{traceback.format_exc()}")
|
||||
self.close()
|
||||
return
|
||||
|
||||
def disc(self) -> None:
|
||||
logger.debug(f'in AsyncStream.disc() {self.addr}')
|
||||
self.writer.close()
|
||||
|
||||
|
||||
def close(self):
|
||||
logger.info(f'in async_stream.close() {self.addr}')
|
||||
logger.debug(f'in AsyncStream.close() {self.addr}')
|
||||
self.writer.close()
|
||||
self.proxy = None
|
||||
self.remoteStream = None
|
||||
super().close() # call close handler in the parent class
|
||||
self.proxy = None # clear our refernce to the proxy, to avoid memory leaks
|
||||
|
||||
if self.remoteStream: # if we have knowledge about a remote stream, we del the references between the two streams
|
||||
self.remoteStream.remoteStream = None
|
||||
self.remoteStream = None
|
||||
|
||||
|
||||
'''
|
||||
@@ -136,8 +145,10 @@ class AsyncStream(Message):
|
||||
if self.server_side:
|
||||
db = self.db.db
|
||||
|
||||
if self.new_data.keys() & {'inverter', 'collector'}:
|
||||
# check if new inverter or collector infos are available or when the home assistant has changed the status back to online
|
||||
if (self.new_data.keys() & {'inverter', 'collector'}) or self.mqtt.home_assistant_restarted:
|
||||
await self.register_home_assistant()
|
||||
self.mqtt.home_assistant_restarted = False # clear flag
|
||||
|
||||
for key in self.new_data:
|
||||
if self.new_data[key] and key in db:
|
||||
@@ -147,6 +158,7 @@ class AsyncStream(Message):
|
||||
self.new_data[key] = False
|
||||
|
||||
def __del__ (self):
|
||||
logger.debug ("AsyncStream __del__")
|
||||
logger.debug ("AsyncStream __del__")
|
||||
super().__del__()
|
||||
|
||||
|
||||
|
||||
@@ -25,13 +25,13 @@ qualname=conn
|
||||
|
||||
[logger_data]
|
||||
level=DEBUG
|
||||
handlers=console_handler,file_handler_name1,file_handler_name2
|
||||
handlers=file_handler_name1,file_handler_name2
|
||||
propagate=0
|
||||
qualname=data
|
||||
|
||||
[logger_mqtt]
|
||||
level=DEBUG
|
||||
handlers=console_handler,file_handler_name1,file_handler_name2
|
||||
level=INFO
|
||||
handlers=console_handler,file_handler_name1
|
||||
propagate=0
|
||||
qualname=mqtt
|
||||
|
||||
@@ -43,12 +43,12 @@ qualname=tracer
|
||||
|
||||
[handler_console_handler]
|
||||
class=StreamHandler
|
||||
level=INFO
|
||||
level=DEBUG
|
||||
formatter=console_formatter
|
||||
|
||||
[handler_file_handler_name1]
|
||||
class=handlers.TimedRotatingFileHandler
|
||||
level=NOTSET
|
||||
level=INFO
|
||||
formatter=file_formatter
|
||||
args=('log/proxy.log', when:='midnight')
|
||||
|
||||
|
||||
@@ -100,6 +100,14 @@ class Message(metaclass=IterRegistry):
|
||||
'''
|
||||
Our puplic methods
|
||||
'''
|
||||
def close(self) -> None:
|
||||
logger.debug(f'in Message.close()')
|
||||
# we have refernces to methods of this class in self.switch
|
||||
# so we have to erase self.switch, otherwise this instance can't be
|
||||
# deallocated by the garbage collector ==> we get a memory leak
|
||||
del self.switch
|
||||
|
||||
|
||||
def read(self) -> None:
|
||||
self._read()
|
||||
|
||||
@@ -186,7 +194,7 @@ class Message(metaclass=IterRegistry):
|
||||
self.send_msg_ofs = len (self._send_buffer)
|
||||
self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB', 0, self.id_str, ctrl, self.msg_id)
|
||||
fnc = self.switch.get(self.msg_id, self.msg_unknown)
|
||||
logger.info(self.__flow_str(self.server_side, 'tx') + f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}' )
|
||||
logger.info(self.__flow_str(self.server_side, 'tx') + f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}' )
|
||||
|
||||
def __finish_send_msg(self) -> None:
|
||||
_len = len(self._send_buffer) - self.send_msg_ofs
|
||||
|
||||
@@ -21,6 +21,7 @@ class Mqtt(metaclass=Singleton):
|
||||
logger_mqtt.debug(f'MQTT: __init__')
|
||||
loop = asyncio.get_event_loop()
|
||||
self.task = loop.create_task(self.__loop())
|
||||
self.home_assistant_restarted = False
|
||||
|
||||
|
||||
def __del__(self):
|
||||
@@ -55,7 +56,11 @@ class Mqtt(metaclass=Singleton):
|
||||
async with self.client.messages() as messages:
|
||||
await self.client.subscribe(f"{ha['auto_conf_prefix']}/status")
|
||||
async for message in messages:
|
||||
logger_mqtt.info(f'Home-Assistant Status: {message.payload.decode("UTF-8")}')
|
||||
status = message.payload.decode("UTF-8")
|
||||
logger_mqtt.info(f'Home-Assistant Status: {status}')
|
||||
if status == 'online':
|
||||
self.home_assistant_restarted = True # set flag to force MQTT registering
|
||||
|
||||
except aiomqtt.MqttError:
|
||||
logger_mqtt.info(f"Connection lost; Reconnecting in {interval} seconds ...")
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
@@ -7,21 +7,28 @@ class Proxy:
|
||||
proxy.ClientStream = None
|
||||
|
||||
async def server_loop(proxy, addr):
|
||||
logging.info(f'Accept connection from {addr}')
|
||||
'''Loop for receiving messages from the inverter (server-side)'''
|
||||
logging.info(f'Accept connection from {addr}')
|
||||
await proxy.ServerStream.loop()
|
||||
logging.info(f'Close server connection {addr}')
|
||||
logging.info(f'Server loop stopped for {addr}')
|
||||
|
||||
# if the server connection closes, we also disconnect the connection to te TSUN cloud
|
||||
if proxy.ClientStream:
|
||||
logging.debug ("close client connection")
|
||||
proxy.ClientStream.close()
|
||||
logging.debug ("disconnect client connection")
|
||||
proxy.ClientStream.disc()
|
||||
|
||||
async def client_loop(proxy, addr):
|
||||
'''Loop for receiving messages from the TSUN cloud (client-side)'''
|
||||
await proxy.ClientStream.loop()
|
||||
logging.info(f'Close client connection {addr}')
|
||||
proxy.ServerStream.remoteStream = None
|
||||
logging.info(f'Client loop stopped for {addr}')
|
||||
|
||||
# if the client connection closes, we don't touch the server connection. Instead we erase the client
|
||||
# connection stream, thus on the next received packet from the inverter, we can establish a new connection
|
||||
# to the TSUN cloud
|
||||
proxy.ClientStream = None
|
||||
|
||||
async def CreateClientStream (proxy, stream, host, port):
|
||||
'''Establish a client connection to the TSUN cloud'''
|
||||
addr = (host, port)
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user