Merge branch 'refactoring-async-stream' of https://github.com/s-allius/tsun-gen3-proxy into titan-scan

This commit is contained in:
Stefan Allius
2024-09-28 23:31:02 +02:00
5 changed files with 24 additions and 38 deletions

View File

@@ -158,7 +158,7 @@ class AsyncStream(AsyncIfcImpl):
'''maximum default time without a received msg in sec'''
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: "StreamPtr") -> None:
rstream: "StreamPtr") -> None:
AsyncIfcImpl.__init__(self)
logger.debug('AsyncStream.__init__')
@@ -167,9 +167,8 @@ class AsyncStream(AsyncIfcImpl):
self.tx_fifo.reg_trigger(self.__write_cb)
self._reader = reader
self._writer = writer
self.addr = addr
self.r_addr = ''
self.l_addr = ''
self.r_addr = writer.get_extra_info('peername')
self.l_addr = writer.get_extra_info('sockname')
self.proc_start = None # start processing start timestamp
self.proc_max = 0
self.async_publ_mqtt = None # will be set AsyncStreamServer only
@@ -184,8 +183,6 @@ class AsyncStream(AsyncIfcImpl):
async def loop(self) -> Self:
"""Async loop handler for precessing all received messages"""
self.r_addr = self._writer.get_extra_info('peername')
self.l_addr = self._writer.get_extra_info('sockname')
self.proc_start = time.time()
while True:
try:
@@ -228,7 +225,7 @@ class AsyncStream(AsyncIfcImpl):
except Exception:
Infos.inc_counter('SW_Exception')
logger.error(
f"Exception for {self.addr}:\n"
f"Exception for {self.r_addr}:\n"
f"{traceback.format_exc()}")
await asyncio.sleep(0) # be cooperative to other task
@@ -282,7 +279,7 @@ class AsyncStream(AsyncIfcImpl):
async def __async_write(self, headline: str = 'Transmit to ') -> None:
"""Async write handler to transmit the send_buffer"""
if len(self.tx_fifo) > 0:
self.tx_fifo.logging(logging.INFO, f'{headline}{self.addr}:')
self.tx_fifo.logging(logging.INFO, f'{headline}{self.r_addr}:')
self._writer.write(self.tx_fifo.get())
await self._writer.drain()
@@ -314,7 +311,7 @@ class AsyncStream(AsyncIfcImpl):
except Exception:
Infos.inc_counter('SW_Exception')
logger.error(
f"Fwd Exception for {self.addr}:\n"
f"Fwd Exception for {self.r_addr}:\n"
f"{traceback.format_exc()}")
def __del__(self):
@@ -324,17 +321,16 @@ class AsyncStream(AsyncIfcImpl):
class AsyncStreamServer(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, async_publ_mqtt, async_create_remote,
async_publ_mqtt, async_create_remote,
rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, addr,
rstream)
AsyncStream.__init__(self, reader, writer, rstream)
self.async_create_remote = async_create_remote
self.async_publ_mqtt = async_publ_mqtt
async def server_loop(self, addr: str) -> None:
async def server_loop(self) -> None:
'''Loop for receiving messages from the inverter (server-side)'''
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'Accept connection from {addr}')
f'Accept connection from {self.r_addr}')
Infos.inc_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
await self.loop()
@@ -361,7 +357,7 @@ class AsyncStreamServer(AsyncStream):
if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.addr}:')
f'{self.remote.ifc.r_addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain()
@@ -385,9 +381,8 @@ class AsyncStreamServer(AsyncStream):
class AsyncStreamClient(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, addr,
rstream)
rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, rstream)
async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
@@ -417,6 +412,6 @@ class AsyncStreamClient(AsyncStream):
if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.addr}:')
f'{self.remote.ifc.r_addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain()

View File

@@ -38,7 +38,7 @@ class ConnectionG3Server(ConnectionG3):
server_side = True
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamServer(reader, writer, addr,
self._ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
@@ -54,7 +54,7 @@ class ConnectionG3Client(ConnectionG3):
id_str=b'') -> None:
server_side = False
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamClient(reader, writer, addr,
self._ifc = AsyncStreamClient(reader, writer,
self.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr

View File

@@ -38,7 +38,7 @@ class ConnectionG3PServer(ConnectionG3P):
server_side = True
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamServer(reader, writer, addr,
self._ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
@@ -55,7 +55,7 @@ class ConnectionG3PClient(ConnectionG3P):
server_side = False
client_mode = False
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamClient(reader, writer, addr, self.remote)
self._ifc = AsyncStreamClient(reader, writer, self.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)

View File

@@ -70,18 +70,11 @@ async def webserver(addr, port):
logging.debug('HTTP cleanup done')
async def handle_client(reader: StreamReader, writer: StreamWriter):
async def handle_client(reader: StreamReader, writer: StreamWriter, inv_class):
'''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername')
await InverterG3(reader, writer, addr)._ifc.server_loop(addr)
async def handle_client_v2(reader: StreamReader, writer: StreamWriter):
'''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername')
await InverterG3P(reader, writer, addr)._ifc.server_loop(addr)
await inv_class(reader, writer, addr)._ifc.server_loop()
async def handle_shutdown(web_task):
@@ -180,8 +173,10 @@ if __name__ == "__main__":
# start_server directly out of our main task, the eventloop will be blocked
# and we can't receive and handle the UNIX signals!
#
loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005))
loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000))
for inv_class, port in [(InverterG3, 5005), (InverterG3P, 10000)]:
loop.create_task(asyncio.start_server(lambda r, w, i=inv_class:
handle_client(r, w, i),
'0.0.0.0', port))
web_task = loop.create_task(webserver('0.0.0.0', 8127))
#

View File

@@ -12,10 +12,6 @@ from app.src.mqtt import Mqtt
from app.src.messages import Message, State
from app.src.inverter import Inverter
from app.src.modbus_tcp import ModbusConn, ModbusTcp
from app.src.mqtt import Mqtt
from app.src.messages import Message, State
from app.src.inverter import Inverter
from app.src.modbus_tcp import ModbusConn, ModbusTcp
pytest_plugins = ('pytest_asyncio',)