split AsyncStream in two classes

This commit is contained in:
Stefan Allius
2024-09-26 23:04:11 +02:00
parent b6761e7517
commit 73526b7dc6
10 changed files with 620 additions and 505 deletions

View File

@@ -7,10 +7,12 @@ from typing import Self
from itertools import count
if __name__ == "app.src.async_stream":
from app.src.inverter import Inverter
from app.src.byte_fifo import ByteFifo
from app.src.async_ifc import AsyncIfc
from app.src.infos import Infos
else: # pragma: no cover
from inverter import Inverter
from byte_fifo import ByteFifo
from async_ifc import AsyncIfc
from infos import Infos
@@ -156,15 +158,13 @@ class AsyncStream(AsyncIfcImpl):
'''maximum default time without a received msg in sec'''
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, async_publ_mqtt, async_create_remote,
rstream: "StreamPtr") -> None:
addr, rstream: "StreamPtr") -> None:
AsyncIfcImpl.__init__(self)
logger.debug('AsyncStream.__init__')
self.remote = rstream
self.tx_fifo.reg_trigger(self.__write_cb)
self.async_create_remote = async_create_remote
self._reader = reader
self._writer = writer
self.addr = addr
@@ -172,7 +172,7 @@ class AsyncStream(AsyncIfcImpl):
self.l_addr = ''
self.proc_start = None # start processing start timestamp
self.proc_max = 0
self.async_publ_mqtt = async_publ_mqtt
self.async_publ_mqtt = None # will be set AsyncStreamServer only
def __write_cb(self):
self._writer.write(self.tx_fifo.get())
@@ -182,56 +182,6 @@ class AsyncStream(AsyncIfcImpl):
return self.timeout_cb
return 360
async def publish_outstanding_mqtt(self):
'''Publish all outstanding MQTT topics'''
try:
if self.unique_id:
await self.async_publ_mqtt()
await self._async_publ_mqtt_proxy_stat('proxy')
except Exception:
pass
async def server_loop(self, addr: str) -> None:
'''Loop for receiving messages from the inverter (server-side)'''
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'Accept connection from {addr}')
Infos.inc_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
await self.loop()
Infos.dec_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for'
f' r{self.r_addr}')
# if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud
if self.remote.stream:
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remote.ifc.node_id}:'
f'{self.remote.ifc.conn_no}]')
await self.remote.ifc.disc()
async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
client_stream = await self.remote.ifc.loop()
logger.info(f'[{client_stream.node_id}:{client_stream.conn_no}] '
'Client loop stopped for'
f' l{client_stream.l_addr}')
# if the client connection closes, we don't touch the server
# connection. Instead we erase the client connection stream,
# thus on the next received packet from the inverter, we can
# establish a new connection to the TSUN cloud
# erase backlink to inverter
client_stream.remote.stream = None
if self.remote.stream == client_stream:
# logging.debug(f'Client l{client_stream.l_addr} refs:'
# f' {gc.get_referrers(client_stream)}')
# than erase client connection
self.remote.stream = None
async def loop(self) -> Self:
"""Async loop handler for precessing all received messages"""
self.r_addr = self._writer.get_extra_info('peername')
@@ -247,10 +197,10 @@ class AsyncStream(AsyncIfcImpl):
await asyncio.wait_for(self.__async_read(),
dead_conn_to)
# if self.unique_id:
await self.__async_write()
await self.__async_forward()
await self.async_publ_mqtt()
if self.async_publ_mqtt:
await self.async_publ_mqtt()
except asyncio.TimeoutError:
logger.warning(f'[{self.node_id}:{self.conn_no}] Dead '
@@ -282,13 +232,6 @@ class AsyncStream(AsyncIfcImpl):
f"{traceback.format_exc()}")
await asyncio.sleep(0) # be cooperative to other task
async def __async_write(self, headline: str = 'Transmit to ') -> None:
"""Async write handler to transmit the send_buffer"""
if len(self.tx_fifo) > 0:
self.tx_fifo.logging(logging.INFO, f'{headline}{self.addr}:')
self._writer.write(self.tx_fifo.get())
await self._writer.drain()
async def disc(self) -> None:
"""Async disc handler for graceful disconnect"""
if self._writer.is_closing():
@@ -302,7 +245,6 @@ class AsyncStream(AsyncIfcImpl):
hint: must be called before releasing the connection instance
"""
self.async_create_remote = None
super().close()
self._reader.feed_eof() # abort awaited read
if self._writer.is_closing():
@@ -337,23 +279,19 @@ class AsyncStream(AsyncIfcImpl):
else:
raise RuntimeError("Peer closed.")
async def __async_write(self, headline: str = 'Transmit to ') -> None:
"""Async write handler to transmit the send_buffer"""
if len(self.tx_fifo) > 0:
self.tx_fifo.logging(logging.INFO, f'{headline}{self.addr}:')
self._writer.write(self.tx_fifo.get())
await self._writer.drain()
async def __async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""
if len(self.fwd_fifo) == 0:
return
try:
if not self.remote.stream:
await self.async_create_remote()
if self.remote.stream:
if self.remote.ifc.init_new_client_conn_cb():
await self.remote.ifc.__async_write()
if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain()
await self._async_forward()
except OSError as error:
if self.remote.stream:
@@ -382,3 +320,103 @@ class AsyncStream(AsyncIfcImpl):
def __del__(self):
logger.debug(
f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}")
class AsyncStreamServer(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, async_publ_mqtt, async_create_remote,
rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, addr,
rstream)
self.async_create_remote = async_create_remote
self.async_publ_mqtt = async_publ_mqtt
async def server_loop(self, addr: str) -> None:
'''Loop for receiving messages from the inverter (server-side)'''
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'Accept connection from {addr}')
Infos.inc_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
await self.loop()
Infos.dec_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for'
f' r{self.r_addr}')
# if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud
if self.remote.stream:
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remote.ifc.node_id}:'
f'{self.remote.ifc.conn_no}]')
await self.remote.ifc.disc()
async def _async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""
if not self.remote.stream:
await self.async_create_remote()
if self.remote.stream and \
self.remote.ifc.init_new_client_conn_cb():
await self.remote.ifc._AsyncStream__async_write()
if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain()
async def publish_outstanding_mqtt(self):
'''Publish all outstanding MQTT topics'''
try:
await self.async_publ_mqtt()
await Inverter._async_publ_mqtt_proxy_stat('proxy')
except Exception:
pass
def close(self) -> None:
"""close handler for a no waiting disconnect
hint: must be called before releasing the connection instance
"""
self.async_create_remote = None
self.async_publ_mqtt = None
super().close()
class AsyncStreamClient(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, addr,
rstream)
async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
await self.loop()
logger.info(f'[{self.node_id}:{self.conn_no}] '
'Client loop stopped for'
f' l{self.l_addr}')
server_stream = self.remote.stream
# if the client connection closes, we don't touch the server
# connection. Instead we erase the client connection stream,
# thus on the next received packet from the inverter, we can
# establish a new connection to the TSUN cloud
if server_stream.remote.ifc == self:
# logging.debug(f'Client l{client_stream.l_addr} refs:'
# f' {gc.get_referrers(client_stream)}')
# than erase client connection
server_stream.remote.stream = None # erases stream and ifc link
# erase backlink to inverter
self.remote.stream = None
async def _async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""
if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain()

View File

@@ -2,10 +2,12 @@ import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3.connection_g3":
from app.src.async_stream import AsyncStream, StreamPtr
from app.src.async_stream import AsyncStreamServer
from app.src.async_stream import AsyncStreamClient, StreamPtr
from app.src.gen3.talent import Talent
else: # pragma: no cover
from async_stream import AsyncStream, StreamPtr
from async_stream import AsyncStreamServer
from async_stream import AsyncStreamClient, StreamPtr
from gen3.talent import Talent
logger = logging.getLogger('conn')
@@ -17,10 +19,14 @@ class ConnectionG3(Talent):
addr, rstream: 'ConnectionG3', server_side: bool,
id_str=b'') -> None:
self.remote = StreamPtr(rstream)
self._ifc = AsyncStream(reader, writer, addr,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
if server_side:
self._ifc = AsyncStreamServer(reader, writer, addr,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
else:
self._ifc = AsyncStreamClient(reader, writer, addr,
self.remote)
Talent.__init__(self, server_side, self._ifc, id_str)
self.conn_no = self._ifc.get_conn_no()

View File

@@ -71,7 +71,7 @@ class InverterG3(Inverter, ConnectionG3):
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self._ifc.client_loop(addr))
asyncio.create_task(self.remote.ifc.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error:
logging.info(f'{error}')
@@ -83,6 +83,8 @@ class InverterG3(Inverter, ConnectionG3):
async def async_publ_mqtt(self) -> None:
'''publish data to MQTT broker'''
if not self.unique_id:
return
# check if new inverter or collector infos are available or when the
# home assistant has changed the status back to online
try:
@@ -97,7 +99,7 @@ class InverterG3(Inverter, ConnectionG3):
for key in self.new_data:
await self.__async_publ_mqtt_packet(key)
for key in Infos.new_stat_data:
await self._async_publ_mqtt_proxy_stat(key)
await Inverter._async_publ_mqtt_proxy_stat(key)
except MqttCodeError as error:
logging.error(f'Mqtt except: {error}')
@@ -118,8 +120,6 @@ class InverterG3(Inverter, ConnectionG3):
async def __register_home_assistant(self) -> None:
'''register all our topics at home assistant'''
if not self.unique_id:
return
for data_json, component, node_id, id in self.db.ha_confs(
self.entity_prfx, self.node_id, self.unique_id,
self.sug_area):

View File

@@ -2,10 +2,12 @@ import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3plus.connection_g3p":
from app.src.async_stream import AsyncStream, StreamPtr
from app.src.async_stream import AsyncStreamServer
from app.src.async_stream import AsyncStreamClient, StreamPtr
from app.src.gen3plus.solarman_v5 import SolarmanV5
else: # pragma: no cover
from async_stream import AsyncStream, StreamPtr
from async_stream import AsyncStreamServer
from async_stream import AsyncStreamClient, StreamPtr
from gen3plus.solarman_v5 import SolarmanV5
logger = logging.getLogger('conn')
@@ -19,10 +21,15 @@ class ConnectionG3P(SolarmanV5):
client_mode: bool) -> None:
self.remote = StreamPtr(rstream)
self._ifc = AsyncStream(reader, writer, addr,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
if server_side:
self._ifc = AsyncStreamServer(reader, writer, addr,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
else:
self._ifc = AsyncStreamClient(reader, writer, addr,
self.remote)
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)
self.conn_no = self._ifc.get_conn_no()

View File

@@ -74,7 +74,7 @@ class InverterG3P(Inverter, ConnectionG3P):
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self._ifc.client_loop(addr))
asyncio.create_task(self.remote.ifc.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error:
logging.info(f'{error}')
@@ -86,6 +86,9 @@ class InverterG3P(Inverter, ConnectionG3P):
async def async_publ_mqtt(self) -> None:
'''publish data to MQTT broker'''
if not self.unique_id:
return
# check if new inverter or collector infos are available or when the
# home assistant has changed the status back to online
try:
@@ -100,7 +103,7 @@ class InverterG3P(Inverter, ConnectionG3P):
for key in self.new_data:
await self.__async_publ_mqtt_packet(key)
for key in Infos.new_stat_data:
await self._async_publ_mqtt_proxy_stat(key)
await Inverter._async_publ_mqtt_proxy_stat(key)
except MqttCodeError as error:
logging.error(f'Mqtt except: {error}')

View File

@@ -1,13 +0,0 @@
class ObjectFactory:
def __init__(self):
self._builders = {}
def register_builder(self, key, builder):
self._builders[key] = builder
def create(self, key, **kwargs):
builder = self._builders.get(key)
if not builder:
raise ValueError(key)
return builder(**kwargs)