move StremPtr instances into Inverter class

This commit is contained in:
Stefan Allius
2024-09-29 15:31:14 +02:00
parent 0c824b4a2a
commit 5a0ef30ceb
13 changed files with 548 additions and 550 deletions

View File

@@ -3,23 +3,19 @@ from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3.connection_g3":
from app.src.async_stream import AsyncStreamServer
from app.src.async_stream import AsyncStreamClient, StreamPtr
from app.src.async_stream import AsyncStreamClient
from app.src.inverter import Inverter
from app.src.gen3.talent import Talent
else: # pragma: no cover
from async_stream import AsyncStreamServer
from async_stream import AsyncStreamClient, StreamPtr
from async_stream import AsyncStreamClient
from inverter import Inverter
from gen3.talent import Talent
logger = logging.getLogger('conn')
class ConnectionG3(Talent):
async def async_create_remote(self) -> None:
pass # virtual interface # pragma: no cover
async def async_publ_mqtt(self) -> None:
pass # virtual interface # pragma: no cover
def healthy(self) -> bool:
logger.debug('ConnectionG3 healthy()')
return self._ifc.healthy()
@@ -32,16 +28,15 @@ class ConnectionG3(Talent):
class ConnectionG3Server(ConnectionG3):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3Client',
id_str=b'') -> None:
def __init__(self, inverter: "Inverter",
reader: StreamReader, writer: StreamWriter,
addr, id_str=b'') -> None:
server_side = True
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
inverter.async_publ_mqtt,
inverter.async_create_remote,
inverter.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
Talent.__init__(self, server_side, self._ifc, id_str)
@@ -49,13 +44,12 @@ class ConnectionG3Server(ConnectionG3):
class ConnectionG3Client(ConnectionG3):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3Server',
id_str=b'') -> None:
def __init__(self, inverter: "Inverter",
reader: StreamReader, writer: StreamWriter,
addr, id_str=b'') -> None:
server_side = False
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamClient(reader, writer,
self.remote)
inverter.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
Talent.__init__(self, server_side, self._ifc, id_str)

View File

@@ -3,10 +3,12 @@ from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3.inverter_g3":
from app.src.inverter import Inverter
from app.src.async_stream import StreamPtr
from app.src.gen3.connection_g3 import ConnectionG3Server
from app.src.gen3.connection_g3 import ConnectionG3Client
else: # pragma: no cover
from inverter import Inverter
from async_stream import StreamPtr
from gen3.connection_g3 import ConnectionG3Server
from gen3.connection_g3 import ConnectionG3Client
@@ -14,40 +16,14 @@ else: # pragma: no cover
logger_mqtt = logging.getLogger('mqtt')
class InverterG3(Inverter, ConnectionG3Server):
'''class Inverter is a derivation of an Async_Stream
The class has some class method for managing common resources like a
connection to the MQTT broker or proxy error counter which are common
for all inverter connection
Instances of the class are connections to an inverter and can have an
optional link to an remote connection to the TSUN cloud. A remote
connection dies with the inverter connection.
class methods:
class_init(): initialize the common resources of the proxy (MQTT
broker, Proxy DB, etc). Must be called before the
first inverter instance can be created
class_close(): release the common resources of the proxy. Should not
be called before any instances of the class are
destroyed
methods:
server_loop(addr): Async loop method for receiving messages from the
inverter (server-side)
client_loop(addr): Async loop method for receiving messages from the
TSUN cloud (client-side)
async_create_remote(): Establish a client connection to the TSUN cloud
async_publ_mqtt(): Publish data to MQTT broker
close(): Release method which must be called before a instance can be
destroyed
'''
class InverterG3(Inverter):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
Inverter.__init__(self)
ConnectionG3Server.__init__(self, reader, writer, addr, None)
super().__init__()
self.addr = addr
self.remote = StreamPtr(None)
self.local = StreamPtr(
ConnectionG3Server(self, reader, writer, addr)
)
async def async_create_remote(self) -> None:
await Inverter.async_create_remote(
@@ -55,5 +31,5 @@ class InverterG3(Inverter, ConnectionG3Server):
def close(self) -> None:
logging.debug(f'InverterG3.close() {self.addr}')
ConnectionG3Server.close(self)
self.local.stream.close()
# logging.info(f'Inverter refs: {gc.get_referrers(self)}')

View File

@@ -3,23 +3,19 @@ from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3plus.connection_g3p":
from app.src.async_stream import AsyncStreamServer
from app.src.async_stream import AsyncStreamClient, StreamPtr
from app.src.async_stream import AsyncStreamClient
from app.src.inverter import Inverter
from app.src.gen3plus.solarman_v5 import SolarmanV5
else: # pragma: no cover
from async_stream import AsyncStreamServer
from async_stream import AsyncStreamClient, StreamPtr
from async_stream import AsyncStreamClient
from inverter import Inverter
from gen3plus.solarman_v5 import SolarmanV5
logger = logging.getLogger('conn')
class ConnectionG3P(SolarmanV5):
async def async_create_remote(self) -> None:
pass # virtual interface # pragma: no cover
async def async_publ_mqtt(self) -> None:
pass # virtual interface # pragma: no cover
def healthy(self) -> bool:
logger.debug('ConnectionG3P healthy()')
return self._ifc.healthy()
@@ -32,16 +28,15 @@ class ConnectionG3P(SolarmanV5):
class ConnectionG3PServer(ConnectionG3P):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3PClient',
client_mode: bool) -> None:
def __init__(self, inverter: "Inverter",
reader: StreamReader, writer: StreamWriter,
addr, client_mode: bool) -> None:
server_side = True
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
inverter.async_publ_mqtt,
inverter.async_create_remote,
inverter.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)
@@ -49,13 +44,13 @@ class ConnectionG3PServer(ConnectionG3P):
class ConnectionG3PClient(ConnectionG3P):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3PServer') -> None:
def __init__(self, inverter: "Inverter",
reader: StreamReader, writer: StreamWriter,
addr) -> None:
server_side = False
client_mode = False
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamClient(reader, writer, self.remote)
self._ifc = AsyncStreamClient(reader, writer, inverter.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)

