resolution of connection classes

- remove ConnectionG3Client
- remove ConnectionG3Server
- remove ConnectionG3PClient
- remove ConnectionG3PServer
This commit is contained in:
Stefan Allius
2024-09-29 20:08:04 +02:00
parent 5a0ef30ceb
commit 41aeac4168
19 changed files with 679 additions and 774 deletions

View File

@@ -1,55 +1,16 @@
import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3.connection_g3":
from app.src.async_stream import AsyncStreamServer
from app.src.async_stream import AsyncStreamClient
from app.src.inverter import Inverter
from app.src.gen3.talent import Talent
else: # pragma: no cover
from async_stream import AsyncStreamServer
from async_stream import AsyncStreamClient
from inverter import Inverter
from gen3.talent import Talent
logger = logging.getLogger('conn')
class ConnectionG3(Talent):
def healthy(self) -> bool:
logger.debug('ConnectionG3 healthy()')
return self._ifc.healthy()
def __init__(self, addr, ifc, server_side, id_str=b'') -> None:
super().__init__(addr, server_side, ifc, id_str)
def close(self):
self._ifc.close()
Talent.close(self)
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
class ConnectionG3Server(ConnectionG3):
def __init__(self, inverter: "Inverter",
reader: StreamReader, writer: StreamWriter,
addr, id_str=b'') -> None:
server_side = True
self._ifc = AsyncStreamServer(reader, writer,
inverter.async_publ_mqtt,
inverter.async_create_remote,
inverter.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
Talent.__init__(self, server_side, self._ifc, id_str)
class ConnectionG3Client(ConnectionG3):
def __init__(self, inverter: "Inverter",
reader: StreamReader, writer: StreamWriter,
addr, id_str=b'') -> None:
server_side = False
self._ifc = AsyncStreamClient(reader, writer,
inverter.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
Talent.__init__(self, server_side, self._ifc, id_str)
super().close()

View File

@@ -2,32 +2,38 @@ import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3.inverter_g3":
from app.src.inverter import Inverter
from app.src.inverter_base import InverterBase
from app.src.async_stream import StreamPtr
from app.src.gen3.connection_g3 import ConnectionG3Server
from app.src.gen3.connection_g3 import ConnectionG3Client
from app.src.async_stream import AsyncStreamServer
from app.src.gen3.connection_g3 import ConnectionG3
else: # pragma: no cover
from inverter import Inverter
from inverter_base import InverterBase
from async_stream import StreamPtr
from gen3.connection_g3 import ConnectionG3Server
from gen3.connection_g3 import ConnectionG3Client
from async_stream import AsyncStreamServer
from gen3.connection_g3 import ConnectionG3
logger_mqtt = logging.getLogger('mqtt')
class InverterG3(Inverter):
class InverterG3(InverterBase):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
super().__init__()
self.addr = addr
self.remote = StreamPtr(None)
ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
self.remote = StreamPtr(None)
self.local = StreamPtr(
ConnectionG3Server(self, reader, writer, addr)
ConnectionG3(addr, ifc, True)
)
async def async_create_remote(self) -> None:
await Inverter.async_create_remote(
self, 'tsun', ConnectionG3Client)
await InverterBase.async_create_remote(
self, 'tsun', ConnectionG3)
def close(self) -> None:
logging.debug(f'InverterG3.close() {self.addr}')

View File

@@ -46,13 +46,15 @@ class Talent(Message):
MB_REGULAR_TIMEOUT = 60
TXT_UNKNOWN_CTRL = 'Unknown Ctrl'
def __init__(self, server_side: bool, ifc: "AsyncIfc", id_str=b''):
def __init__(self, addr, server_side: bool, ifc: "AsyncIfc", id_str=b''):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=15)
ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout)
ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn)
ifc.prot_set_update_header_cb(self._update_header)
self.addr = addr
self.ifc = ifc
self.conn_no = ifc.get_conn_no()
self.await_conn_resp_cnt = 0
self.id_str = id_str
self.contact_name = b''
@@ -93,6 +95,10 @@ class Talent(Message):
'''
Our puplic methods
'''
def healthy(self) -> bool:
logger.debug('Talent healthy()')
return self.ifc.healthy()
def close(self) -> None:
logging.debug('Talent.close()')
if self.server_side:
@@ -110,6 +116,7 @@ class Talent(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.close()
self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None)
self.ifc.prot_set_init_new_client_conn_cb(None)

