Merge branch 'refactoring-async-stream' of https://github.com/s-allius/tsun-gen3-proxy into titan-scan

This commit is contained in:
Stefan Allius
2024-09-27 19:28:39 +02:00
25 changed files with 1624 additions and 1079 deletions

View File

@@ -1,11 +1,119 @@
if __name__ == "app.src.async_ifc":
from app.src.byte_fifo import ByteFifo
else: # pragma: no cover
from byte_fifo import ByteFifo
from abc import ABC, abstractmethod
class AsyncIfc():
def __init__(self):
self.read = ByteFifo()
self.write = ByteFifo()
self.forward = ByteFifo()
class AsyncIfc(ABC):
@abstractmethod
def get_conn_no(self):
pass # pragma: no cover
@abstractmethod
def set_node_id(self, value: str):
pass # pragma: no cover
#
# TX - QUEUE
#
@abstractmethod
def tx_add(self, data: bytearray):
''' add data to transmit queue'''
pass # pragma: no cover
@abstractmethod
def tx_flush(self):
''' send transmit queue and clears it'''
pass # pragma: no cover
@abstractmethod
def tx_get(self, size: int = None) -> bytearray:
'''removes size numbers of bytes and return them'''
pass # pragma: no cover
@abstractmethod
def tx_peek(self, size: int = None) -> bytearray:
'''returns size numbers of byte without removing them'''
pass # pragma: no cover
@abstractmethod
def tx_log(self, level, info):
''' log the transmit queue'''
pass # pragma: no cover
@abstractmethod
def tx_clear(self):
''' clear transmit queue'''
pass # pragma: no cover
@abstractmethod
def tx_len(self):
''' get numner of bytes in the transmit queue'''
pass # pragma: no cover
#
# FORWARD - QUEUE
#
@abstractmethod
def fwd_add(self, data: bytearray):
''' add data to forward queue'''
pass # pragma: no cover
@abstractmethod
def fwd_flush(self):
''' send forward queue and clears it'''
pass # pragma: no cover
@abstractmethod
def fwd_log(self, level, info):
''' log the forward queue'''
pass # pragma: no cover
@abstractmethod
def fwd_clear(self):
''' clear forward queue'''
pass # pragma: no cover
#
# RX - QUEUE
#
@abstractmethod
def rx_get(self, size: int = None) -> bytearray:
'''removes size numbers of bytes and return them'''
pass # pragma: no cover
@abstractmethod
def rx_peek(self, size: int = None) -> bytearray:
'''returns size numbers of byte without removing them'''
pass # pragma: no cover
@abstractmethod
def rx_log(self, level, info):
''' logs the receive queue'''
pass # pragma: no cover
@abstractmethod
def rx_clear(self):
''' clear receive queue'''
pass # pragma: no cover
@abstractmethod
def rx_len(self):
''' get numner of bytes in the receive queue'''
pass # pragma: no cover
@abstractmethod
def rx_set_cb(self, callback):
pass # pragma: no cover
#
# Protocol Callbacks
#
@abstractmethod
def prot_set_timeout_cb(self, callback):
pass # pragma: no cover
@abstractmethod
def prot_set_init_new_client_conn_cb(self, callback):
pass # pragma: no cover
@abstractmethod
def prot_set_update_header_cb(self, callback):
pass # pragma: no cover

View File