View File

@@ -3,10 +3,12 @@ from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3plus.inverter_g3p":
from app.src.inverter import Inverter
from app.src.async_stream import StreamPtr
from app.src.gen3plus.connection_g3p import ConnectionG3PServer
from app.src.gen3plus.connection_g3p import ConnectionG3PClient
else: # pragma: no cover
from inverter import Inverter
from async_stream import StreamPtr
from gen3plus.connection_g3p import ConnectionG3PServer
from gen3plus.connection_g3p import ConnectionG3PClient
@@ -14,42 +16,15 @@ else: # pragma: no cover
logger_mqtt = logging.getLogger('mqtt')
class InverterG3P(Inverter, ConnectionG3PServer):
'''class Inverter is a derivation of an Async_Stream
The class has some class method for managing common resources like a
connection to the MQTT broker or proxy error counter which are common
for all inverter connection
Instances of the class are connections to an inverter and can have an
optional link to an remote connection to the TSUN cloud. A remote
connection dies with the inverter connection.
class methods:
class_init(): initialize the common resources of the proxy (MQTT
broker, Proxy DB, etc). Must be called before the
first inverter instance can be created
class_close(): release the common resources of the proxy. Should not
be called before any instances of the class are
destroyed
methods:
server_loop(addr): Async loop method for receiving messages from the
inverter (server-side)
client_loop(addr): Async loop method for receiving messages from the
TSUN cloud (client-side)
async_create_remote(): Establish a client connection to the TSUN cloud
async_publ_mqtt(): Publish data to MQTT broker
close(): Release method which must be called before a instance can be
destroyed
'''
class InverterG3P(Inverter):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr,
client_mode: bool = False):
Inverter.__init__(self)
ConnectionG3PServer.__init__(
self, reader, writer, addr, None, client_mode=client_mode)
super().__init__()
self.addr = addr
self.remote = StreamPtr(None)
self.local = StreamPtr(
ConnectionG3PServer(self, reader, writer, addr, client_mode)
)
async def async_create_remote(self) -> None:
await Inverter.async_create_remote(
@@ -57,5 +32,5 @@ class InverterG3P(Inverter, ConnectionG3PServer):
def close(self) -> None:
logging.debug(f'InverterG3P.close() {self.addr}')
ConnectionG3PServer.close(self)
self.local.stream.close()
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}')

View File

