From 2be0ef67af75dc594199821ca29ff79c9aa4a6c2 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Sat, 28 Sep 2024 22:43:29 +0200 Subject: [PATCH] refactor server creation --- app/src/async_stream.py | 33 +++++++++++++----------------- app/src/gen3/connection_g3.py | 4 ++-- app/src/gen3plus/connection_g3p.py | 4 ++-- app/src/server.py | 17 ++++++--------- 4 files changed, 24 insertions(+), 34 deletions(-) diff --git a/app/src/async_stream.py b/app/src/async_stream.py index a0c8e6a..9b189b4 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -158,7 +158,7 @@ class AsyncStream(AsyncIfcImpl): '''maximum default time without a received msg in sec''' def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, rstream: "StreamPtr") -> None: + rstream: "StreamPtr") -> None: AsyncIfcImpl.__init__(self) logger.debug('AsyncStream.__init__') @@ -167,9 +167,8 @@ class AsyncStream(AsyncIfcImpl): self.tx_fifo.reg_trigger(self.__write_cb) self._reader = reader self._writer = writer - self.addr = addr - self.r_addr = '' - self.l_addr = '' + self.r_addr = writer.get_extra_info('peername') + self.l_addr = writer.get_extra_info('sockname') self.proc_start = None # start processing start timestamp self.proc_max = 0 self.async_publ_mqtt = None # will be set AsyncStreamServer only @@ -184,8 +183,6 @@ class AsyncStream(AsyncIfcImpl): async def loop(self) -> Self: """Async loop handler for precessing all received messages""" - self.r_addr = self._writer.get_extra_info('peername') - self.l_addr = self._writer.get_extra_info('sockname') self.proc_start = time.time() while True: try: @@ -228,7 +225,7 @@ class AsyncStream(AsyncIfcImpl): except Exception: Infos.inc_counter('SW_Exception') logger.error( - f"Exception for {self.addr}:\n" + f"Exception for {self.r_addr}:\n" f"{traceback.format_exc()}") await asyncio.sleep(0) # be cooperative to other task @@ -282,7 +279,7 @@ class AsyncStream(AsyncIfcImpl): async def __async_write(self, headline: str = 'Transmit to ') -> None: """Async write handler to transmit the send_buffer""" if len(self.tx_fifo) > 0: - self.tx_fifo.logging(logging.INFO, f'{headline}{self.addr}:') + self.tx_fifo.logging(logging.INFO, f'{headline}{self.r_addr}:') self._writer.write(self.tx_fifo.get()) await self._writer.drain() @@ -314,7 +311,7 @@ class AsyncStream(AsyncIfcImpl): except Exception: Infos.inc_counter('SW_Exception') logger.error( - f"Fwd Exception for {self.addr}:\n" + f"Fwd Exception for {self.r_addr}:\n" f"{traceback.format_exc()}") def __del__(self): @@ -324,17 +321,16 @@ class AsyncStream(AsyncIfcImpl): class AsyncStreamServer(AsyncStream): def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, async_publ_mqtt, async_create_remote, + async_publ_mqtt, async_create_remote, rstream: "StreamPtr") -> None: - AsyncStream.__init__(self, reader, writer, addr, - rstream) + AsyncStream.__init__(self, reader, writer, rstream) self.async_create_remote = async_create_remote self.async_publ_mqtt = async_publ_mqtt - async def server_loop(self, addr: str) -> None: + async def server_loop(self) -> None: '''Loop for receiving messages from the inverter (server-side)''' logger.info(f'[{self.node_id}:{self.conn_no}] ' - f'Accept connection from {addr}') + f'Accept connection from {self.r_addr}') Infos.inc_counter('Inverter_Cnt') await self.publish_outstanding_mqtt() await self.loop() @@ -361,7 +357,7 @@ class AsyncStreamServer(AsyncStream): if self.remote.stream: self.remote.ifc.update_header_cb(self.fwd_fifo.peek()) self.fwd_fifo.logging(logging.INFO, 'Forward to ' - f'{self.remote.ifc.addr}:') + f'{self.remote.ifc.r_addr}:') self.remote.ifc._writer.write(self.fwd_fifo.get()) await self.remote.ifc._writer.drain() @@ -385,9 +381,8 @@ class AsyncStreamServer(AsyncStream): class AsyncStreamClient(AsyncStream): def __init__(self, reader: StreamReader, writer: StreamWriter, - addr, rstream: "StreamPtr") -> None: - AsyncStream.__init__(self, reader, writer, addr, - rstream) + rstream: "StreamPtr") -> None: + AsyncStream.__init__(self, reader, writer, rstream) async def client_loop(self, _: str) -> None: '''Loop for receiving messages from the TSUN cloud (client-side)''' @@ -417,6 +412,6 @@ class AsyncStreamClient(AsyncStream): if self.remote.stream: self.remote.ifc.update_header_cb(self.fwd_fifo.peek()) self.fwd_fifo.logging(logging.INFO, 'Forward to ' - f'{self.remote.ifc.addr}:') + f'{self.remote.ifc.r_addr}:') self.remote.ifc._writer.write(self.fwd_fifo.get()) await self.remote.ifc._writer.drain() diff --git a/app/src/gen3/connection_g3.py b/app/src/gen3/connection_g3.py index 8e431c4..8e213b1 100644 --- a/app/src/gen3/connection_g3.py +++ b/app/src/gen3/connection_g3.py @@ -38,7 +38,7 @@ class ConnectionG3Server(ConnectionG3): server_side = True self.remote = StreamPtr(rstream) - self._ifc = AsyncStreamServer(reader, writer, addr, + self._ifc = AsyncStreamServer(reader, writer, self.async_publ_mqtt, self.async_create_remote, self.remote) @@ -54,7 +54,7 @@ class ConnectionG3Client(ConnectionG3): id_str=b'') -> None: server_side = False self.remote = StreamPtr(rstream) - self._ifc = AsyncStreamClient(reader, writer, addr, + self._ifc = AsyncStreamClient(reader, writer, self.remote) self.conn_no = self._ifc.get_conn_no() self.addr = addr diff --git a/app/src/gen3plus/connection_g3p.py b/app/src/gen3plus/connection_g3p.py index 2fcf1bb..d1b99f3 100644 --- a/app/src/gen3plus/connection_g3p.py +++ b/app/src/gen3plus/connection_g3p.py @@ -38,7 +38,7 @@ class ConnectionG3PServer(ConnectionG3P): server_side = True self.remote = StreamPtr(rstream) - self._ifc = AsyncStreamServer(reader, writer, addr, + self._ifc = AsyncStreamServer(reader, writer, self.async_publ_mqtt, self.async_create_remote, self.remote) @@ -55,7 +55,7 @@ class ConnectionG3PClient(ConnectionG3P): server_side = False client_mode = False self.remote = StreamPtr(rstream) - self._ifc = AsyncStreamClient(reader, writer, addr, self.remote) + self._ifc = AsyncStreamClient(reader, writer, self.remote) self.conn_no = self._ifc.get_conn_no() self.addr = addr SolarmanV5.__init__(self, server_side, client_mode, self._ifc) diff --git a/app/src/server.py b/app/src/server.py index f1648ba..f39e263 100644 --- a/app/src/server.py +++ b/app/src/server.py @@ -70,18 +70,11 @@ async def webserver(addr, port): logging.debug('HTTP cleanup done') -async def handle_client(reader: StreamReader, writer: StreamWriter): +async def handle_client(reader: StreamReader, writer: StreamWriter, inv_class): '''Handles a new incoming connection and starts an async loop''' addr = writer.get_extra_info('peername') - await InverterG3(reader, writer, addr)._ifc.server_loop(addr) - - -async def handle_client_v2(reader: StreamReader, writer: StreamWriter): - '''Handles a new incoming connection and starts an async loop''' - - addr = writer.get_extra_info('peername') - await InverterG3P(reader, writer, addr)._ifc.server_loop(addr) + await inv_class(reader, writer, addr)._ifc.server_loop() async def handle_shutdown(web_task): @@ -180,8 +173,10 @@ if __name__ == "__main__": # start_server directly out of our main task, the eventloop will be blocked # and we can't receive and handle the UNIX signals! # - loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005)) - loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000)) + for inv_class, port in [(InverterG3, 5005), (InverterG3P, 10000)]: + loop.create_task(asyncio.start_server(lambda r, w, i=inv_class: + handle_client(r, w, i), + '0.0.0.0', port)) web_task = loop.create_task(webserver('0.0.0.0', 8127)) #