View File

@@ -1,56 +1,18 @@
import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3plus.connection_g3p":
from app.src.async_stream import AsyncStreamServer
from app.src.async_stream import AsyncStreamClient
from app.src.inverter import Inverter
from app.src.gen3plus.solarman_v5 import SolarmanV5
else: # pragma: no cover
from async_stream import AsyncStreamServer
from async_stream import AsyncStreamClient
from inverter import Inverter
from gen3plus.solarman_v5 import SolarmanV5
logger = logging.getLogger('conn')
class ConnectionG3P(SolarmanV5):
def healthy(self) -> bool:
logger.debug('ConnectionG3P healthy()')
return self._ifc.healthy()
def __init__(self, addr, ifc, server_side,
client_mode: bool = False) -> None:
super().__init__(addr, server_side, client_mode, ifc)
def close(self):
self._ifc.close()
SolarmanV5.close(self)
super().close()
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
class ConnectionG3PServer(ConnectionG3P):
def __init__(self, inverter: "Inverter",
reader: StreamReader, writer: StreamWriter,
addr, client_mode: bool) -> None:
server_side = True
self._ifc = AsyncStreamServer(reader, writer,
inverter.async_publ_mqtt,
inverter.async_create_remote,
inverter.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)
class ConnectionG3PClient(ConnectionG3P):
def __init__(self, inverter: "Inverter",
reader: StreamReader, writer: StreamWriter,
addr) -> None:
server_side = False
client_mode = False
self._ifc = AsyncStreamClient(reader, writer, inverter.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)

View File

@@ -2,33 +2,38 @@ import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3plus.inverter_g3p":
from app.src.inverter import Inverter
from app.src.inverter_base import InverterBase
from app.src.async_stream import StreamPtr
from app.src.gen3plus.connection_g3p import ConnectionG3PServer
from app.src.gen3plus.connection_g3p import ConnectionG3PClient
from app.src.async_stream import AsyncStreamServer
from app.src.gen3plus.connection_g3p import ConnectionG3P
else: # pragma: no cover
from inverter import Inverter
from inverter_base import InverterBase
from async_stream import StreamPtr
from gen3plus.connection_g3p import ConnectionG3PServer
from gen3plus.connection_g3p import ConnectionG3PClient
from async_stream import AsyncStreamServer
from gen3plus.connection_g3p import ConnectionG3P
logger_mqtt = logging.getLogger('mqtt')
class InverterG3P(Inverter):
class InverterG3P(InverterBase):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr,
client_mode: bool = False):
super().__init__()
self.addr = addr
self.remote = StreamPtr(None)
ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
self.local = StreamPtr(
ConnectionG3PServer(self, reader, writer, addr, client_mode)
ConnectionG3P(addr, ifc, True, client_mode)
)
async def async_create_remote(self) -> None:
await Inverter.async_create_remote(
self, 'solarman', ConnectionG3PClient)
await InverterBase.async_create_remote(
self, 'solarman', ConnectionG3P)
def close(self) -> None:
logging.debug(f'InverterG3P.close() {self.addr}')

View File

