remove connection classes

This commit is contained in:
Stefan Allius
2024-10-02 23:40:42 +02:00
parent 39aba31bbd
commit cfe2c9cb9d
20 changed files with 582 additions and 780 deletions

View File

@@ -127,8 +127,9 @@ class AsyncIfcImpl(AsyncIfc):
class StreamPtr():
'''Descr StreamPtr'''
def __init__(self, _stream):
def __init__(self, _stream, _ifc=None):
self.stream = _stream
self.ifc = _ifc
def __str__(self) -> str:
return f'ifc:{self._ifc}, stream: {self._stream}'
@@ -137,6 +138,10 @@ class StreamPtr():
def ifc(self):
return self._ifc
@ifc.setter
def ifc(self, value):
self._ifc = value
@property
def stream(self):
return self._stream
@@ -144,10 +149,6 @@ class StreamPtr():
@stream.setter
def stream(self, value):
self._stream = value
if value:
self._ifc = value.ifc
else:
self._ifc = None
class AsyncStream(AsyncIfcImpl):
@@ -231,6 +232,7 @@ class AsyncStream(AsyncIfcImpl):
async def disc(self) -> None:
"""Async disc handler for graceful disconnect"""
self.remote = None
if self._writer.is_closing():
return
logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
@@ -329,6 +331,12 @@ class AsyncStreamServer(AsyncStream):
self.async_create_remote = async_create_remote
self.async_publ_mqtt = async_publ_mqtt
def close(self) -> None:
logging.info('AsyncStreamServer.close()')
self.async_create_remote = None
self.async_publ_mqtt = None
super().close()
async def server_loop(self) -> None:
'''Loop for receiving messages from the inverter (server-side)'''
logger.info(f'[{self.node_id}:{self.conn_no}] '
@@ -343,7 +351,7 @@ class AsyncStreamServer(AsyncStream):
# if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud
if self.remote.stream:
if self.remote and self.remote.stream:
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remote.ifc.node_id}:'
f'{self.remote.ifc.conn_no}]')
@@ -378,6 +386,11 @@ class AsyncStreamClient(AsyncStream):
AsyncStream.__init__(self, reader, writer, rstream)
self.close_cb = close_cb
def close(self) -> None:
logging.info('AsyncStreamClient.close()')
self.close_cb = None
super().close()
async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
await self.loop()

View File

@@ -1,13 +0,0 @@
import logging
if __name__ == "app.src.gen3.connection_g3":
from app.src.gen3.talent import Talent
else: # pragma: no cover
from gen3.talent import Talent
logger = logging.getLogger('conn')
class ConnectionG3(Talent):
def __init__(self, addr, ifc, server_side, id_str=b'') -> None:
super().__init__(addr, server_side, ifc, id_str)

View File

@@ -5,12 +5,12 @@ if __name__ == "app.src.gen3.inverter_g3":
from app.src.inverter_base import InverterBase
from app.src.async_stream import StreamPtr
from app.src.async_stream import AsyncStreamServer
from app.src.gen3.connection_g3 import ConnectionG3
from app.src.gen3.talent import Talent
else: # pragma: no cover
from inverter_base import InverterBase
from async_stream import StreamPtr
from async_stream import AsyncStreamServer
from gen3.connection_g3 import ConnectionG3
from gen3.talent import Talent
logger_mqtt = logging.getLogger('mqtt')
@@ -27,9 +27,9 @@ class InverterG3(InverterBase):
self.remote)
self.local = StreamPtr(
ConnectionG3(addr, ifc, True)
Talent(addr, ifc, True, False), ifc
)
async def async_create_remote(self) -> None:
await InverterBase.async_create_remote(
self, 'tsun', ConnectionG3)
self, 'tsun', Talent)

View File

@@ -46,7 +46,8 @@ class Talent(Message):
MB_REGULAR_TIMEOUT = 60
TXT_UNKNOWN_CTRL = 'Unknown Ctrl'
def __init__(self, addr, server_side: bool, ifc: "AsyncIfc", id_str=b''):
def __init__(self, addr, ifc: "AsyncIfc", server_side: bool,
client_mode: bool = False, id_str=b''):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=15)
ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout)
@@ -95,10 +96,6 @@ class Talent(Message):
'''
Our puplic methods
'''
def healthy(self) -> bool:
logger.debug('Talent healthy()')
return self.ifc.healthy()
def close(self) -> None:
logging.debug('Talent.close()')
if self.server_side:
@@ -116,11 +113,11 @@ class Talent(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.close()
self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None)
self.ifc.prot_set_init_new_client_conn_cb(None)
self.ifc.prot_set_update_header_cb(None)
self.ifc = None
super().close()
def __set_serial_no(self, serial_no: str):