@@ -7,19 +7,147 @@ from typing import Self
from itertools import count
if __name__ == "app.src.async_stream":
from app.src.inverter import Inverter
from app.src.byte_fifo import ByteFifo
from app.src.async_ifc import AsyncIfc
from app.src.messages import hex_dump_memory, State
from app.src.infos import Infos
else: # pragma: no cover
from inverter import Inverter
from byte_fifo import ByteFifo
from async_ifc import AsyncIfc
from messages import hex_dump_memory, State
from infos import Infos
import gc
logger = logging.getLogger('conn')
class AsyncStream():
class AsyncIfcImpl(AsyncIfc):
_ids = count(0)
def __init__(self) -> None:
logger.debug('AsyncIfcImpl.__init__')
self.fwd_fifo = ByteFifo()
self.tx_fifo = ByteFifo()
self.rx_fifo = ByteFifo()
self.conn_no = next(self._ids)
self.node_id = ''
self.timeout_cb = None
self.init_new_client_conn_cb = None
self.update_header_cb = None
def close(self):
self.timeout_cb = None
self.fwd_fifo.reg_trigger(None)
self.tx_fifo.reg_trigger(None)
self.rx_fifo.reg_trigger(None)
def set_node_id(self, value: str):
self.node_id = value
def get_conn_no(self):
return self.conn_no
def tx_add(self, data: bytearray):
''' add data to transmit queue'''
self.tx_fifo += data
def tx_flush(self):
''' send transmit queue and clears it'''
self.tx_fifo()
def tx_get(self, size: int = None) -> bytearray:
'''removes size numbers of bytes and return them'''
return self.tx_fifo.get(size)
def tx_peek(self, size: int = None) -> bytearray:
'''returns size numbers of byte without removing them'''
return self.tx_fifo.peek(size)
def tx_log(self, level, info):
''' log the transmit queue'''
self.tx_fifo.logging(level, info)
def tx_clear(self):
''' clear transmit queue'''
self.tx_fifo.clear()
def tx_len(self):
''' get numner of bytes in the transmit queue'''
return len(self.tx_fifo)
def fwd_add(self, data: bytearray):
''' add data to forward queue'''
self.fwd_fifo += data
def fwd_flush(self):
''' send forward queue and clears it'''
self.fwd_fifo()
def fwd_log(self, level, info):
''' log the forward queue'''
self.fwd_fifo.logging(level, info)
def fwd_clear(self):
''' clear forward queue'''
self.fwd_fifo.clear()
def rx_get(self, size: int = None) -> bytearray:
'''removes size numbers of bytes and return them'''
return self.rx_fifo.get(size)
def rx_peek(self, size: int = None) -> bytearray:
'''returns size numbers of byte without removing them'''
return self.rx_fifo.peek(size)
def rx_log(self, level, info):
''' logs the receive queue'''
self.rx_fifo.logging(level, info)
def rx_clear(self):
''' clear receive queue'''
self.rx_fifo.clear()
def rx_len(self):
''' get numner of bytes in the receive queue'''
return len(self.rx_fifo)
def rx_set_cb(self, callback):
self.rx_fifo.reg_trigger(callback)
def prot_set_timeout_cb(self, callback):
self.timeout_cb = callback
def prot_set_init_new_client_conn_cb(self, callback):
self.init_new_client_conn_cb = callback
def prot_set_update_header_cb(self, callback):
self.update_header_cb = callback
class StreamPtr():
'''Descr StreamPtr'''
def __init__(self, _stream):
self.stream = _stream
@property
def ifc(self):
return self._ifc
@property
def stream(self):
return self._stream
@stream.setter
def stream(self, value):
self._stream = value
if value:
self._ifc = value.ifc
else:
self._ifc = None
class AsyncStream(AsyncIfcImpl):
MAX_PROC_TIME = 2
'''maximum processing time for a received msg in sec'''
MAX_START_TIME = 400
@@ -30,81 +158,29 @@ class AsyncStream():
'''maximum default time without a received msg in sec'''
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, ifc: "AsyncIfc") -> None:
addr, rstream: "StreamPtr") -> None:
AsyncIfcImpl.__init__(self)
logger.debug('AsyncStream.__init__')
ifc.write.reg_trigger(self.__write_cb)
self.ifc = ifc
self.remote = rstream
self.tx_fifo.reg_trigger(self.__write_cb)
self._reader = reader
self._writer = writer
self.addr = addr
self.r_addr = ''
self.l_addr = ''
self.conn_no = next(self._ids)
self.proc_start = None # start processing start timestamp
self.proc_max = 0
self.async_publ_mqtt = None # will be set AsyncStreamServer only
def __write_cb(self):
self._writer.write(self.ifc.write.get())
self._writer.write(self.tx_fifo.get())
def __timeout(self) -> int:
if self.state == State.init or self.state == State.received:
to = self.MAX_START_TIME
elif self.state == State.up and \
self.server_side and self.modbus_polling:
to = self.MAX_INV_IDLE_TIME
else:
to = self.MAX_DEF_IDLE_TIME
return to
async def publish_outstanding_mqtt(self):
'''Publish all outstanding MQTT topics'''
try:
if self.unique_id:
await self.async_publ_mqtt()
await self._async_publ_mqtt_proxy_stat('proxy')
except Exception:
pass
async def server_loop(self, addr: str) -> None:
'''Loop for receiving messages from the inverter (server-side)'''
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'Accept connection from {addr}')
self.inc_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
await self.loop()
self.dec_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for'
f' r{self.r_addr}')
# if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud
if self.remote_stream:
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remote_stream.node_id}:'
f'{self.remote_stream.conn_no}]')
await self.remote_stream.disc()
async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
client_stream = await self.remote_stream.loop()
logger.info(f'[{client_stream.node_id}:{client_stream.conn_no}] '
'Client loop stopped for'
f' l{client_stream.l_addr}')
# if the client connection closes, we don't touch the server
# connection. Instead we erase the client connection stream,
# thus on the next received packet from the inverter, we can
# establish a new connection to the TSUN cloud
# erase backlink to inverter
client_stream.remote_stream = None
if self.remote_stream == client_stream:
# logging.debug(f'Client l{client_stream.l_addr} refs:'
# f' {gc.get_referrers(client_stream)}')
# than erase client connection
self.remote_stream = None
if self.timeout_cb is callable:
return self.timeout_cb
return 360
async def loop(self) -> Self:
"""Async loop handler for precessing all received messages"""
@@ -121,9 +197,9 @@ class AsyncStream():
await asyncio.wait_for(self.__async_read(),
dead_conn_to)
if self.unique_id:
await self.async_write()
await self.__async_forward()
await self.__async_write()
await self.__async_forward()
if self.async_publ_mqtt:
await self.async_publ_mqtt()
except asyncio.TimeoutError:
@@ -150,19 +226,12 @@ class AsyncStream():
return self
except Exception:
self.inc_counter('SW_Exception')
Infos.inc_counter('SW_Exception')
logger.error(
f"Exception for {self.addr}:\n"
f"{traceback.format_exc()}")
await asyncio.sleep(0) # be cooperative to other task
async def async_write(self, headline: str = 'Transmit to ') -> None:
"""Async write handler to transmit the send_buffer"""
if len(self.ifc.write) > 0:
self.ifc.write.logging(logging.INFO, f'{headline}{self.addr}:')
self._writer.write(self.ifc.write.get())
await self._writer.drain()
async def disc(self) -> None:
"""Async disc handler for graceful disconnect"""
if self._writer.is_closing():
@@ -176,18 +245,18 @@ class AsyncStream():
hint: must be called before releasing the connection instance
"""
super().close()
self._reader.feed_eof() # abort awaited read
if self._writer.is_closing():
return
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
self.ifc.write.reg_trigger(None)
self._writer.close()
def healthy(self) -> bool:
elapsed = 0
if self.proc_start is not None:
elapsed = time.time() - self.proc_start
if self.state == State.closed or elapsed > self.MAX_PROC_TIME:
if elapsed > self.MAX_PROC_TIME:
logging.debug(f'[{self.node_id}:{self.conn_no}:'
f'{type(self).__name__}]'
f' act:{round(1000*elapsed)}ms'
@@ -203,54 +272,47 @@ class AsyncStream():
data = await self._reader.read(4096)
if data:
self.proc_start = time.time()
self.ifc.read += data
wait = self.ifc.read() # call read in parent class
self.rx_fifo += data
wait = self.rx_fifo() # call read in parent class
if wait > 0:
await asyncio.sleep(wait)
else:
raise RuntimeError("Peer closed.")
async def __async_write(self, headline: str = 'Transmit to ') -> None:
"""Async write handler to transmit the send_buffer"""
if len(self.tx_fifo) > 0:
self.tx_fifo.logging(logging.INFO, f'{headline}{self.addr}:')
self._writer.write(self.tx_fifo.get())
await self._writer.drain()
async def __async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""
if not self._forward_buffer:
if len(self.fwd_fifo) == 0:
return
try:
if not self.remote_stream:
await self.async_create_remote()
if self.remote_stream:
if self.remote_stream._init_new_client_conn():
await self.remote_stream.async_write()
if self.remote_stream:
self.remote_stream._update_header(self._forward_buffer)
hex_dump_memory(logging.INFO,
f'Forward to {self.remote_stream.addr}:',
self._forward_buffer,
len(self._forward_buffer))
self.remote_stream._writer.write(self._forward_buffer)
await self.remote_stream._writer.drain()
self._forward_buffer = bytearray(0)
await self._async_forward()
except OSError as error:
if self.remote_stream:
rmt = self.remote_stream
self.remote_stream = None
if self.remote.stream:
rmt = self.remote.stream
self.remote.stream = None
logger.error(f'[{rmt.node_id}:{rmt.conn_no}] Fwd: {error} for '
f'l{rmt.l_addr} | r{rmt.r_addr}')
await rmt.disc()
rmt.close()
f'l{rmt._ifc.l_addr} | r{rmt._ifc.r_addr}')
await rmt._ifc.disc()
rmt._ifc.close()
except RuntimeError as error:
if self.remote_stream:
rmt = self.remote_stream
self.remote_stream = None
if self.remote.stream:
rmt = self.remote.stream
self.remote.stream = None
logger.info(f'[{rmt.node_id}:{rmt.conn_no}] '
f'Fwd: {error} for {rmt.l_addr}')
await rmt.disc()
rmt.close()
f'Fwd: {error} for {rmt._ifc.l_addr}')
await rmt._ifc.disc()
rmt._ifc.close()
except Exception:
self.inc_counter('SW_Exception')
Infos.inc_counter('SW_Exception')
logger.error(
f"Fwd Exception for {self.addr}:\n"
f"{traceback.format_exc()}")
@@ -258,3 +320,103 @@ class AsyncStream():
def __del__(self):
logger.debug(
f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}")
class AsyncStreamServer(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, async_publ_mqtt, async_create_remote,
rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, addr,
rstream)
self.async_create_remote = async_create_remote
self.async_publ_mqtt = async_publ_mqtt
async def server_loop(self, addr: str) -> None:
'''Loop for receiving messages from the inverter (server-side)'''
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'Accept connection from {addr}')
Infos.inc_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
await self.loop()
Infos.dec_counter('Inverter_Cnt')
await self.publish_outstanding_mqtt()
logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for'
f' r{self.r_addr}')
# if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud
if self.remote.stream:
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remote.ifc.node_id}:'
f'{self.remote.ifc.conn_no}]')
await self.remote.ifc.disc()
async def _async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""
if not self.remote.stream:
await self.async_create_remote()
if self.remote.stream and \
self.remote.ifc.init_new_client_conn_cb():
await self.remote.ifc._AsyncStream__async_write()
if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain()
async def publish_outstanding_mqtt(self):
'''Publish all outstanding MQTT topics'''
try:
await self.async_publ_mqtt()
await Inverter._async_publ_mqtt_proxy_stat('proxy')
except Exception:
pass
def close(self) -> None:
"""close handler for a no waiting disconnect
hint: must be called before releasing the connection instance
"""
self.async_create_remote = None
self.async_publ_mqtt = None
super().close()
class AsyncStreamClient(AsyncStream):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: "StreamPtr") -> None:
AsyncStream.__init__(self, reader, writer, addr,
rstream)
async def client_loop(self, _: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
await self.loop()
logger.info(f'[{self.node_id}:{self.conn_no}] '
'Client loop stopped for'
f' l{self.l_addr}')
server_stream = self.remote.stream
# if the client connection closes, we don't touch the server
# connection. Instead we erase the client connection stream,
# thus on the next received packet from the inverter, we can
# establish a new connection to the TSUN cloud
if server_stream.remote.ifc == self:
# logging.debug(f'Client l{client_stream.l_addr} refs:'
# f' {gc.get_referrers(client_stream)}')
# than erase client connection
server_stream.remote.stream = None # erases stream and ifc link
# erase backlink to inverter
self.remote.stream = None
async def _async_forward(self) -> None:
"""forward handler transmits data over the remote connection"""
if self.remote.stream:
self.remote.ifc.update_header_cb(self.fwd_fifo.peek())
self.fwd_fifo.logging(logging.INFO, 'Forward to '
f'{self.remote.ifc.addr}:')
self.remote.ifc._writer.write(self.fwd_fifo.get())
await self.remote.ifc._writer.drain()

View File

@@ -2,36 +2,18 @@ import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3.connection_g3":
from app.src.async_ifc import AsyncIfc
from app.src.async_stream import AsyncStream
from app.src.async_stream import AsyncStreamServer
from app.src.async_stream import AsyncStreamClient, StreamPtr
from app.src.gen3.talent import Talent
else: # pragma: no cover
from async_ifc import AsyncIfc
from async_stream import AsyncStream
from async_stream import AsyncStreamServer
from async_stream import AsyncStreamClient, StreamPtr
from gen3.talent import Talent
logger = logging.getLogger('conn')
class ConnectionG3(AsyncStream, Talent):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, remote_stream: 'ConnectionG3', server_side: bool,
id_str=b'') -> None:
self._ifc = AsyncIfc()
AsyncStream.__init__(self, reader, writer, addr, self._ifc)
Talent.__init__(self, server_side, self._ifc, id_str)
self.remote_stream: 'ConnectionG3' = remote_stream
'''
Our puplic methods
'''
def close(self):
AsyncStream.close(self)
Talent.close(self)
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
class ConnectionG3(Talent):
async def async_create_remote(self) -> None:
pass # virtual interface # pragma: no cover
@@ -40,10 +22,40 @@ class ConnectionG3(AsyncStream, Talent):
def healthy(self) -> bool:
logger.debug('ConnectionG3 healthy()')
return AsyncStream.healthy(self)
return self._ifc.healthy()
'''
Our private methods
'''
def __del__(self):
super().__del__()
def close(self):
self._ifc.close()
Talent.close(self)
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
class ConnectionG3Server(ConnectionG3):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3Client',
id_str=b'') -> None:
server_side = True
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamServer(reader, writer, addr,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
Talent.__init__(self, server_side, self._ifc, id_str)
class ConnectionG3Client(ConnectionG3):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3Server',
id_str=b'') -> None:
server_side = False
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamClient(reader, writer, addr,
self.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
Talent.__init__(self, server_side, self._ifc, id_str)

View File

@@ -8,19 +8,21 @@ from aiomqtt import MqttCodeError
if __name__ == "app.src.gen3.inverter_g3":
from app.src.config import Config
from app.src.inverter import Inverter
from app.src.gen3.connection_g3 import ConnectionG3
from app.src.gen3.connection_g3 import ConnectionG3Server
from app.src.gen3.connection_g3 import ConnectionG3Client
from app.src.infos import Infos
else: # pragma: no cover
from config import Config
from inverter import Inverter
from gen3.connection_g3 import ConnectionG3
from gen3.connection_g3 import ConnectionG3Server
from gen3.connection_g3 import ConnectionG3Client
from infos import Infos
logger_mqtt = logging.getLogger('mqtt')
class InverterG3(Inverter, ConnectionG3):
class InverterG3(Inverter, ConnectionG3Server):
'''class Inverter is a derivation of an Async_Stream
The class has some class method for managing common resources like a
@@ -51,8 +53,9 @@ class InverterG3(Inverter, ConnectionG3):
'''
def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
super().__init__(reader, writer, addr, None, True)
super().__init__(reader, writer, addr, None)
self.__ha_restarts = -1
self.addr = addr
async def async_create_remote(self) -> None:
'''Establish a client connection to the TSUN cloud'''
@@ -65,12 +68,12 @@ class InverterG3(Inverter, ConnectionG3):
logging.info(f'[{self.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port)
reader, writer = await connect
self.remote_stream = ConnectionG3(reader, writer, addr, self,
False, self.id_str)
logging.info(f'[{self.remote_stream.node_id}:'
f'{self.remote_stream.conn_no}] '
self.remote.stream = ConnectionG3Client(reader, writer, addr, self,
self.id_str)
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self.client_loop(addr))
asyncio.create_task(self.remote.ifc.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error:
logging.info(f'{error}')
@@ -82,6 +85,8 @@ class InverterG3(Inverter, ConnectionG3):
async def async_publ_mqtt(self) -> None:
'''publish data to MQTT broker'''
if not self.unique_id:
return
# check if new inverter or collector infos are available or when the
# home assistant has changed the status back to online
try:
@@ -96,7 +101,7 @@ class InverterG3(Inverter, ConnectionG3):
for key in self.new_data:
await self.__async_publ_mqtt_packet(key)
for key in Infos.new_stat_data:
await self._async_publ_mqtt_proxy_stat(key)
await Inverter._async_publ_mqtt_proxy_stat(key)
except MqttCodeError as error:
logging.error(f'Mqtt except: {error}')
@@ -128,10 +133,6 @@ class InverterG3(Inverter, ConnectionG3):
self.db.reg_clr_at_midnight(f'{self.entity_prfx}{self.node_id}')
def close(self) -> None:
logging.debug(f'InverterG3.close() l{self.l_addr} | r{self.r_addr}')
logging.debug(f'InverterG3.close() {self.addr}')
super().close() # call close handler in the parent class
# logging.info(f'Inverter refs: {gc.get_referrers(self)}')
def __del__(self):
logging.debug("InverterG3.__del__")
super().__del__()

View File

@@ -6,7 +6,7 @@ from tzlocal import get_localzone
if __name__ == "app.src.gen3.talent":
from app.src.async_ifc import AsyncIfc
from app.src.messages import hex_dump_memory, Message, State
from app.src.messages import Message, State
from app.src.modbus import Modbus
from app.src.my_timer import Timer
from app.src.config import Config
@@ -14,7 +14,7 @@ if __name__ == "app.src.gen3.talent":
from app.src.infos import Register
else: # pragma: no cover
from async_ifc import AsyncIfc
from messages import hex_dump_memory, Message, State
from messages import Message, State
from modbus import Modbus
from my_timer import Timer
from config import Config
@@ -48,7 +48,10 @@ class Talent(Message):
def __init__(self, server_side: bool, ifc: "AsyncIfc", id_str=b''):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=15)
ifc.read.reg_trigger(self.read)
ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout)
ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn)
ifc.prot_set_update_header_cb(self._update_header)
self.ifc = ifc
self.await_conn_resp_cnt = 0
self.id_str = id_str
@@ -107,7 +110,10 @@ class Talent(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.read.reg_trigger(None)
self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None)
self.ifc.prot_set_init_new_client_conn_cb(None)
self.ifc.prot_set_update_header_cb(None)
super().close()
def __set_serial_no(self, serial_no: str):
@@ -143,10 +149,10 @@ class Talent(Message):
self._read()
while True:
if not self.header_valid:
self.__parse_header(self.ifc.read.peek(), len(self.ifc.read))
self.__parse_header(self.ifc.rx_peek(), self.ifc.rx_len())
if self.header_valid and \
len(self.ifc.read) >= (self.header_len + self.data_len):
self.ifc.rx_len() >= (self.header_len + self.data_len):
if self.state == State.init:
self.state = State.received # received 1st package
@@ -154,10 +160,10 @@ class Talent(Message):
if callable(log_lvl):
log_lvl = log_lvl()
self.ifc.read.logging(log_lvl, f'Received from {self.addr}:'
f' BufLen: {len(self.ifc.read)}'
f' HdrLen: {self.header_len}'
f' DtaLen: {self.data_len}')
self.ifc.rx_log(log_lvl, f'Received from {self.addr}:'
f' BufLen: {self.ifc.rx_len()}'
f' HdrLen: {self.header_len}'
f' DtaLen: {self.data_len}')
self.__set_serial_no(self.id_str.decode("utf-8"))
self.__dispatch_msg()
@@ -170,10 +176,9 @@ class Talent(Message):
tsun = Config.get('tsun')
if tsun['enabled']:
buflen = self.header_len+self.data_len
buffer = self.ifc.read.peek(buflen)
self._forward_buffer += buffer
hex_dump_memory(logging.DEBUG, 'Store for forwarding:',
buffer, buflen)
buffer = self.ifc.rx_peek(buflen)
self.ifc.fwd_add(buffer)
self.ifc.fwd_log(logging.DEBUG, 'Store for forwarding:')
fnc = self.switch.get(self.msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'forwrd') +
@@ -182,20 +187,18 @@ class Talent(Message):
def forward_snd(self) -> None:
'''add the build send msg to the forwarding queue'''
tsun = Config.get('tsun')
rest = self.ifc.write.get(self.send_msg_ofs)
buffer = self.ifc.write.get(len(self.ifc.write))
rest = self.ifc.tx_get(self.send_msg_ofs)
buffer = self.ifc.tx_get()
if tsun['enabled']:
_len = len(buffer)
struct.pack_into('!l', buffer, 0, _len-4)
buflen = _len
self._forward_buffer += buffer
hex_dump_memory(logging.INFO, 'Store for forwarding:',
buffer, buflen)
self.ifc.fwd_add(buffer)
self.ifc.fwd_log(logging.INFO, 'Store for forwarding:')
fnc = self.switch.get(self.msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'forwrd') +
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
self.ifc.write += rest
self.ifc.tx_add(rest)
def send_modbus_cb(self, modbus_pdu: bytearray, log_lvl: int, state: str):
if self.state != State.up:
@@ -204,13 +207,13 @@ class Talent(Message):
return
self.__build_header(0x70, 0x77)
self.ifc.write += b'\x00\x01\xa3\x28' # magic ?
self.ifc.write += struct.pack('!B', len(modbus_pdu))
self.ifc.write += modbus_pdu
self.ifc.tx_add(b'\x00\x01\xa3\x28') # magic ?
self.ifc.tx_add(struct.pack('!B', len(modbus_pdu)))
self.ifc.tx_add(modbus_pdu)
self.__finish_send_msg()
self.ifc.write.logging(log_lvl, f'Send Modbus {state}:{self.addr}:')
self.ifc.write()
self.ifc.tx_log(log_lvl, f'Send Modbus {state}:{self.addr}:')
self.ifc.tx_flush()
def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
if self.state != State.up:
@@ -238,9 +241,9 @@ class Talent(Message):
self.msg_id = 0
self.await_conn_resp_cnt += 1
self.__build_header(0x91)
self.ifc.write += struct.pack(f'!{len(contact_name)+1}p'
f'{len(contact_mail)+1}p',
contact_name, contact_mail)
self.ifc.tx_add(struct.pack(f'!{len(contact_name)+1}p'
f'{len(contact_mail)+1}p',
contact_name, contact_mail))
self.__finish_send_msg()
return True
@@ -324,7 +327,7 @@ class Talent(Message):
self.inc_counter('Invalid_Msg_Format')
# erase broken recv buffer
self.ifc.read.clear()
self.ifc.rx_clear()
return
hdr_len = 5+id_len+2
@@ -345,16 +348,16 @@ class Talent(Message):
def __build_header(self, ctrl, msg_id=None) -> None:
if not msg_id:
msg_id = self.msg_id
self.send_msg_ofs = len(self.ifc.write)
self.ifc.write += struct.pack(f'!l{len(self.id_str)+1}pBB',
0, self.id_str, ctrl, msg_id)
self.send_msg_ofs = self.ifc.tx_len()
self.ifc.tx_add(struct.pack(f'!l{len(self.id_str)+1}pBB',
0, self.id_str, ctrl, msg_id))
fnc = self.switch.get(msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'tx') +
f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}')
def __finish_send_msg(self) -> None:
_len = len(self.ifc.write) - self.send_msg_ofs
struct.pack_into('!l', self.ifc.write.peek(), self.send_msg_ofs,
_len = self.ifc.tx_len() - self.send_msg_ofs
struct.pack_into('!l', self.ifc.tx_peek(), self.send_msg_ofs,
_len-4)
def __dispatch_msg(self) -> None:
@@ -369,7 +372,7 @@ class Talent(Message):
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
def __flush_recv_msg(self) -> None:
self.ifc.read.get(self.header_len+self.data_len)
self.ifc.rx_get(self.header_len+self.data_len)
self.header_valid = False
'''
@@ -379,7 +382,7 @@ class Talent(Message):
if self.ctrl.is_ind():
if self.server_side and self.__process_contact_info():
self.__build_header(0x91)
self.ifc.write += b'\x01'
self.ifc.tx_add(b'\x01')
self.__finish_send_msg()
# don't forward this contact info here, we will build one
# when the remote connection is established
@@ -393,7 +396,7 @@ class Talent(Message):
self.forward()
def __process_contact_info(self) -> bool:
buf = self.ifc.read.peek()
buf = self.ifc.rx_peek()
result = struct.unpack_from('!B', buf, self.header_len)
name_len = result[0]
if self.data_len == 1: # this is a response withone status byte
@@ -420,16 +423,16 @@ class Talent(Message):
ts = self._timestamp()
logger.debug(f'time: {ts:08x}')
self.__build_header(0x91)
self.ifc.write += struct.pack('!q', ts)
self.ifc.tx_add(struct.pack('!q', ts))
self.__finish_send_msg()
elif self.data_len >= 8:
ts = self._timestamp()
result = struct.unpack_from('!q', self.ifc.read.peek(),
result = struct.unpack_from('!q', self.ifc.rx_peek(),
self.header_len)
self.ts_offset = result[0]-ts
if self.remote_stream:
self.remote_stream.ts_offset = self.ts_offset
if self.remote.stream:
self.remote.stream.ts_offset = self.ts_offset
logger.debug(f'tsun-time: {int(result[0]):08x}'
f' proxy-time: {ts:08x}'
f' offset: {self.ts_offset}')
@@ -449,10 +452,10 @@ class Talent(Message):
self.db.set_db_def_value(Register.POLLING_INTERVAL,
self.mb_timeout)
self.__build_header(0x99)
self.ifc.write += b'\x02'
self.ifc.tx_add(b'\x02')
self.__finish_send_msg()
result = struct.unpack_from('!Bq', self.ifc.read.peek(),
result = struct.unpack_from('!Bq', self.ifc.rx_peek(),
self.header_len)
resp_code = result[0]
ts = result[1]+self.ts_offset
@@ -460,11 +463,11 @@ class Talent(Message):
f' tsun-time: {ts:08x}'
f' offset: {self.ts_offset}')
self.__build_header(0x91)
self.ifc.write += struct.pack('!Bq', resp_code, ts)
self.ifc.tx_add(struct.pack('!Bq', resp_code, ts))
self.forward_snd()
return
elif self.ctrl.is_resp():
result = struct.unpack_from('!B', self.ifc.read.peek(),
result = struct.unpack_from('!B', self.ifc.rx_peek(),
self.header_len)
resp_code = result[0]
logging.debug(f'TimeActRespCode: {resp_code}')
@@ -476,7 +479,7 @@ class Talent(Message):
self.forward()
def parse_msg_header(self):
result = struct.unpack_from('!lB', self.ifc.read.peek(),
result = struct.unpack_from('!lB', self.ifc.rx_peek(),
self.header_len)
data_id = result[0] # len of complete message
@@ -485,7 +488,7 @@ class Talent(Message):
msg_hdr_len = 5+id_len+9
result = struct.unpack_from(f'!{id_len+1}pBq', self.ifc.read.peek(),
result = struct.unpack_from(f'!{id_len+1}pBq', self.ifc.rx_peek(),
self.header_len + 4)
timestamp = result[2]
@@ -498,7 +501,7 @@ class Talent(Message):
def msg_collector_data(self):
if self.ctrl.is_ind():
self.__build_header(0x99)
self.ifc.write += b'\x01'
self.ifc.tx_add(b'\x01')
self.__finish_send_msg()
self.__process_data()
@@ -513,7 +516,7 @@ class Talent(Message):
def msg_inverter_data(self):
if self.ctrl.is_ind():
self.__build_header(0x99)
self.ifc.write += b'\x01'
self.ifc.tx_add(b'\x01')
self.__finish_send_msg()
self.__process_data()
self.state = State.up # allow MODBUS cmds
@@ -533,7 +536,7 @@ class Talent(Message):
def __process_data(self):
msg_hdr_len, ts = self.parse_msg_header()
for key, update in self.db.parse(self.ifc.read.peek(), self.header_len
for key, update in self.db.parse(self.ifc.rx_peek(), self.header_len
+ msg_hdr_len, self.node_id):
if update:
self._set_mqtt_timestamp(key, self._utcfromts(ts))
@@ -553,7 +556,7 @@ class Talent(Message):
msg_hdr_len = 5
result = struct.unpack_from('!lBB', self.ifc.read.peek(),
result = struct.unpack_from('!lBB', self.ifc.rx_peek(),
self.header_len)
modbus_len = result[1]
return msg_hdr_len, modbus_len
@@ -562,7 +565,7 @@ class Talent(Message):
msg_hdr_len = 6
result = struct.unpack_from('!lBBB', self.ifc.read.peek(),
result = struct.unpack_from('!lBBB', self.ifc.rx_peek(),
self.header_len)
modbus_len = result[2]
return msg_hdr_len, modbus_len
@@ -583,12 +586,12 @@ class Talent(Message):
self.__msg_modbus(hdr_len)
def __msg_modbus(self, hdr_len):
data = self.ifc.read.peek()[self.header_len:
self.header_len+self.data_len]
data = self.ifc.rx_peek()[self.header_len:
self.header_len+self.data_len]
if self.ctrl.is_req():
if self.remote_stream.mb.recv_req(data[hdr_len:],
self.remote_stream.
if self.remote.stream.mb.recv_req(data[hdr_len:],
self.remote.stream.
msg_forward):
self.inc_counter('Modbus_Command')
else:

View File

@@ -2,37 +2,18 @@ import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3plus.connection_g3p":
from app.src.async_ifc import AsyncIfc
from app.src.async_stream import AsyncStream
from app.src.async_stream import AsyncStreamServer
from app.src.async_stream import AsyncStreamClient, StreamPtr
from app.src.gen3plus.solarman_v5 import SolarmanV5
else: # pragma: no cover
from async_ifc import AsyncIfc
from async_stream import AsyncStream
from async_stream import AsyncStreamServer
from async_stream import AsyncStreamClient, StreamPtr
from gen3plus.solarman_v5 import SolarmanV5
logger = logging.getLogger('conn')
class ConnectionG3P(AsyncStream, SolarmanV5):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, remote_stream: 'ConnectionG3P',
server_side: bool,
client_mode: bool) -> None:
self._ifc = AsyncIfc()
AsyncStream.__init__(self, reader, writer, addr, self._ifc)
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)
self.remote_stream: 'ConnectionG3P' = remote_stream
'''
Our puplic methods
'''
def close(self):
AsyncStream.close(self)
SolarmanV5.close(self)
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
class ConnectionG3P(SolarmanV5):
async def async_create_remote(self) -> None:
pass # virtual interface # pragma: no cover
@@ -41,10 +22,40 @@ class ConnectionG3P(AsyncStream, SolarmanV5):
def healthy(self) -> bool:
logger.debug('ConnectionG3P healthy()')
return AsyncStream.healthy(self)
return self._ifc.healthy()
'''
Our private methods
'''
def __del__(self):
super().__del__()
def close(self):
self._ifc.close()
SolarmanV5.close(self)
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
class ConnectionG3PServer(ConnectionG3P):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3PClient',
client_mode: bool) -> None:
server_side = True
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamServer(reader, writer, addr,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)
class ConnectionG3PClient(ConnectionG3P):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3PServer') -> None:
server_side = False
client_mode = False
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamClient(reader, writer, addr, self.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)

View File

@@ -8,19 +8,21 @@ from aiomqtt import MqttCodeError
if __name__ == "app.src.gen3plus.inverter_g3p":
from app.src.config import Config
from app.src.inverter import Inverter
from app.src.gen3plus.connection_g3p import ConnectionG3P
from app.src.gen3plus.connection_g3p import ConnectionG3PServer
from app.src.gen3plus.connection_g3p import ConnectionG3PClient
from app.src.infos import Infos
else: # pragma: no cover
from config import Config
from inverter import Inverter
from gen3plus.connection_g3p import ConnectionG3P
from gen3plus.connection_g3p import ConnectionG3PServer
from gen3plus.connection_g3p import ConnectionG3PClient
from infos import Infos
logger_mqtt = logging.getLogger('mqtt')
class InverterG3P(Inverter, ConnectionG3P):
class InverterG3P(Inverter, ConnectionG3PServer):
'''class Inverter is a derivation of an Async_Stream
The class has some class method for managing common resources like a
@@ -53,8 +55,9 @@ class InverterG3P(Inverter, ConnectionG3P):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr,
client_mode: bool = False):
super().__init__(reader, writer, addr, None,
server_side=True, client_mode=client_mode)
client_mode=client_mode)
self.__ha_restarts = -1
self.addr = addr
async def async_create_remote(self) -> None:
'''Establish a client connection to the TSUN cloud'''
@@ -67,13 +70,12 @@ class InverterG3P(Inverter, ConnectionG3P):
logging.info(f'[{self.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port)
reader, writer = await connect
self.remote_stream = ConnectionG3P(reader, writer, addr, self,
server_side=False,
client_mode=False)
logging.info(f'[{self.remote_stream.node_id}:'
f'{self.remote_stream.conn_no}] '
self.remote.stream = ConnectionG3PClient(reader, writer,
addr, self)
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self.client_loop(addr))
asyncio.create_task(self.remote.ifc.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error:
logging.info(f'{error}')
@@ -85,6 +87,9 @@ class InverterG3P(Inverter, ConnectionG3P):
async def async_publ_mqtt(self) -> None:
'''publish data to MQTT broker'''
if not self.unique_id:
return
# check if new inverter or collector infos are available or when the
# home assistant has changed the status back to online
try:
@@ -99,7 +104,7 @@ class InverterG3P(Inverter, ConnectionG3P):
for key in self.new_data:
await self.__async_publ_mqtt_packet(key)
for key in Infos.new_stat_data:
await self._async_publ_mqtt_proxy_stat(key)
await Inverter._async_publ_mqtt_proxy_stat(key)
except MqttCodeError as error:
logging.error(f'Mqtt except: {error}')
@@ -131,10 +136,6 @@ class InverterG3P(Inverter, ConnectionG3P):
self.db.reg_clr_at_midnight(f'{self.entity_prfx}{self.node_id}')
def close(self) -> None:
logging.debug(f'InverterG3P.close() l{self.l_addr} | r{self.r_addr}')
logging.debug(f'InverterG3P.close() {self.addr}')
super().close() # call close handler in the parent class
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}')
def __del__(self):
logging.debug("InverterG3P.__del__")
super().__del__()

View File

@@ -64,7 +64,11 @@ class SolarmanV5(Message):
def __init__(self, server_side: bool, client_mode: bool, ifc: "AsyncIfc"):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=8)
ifc.read.reg_trigger(self.read)
ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout)
ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn)
ifc.prot_set_update_header_cb(self._update_header)
self.ifc = ifc
self.header_len = 11 # overwrite construcor in class Message
self.control = 0
@@ -165,7 +169,10 @@ class SolarmanV5(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.read.reg_trigger(None)
self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None)
self.ifc.prot_set_init_new_client_conn_cb(None)
self.ifc.prot_set_update_header_cb(None)
super().close()
async def send_start_cmd(self, snr: int, host: str,
@@ -250,10 +257,10 @@ class SolarmanV5(Message):
self._read()
while True:
if not self.header_valid:
self.__parse_header(self.ifc.read.peek(),
len(self.ifc.read))
self.__parse_header(self.ifc.rx_peek(),
self.ifc.rx_len())
if self.header_valid and len(self.ifc.read) >= \
if self.header_valid and self.ifc.rx_len() >= \
(self.header_len + self.data_len+2):
self.__process_complete_received_msg()
self.__flush_recv_msg()
@@ -264,10 +271,10 @@ class SolarmanV5(Message):
log_lvl = self.log_lvl.get(self.control, logging.WARNING)
if callable(log_lvl):
log_lvl = log_lvl()
self.ifc.read.logging(log_lvl, f'Received from {self.addr}:')
self.ifc.rx_log(log_lvl, f'Received from {self.addr}:')
# self._recv_buffer, self.header_len +
# self.data_len+2)
if self.__trailer_is_ok(self.ifc.read.peek(), self.header_len
if self.__trailer_is_ok(self.ifc.rx_peek(), self.header_len
+ self.data_len + 2):
if self.state == State.init:
self.state = State.received
@@ -280,9 +287,8 @@ class SolarmanV5(Message):
return
tsun = Config.get('solarman')
if tsun['enabled']:
self._forward_buffer += buffer[:buflen]
hex_dump_memory(logging.DEBUG, 'Store for forwarding:',
buffer, buflen)
self.ifc.fwd_add(buffer[:buflen])
self.ifc.fwd_log(logging.DEBUG, 'Store for forwarding:')
fnc = self.switch.get(self.control, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'forwrd') +
@@ -338,7 +344,7 @@ class SolarmanV5(Message):
self.inc_counter('Invalid_Msg_Format')
# erase broken recv buffer
self.ifc.read.clear()
self.ifc.rx_clear()
return
self.header_valid = True
@@ -350,11 +356,11 @@ class SolarmanV5(Message):
'Drop packet w invalid stop byte from '
f'{self.addr}:', buf, buf_len)
self.inc_counter('Invalid_Msg_Format')
if len(self.ifc.read) > (self.data_len+13):
if self.ifc.rx_len() > (self.data_len+13):
next_start = buf[self.data_len+13]
if next_start != 0xa5:
# erase broken recv buffer
self.ifc.read.clear()
self.ifc.rx_clear()
return False
@@ -370,22 +376,22 @@ class SolarmanV5(Message):
def __build_header(self, ctrl) -> None:
'''build header for new transmit message'''
self.send_msg_ofs = len(self.ifc.write)
self.send_msg_ofs = self.ifc.tx_len()
self.ifc.write += struct.pack(
'<BHHHL', 0xA5, 0, ctrl, self.seq.get_send(), self.snr)
self.ifc.tx_add(struct.pack(
'<BHHHL', 0xA5, 0, ctrl, self.seq.get_send(), self.snr))
fnc = self.switch.get(ctrl, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'tx') +
f' Ctl: {int(ctrl):#04x} Msg: {fnc.__name__!r}')
def __finish_send_msg(self) -> None:
'''finish the transmit message, set lenght and checksum'''
_len = len(self.ifc.write) - self.send_msg_ofs
struct.pack_into('<H', self.ifc.write.peek(), self.send_msg_ofs+1,
_len = self.ifc.tx_len() - self.send_msg_ofs
struct.pack_into('<H', self.ifc.tx_peek(), self.send_msg_ofs+1,
_len-11)
check = sum(self.ifc.write.peek()[
check = sum(self.ifc.tx_peek()[
self.send_msg_ofs+1:self.send_msg_ofs + _len]) & 0xff
self.ifc.write += struct.pack('<BB', check, 0x15) # crc & stop
self.ifc.tx_add(struct.pack('<BB', check, 0x15)) # crc & stop
def _update_header(self, _forward_buffer):
'''update header for message before forwarding,
@@ -416,14 +422,14 @@ class SolarmanV5(Message):
f' Msg: {fnc.__name__!r}')
def __flush_recv_msg(self) -> None:
self.ifc.read.get(self.header_len + self.data_len+2)
self.ifc.rx_get(self.header_len + self.data_len+2)
self.header_valid = False
def __send_ack_rsp(self, msgtype, ftype, ack=1):
self.__build_header(msgtype)
self.ifc.write += struct.pack('<BBLL', ftype, ack,
self._timestamp(),
self._heartbeat())
self.ifc.tx_add(struct.pack('<BBLL', ftype, ack,
self._timestamp(),
self._heartbeat()))
self.__finish_send_msg()
def send_modbus_cb(self, pdu: bytearray, log_lvl: int, state: str):
@@ -432,12 +438,12 @@ class SolarmanV5(Message):
' cause the state is not UP anymore')
return
self.__build_header(0x4510)
self.ifc.write += struct.pack('<BHLLL', self.MB_RTU_CMD,
self.sensor_list, 0, 0, 0)
self.ifc.write += pdu
self.ifc.tx_add(struct.pack('<BHLLL', self.MB_RTU_CMD,
self.sensor_list, 0, 0, 0))
self.ifc.tx_add(pdu)
self.__finish_send_msg()
self.ifc.write.logging(log_lvl, f'Send Modbus {state}:{self.addr}:')
self.ifc.write()
self.ifc.tx_log(log_lvl, f'Send Modbus {state}:{self.addr}:')
self.ifc.tx_flush()
def _send_modbus_cmd(self, mb_no, func, addr, val, log_lvl) -> None:
if self.state != State.up:
@@ -493,17 +499,18 @@ class SolarmanV5(Message):
self.forward_at_cmd_resp = False
self.__build_header(0x4510)
self.ifc.write += struct.pack(f'<BHLLL{len(at_cmd)}sc', self.AT_CMD,
0x0002, 0, 0, 0,
at_cmd.encode('utf-8'), b'\r')
self.ifc.tx_add(struct.pack(f'<BHLLL{len(at_cmd)}sc', self.AT_CMD,
0x0002, 0, 0, 0,
at_cmd.encode('utf-8'), b'\r'))
self.__finish_send_msg()
self.ifc.tx_log(logging.INFO, 'Send AT Command:')
try:
await self.async_write('Send AT Command:')
self.ifc.tx_flush()
except Exception:
self.ifc.write.clear()
self.ifc.tx_clear()
def __forward_msg(self):
self.forward(self.ifc.read.peek(), self.header_len+self.data_len+2)
self.forward(self.ifc.rx_peek(), self.header_len+self.data_len+2)
def __build_model_name(self):
db = self.db
@@ -524,7 +531,7 @@ class SolarmanV5(Message):
def __process_data(self, ftype, ts):
inv_update = False
msg_type = self.control >> 8
for key, update in self.db.parse(self.ifc.read.peek(), msg_type, ftype,
for key, update in self.db.parse(self.ifc.rx_peek(), msg_type, ftype,
self.node_id):
if update:
if key == 'inverter':
@@ -543,7 +550,7 @@ class SolarmanV5(Message):
self.__forward_msg()
def msg_dev_ind(self):
data = self.ifc.read.peek()[self.header_len:]
data = self.ifc.rx_peek()[self.header_len:]
result = struct.unpack_from(self.HDR_FMT, data, 0)
ftype = result[0] # always 2
total = result[1]
@@ -564,7 +571,7 @@ class SolarmanV5(Message):
self.__send_ack_rsp(0x1110, ftype)
def msg_data_ind(self):
data = self.ifc.read.peek()
data = self.ifc.rx_peek()
result = struct.unpack_from('<BHLLLHL', data, self.header_len)
ftype = result[0] # 1 or 0x81
sensor = result[1]
@@ -592,7 +599,7 @@ class SolarmanV5(Message):
self.new_state_up()
def msg_sync_start(self):
data = self.ifc.read.peek()[self.header_len:]
data = self.ifc.rx_peek()[self.header_len:]
result = struct.unpack_from(self.HDR_FMT, data, 0)
ftype = result[0]
total = result[1]
@@ -605,8 +612,8 @@ class SolarmanV5(Message):
self.__send_ack_rsp(0x1310, ftype)
def msg_command_req(self):
data = self.ifc.read.peek()[self.header_len:
self.header_len+self.data_len]
data = self.ifc.rx_peek()[self.header_len:
self.header_len+self.data_len]
result = struct.unpack_from('<B', data, 0)
ftype = result[0]
if ftype == self.AT_CMD:
@@ -618,8 +625,8 @@ class SolarmanV5(Message):
self.forward_at_cmd_resp = True
elif ftype == self.MB_RTU_CMD:
if self.remote_stream.mb.recv_req(data[15:],
self.remote_stream.
if self.remote.stream.mb.recv_req(data[15:],
self.remote.stream.
__forward_msg):
self.inc_counter('Modbus_Command')
else:
@@ -634,7 +641,7 @@ class SolarmanV5(Message):
self.mqtt.publish(key, data))
def get_cmd_rsp_log_lvl(self) -> int:
ftype = self.ifc.read.peek()[self.header_len]
ftype = self.ifc.rx_peek()[self.header_len]
if ftype == self.AT_CMD:
if self.forward_at_cmd_resp:
return logging.INFO
@@ -646,8 +653,8 @@ class SolarmanV5(Message):
return logging.WARNING
def msg_command_rsp(self):
data = self.ifc.read.peek()[self.header_len:
self.header_len+self.data_len]
data = self.ifc.rx_peek()[self.header_len:
self.header_len+self.data_len]
ftype = data[0]
if ftype == self.AT_CMD:
if not self.forward_at_cmd_resp:
@@ -690,7 +697,7 @@ class SolarmanV5(Message):
self.__build_model_name()
def msg_hbeat_ind(self):
data = self.ifc.read.peek()[self.header_len:]
data = self.ifc.rx_peek()[self.header_len:]
result = struct.unpack_from('<B', data, 0)
ftype = result[0]
@@ -699,7 +706,7 @@ class SolarmanV5(Message):
self.new_state_up()
def msg_sync_end(self):
data = self.ifc.read.peek()[self.header_len:]
data = self.ifc.rx_peek()[self.header_len:]
result = struct.unpack_from(self.HDR_FMT, data, 0)
ftype = result[0]
total = result[1]
@@ -712,7 +719,7 @@ class SolarmanV5(Message):
self.__send_ack_rsp(0x1810, ftype)
def msg_response(self):
data = self.ifc.read.peek()[self.header_len:]
data = self.ifc.rx_peek()[self.header_len:]
result = struct.unpack_from('<BBLL', data, 0)
ftype = result[0] # always 2
valid = result[1] == 1 # status

View File

@@ -383,12 +383,14 @@ class Infos:
'''inc proxy statistic counter'''
db_dict = cls.stat['proxy']
db_dict[counter] += 1
cls.new_stat_data['proxy'] = True
@classmethod
def dec_counter(cls, counter: str) -> None:
'''dec proxy statistic counter'''
db_dict = cls.stat['proxy']
db_dict[counter] -= 1
cls.new_stat_data['proxy'] = True
def ha_proxy_confs(self, ha_prfx: str, node_id: str, snr: str) \
-> Generator[tuple[str, str, str, str], None, None]:

View File

@@ -90,6 +90,12 @@ class State(Enum):
class Message(metaclass=IterRegistry):
_registry = []
MAX_START_TIME = 400
'''maximum time without a received msg in sec'''
MAX_INV_IDLE_TIME = 120
'''maximum time without a received msg from the inverter in sec'''
MAX_DEF_IDLE_TIME = 360
'''maximum default time without a received msg in sec'''
def __init__(self, server_side: bool, send_modbus_cb:
Callable[[bytes, int, str], None], mb_timeout: int):
@@ -105,13 +111,21 @@ class Message(metaclass=IterRegistry):
self.header_len = 0
self.data_len = 0
self.unique_id = 0
self.node_id = '' # will be overwritten in the child class's __init__
self._node_id = ''
self.sug_area = ''
self._forward_buffer = bytearray(0)
self.new_data = {}
self.state = State.init
self.shutdown_started = False
@property
def node_id(self):
return self._node_id
@node_id.setter
def node_id(self, value):
self._node_id = value
self.ifc.set_node_id(value)
'''
Empty methods, that have to be implemented in any child class which
don't use asyncio
@@ -120,10 +134,6 @@ class Message(metaclass=IterRegistry):
# to our _recv_buffer
return # pragma: no cover
def _update_header(self, _forward_buffer):
'''callback for updating the header of the forward buffer'''
pass # pragma: no cover
def _set_mqtt_timestamp(self, key, ts: float | None):
if key not in self.new_data or \
not self.new_data[key]:
@@ -139,6 +149,16 @@ class Message(metaclass=IterRegistry):
# logger.info(f'update: key: {key} ts:{tstr}'
self.db.set_db_def_value(info_id, round(ts))
def _timeout(self) -> int:
if self.state == State.init or self.state == State.received:
to = self.MAX_START_TIME
elif self.state == State.up and \
self.server_side and self.modbus_polling:
to = self.MAX_INV_IDLE_TIME
else:
to = self.MAX_DEF_IDLE_TIME
return to
'''
Our puplic methods
'''

View File

@@ -5,9 +5,11 @@ import asyncio
if __name__ == "app.src.modbus_tcp":
from app.src.config import Config
from app.src.gen3plus.inverter_g3p import InverterG3P
from app.src.infos import Infos
else: # pragma: no cover
from config import Config
from gen3plus.inverter_g3p import InverterG3P
from infos import Infos
logger = logging.getLogger('conn')
@@ -27,13 +29,14 @@ class ModbusConn():
client_mode=True)
logging.info(f'[{self.stream.node_id}:{self.stream.conn_no}] '
f'Connected to {self.addr}')
self.stream.inc_counter('Inverter_Cnt')
await self.stream.publish_outstanding_mqtt()
Infos.inc_counter('Inverter_Cnt')
await self.stream._ifc.publish_outstanding_mqtt()
return self.stream
async def __aexit__(self, exc_type, exc, tb):
self.stream.dec_counter('Inverter_Cnt')
await self.stream.publish_outstanding_mqtt()
Infos.dec_counter('Inverter_Cnt')
await self.stream._ifc.publish_outstanding_mqtt()
self.stream.close()
class ModbusTcp():
@@ -60,7 +63,7 @@ class ModbusTcp():
try:
async with ModbusConn(host, port) as stream:
await stream.send_start_cmd(snr, host)
await stream.loop()
await stream._ifc.loop()
logger.info(f'[{stream.node_id}:{stream.conn_no}] '
f'Connection closed - Shutdown: '
f'{stream.shutdown_started}')

View File

@@ -1,13 +0,0 @@
class ObjectFactory:
def __init__(self):
self._builders = {}
def register_builder(self, key, builder):
self._builders[key] = builder
def create(self, key, **kwargs):
builder = self._builders.get(key)
if not builder:
raise ValueError(key)
return builder(**kwargs)

View File

@@ -74,14 +74,14 @@ async def handle_client(reader: StreamReader, writer: StreamWriter):
'''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername')
await InverterG3(reader, writer, addr).server_loop(addr)
await InverterG3(reader, writer, addr)._ifc.server_loop(addr)
async def handle_client_v2(reader: StreamReader, writer: StreamWriter):
'''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername')
await InverterG3P(reader, writer, addr).server_loop(addr)
await InverterG3P(reader, writer, addr)._ifc.server_loop(addr)
async def handle_shutdown(web_task):