Merge branch 'refactoring-async-stream' of https://github.com/s-allius/tsun-gen3-proxy into titan-scan

This commit is contained in:
Stefan Allius
2024-10-06 21:28:12 +02:00
31 changed files with 1763 additions and 1222 deletions

View File

@@ -56,21 +56,11 @@ class AsyncIfc(ABC):
''' add data to forward queue'''
pass # pragma: no cover
@abstractmethod
def fwd_flush(self):
''' send forward queue and clears it'''
pass # pragma: no cover
@abstractmethod
def fwd_log(self, level, info):
''' log the forward queue'''
pass # pragma: no cover
@abstractmethod
def fwd_clear(self):
''' clear forward queue'''
pass # pragma: no cover
#
# RX - QUEUE
#

View File

@@ -7,12 +7,12 @@ from typing import Self
from itertools import count
if __name__ == "app.src.async_stream":
from app.src.inverter import Inverter
from app.src.proxy import Proxy
from app.src.byte_fifo import ByteFifo
from app.src.async_ifc import AsyncIfc
from app.src.infos import Infos
else: # pragma: no cover
from inverter import Inverter
from proxy import Proxy
from byte_fifo import ByteFifo
from async_ifc import AsyncIfc
from infos import Infos
@@ -80,18 +80,10 @@ class AsyncIfcImpl(AsyncIfc):
''' add data to forward queue'''
self.fwd_fifo += data
def fwd_flush(self):
''' send forward queue and clears it'''
self.fwd_fifo()
def fwd_log(self, level, info):
''' log the forward queue'''
self.fwd_fifo.logging(level, info)
def fwd_clear(self):
''' clear forward queue'''
self.fwd_fifo.clear()
def rx_get(self, size: int = None) -> bytearray:
'''removes size numbers of bytes and return them'''
return self.rx_fifo.get(size)
@@ -127,13 +119,18 @@ class AsyncIfcImpl(AsyncIfc):
class StreamPtr():
'''Descr StreamPtr'''
def __init__(self, _stream):
def __init__(self, _stream, _ifc=None):
self.stream = _stream
self.ifc = _ifc
@property
def ifc(self):
return self._ifc
@ifc.setter
def ifc(self, value):
self._ifc = value
@property
def stream(self):
return self._stream
@@ -141,10 +138,6 @@ class StreamPtr():
@stream.setter
def stream(self, value):
self._stream = value
if value:
self._ifc = value.ifc
else:
self._ifc = None
class AsyncStream(AsyncIfcImpl):
@@ -177,8 +170,8 @@ class AsyncStream(AsyncIfcImpl):
self._writer.write(self.tx_fifo.get())
def __timeout(self) -> int:
if self.timeout_cb is callable:
return self.timeout_cb
if self.timeout_cb:
return self.timeout_cb()
return 360
async def loop(self) -> Self:
@@ -186,10 +179,7 @@ class AsyncStream(AsyncIfcImpl):
self.proc_start = time.time()
while True:
try:
proc = time.time() - self.proc_start
if proc > self.proc_max:
self.proc_max = proc
self.proc_start = None
self.__calc_proc_time()
dead_conn_to = self.__timeout()
await asyncio.wait_for(self.__async_read(),
dead_conn_to)
@@ -204,7 +194,6 @@ class AsyncStream(AsyncIfcImpl):
f'connection timeout ({dead_conn_to}s) '
f'for {self.l_addr}')
await self.disc()
self.close()
return self
except OSError as error:
@@ -212,14 +201,12 @@ class AsyncStream(AsyncIfcImpl):
f'{error} for l{self.l_addr} | '
f'r{self.r_addr}')
await self.disc()
self.close()
return self
except RuntimeError as error:
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'{error} for {self.l_addr}')
await self.disc()
self.close()
return self
except Exception:
@@ -229,8 +216,16 @@ class AsyncStream(AsyncIfcImpl):
f"{traceback.format_exc()}")
await asyncio.sleep(0) # be cooperative to other task
def __calc_proc_time(self):
if self.proc_start:
proc = time.time() - self.proc_start
if proc > self.proc_max:
self.proc_max = proc
self.proc_start = None
async def disc(self) -> None:
"""Async disc handler for graceful disconnect"""
self.remote = None
if self._writer.is_closing():
return
logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
@@ -238,6 +233,7 @@ class AsyncStream(AsyncIfcImpl):
await self._writer.wait_closed()
def close(self) -> None:
logging.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
"""close handler for a no waiting disconnect
hint: must be called before releasing the connection instance
@@ -246,7 +242,6 @@ class AsyncStream(AsyncIfcImpl):
self._reader.feed_eof() # abort awaited read
if self._writer.is_closing():
return
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
self._writer.close()
def healthy(self) -> bool:
@@ -271,7 +266,7 @@ class AsyncStream(AsyncIfcImpl):
self.proc_start = time.time()
self.rx_fifo += data
wait = self.rx_fifo() # call read in parent class
if wait > 0:
if wait and wait > 0:
await asyncio.sleep(wait)
else:
raise RuntimeError("Peer closed.")
@@ -292,21 +287,22 @@ class AsyncStream(AsyncIfcImpl):
except OSError as error:
if self.remote.stream:
rmt = self.remote.stream
self.remote.stream = None
logger.error(f'[{rmt.node_id}:{rmt.conn_no}] Fwd: {error} for '
f'l{rmt._ifc.l_addr} | r{rmt._ifc.r_addr}')
await rmt._ifc.disc()
rmt._ifc.close()
rmt = self.remote
logger.error(f'[{rmt.stream.node_id}:{rmt.stream.conn_no}] '
f'Fwd: {error} for '
f'l{rmt.ifc.l_addr} | r{rmt.ifc.r_addr}')
await rmt.ifc.disc()
if rmt.ifc.close_cb:
rmt.ifc.close_cb()
except RuntimeError as error:
if self.remote.stream:
rmt = self.remote.stream
self.remote.stream = None
logger.info(f'[{rmt.node_id}:{rmt.conn_no}] '
f'Fwd: {error} for {rmt._ifc.l_addr}')
await rmt._ifc.disc()
rmt._ifc.close()
rmt = self.remote
logger.info(f'[{rmt.stream.node_id}:{rmt.stream.conn_no}] '
f'Fwd: {error} for {rmt.ifc.l_addr}')
await rmt.ifc.disc()
if rmt.ifc.close_cb:
rmt.ifc.close_cb()
except Exception:
Infos.inc_counter('SW_Exception')
@@ -315,18 +311,24 @@ class AsyncStream(AsyncIfcImpl):
f"{traceback.format_exc()}")
def __del__(self):
logger.debug(
logger.info(
f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}")
class AsyncStreamServer(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter,
async_publ_mqtt, async_create_remote,
async_publ_mqtt, create_remote,
rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, rstream)
self.async_create_remote = async_create_remote
self.create_remote = create_remote
self.async_publ_mqtt = async_publ_mqtt
def close(self) -> None:
logging.debug('AsyncStreamServer.close()')
self.create_remote = None
self.async_publ_mqtt = None
super().close()
async def server_loop(self) -> None:
'''Loop for receiving messages from the inverter (server-side)'''
logger.info(f'[{self.node_id}:{self.conn_no}] '
@@ -341,7 +343,7 @@ class AsyncStreamServer(AsyncStream):
# if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud
if self.remote.stream:
if self.remote and self.remote.stream:
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remote.ifc.node_id}:'
f'{self.remote.ifc.conn_no}]')
@@ -350,7 +352,7 @@ class AsyncStreamServer(AsyncStream):
async def _async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""
if not self.remote.stream:
await self.async_create_remote()
await self.create_remote()
if self.remote.stream and \
self.remote.ifc.init_new_client_conn_cb():
await self.remote.ifc._AsyncStream__async_write()
@@ -365,24 +367,21 @@ class AsyncStreamServer(AsyncStream):
'''Publish all outstanding MQTT topics'''
try:
await self.async_publ_mqtt()
await Inverter._async_publ_mqtt_proxy_stat('proxy')
await Proxy._async_publ_mqtt_proxy_stat('proxy')
except Exception:
pass
def close(self) -> None:
"""close handler for a no waiting disconnect
hint: must be called before releasing the connection instance
"""
self.async_create_remote = None
self.async_publ_mqtt = None
super().close()
class AsyncStreamClient(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter,
rstream: "StreamPtr") -> None:
rstream: "StreamPtr", close_cb) -> None:
AsyncStream.__init__(self, reader, writer, rstream)
self.close_cb = close_cb
def close(self) -> None:
logging.debug('AsyncStreamClient.close()')
self.close_cb = None
super().close()
async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
@@ -391,21 +390,8 @@ class AsyncStreamClient(AsyncStream):
'Client loop stopped for'
f' l{self.l_addr}')
server_stream = self.remote.stream
# if the client connection closes, we don't touch the server
# connection. Instead we erase the client connection stream,
# thus on the next received packet from the inverter, we can
# establish a new connection to the TSUN cloud
if server_stream.remote.ifc == self:
# logging.debug(f'Client l{client_stream.l_addr} refs:'
# f' {gc.get_referrers(client_stream)}')
# than erase client connection
server_stream.remote.stream = None # erases stream and ifc link
# erase backlink to inverter
self.remote.stream = None
if self.close_cb:
self.close_cb()
async def _async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""

View File

@@ -18,10 +18,11 @@ class ByteFifo:
self.__buf.extend(data)
return self
def __call__(self) -> None:
def __call__(self):
'''triggers the observer'''
if callable(self.__trigger_cb):
return self.__trigger_cb()
return None
def get(self, size: int = None) -> bytearray:
'''removes size numbers of byte and return them'''

View File

@@ -1,16 +0,0 @@
import logging
if __name__ == "app.src.gen3.connection_g3":
from app.src.gen3.talent import Talent
else: # pragma: no cover
from gen3.talent import Talent
logger = logging.getLogger('conn')
class ConnectionG3(Talent):
def __init__(self, addr, ifc, server_side, id_str=b'') -> None:
super().__init__(addr, server_side, ifc, id_str)
def close(self):
super().close()

View File

@@ -1,41 +1,13 @@
import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3.inverter_g3":
from app.src.inverter_base import InverterBase
from app.src.async_stream import StreamPtr
from app.src.async_stream import AsyncStreamServer
from app.src.gen3.connection_g3 import ConnectionG3
from app.src.gen3.talent import Talent
else: # pragma: no cover
from inverter_base import InverterBase
from async_stream import StreamPtr
from async_stream import AsyncStreamServer
from gen3.connection_g3 import ConnectionG3
logger_mqtt = logging.getLogger('mqtt')
from gen3.talent import Talent
class InverterG3(InverterBase):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
super().__init__()
self.addr = addr
self.remote = StreamPtr(None)
ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
self.remote = StreamPtr(None)
self.local = StreamPtr(
ConnectionG3(addr, ifc, True)
)
async def async_create_remote(self) -> None:
await InverterBase.async_create_remote(
self, 'tsun', ConnectionG3)
def close(self) -> None:
logging.debug(f'InverterG3.close() {self.addr}')
self.local.stream.close()
# logging.info(f'Inverter refs: {gc.get_referrers(self)}')
def __init__(self, reader: StreamReader, writer: StreamWriter):
super().__init__(reader, writer, 'tsun', Talent)

View File

@@ -46,7 +46,8 @@ class Talent(Message):
MB_REGULAR_TIMEOUT = 60
TXT_UNKNOWN_CTRL = 'Unknown Ctrl'
def __init__(self, addr, server_side: bool, ifc: "AsyncIfc", id_str=b''):
def __init__(self, addr, ifc: "AsyncIfc", server_side: bool,
client_mode: bool = False, id_str=b''):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=15)
ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout)
@@ -95,10 +96,6 @@ class Talent(Message):
'''
Our puplic methods
'''
def healthy(self) -> bool:
logger.debug('Talent healthy()')
return self.ifc.healthy()
def close(self) -> None:
logging.debug('Talent.close()')
if self.server_side:
@@ -116,11 +113,11 @@ class Talent(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.close()
self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None)
self.ifc.prot_set_init_new_client_conn_cb(None)
self.ifc.prot_set_update_header_cb(None)
self.ifc = None
super().close()
def __set_serial_no(self, serial_no: str):
@@ -438,8 +435,8 @@ class Talent(Message):
result = struct.unpack_from('!q', self.ifc.rx_peek(),
self.header_len)
self.ts_offset = result[0]-ts
if self.remote.stream:
self.remote.stream.ts_offset = self.ts_offset
if self.ifc.remote.stream:
self.ifc.remote.stream.ts_offset = self.ts_offset
logger.debug(f'tsun-time: {int(result[0]):08x}'
f' proxy-time: {ts:08x}'
f' offset: {self.ts_offset}')
@@ -597,9 +594,8 @@ class Talent(Message):
self.header_len+self.data_len]
if self.ctrl.is_req():
if self.remote.stream.mb.recv_req(data[hdr_len:],
self.remote.stream.
msg_forward):
rstream = self.ifc.remote.stream
if rstream.mb.recv_req(data[hdr_len:], rstream.msg_forward):
self.inc_counter('Modbus_Command')
else:
self.inc_counter('Invalid_Msg_Format')

View File

@@ -1,18 +0,0 @@
import logging
if __name__ == "app.src.gen3plus.connection_g3p":
from app.src.gen3plus.solarman_v5 import SolarmanV5
else: # pragma: no cover
from gen3plus.solarman_v5 import SolarmanV5
logger = logging.getLogger('conn')
class ConnectionG3P(SolarmanV5):
def __init__(self, addr, ifc, server_side,
client_mode: bool = False) -> None:
super().__init__(addr, server_side, client_mode, ifc)
def close(self):
super().close()
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')

View File

@@ -1,41 +1,15 @@
import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3plus.inverter_g3p":
from app.src.inverter_base import InverterBase
from app.src.async_stream import StreamPtr
from app.src.async_stream import AsyncStreamServer
from app.src.gen3plus.connection_g3p import ConnectionG3P
from app.src.gen3plus.solarman_v5 import SolarmanV5
else: # pragma: no cover
from inverter_base import InverterBase
from async_stream import StreamPtr
from async_stream import AsyncStreamServer
from gen3plus.connection_g3p import ConnectionG3P
logger_mqtt = logging.getLogger('mqtt')
from gen3plus.solarman_v5 import SolarmanV5
class InverterG3P(InverterBase):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr,
def __init__(self, reader: StreamReader, writer: StreamWriter,
client_mode: bool = False):
super().__init__()
self.addr = addr
self.remote = StreamPtr(None)
ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
self.local = StreamPtr(
ConnectionG3P(addr, ifc, True, client_mode)
)
async def async_create_remote(self) -> None:
await InverterBase.async_create_remote(
self, 'solarman', ConnectionG3P)
def close(self) -> None:
logging.debug(f'InverterG3P.close() {self.addr}')
self.local.stream.close()
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}')
super().__init__(reader, writer, 'solarman',
SolarmanV5, client_mode)

View File

@@ -62,8 +62,8 @@ class SolarmanV5(Message):
HDR_FMT = '<BLLL'
'''format string for packing of the header'''
def __init__(self, addr, server_side: bool, client_mode: bool,
ifc: "AsyncIfc"):
def __init__(self, addr, ifc: "AsyncIfc",
server_side: bool, client_mode: bool):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=8)
ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout)
@@ -155,10 +155,6 @@ class SolarmanV5(Message):
'''
Our puplic methods
'''
def healthy(self) -> bool:
logger.debug('SolarmanV5 healthy()')
return self.ifc.healthy()
def close(self) -> None:
logging.debug('Solarman.close()')
if self.server_side:
@@ -176,11 +172,11 @@ class SolarmanV5(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.close()
self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None)
self.ifc.prot_set_init_new_client_conn_cb(None)
self.ifc.prot_set_update_header_cb(None)
self.ifc = None
super().close()
async def send_start_cmd(self, snr: int, host: str,
@@ -633,9 +629,9 @@ class SolarmanV5(Message):
self.forward_at_cmd_resp = True
elif ftype == self.MB_RTU_CMD:
if self.remote.stream.mb.recv_req(data[15:],
self.remote.stream.
__forward_msg):
rstream = self.ifc.remote.stream
if rstream.mb.recv_req(data[15:],
rstream.__forward_msg):
self.inc_counter('Modbus_Command')
else:
logger.error('Invalid Modbus Msg')

View File

@@ -1,30 +1,99 @@
import weakref
import asyncio
import logging
import traceback
import json
from aiomqtt import MqttCodeError
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.inverter_base":
from app.src.inverter import Inverter
from app.src.inverter_ifc import InverterIfc
from app.src.proxy import Proxy
from app.src.async_stream import StreamPtr
from app.src.async_stream import AsyncStreamClient
from app.src.async_stream import AsyncStreamServer
from app.src.config import Config
from app.src.infos import Infos
else: # pragma: no cover
from inverter import Inverter
from inverter_ifc import InverterIfc
from proxy import Proxy
from async_stream import StreamPtr
from async_stream import AsyncStreamClient
from async_stream import AsyncStreamServer
from config import Config
from infos import Infos
logger_mqtt = logging.getLogger('mqtt')
class InverterBase(Inverter):
def __init__(self):
self.__ha_restarts = -1
class InverterBase(InverterIfc, Proxy):
async def async_create_remote(self, inv_prot: str, conn_class) -> None:
def __init__(self, reader: StreamReader, writer: StreamWriter,
config_id: str, prot_class,
client_mode: bool = False):
Proxy.__init__(self)
self._registry.append(weakref.ref(self))
self.addr = writer.get_extra_info('peername')
self.config_id = config_id
self.prot_class = prot_class
self.__ha_restarts = -1
self.remote = StreamPtr(None)
ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.create_remote,
self.remote)
self.local = StreamPtr(
self.prot_class(self.addr, ifc, True, client_mode), ifc
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb) -> None:
logging.debug(f'InverterBase.__exit__() {self.addr}')
self.__del_remote()
self.local.stream.close()
self.local.stream = None
self.local.ifc.close()
self.local.ifc = None
def __del__(self) -> None:
logging.debug(f'InverterBase.__del__() {self.addr}')
def __del_remote(self):
if self.remote.stream:
self.remote.stream.close()
self.remote.stream = None
if self.remote.ifc:
self.remote.ifc.close()
self.remote.ifc = None
async def disc(self, shutdown_started=False) -> None:
if self.remote.stream:
self.remote.stream.shutdown_started = shutdown_started
if self.remote.ifc:
await self.remote.ifc.disc()
if self.local.stream:
self.local.stream.shutdown_started = shutdown_started
if self.local.ifc:
await self.local.ifc.disc()
def healthy(self) -> bool:
logging.debug('InverterBase healthy()')
if self.local.ifc and not self.local.ifc.healthy():
return False
if self.remote.ifc and not self.remote.ifc.healthy():
return False
return True
async def create_remote(self) -> None:
'''Establish a client connection to the TSUN cloud'''
tsun = Config.get(inv_prot)
tsun = Config.get(self.config_id)
host = tsun['host']
port = tsun['port']
addr = (host, port)
@@ -34,15 +103,18 @@ class InverterBase(Inverter):
logging.info(f'[{stream.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port)
reader, writer = await connect
ifc = AsyncStreamClient(reader, writer,
self.remote)
ifc = AsyncStreamClient(
reader, writer, self.local, self.__del_remote)
self.remote.ifc = ifc
if hasattr(stream, 'id_str'):
self.remote.stream = conn_class(
addr, ifc, False, stream.id_str)
self.remote.stream = self.prot_class(
addr, ifc, server_side=False,
client_mode=False, id_str=stream.id_str)
else:
self.remote.stream = conn_class(
addr, ifc, False)
self.remote.stream = self.prot_class(
addr, ifc, server_side=False,
client_mode=False)
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '
@@ -60,7 +132,7 @@ class InverterBase(Inverter):
async def async_publ_mqtt(self) -> None:
'''publish data to MQTT broker'''
stream = self.local.stream
if not stream.unique_id:
if not stream or not stream.unique_id:
return
# check if new inverter or collector infos are available or when the
# home assistant has changed the status back to online
@@ -76,7 +148,7 @@ class InverterBase(Inverter):
for key in stream.new_data:
await self.__async_publ_mqtt_packet(stream, key)
for key in Infos.new_stat_data:
await Inverter._async_publ_mqtt_proxy_stat(key)
await Proxy._async_publ_mqtt_proxy_stat(key)
except MqttCodeError as error:
logging.error(f'Mqtt except: {error}')

40
app/src/inverter_ifc.py Normal file
View File

@@ -0,0 +1,40 @@
from abc import abstractmethod
import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.inverter_ifc":
from app.src.iter_registry import AbstractIterMeta
else: # pragma: no cover
from iter_registry import AbstractIterMeta
logger_mqtt = logging.getLogger('mqtt')
class InverterIfc(metaclass=AbstractIterMeta):
_registry = []
@abstractmethod
def __init__(self, reader: StreamReader, writer: StreamWriter,
config_id: str, prot_class,
client_mode: bool):
pass # pragma: no cover
@abstractmethod
def __enter__(self):
pass # pragma: no cover
@abstractmethod
def __exit__(self, exc_type, exc, tb):
pass # pragma: no cover
@abstractmethod
def healthy(self) -> bool:
pass # pragma: no cover
@abstractmethod
async def disc(self, shutdown_started=False) -> None:
pass # pragma: no cover
@abstractmethod
async def create_remote(self) -> None:
pass # pragma: no cover

9
app/src/iter_registry.py Normal file
View File

@@ -0,0 +1,9 @@
from abc import ABCMeta
class AbstractIterMeta(ABCMeta):
def __iter__(cls):
for ref in cls._registry:
obj = ref()
if obj is not None:
yield obj

View File

@@ -1,13 +1,15 @@
import logging
import weakref
from typing import Callable, Generator
from typing import Callable
from enum import Enum
if __name__ == "app.src.messages":
from app.src.protocol_ifc import ProtocolIfc
from app.src.infos import Infos, Register
from app.src.modbus import Modbus
else: # pragma: no cover
from protocol_ifc import ProtocolIfc
from infos import Infos, Register
from modbus import Modbus
@@ -66,14 +68,6 @@ def hex_dump_memory(level, info, data, data_len):
tracer.log(level, '\n'.join(lines))
class IterRegistry(type):
def __iter__(cls) -> Generator['Message', None, None]:
for ref in cls._registry:
obj = ref()
if obj is not None:
yield obj
class State(Enum):
'''state of the logical connection'''
init = 0
@@ -88,8 +82,7 @@ class State(Enum):
'''connection closed'''
class Message(metaclass=IterRegistry):
_registry = []
class Message(ProtocolIfc):
MAX_START_TIME = 400
'''maximum time without a received msg in sec'''
MAX_INV_IDLE_TIME = 120

View File

@@ -25,8 +25,9 @@ class ModbusConn():
'''Establish a client connection to the TSUN cloud'''
connection = asyncio.open_connection(self.host, self.port)
reader, writer = await connection
self.inverter = InverterG3P(reader, writer, self.addr,
self.inverter = InverterG3P(reader, writer,
client_mode=True)
self.inverter.__enter__()
stream = self.inverter.local.stream
logging.info(f'[{stream.node_id}:{stream.conn_no}] '
f'Connected to {self.addr}')
@@ -37,7 +38,7 @@ class ModbusConn():
async def __aexit__(self, exc_type, exc, tb):
Infos.dec_counter('Inverter_Cnt')
await self.inverter.local.ifc.publish_outstanding_mqtt()
self.inverter.close()
self.inverter.__exit__(exc_type, exc, tb)
class ModbusTcp():

21
app/src/protocol_ifc.py Normal file
View File

@@ -0,0 +1,21 @@
from abc import abstractmethod
if __name__ == "app.src.protocol_ifc":
from app.src.iter_registry import AbstractIterMeta
from app.src.async_ifc import AsyncIfc
else: # pragma: no cover
from iter_registry import AbstractIterMeta
from async_ifc import AsyncIfc
class ProtocolIfc(metaclass=AbstractIterMeta):
_registry = []
@abstractmethod
def __init__(self, addr, ifc: "AsyncIfc", server_side: bool,
client_mode: bool = False, id_str=b''):
pass # pragma: no cover
@abstractmethod
def close(self):
pass # pragma: no cover

View File

@@ -2,7 +2,7 @@ import asyncio
import logging
import json
if __name__ == "app.src.inverter":
if __name__ == "app.src.proxy":
from app.src.config import Config
from app.src.mqtt import Mqtt
from app.src.infos import Infos
@@ -14,8 +14,8 @@ else: # pragma: no cover
logger_mqtt = logging.getLogger('mqtt')
class Inverter():
'''class Inverter is a baseclass
class Proxy():
'''class Proxy is a baseclass
The class has some class method for managing common resources like a
connection to the MQTT broker or proxy error counter which are common
@@ -34,12 +34,12 @@ class Inverter():
destroyed
methods:
async_create_remote(): Establish a client connection to the TSUN cloud
create_remote(): Establish a client connection to the TSUN cloud
async_publ_mqtt(): Publish data to MQTT broker
'''
@classmethod
def class_init(cls) -> None:
logging.debug('Inverter.class_init')
logging.debug('Proxy.class_init')
# initialize the proxy statistics
Infos.static_init()
cls.db_stat = Infos()
@@ -61,7 +61,7 @@ class Inverter():
# reset at midnight when you restart the proxy just before
# midnight!
inverters = Config.get('inverters')
# logger.debug(f'Inverters: {inverters}')
# logger.debug(f'Proxys: {inverters}')
for inv in inverters.values():
if (type(inv) is dict):
node_id = inv['node_id']
@@ -100,7 +100,7 @@ class Inverter():
@classmethod
def class_close(cls, loop) -> None: # pragma: no cover
logging.debug('Inverter.class_close')
logging.debug('Proxy.class_close')
logging.info('Close MQTT Task')
loop.run_until_complete(cls.mqtt.close())
cls.mqtt = None

View File

@@ -5,8 +5,8 @@ import os
from asyncio import StreamReader, StreamWriter
from aiohttp import web
from logging import config # noqa F401
from messages import Message
from inverter import Inverter
from proxy import Proxy
from inverter_ifc import InverterIfc
from gen3.inverter_g3 import InverterG3
from gen3plus.inverter_g3p import InverterG3P
from scheduler import Schedule
@@ -38,9 +38,9 @@ async def healthy(request):
if proxy_is_up:
# logging.info('web reqeust healthy()')
for stream in Message:
for inverter in InverterIfc:
try:
res = stream.healthy()
res = inverter.healthy()
if not res:
return web.Response(status=503, text="I have a problem")
except Exception as err:
@@ -73,8 +73,8 @@ async def webserver(addr, port):
async def handle_client(reader: StreamReader, writer: StreamWriter, inv_class):
'''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername')
await inv_class(reader, writer, addr).local.ifc.server_loop()
with inv_class(reader, writer) as inv:
await inv.local.ifc.server_loop()
async def handle_shutdown(web_task):
@@ -87,25 +87,13 @@ async def handle_shutdown(web_task):
#
# first, disc all open TCP connections gracefully
#
for stream in Message:
stream.shutdown_started = True
try:
await asyncio.wait_for(stream.disc(), 2)
except Exception:
pass
for inverter in InverterIfc:
await inverter.disc(True)
logging.info('Proxy disconnecting done')
#
# second, close all open TCP connections
#
for stream in Message:
stream.close()
await asyncio.sleep(0.1) # give time for closing
logging.info('Proxy closing done')
#
# third, cancel the web server
# second, cancel the web server
#
web_task.cancel()
await web_task
@@ -164,9 +152,9 @@ if __name__ == "__main__":
ConfigErr = Config.class_init()
if ConfigErr is not None:
logging.info(f'ConfigErr: {ConfigErr}')
Inverter.class_init()
Proxy.class_init()
Schedule.start()
mb_tcp = ModbusTcp(loop)
ModbusTcp(loop)
#
# Create tasks for our listening servers. These must be tasks! If we call
@@ -197,7 +185,7 @@ if __name__ == "__main__":
pass
finally:
logging.info("Event loop is stopped")
Inverter.class_close(loop)
Proxy.class_close(loop)
logging.debug('Close event loop')
loop.close()
logging.info(f'Finally, exit Server "{serv_name}"')