@@ -17,6 +17,28 @@ logger_mqtt = logging.getLogger('mqtt')
class Inverter():
'''class Inverter is a baseclass
The class has some class method for managing common resources like a
connection to the MQTT broker or proxy error counter which are common
for all inverter connection
Instances of the class are connections to an inverter and can have an
optional link to an remote connection to the TSUN cloud. A remote
connection dies with the inverter connection.
class methods:
class_init(): initialize the common resources of the proxy (MQTT
broker, Proxy DB, etc). Must be called before the
first inverter instance can be created
class_close(): release the common resources of the proxy. Should not
be called before any instances of the class are
destroyed
methods:
async_create_remote(): Establish a client connection to the TSUN cloud
async_publ_mqtt(): Publish data to MQTT broker
'''
@classmethod
def class_init(cls) -> None:
logging.debug('Inverter.class_init')
@@ -94,17 +116,18 @@ class Inverter():
host = tsun['host']
port = tsun['port']
addr = (host, port)
stream = self.local.stream
try:
logging.info(f'[{self.node_id}] Connect to {addr}')
logging.info(f'[{stream.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port)
reader, writer = await connect
if hasattr(self, 'id_str'):
if hasattr(stream, 'id_str'):
self.remote.stream = conn_class(
reader, writer, addr, self, self.id_str)
self, reader, writer, addr, stream.id_str)
else:
self.remote.stream = conn_class(
reader, writer, addr, self)
self, reader, writer, addr)
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '
@@ -114,56 +137,57 @@ class Inverter():
except (ConnectionRefusedError, TimeoutError) as error:
logging.info(f'{error}')
except Exception:
self.inc_counter('SW_Exception')
Infos.inc_counter('SW_Exception')
logging.error(
f"Inverter: Exception for {addr}:\n"
f"{traceback.format_exc()}")
async def async_publ_mqtt(self) -> None:
'''publish data to MQTT broker'''
if not self.unique_id:
stream = self.local.stream
if not stream.unique_id:
return
# check if new inverter or collector infos are available or when the
# home assistant has changed the status back to online
try:
if (('inverter' in self.new_data and self.new_data['inverter'])
or ('collector' in self.new_data and
self.new_data['collector'])
if (('inverter' in stream.new_data and stream.new_data['inverter'])
or ('collector' in stream.new_data and
stream.new_data['collector'])
or self.mqtt.ha_restarts != self.__ha_restarts):
await self._register_proxy_stat_home_assistant()
await self.__register_home_assistant()
await self.__register_home_assistant(stream)
self.__ha_restarts = self.mqtt.ha_restarts
for key in self.new_data:
await self.__async_publ_mqtt_packet(key)
for key in stream.new_data:
await self.__async_publ_mqtt_packet(stream, key)
for key in Infos.new_stat_data:
await Inverter._async_publ_mqtt_proxy_stat(key)
except MqttCodeError as error:
logging.error(f'Mqtt except: {error}')
except Exception:
self.inc_counter('SW_Exception')
Infos.inc_counter('SW_Exception')
logging.error(
f"Inverter: Exception:\n"
f"{traceback.format_exc()}")
async def __async_publ_mqtt_packet(self, key):
db = self.db.db
if key in db and self.new_data[key]:
async def __async_publ_mqtt_packet(self, stream, key):
db = stream.db.db
if key in db and stream.new_data[key]:
data_json = json.dumps(db[key])
node_id = self.node_id
node_id = stream.node_id
logger_mqtt.debug(f'{key}: {data_json}')
await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
self.new_data[key] = False
stream.new_data[key] = False
async def __register_home_assistant(self) -> None:
async def __register_home_assistant(self, stream) -> None:
'''register all our topics at home assistant'''
for data_json, component, node_id, id in self.db.ha_confs(
self.entity_prfx, self.node_id, self.unique_id,
self.sug_area):
for data_json, component, node_id, id in stream.db.ha_confs(
self.entity_prfx, stream.node_id, stream.unique_id,
stream.sug_area):
logger_mqtt.debug(f"MQTT Register: cmp:'{component}'"
f" node_id:'{node_id}' {data_json}")
await self.mqtt.publish(f"{self.discovery_prfx}{component}"
f"/{node_id}{id}/config", data_json)
self.db.reg_clr_at_midnight(f'{self.entity_prfx}{self.node_id}')
stream.db.reg_clr_at_midnight(f'{self.entity_prfx}{stream.node_id}')

View File

@@ -19,24 +19,25 @@ class ModbusConn():
self.host = host
self.port = port
self.addr = (host, port)
self.stream = None
self.inverter = None
async def __aenter__(self) -> 'InverterG3P':
'''Establish a client connection to the TSUN cloud'''
connection = asyncio.open_connection(self.host, self.port)
reader, writer = await connection
self.stream = InverterG3P(reader, writer, self.addr,
client_mode=True)
logging.info(f'[{self.stream.node_id}:{self.stream.conn_no}] '
self.inverter = InverterG3P(reader, writer, self.addr,
client_mode=True)
stream = self.inverter.local.stream
logging.info(f'[{stream.node_id}:{stream.conn_no}] '
f'Connected to {self.addr}')
Infos.inc_counter('Inverter_Cnt')
await self.stream._ifc.publish_outstanding_mqtt()
return self.stream
await self.inverter.local._ifc.publish_outstanding_mqtt()
return self.inverter
async def __aexit__(self, exc_type, exc, tb):
Infos.dec_counter('Inverter_Cnt')
await self.stream._ifc.publish_outstanding_mqtt()
self.stream.close()
await self.inverter.local._ifc.publish_outstanding_mqtt()
self.inverter.close()
class ModbusTcp():
@@ -61,7 +62,8 @@ class ModbusTcp():
'''Loop for receiving messages from the TSUN cloud (client-side)'''
while True:
try:
async with ModbusConn(host, port) as stream:
async with ModbusConn(host, port) as inverter:
stream = inverter.local.stream
await stream.send_start_cmd(snr, host)
await stream._ifc.loop()
logger.info(f'[{stream.node_id}:{stream.conn_no}] '