@@ -62,14 +62,17 @@ class SolarmanV5(Message):
HDR_FMT = '<BLLL'
'''format string for packing of the header'''
def __init__(self, server_side: bool, client_mode: bool, ifc: "AsyncIfc"):
def __init__(self, addr, server_side: bool, client_mode: bool,
ifc: "AsyncIfc"):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=8)
ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout)
ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn)
ifc.prot_set_update_header_cb(self._update_header)
self.addr = addr
self.ifc = ifc
self.conn_no = ifc.get_conn_no()
self.header_len = 11 # overwrite construcor in class Message
self.control = 0
self.seq = Sequence(server_side)
@@ -150,6 +153,10 @@ class SolarmanV5(Message):
'''
Our puplic methods
'''
def healthy(self) -> bool:
logger.debug('SolarmanV5 healthy()')
return self.ifc.healthy()
def close(self) -> None:
logging.debug('Solarman.close()')
if self.server_side:
@@ -167,6 +174,7 @@ class SolarmanV5(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.close()
self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None)
self.ifc.prot_set_init_new_client_conn_cb(None)

View File

@@ -1,8 +1,6 @@
import asyncio
import logging
import traceback
import json
from aiomqtt import MqttCodeError
if __name__ == "app.src.inverter":
from app.src.config import Config
@@ -106,88 +104,3 @@ class Inverter():
logging.info('Close MQTT Task')
loop.run_until_complete(cls.mqtt.close())
cls.mqtt = None
def __init__(self):
self.__ha_restarts = -1
async def async_create_remote(self, inv_prot: str, conn_class) -> None:
'''Establish a client connection to the TSUN cloud'''
tsun = Config.get(inv_prot)
host = tsun['host']
port = tsun['port']
addr = (host, port)
stream = self.local.stream
try:
logging.info(f'[{stream.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port)
reader, writer = await connect
if hasattr(stream, 'id_str'):
self.remote.stream = conn_class(
self, reader, writer, addr, stream.id_str)
else:
self.remote.stream = conn_class(
self, reader, writer, addr)
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self.remote.ifc.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error:
logging.info(f'{error}')
except Exception:
Infos.inc_counter('SW_Exception')
logging.error(
f"Inverter: Exception for {addr}:\n"
f"{traceback.format_exc()}")
async def async_publ_mqtt(self) -> None:
'''publish data to MQTT broker'''
stream = self.local.stream
if not stream.unique_id:
return
# check if new inverter or collector infos are available or when the
# home assistant has changed the status back to online
try:
if (('inverter' in stream.new_data and stream.new_data['inverter'])
or ('collector' in stream.new_data and
stream.new_data['collector'])
or self.mqtt.ha_restarts != self.__ha_restarts):
await self._register_proxy_stat_home_assistant()
await self.__register_home_assistant(stream)
self.__ha_restarts = self.mqtt.ha_restarts
for key in stream.new_data:
await self.__async_publ_mqtt_packet(stream, key)
for key in Infos.new_stat_data:
await Inverter._async_publ_mqtt_proxy_stat(key)
except MqttCodeError as error:
logging.error(f'Mqtt except: {error}')
except Exception:
Infos.inc_counter('SW_Exception')
logging.error(
f"Inverter: Exception:\n"
f"{traceback.format_exc()}")
async def __async_publ_mqtt_packet(self, stream, key):
db = stream.db.db
if key in db and stream.new_data[key]:
data_json = json.dumps(db[key])
node_id = stream.node_id
logger_mqtt.debug(f'{key}: {data_json}')
await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
stream.new_data[key] = False
async def __register_home_assistant(self, stream) -> None:
'''register all our topics at home assistant'''
for data_json, component, node_id, id in stream.db.ha_confs(
self.entity_prfx, stream.node_id, stream.unique_id,
stream.sug_area):
logger_mqtt.debug(f"MQTT Register: cmp:'{component}'"
f" node_id:'{node_id}' {data_json}")
await self.mqtt.publish(f"{self.discovery_prfx}{component}"
f"/{node_id}{id}/config", data_json)
stream.db.reg_clr_at_midnight(f'{self.entity_prfx}{stream.node_id}')

108
app/src/inverter_base.py Normal file
View File

@@ -0,0 +1,108 @@
import asyncio
import logging
import traceback
import json
from aiomqtt import MqttCodeError
if __name__ == "app.src.inverter_base":
from app.src.inverter import Inverter
from app.src.async_stream import AsyncStreamClient
from app.src.config import Config
from app.src.infos import Infos
else: # pragma: no cover
from inverter import Inverter
from async_stream import AsyncStreamClient
from config import Config
from infos import Infos
logger_mqtt = logging.getLogger('mqtt')
class InverterBase(Inverter):
def __init__(self):
self.__ha_restarts = -1
async def async_create_remote(self, inv_prot: str, conn_class) -> None:
'''Establish a client connection to the TSUN cloud'''
tsun = Config.get(inv_prot)
host = tsun['host']
port = tsun['port']
addr = (host, port)
stream = self.local.stream
try:
logging.info(f'[{stream.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port)
reader, writer = await connect
ifc = AsyncStreamClient(reader, writer,
self.remote)
if hasattr(stream, 'id_str'):
self.remote.stream = conn_class(
addr, ifc, False, stream.id_str)
else:
self.remote.stream = conn_class(
addr, ifc, False)
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self.remote.ifc.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error:
logging.info(f'{error}')
except Exception:
Infos.inc_counter('SW_Exception')
logging.error(
f"Inverter: Exception for {addr}:\n"
f"{traceback.format_exc()}")
async def async_publ_mqtt(self) -> None:
'''publish data to MQTT broker'''
stream = self.local.stream
if not stream.unique_id:
return
# check if new inverter or collector infos are available or when the
# home assistant has changed the status back to online
try:
if (('inverter' in stream.new_data and stream.new_data['inverter'])
or ('collector' in stream.new_data and
stream.new_data['collector'])
or self.mqtt.ha_restarts != self.__ha_restarts):
await self._register_proxy_stat_home_assistant()
await self.__register_home_assistant(stream)
self.__ha_restarts = self.mqtt.ha_restarts
for key in stream.new_data:
await self.__async_publ_mqtt_packet(stream, key)
for key in Infos.new_stat_data:
await Inverter._async_publ_mqtt_proxy_stat(key)
except MqttCodeError as error:
logging.error(f'Mqtt except: {error}')
except Exception:
Infos.inc_counter('SW_Exception')
logging.error(
f"Inverter: Exception:\n"
f"{traceback.format_exc()}")
async def __async_publ_mqtt_packet(self, stream, key):
db = stream.db.db
if key in db and stream.new_data[key]:
data_json = json.dumps(db[key])
node_id = stream.node_id
logger_mqtt.debug(f'{key}: {data_json}')
await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
stream.new_data[key] = False
async def __register_home_assistant(self, stream) -> None:
'''register all our topics at home assistant'''
for data_json, component, node_id, id in stream.db.ha_confs(
self.entity_prfx, stream.node_id, stream.unique_id,
stream.sug_area):
logger_mqtt.debug(f"MQTT Register: cmp:'{component}'"
f" node_id:'{node_id}' {data_json}")
await self.mqtt.publish(f"{self.discovery_prfx}{component}"
f"/{node_id}{id}/config", data_json)
stream.db.reg_clr_at_midnight(f'{self.entity_prfx}{stream.node_id}')

View File

@@ -31,12 +31,12 @@ class ModbusConn():
logging.info(f'[{stream.node_id}:{stream.conn_no}] '
f'Connected to {self.addr}')
Infos.inc_counter('Inverter_Cnt')
await self.inverter.local._ifc.publish_outstanding_mqtt()
await self.inverter.local.ifc.publish_outstanding_mqtt()
return self.inverter
async def __aexit__(self, exc_type, exc, tb):
Infos.dec_counter('Inverter_Cnt')
await self.inverter.local._ifc.publish_outstanding_mqtt()
await self.inverter.local.ifc.publish_outstanding_mqtt()
self.inverter.close()
@@ -65,7 +65,7 @@ class ModbusTcp():
async with ModbusConn(host, port) as inverter:
stream = inverter.local.stream
await stream.send_start_cmd(snr, host)
await stream._ifc.loop()
await stream.ifc.loop()
logger.info(f'[{stream.node_id}:{stream.conn_no}] '
f'Connection closed - Shutdown: '
f'{stream.shutdown_started}')