View File

@@ -1,14 +0,0 @@
import logging
if __name__ == "app.src.gen3plus.connection_g3p":
from app.src.gen3plus.solarman_v5 import SolarmanV5
else: # pragma: no cover
from gen3plus.solarman_v5 import SolarmanV5
logger = logging.getLogger('conn')
class ConnectionG3P(SolarmanV5):
def __init__(self, addr, ifc, server_side,
client_mode: bool = False) -> None:
super().__init__(addr, server_side, client_mode, ifc)

View File

@@ -5,12 +5,12 @@ if __name__ == "app.src.gen3plus.inverter_g3p":
from app.src.inverter_base import InverterBase
from app.src.async_stream import StreamPtr
from app.src.async_stream import AsyncStreamServer
from app.src.gen3plus.connection_g3p import ConnectionG3P
from app.src.gen3plus.solarman_v5 import SolarmanV5
else: # pragma: no cover
from inverter_base import InverterBase
from async_stream import StreamPtr
from async_stream import AsyncStreamServer
from gen3plus.connection_g3p import ConnectionG3P
from gen3plus.solarman_v5 import SolarmanV5
logger_mqtt = logging.getLogger('mqtt')
@@ -28,9 +28,9 @@ class InverterG3P(InverterBase):
self.remote)
self.local = StreamPtr(
ConnectionG3P(addr, ifc, True, client_mode)
SolarmanV5(addr, ifc, True, client_mode), ifc
)
async def async_create_remote(self) -> None:
await InverterBase.async_create_remote(
self, 'solarman', ConnectionG3P)
self, 'solarman', SolarmanV5)

View File

@@ -62,8 +62,8 @@ class SolarmanV5(Message):
HDR_FMT = '<BLLL'
'''format string for packing of the header'''
def __init__(self, addr, server_side: bool, client_mode: bool,
ifc: "AsyncIfc"):
def __init__(self, addr, ifc: "AsyncIfc",
server_side: bool, client_mode: bool):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=8)
ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout)
@@ -153,10 +153,6 @@ class SolarmanV5(Message):
'''
Our puplic methods
'''
def healthy(self) -> bool:
logger.debug('SolarmanV5 healthy()')
return self.ifc.healthy()
def close(self) -> None:
logging.debug('Solarman.close()')
if self.server_side:
@@ -174,11 +170,11 @@ class SolarmanV5(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.close()
self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None)
self.ifc.prot_set_init_new_client_conn_cb(None)
self.ifc.prot_set_update_header_cb(None)
self.ifc = None
super().close()
async def send_start_cmd(self, snr: int, host: str,

View File

@@ -1,12 +1,15 @@
import asyncio
import weakref
import logging
import json
if __name__ == "app.src.inverter":
from app.src.iter_registry import IterRegistry
from app.src.config import Config
from app.src.mqtt import Mqtt
from app.src.infos import Infos
else: # pragma: no cover
from iter_registry import IterRegistry
from config import Config
from mqtt import Mqtt
from infos import Infos
@@ -14,7 +17,7 @@ else: # pragma: no cover
logger_mqtt = logging.getLogger('mqtt')
class Inverter():
class Inverter(metaclass=IterRegistry):
'''class Inverter is a baseclass
The class has some class method for managing common resources like a
@@ -37,6 +40,8 @@ class Inverter():
async_create_remote(): Establish a client connection to the TSUN cloud
async_publ_mqtt(): Publish data to MQTT broker
'''
_registry = []
@classmethod
def class_init(cls) -> None:
logging.debug('Inverter.class_init')
@@ -104,3 +109,6 @@ class Inverter():
logging.info('Close MQTT Task')
loop.run_until_complete(cls.mqtt.close())
cls.mqtt = None
def __init__(self):
self._registry.append(weakref.ref(self))

View File

@@ -20,24 +20,23 @@ logger_mqtt = logging.getLogger('mqtt')
class InverterBase(Inverter):
def __init__(self):
super().__init__()
self.__ha_restarts = -1
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb) -> None:
logging.info(f'Inverter.__exit__() {self.addr}')
logging.debug(f'InverterBase.__exit__() {self.addr}')
self.__del_remote()
if self.local.stream:
self.local.stream.close()
self.local.stream = None
if self.local.ifc:
self.local.ifc.close()
self.local.ifc = None
self.local.stream.close()
self.local.stream = None
self.local.ifc.close()
self.local.ifc = None
def __del__(self) -> None:
logging.info(f'Inverter.__del__() {self.addr}')
logging.debug(f'InverterBase.__del__() {self.addr}')
def __del_remote(self):
if self.remote.stream:
@@ -48,6 +47,25 @@ class InverterBase(Inverter):
self.remote.ifc.close()
self.remote.ifc = None
async def disc(self, shutdown_started=False) -> None:
if self.remote.stream:
self.remote.stream.shutdown_started = shutdown_started
if self.remote.ifc:
await self.remote.ifc.disc()
if self.local.stream:
self.local.stream.shutdown_started = shutdown_started
if self.local.ifc:
await self.local.ifc.disc()
def healthy(self) -> bool:
logging.debug('Inverter healthy()')
if self.local.ifc and not self.local.ifc.healthy():
return False
if self.remote.ifc and not self.remote.ifc.healthy():
return False
return True
async def async_create_remote(self, inv_prot: str, conn_class) -> None:
'''Establish a client connection to the TSUN cloud'''
tsun = Config.get(inv_prot)
@@ -55,8 +73,6 @@ class InverterBase(Inverter):
port = tsun['port']
addr = (host, port)
stream = self.local.stream
if not stream:
return
try:
logging.info(f'[{stream.node_id}] Connect to {addr}')
@@ -65,12 +81,15 @@ class InverterBase(Inverter):
ifc = AsyncStreamClient(
reader, writer, self.local, self.__del_remote)
self.remote.ifc = ifc
if hasattr(stream, 'id_str'):
self.remote.stream = conn_class(
addr, ifc, False, stream.id_str)
addr, ifc, server_side=False,
client_mode=False, id_str=stream.id_str)
else:
self.remote.stream = conn_class(
addr, ifc, False)
addr, ifc, server_side=False,
client_mode=False)
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '

