refactor server creation

This commit is contained in:
Stefan Allius
2024-09-28 22:43:29 +02:00
parent 7b6810cb46
commit 2be0ef67af
4 changed files with 24 additions and 34 deletions

View File

@@ -158,7 +158,7 @@ class AsyncStream(AsyncIfcImpl):
'''maximum default time without a received msg in sec''' '''maximum default time without a received msg in sec'''
def __init__(self, reader: StreamReader, writer: StreamWriter, def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: "StreamPtr") -> None: rstream: "StreamPtr") -> None:
AsyncIfcImpl.__init__(self) AsyncIfcImpl.__init__(self)
logger.debug('AsyncStream.__init__') logger.debug('AsyncStream.__init__')
@@ -167,9 +167,8 @@ class AsyncStream(AsyncIfcImpl):
self.tx_fifo.reg_trigger(self.__write_cb) self.tx_fifo.reg_trigger(self.__write_cb)
self._reader = reader self._reader = reader
self._writer = writer self._writer = writer
self.addr = addr self.r_addr = writer.get_extra_info('peername')
self.r_addr = '' self.l_addr = writer.get_extra_info('sockname')
self.l_addr = ''
self.proc_start = None # start processing start timestamp self.proc_start = None # start processing start timestamp
self.proc_max = 0 self.proc_max = 0
self.async_publ_mqtt = None # will be set AsyncStreamServer only self.async_publ_mqtt = None # will be set AsyncStreamServer only
@@ -184,8 +183,6 @@ class AsyncStream(AsyncIfcImpl):
async def loop(self) -> Self: async def loop(self) -> Self:
"""Async loop handler for precessing all received messages""" """Async loop handler for precessing all received messages"""
self.r_addr = self._writer.get_extra_info('peername')
self.l_addr = self._writer.get_extra_info('sockname')
self.proc_start = time.time() self.proc_start = time.time()
while True: while True:
try: try:
@@ -228,7 +225,7 @@ class AsyncStream(AsyncIfcImpl):
except Exception: except Exception:
Infos.inc_counter('SW_Exception') Infos.inc_counter('SW_Exception')
logger.error( logger.error(
f"Exception for {self.addr}:\n" f"Exception for {self.r_addr}:\n"
f"{traceback.format_exc()}") f"{traceback.format_exc()}")
await asyncio.sleep(0) # be cooperative to other task await asyncio.sleep(0) # be cooperative to other task
@@ -282,7 +279,7 @@ class AsyncStream(AsyncIfcImpl):
async def __async_write(self, headline: str = 'Transmit to ') -> None: async def __async_write(self, headline: str = 'Transmit to ') -> None:
"""Async write handler to transmit the send_buffer""" """Async write handler to transmit the send_buffer"""
if len(self.tx_fifo) > 0: if len(self.tx_fifo) > 0:
self.tx_fifo.logging(logging.INFO, f'{headline}{self.addr}:') self.tx_fifo.logging(logging.INFO, f'{headline}{self.r_addr}:')
self._writer.write(self.tx_fifo.get()) self._writer.write(self.tx_fifo.get())
await self._writer.drain() await self._writer.drain()
@@ -314,7 +311,7 @@ class AsyncStream(AsyncIfcImpl):
except Exception: except Exception:
Infos.inc_counter('SW_Exception') Infos.inc_counter('SW_Exception')
logger.error( logger.error(
f"Fwd Exception for {self.addr}:\n" f"Fwd Exception for {self.r_addr}:\n"
f"{traceback.format_exc()}") f"{traceback.format_exc()}")
def __del__(self): def __del__(self):
@@ -324,17 +321,16 @@ class AsyncStream(AsyncIfcImpl):
class AsyncStreamServer(AsyncStream): class AsyncStreamServer(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter, def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, async_publ_mqtt, async_create_remote, async_publ_mqtt, async_create_remote,
rstream: "StreamPtr") -> None: rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, addr, AsyncStream.__init__(self, reader, writer, rstream)
rstream)
self.async_create_remote = async_create_remote self.async_create_remote = async_create_remote
self.async_publ_mqtt = async_publ_mqtt self.async_publ_mqtt = async_publ_mqtt
async def server_loop(self, addr: str) -> None: async def server_loop(self) -> None:
'''Loop for receiving messages from the inverter (server-side)''' '''Loop for receiving messages from the inverter (server-side)'''
logger.info(f'[{self.node_id}:{self.conn_no}] ' logger.info(f'[{self.node_id}:{self.conn_no}] '
f'Accept connection from {addr}') f'Accept connection from {self.r_addr}')
Infos.inc_counter('Inverter_Cnt') Infos.inc_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt() await self.publish_outstanding_mqtt()
await self.loop() await self.loop()
@@ -361,7 +357,7 @@ class AsyncStreamServer(AsyncStream):
if self.remote.stream: if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek()) self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to ' self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.addr}:') f'{self.remote.ifc.r_addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get()) self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain() await self.remote.ifc._writer.drain()
@@ -385,9 +381,8 @@ class AsyncStreamServer(AsyncStream):
class AsyncStreamClient(AsyncStream): class AsyncStreamClient(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter, def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: "StreamPtr") -> None: rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, addr, AsyncStream.__init__(self, reader, writer, rstream)
rstream)
async def client_loop(self, _: str) -> None: async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)''' '''Loop for receiving messages from the TSUN cloud (client-side)'''
@@ -417,6 +412,6 @@ class AsyncStreamClient(AsyncStream):
if self.remote.stream: if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek()) self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to ' self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.addr}:') f'{self.remote.ifc.r_addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get()) self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain() await self.remote.ifc._writer.drain()

View File

@@ -38,7 +38,7 @@ class ConnectionG3Server(ConnectionG3):
server_side = True server_side = True
self.remote = StreamPtr(rstream) self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamServer(reader, writer, addr, self._ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt, self.async_publ_mqtt,
self.async_create_remote, self.async_create_remote,
self.remote) self.remote)
@@ -54,7 +54,7 @@ class ConnectionG3Client(ConnectionG3):
id_str=b'') -> None: id_str=b'') -> None:
server_side = False server_side = False
self.remote = StreamPtr(rstream) self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamClient(reader, writer, addr, self._ifc = AsyncStreamClient(reader, writer,
self.remote) self.remote)
self.conn_no = self._ifc.get_conn_no() self.conn_no = self._ifc.get_conn_no()
self.addr = addr self.addr = addr

View File

@@ -38,7 +38,7 @@ class ConnectionG3PServer(ConnectionG3P):
server_side = True server_side = True
self.remote = StreamPtr(rstream) self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamServer(reader, writer, addr, self._ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt, self.async_publ_mqtt,
self.async_create_remote, self.async_create_remote,
self.remote) self.remote)
@@ -55,7 +55,7 @@ class ConnectionG3PClient(ConnectionG3P):
server_side = False server_side = False
client_mode = False client_mode = False
self.remote = StreamPtr(rstream) self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamClient(reader, writer, addr, self.remote) self._ifc = AsyncStreamClient(reader, writer, self.remote)
self.conn_no = self._ifc.get_conn_no() self.conn_no = self._ifc.get_conn_no()
self.addr = addr self.addr = addr
SolarmanV5.__init__(self, server_side, client_mode, self._ifc) SolarmanV5.__init__(self, server_side, client_mode, self._ifc)

View File

@@ -70,18 +70,11 @@ async def webserver(addr, port):
logging.debug('HTTP cleanup done') logging.debug('HTTP cleanup done')
async def handle_client(reader: StreamReader, writer: StreamWriter): async def handle_client(reader: StreamReader, writer: StreamWriter, inv_class):
'''Handles a new incoming connection and starts an async loop''' '''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername') addr = writer.get_extra_info('peername')
await InverterG3(reader, writer, addr)._ifc.server_loop(addr) await inv_class(reader, writer, addr)._ifc.server_loop()
async def handle_client_v2(reader: StreamReader, writer: StreamWriter):
'''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername')
await InverterG3P(reader, writer, addr)._ifc.server_loop(addr)
async def handle_shutdown(web_task): async def handle_shutdown(web_task):
@@ -180,8 +173,10 @@ if __name__ == "__main__":
# start_server directly out of our main task, the eventloop will be blocked # start_server directly out of our main task, the eventloop will be blocked
# and we can't receive and handle the UNIX signals! # and we can't receive and handle the UNIX signals!
# #
loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005)) for inv_class, port in [(InverterG3, 5005), (InverterG3P, 10000)]:
loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000)) loop.create_task(asyncio.start_server(lambda r, w, i=inv_class:
handle_client(r, w, i),
'0.0.0.0', port))
web_task = loop.create_task(webserver('0.0.0.0', 8127)) web_task = loop.create_task(webserver('0.0.0.0', 8127))
# #