Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c51a159af | ||
|
|
450012aac5 | ||
|
|
00f800c17a | ||
|
|
421f7a533a | ||
|
|
6d9be75ce3 | ||
|
|
0886b30032 | ||
|
|
d308c3a9fa | ||
|
|
38dacf2b97 | ||
|
|
700b946acf | ||
|
|
dfe8bcb01e | ||
|
|
a8449e8417 | ||
|
|
f097b3350b | ||
|
|
056e182f64 | ||
|
|
00f1fe01bf | ||
|
|
108da0a97e | ||
|
|
e5d19ce07d | ||
|
|
464e542a47 | ||
|
|
414eb19ffb | ||
|
|
283bc2257b | ||
|
|
198146b5f4 |
@@ -7,7 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
###
|
## [0.0.6] - 2023-10-03
|
||||||
|
|
||||||
|
- Bump aiomqtt to version 1.2.1
|
||||||
|
- Force MQTT registration when the home assistant has set the status to online again
|
||||||
|
- fix control byte output in tx trace
|
||||||
|
- dealloc async_stream instances in connection termination
|
||||||
|
|
||||||
## [0.0.5] - 2023-10-01
|
## [0.0.5] - 2023-10-01
|
||||||
|
|
||||||
|
|||||||
@@ -1,2 +1,2 @@
|
|||||||
aiomqtt==1.2.0
|
aiomqtt==1.2.1
|
||||||
schema
|
schema==0.7.5
|
||||||
@@ -94,12 +94,21 @@ class AsyncStream(Message):
|
|||||||
f"{traceback.format_exc()}")
|
f"{traceback.format_exc()}")
|
||||||
self.close()
|
self.close()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def disc(self) -> None:
|
||||||
|
logger.debug(f'in AsyncStream.disc() {self.addr}')
|
||||||
|
self.writer.close()
|
||||||
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
logger.info(f'in async_stream.close() {self.addr}')
|
logger.debug(f'in AsyncStream.close() {self.addr}')
|
||||||
self.writer.close()
|
self.writer.close()
|
||||||
self.proxy = None
|
super().close() # call close handler in the parent class
|
||||||
self.remoteStream = None
|
self.proxy = None # clear our refernce to the proxy, to avoid memory leaks
|
||||||
|
|
||||||
|
if self.remoteStream: # if we have knowledge about a remote stream, we del the references between the two streams
|
||||||
|
self.remoteStream.remoteStream = None
|
||||||
|
self.remoteStream = None
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
@@ -136,8 +145,10 @@ class AsyncStream(Message):
|
|||||||
if self.server_side:
|
if self.server_side:
|
||||||
db = self.db.db
|
db = self.db.db
|
||||||
|
|
||||||
if self.new_data.keys() & {'inverter', 'collector'}:
|
# check if new inverter or collector infos are available or when the home assistant has changed the status back to online
|
||||||
|
if (self.new_data.keys() & {'inverter', 'collector'}) or self.mqtt.home_assistant_restarted:
|
||||||
await self.register_home_assistant()
|
await self.register_home_assistant()
|
||||||
|
self.mqtt.home_assistant_restarted = False # clear flag
|
||||||
|
|
||||||
for key in self.new_data:
|
for key in self.new_data:
|
||||||
if self.new_data[key] and key in db:
|
if self.new_data[key] and key in db:
|
||||||
@@ -147,6 +158,7 @@ class AsyncStream(Message):
|
|||||||
self.new_data[key] = False
|
self.new_data[key] = False
|
||||||
|
|
||||||
def __del__ (self):
|
def __del__ (self):
|
||||||
logger.debug ("AsyncStream __del__")
|
logger.debug ("AsyncStream __del__")
|
||||||
|
super().__del__()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -25,13 +25,13 @@ qualname=conn
|
|||||||
|
|
||||||
[logger_data]
|
[logger_data]
|
||||||
level=DEBUG
|
level=DEBUG
|
||||||
handlers=console_handler,file_handler_name1,file_handler_name2
|
handlers=file_handler_name1,file_handler_name2
|
||||||
propagate=0
|
propagate=0
|
||||||
qualname=data
|
qualname=data
|
||||||
|
|
||||||
[logger_mqtt]
|
[logger_mqtt]
|
||||||
level=DEBUG
|
level=INFO
|
||||||
handlers=console_handler,file_handler_name1,file_handler_name2
|
handlers=console_handler,file_handler_name1
|
||||||
propagate=0
|
propagate=0
|
||||||
qualname=mqtt
|
qualname=mqtt
|
||||||
|
|
||||||
@@ -43,12 +43,12 @@ qualname=tracer
|
|||||||
|
|
||||||
[handler_console_handler]
|
[handler_console_handler]
|
||||||
class=StreamHandler
|
class=StreamHandler
|
||||||
level=INFO
|
level=DEBUG
|
||||||
formatter=console_formatter
|
formatter=console_formatter
|
||||||
|
|
||||||
[handler_file_handler_name1]
|
[handler_file_handler_name1]
|
||||||
class=handlers.TimedRotatingFileHandler
|
class=handlers.TimedRotatingFileHandler
|
||||||
level=NOTSET
|
level=INFO
|
||||||
formatter=file_formatter
|
formatter=file_formatter
|
||||||
args=('log/proxy.log', when:='midnight')
|
args=('log/proxy.log', when:='midnight')
|
||||||
|
|
||||||
|
|||||||
@@ -100,6 +100,14 @@ class Message(metaclass=IterRegistry):
|
|||||||
'''
|
'''
|
||||||
Our puplic methods
|
Our puplic methods
|
||||||
'''
|
'''
|
||||||
|
def close(self) -> None:
|
||||||
|
logger.debug(f'in Message.close()')
|
||||||
|
# we have refernces to methods of this class in self.switch
|
||||||
|
# so we have to erase self.switch, otherwise this instance can't be
|
||||||
|
# deallocated by the garbage collector ==> we get a memory leak
|
||||||
|
del self.switch
|
||||||
|
|
||||||
|
|
||||||
def read(self) -> None:
|
def read(self) -> None:
|
||||||
self._read()
|
self._read()
|
||||||
|
|
||||||
@@ -186,7 +194,7 @@ class Message(metaclass=IterRegistry):
|
|||||||
self.send_msg_ofs = len (self._send_buffer)
|
self.send_msg_ofs = len (self._send_buffer)
|
||||||
self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB', 0, self.id_str, ctrl, self.msg_id)
|
self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB', 0, self.id_str, ctrl, self.msg_id)
|
||||||
fnc = self.switch.get(self.msg_id, self.msg_unknown)
|
fnc = self.switch.get(self.msg_id, self.msg_unknown)
|
||||||
logger.info(self.__flow_str(self.server_side, 'tx') + f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}' )
|
logger.info(self.__flow_str(self.server_side, 'tx') + f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}' )
|
||||||
|
|
||||||
def __finish_send_msg(self) -> None:
|
def __finish_send_msg(self) -> None:
|
||||||
_len = len(self._send_buffer) - self.send_msg_ofs
|
_len = len(self._send_buffer) - self.send_msg_ofs
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ class Mqtt(metaclass=Singleton):
|
|||||||
logger_mqtt.debug(f'MQTT: __init__')
|
logger_mqtt.debug(f'MQTT: __init__')
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
self.task = loop.create_task(self.__loop())
|
self.task = loop.create_task(self.__loop())
|
||||||
|
self.home_assistant_restarted = False
|
||||||
|
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
@@ -55,7 +56,11 @@ class Mqtt(metaclass=Singleton):
|
|||||||
async with self.client.messages() as messages:
|
async with self.client.messages() as messages:
|
||||||
await self.client.subscribe(f"{ha['auto_conf_prefix']}/status")
|
await self.client.subscribe(f"{ha['auto_conf_prefix']}/status")
|
||||||
async for message in messages:
|
async for message in messages:
|
||||||
logger_mqtt.info(f'Home-Assistant Status: {message.payload.decode("UTF-8")}')
|
status = message.payload.decode("UTF-8")
|
||||||
|
logger_mqtt.info(f'Home-Assistant Status: {status}')
|
||||||
|
if status == 'online':
|
||||||
|
self.home_assistant_restarted = True # set flag to force MQTT registering
|
||||||
|
|
||||||
except aiomqtt.MqttError:
|
except aiomqtt.MqttError:
|
||||||
logger_mqtt.info(f"Connection lost; Reconnecting in {interval} seconds ...")
|
logger_mqtt.info(f"Connection lost; Reconnecting in {interval} seconds ...")
|
||||||
await asyncio.sleep(interval)
|
await asyncio.sleep(interval)
|
||||||
|
|||||||
@@ -7,21 +7,28 @@ class Proxy:
|
|||||||
proxy.ClientStream = None
|
proxy.ClientStream = None
|
||||||
|
|
||||||
async def server_loop(proxy, addr):
|
async def server_loop(proxy, addr):
|
||||||
logging.info(f'Accept connection from {addr}')
|
'''Loop for receiving messages from the inverter (server-side)'''
|
||||||
|
logging.info(f'Accept connection from {addr}')
|
||||||
await proxy.ServerStream.loop()
|
await proxy.ServerStream.loop()
|
||||||
logging.info(f'Close server connection {addr}')
|
logging.info(f'Server loop stopped for {addr}')
|
||||||
|
|
||||||
|
# if the server connection closes, we also disconnect the connection to te TSUN cloud
|
||||||
if proxy.ClientStream:
|
if proxy.ClientStream:
|
||||||
logging.debug ("close client connection")
|
logging.debug ("disconnect client connection")
|
||||||
proxy.ClientStream.close()
|
proxy.ClientStream.disc()
|
||||||
|
|
||||||
async def client_loop(proxy, addr):
|
async def client_loop(proxy, addr):
|
||||||
|
'''Loop for receiving messages from the TSUN cloud (client-side)'''
|
||||||
await proxy.ClientStream.loop()
|
await proxy.ClientStream.loop()
|
||||||
logging.info(f'Close client connection {addr}')
|
logging.info(f'Client loop stopped for {addr}')
|
||||||
proxy.ServerStream.remoteStream = None
|
|
||||||
|
# if the client connection closes, we don't touch the server connection. Instead we erase the client
|
||||||
|
# connection stream, thus on the next received packet from the inverter, we can establish a new connection
|
||||||
|
# to the TSUN cloud
|
||||||
proxy.ClientStream = None
|
proxy.ClientStream = None
|
||||||
|
|
||||||
async def CreateClientStream (proxy, stream, host, port):
|
async def CreateClientStream (proxy, stream, host, port):
|
||||||
|
'''Establish a client connection to the TSUN cloud'''
|
||||||
addr = (host, port)
|
addr = (host, port)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user