8
app/src/iter_registry.py Normal file
View File

@@ -0,0 +1,8 @@
class IterRegistry(type):
def __iter__(cls):
for ref in cls._registry:
obj = ref()
if obj is not None:
yield obj

View File

@@ -1,13 +1,15 @@
import logging
import weakref
from typing import Callable, Generator
from typing import Callable
from enum import Enum
if __name__ == "app.src.messages":
from app.src.iter_registry import IterRegistry
from app.src.infos import Infos, Register
from app.src.modbus import Modbus
else: # pragma: no cover
from iter_registry import IterRegistry
from infos import Infos, Register
from modbus import Modbus
@@ -66,14 +68,6 @@ def hex_dump_memory(level, info, data, data_len):
tracer.log(level, '\n'.join(lines))
class IterRegistry(type):
def __iter__(cls) -> Generator['Message', None, None]:
for ref in cls._registry:
obj = ref()
if obj is not None:
yield obj
class State(Enum):
'''state of the logical connection'''
init = 0

View File

@@ -5,7 +5,6 @@ import os
from asyncio import StreamReader, StreamWriter
from aiohttp import web
from logging import config # noqa F401
from messages import Message
from inverter import Inverter
from gen3.inverter_g3 import InverterG3
from gen3plus.inverter_g3p import InverterG3P
@@ -38,9 +37,9 @@ async def healthy(request):
if proxy_is_up:
# logging.info('web reqeust healthy()')
for stream in Message:
for inverter in Inverter:
try:
res = stream.healthy()
res = inverter.healthy()
if not res:
return web.Response(status=503, text="I have a problem")
except Exception as err:
@@ -88,25 +87,13 @@ async def handle_shutdown(web_task):
#
# first, disc all open TCP connections gracefully
#
for stream in Message:
stream.shutdown_started = True
try:
await asyncio.wait_for(stream.disc(), 2)
except Exception:
pass
for inverter in Inverter:
await inverter.disc(True)
logging.info('Proxy disconnecting done')
#
# second, close all open TCP connections
#
for stream in Message:
stream.close()
await asyncio.sleep(0.1) # give time for closing
logging.info('Proxy closing done')
#
# third, cancel the web server
# second, cancel the web server
#
web_task.cancel()
await web_task
@@ -167,7 +154,7 @@ if __name__ == "__main__":
logging.info(f'ConfigErr: {ConfigErr}')
Inverter.class_init()
Schedule.start()
mb_tcp = ModbusTcp(loop)
ModbusTcp(loop)
#
# Create tasks for our listening servers. These must be tasks! If we call