Dev 0.9 (#115)
* make timestamp handling stateless * adapt tests for stateless timestamp handling * initial version * add more type annotations * add more type annotations * fix Generator annotation for ha_proxy_confs * fix names of issue branches * add more type annotations * don't use depricated varn anymore * don't mark all test as async * fix imports * fix solarman unit tests - fake Mqtt class * print image build time during proxy start * update changelog * fix pytest collect warning * cleanup msg_get_time handler * addapt unit test * label debug images with debug * dump droped packages * fix warnings * add systemtest with invalid start byte * update changelog * update changelog * add exposed ports and healthcheck * add wget for healthcheck * add aiohttp * use config validation for healthcheck * add http server for healthcheck * calculate msg prossesing time * add healthy check methods * fix typo * log ConfigErr with DEBUG level * Update async_stream.py - check if processing time is < 5 sec * add a close handler to release internal resources * call modbus close hanlder on a close call * add exception handling for forward handler * update changelog * isolate Modbus fix * cleanup * update changelog * add heaithy handler * log unrelease references * add healtcheck * complete exposed port list * add wget for healtcheck * add aiohttp * use Enum class for State * calc processing time for healthcheck * add HTTP server for healthcheck * cleanup * Update CHANGELOG.md * updat changelog * add docstrings to state enum * set new state State.received * add healthy method * log healthcheck infos with DEBUG level * update changelog * S allius/issue100 (#101) * detect dead connections - disconnect connection on Msg receive timeout - improve connection trace (add connection id) * update changelog * fix merge conflict * fix unittests * S allius/issue108 (#109) * add more data types * adapt unittests * improve test coverage * fix linter warning * update changelog * S allius/issue102 (#110) * hotfix: don't send two MODBUS commands together * fix unit tests * remove read loop * optional sleep between msg read and sending rsp * wait after read 0.5s before sending a response * add pending state * fix state definitions * determine the connection timeout by the conn state * avoid sending MODBUS cmds in the inverter's reporting phase * update changelog * S allius/issue111 (#112) Synchronize regular MODBUS commands with the status of the inverter to prevent the inverter from crashing due to unexpected packets. * inital checkin * remove crontab entry for regular MODBUS cmds * add timer for regular MODBUS polling * fix Stop method call for already stopped timer * optimize MB_START_TIMEOUT value * cleanup * update changelog * fix buildx warnings * fix timer cleanup * fix Config.class_init() - return error string or None - release Schema structure after building thr config * add quit flag to docker push * fix timout calculation * rename python to debugpy * add asyncio log * cleanup shutdown - stop webserver on shutdown - enable asyncio debug mode for debug versions * update changelog * update changelog * fix exception in MODBUS timeout callback * update changelog
This commit is contained in:
@@ -1,44 +1,77 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import traceback
|
||||
from messages import hex_dump_memory
|
||||
import time
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from messages import hex_dump_memory, State
|
||||
from typing import Self
|
||||
from itertools import count
|
||||
|
||||
import gc
|
||||
logger = logging.getLogger('conn')
|
||||
|
||||
|
||||
class AsyncStream():
|
||||
_ids = count(0)
|
||||
MAX_PROC_TIME = 2
|
||||
'''maximum processing time for a received msg in sec'''
|
||||
MAX_START_TIME = 400
|
||||
'''maximum time without a received msg in sec'''
|
||||
MAX_INV_IDLE_TIME = 90
|
||||
'''maximum time without a received msg from the inverter in sec'''
|
||||
MAX_CLOUD_IDLE_TIME = 360
|
||||
'''maximum time without a received msg from cloud side in sec'''
|
||||
|
||||
def __init__(self, reader, writer, addr) -> None:
|
||||
def __init__(self, reader: StreamReader, writer: StreamWriter,
|
||||
addr) -> None:
|
||||
logger.debug('AsyncStream.__init__')
|
||||
self.reader = reader
|
||||
self.writer = writer
|
||||
self.addr = addr
|
||||
self.r_addr = ''
|
||||
self.l_addr = ''
|
||||
self.conn_no = next(self._ids)
|
||||
self.proc_start = None # start processing start timestamp
|
||||
self.proc_max = 0
|
||||
|
||||
async def server_loop(self, addr):
|
||||
def __timeout(self) -> int:
|
||||
if self.state == State.init:
|
||||
to = self.MAX_START_TIME
|
||||
else:
|
||||
if self.server_side:
|
||||
to = self.MAX_INV_IDLE_TIME
|
||||
else:
|
||||
to = self.MAX_CLOUD_IDLE_TIME
|
||||
return to
|
||||
|
||||
async def server_loop(self, addr: str) -> None:
|
||||
'''Loop for receiving messages from the inverter (server-side)'''
|
||||
logging.info(f'[{self.node_id}] Accept connection from {addr}')
|
||||
logger.info(f'[{self.node_id}:{self.conn_no}] '
|
||||
f'Accept connection from {addr}')
|
||||
self.inc_counter('Inverter_Cnt')
|
||||
await self.loop()
|
||||
self.dec_counter('Inverter_Cnt')
|
||||
logging.info(f'[{self.node_id}] Server loop stopped for'
|
||||
f' r{self.r_addr}')
|
||||
logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for'
|
||||
f' r{self.r_addr}')
|
||||
|
||||
# if the server connection closes, we also have to disconnect
|
||||
# the connection to te TSUN cloud
|
||||
if self.remoteStream:
|
||||
logging.debug("disconnect client connection")
|
||||
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
|
||||
f'connection: [{self.remoteStream.node_id}:'
|
||||
f'{self.remoteStream.conn_no}]')
|
||||
await self.remoteStream.disc()
|
||||
try:
|
||||
await self._async_publ_mqtt_proxy_stat('proxy')
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def client_loop(self, addr):
|
||||
async def client_loop(self, addr: str) -> None:
|
||||
'''Loop for receiving messages from the TSUN cloud (client-side)'''
|
||||
clientStream = await self.remoteStream.loop()
|
||||
logging.info(f'[{self.node_id}] Client loop stopped for'
|
||||
f' l{clientStream.l_addr}')
|
||||
logger.info(f'[{clientStream.node_id}:{clientStream.conn_no}] '
|
||||
'Client loop stopped for'
|
||||
f' l{clientStream.l_addr}')
|
||||
|
||||
# if the client connection closes, we don't touch the server
|
||||
# connection. Instead we erase the client connection stream,
|
||||
@@ -54,28 +87,45 @@ class AsyncStream():
|
||||
# than erase client connection
|
||||
self.remoteStream = None
|
||||
|
||||
async def loop(self):
|
||||
async def loop(self) -> Self:
|
||||
"""Async loop handler for precessing all received messages"""
|
||||
self.r_addr = self.writer.get_extra_info('peername')
|
||||
self.l_addr = self.writer.get_extra_info('sockname')
|
||||
|
||||
self.proc_start = time.time()
|
||||
while True:
|
||||
try:
|
||||
await self.__async_read()
|
||||
proc = time.time() - self.proc_start
|
||||
if proc > self.proc_max:
|
||||
self.proc_max = proc
|
||||
self.proc_start = None
|
||||
dead_conn_to = self.__timeout()
|
||||
await asyncio.wait_for(self.__async_read(),
|
||||
dead_conn_to)
|
||||
|
||||
if self.unique_id:
|
||||
await self.async_write()
|
||||
await self.__async_forward()
|
||||
await self.async_publ_mqtt()
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f'[{self.node_id}:{self.conn_no}] Dead '
|
||||
f'connection timeout ({dead_conn_to}s) '
|
||||
f'for {self.l_addr}')
|
||||
await self.disc()
|
||||
self.close()
|
||||
return self
|
||||
|
||||
except OSError as error:
|
||||
logger.error(f'[{self.node_id}] {error} for l{self.l_addr} | '
|
||||
logger.error(f'[{self.node_id}:{self.conn_no}] '
|
||||
f'{error} for l{self.l_addr} | '
|
||||
f'r{self.r_addr}')
|
||||
await self.disc()
|
||||
self.close()
|
||||
return self
|
||||
|
||||
except RuntimeError as error:
|
||||
logger.info(f"[{self.node_id}] {error} for {self.l_addr}")
|
||||
logger.info(f'[{self.node_id}:{self.conn_no}] '
|
||||
f'{error} for {self.l_addr}')
|
||||
await self.disc()
|
||||
self.close()
|
||||
return self
|
||||
@@ -86,31 +136,8 @@ class AsyncStream():
|
||||
f"Exception for {self.addr}:\n"
|
||||
f"{traceback.format_exc()}")
|
||||
|
||||
async def disc(self) -> None:
|
||||
if self.writer.is_closing():
|
||||
return
|
||||
logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
|
||||
def close(self):
|
||||
if self.writer.is_closing():
|
||||
return
|
||||
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
|
||||
self.writer.close()
|
||||
|
||||
'''
|
||||
Our private methods
|
||||
'''
|
||||
async def __async_read(self) -> None:
|
||||
data = await self.reader.read(4096)
|
||||
if data:
|
||||
self._recv_buffer += data
|
||||
self.read() # call read in parent class
|
||||
else:
|
||||
raise RuntimeError("Peer closed.")
|
||||
|
||||
async def async_write(self, headline='Transmit to ') -> None:
|
||||
async def async_write(self, headline: str = 'Transmit to ') -> None:
|
||||
"""Async write handler to transmit the send_buffer"""
|
||||
if self._send_buffer:
|
||||
hex_dump_memory(logging.INFO, f'{headline}{self.addr}:',
|
||||
self._send_buffer, len(self._send_buffer))
|
||||
@@ -118,8 +145,57 @@ class AsyncStream():
|
||||
await self.writer.drain()
|
||||
self._send_buffer = bytearray(0) # self._send_buffer[sent:]
|
||||
|
||||
async def disc(self) -> None:
|
||||
"""Async disc handler for graceful disconnect"""
|
||||
if self.writer.is_closing():
|
||||
return
|
||||
logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
|
||||
def close(self) -> None:
|
||||
"""close handler for a no waiting disconnect
|
||||
|
||||
hint: must be called before releasing the connection instance
|
||||
"""
|
||||
self.reader.feed_eof() # abort awaited read
|
||||
if self.writer.is_closing():
|
||||
return
|
||||
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
|
||||
self.writer.close()
|
||||
|
||||
def healthy(self) -> bool:
|
||||
elapsed = 0
|
||||
if self.proc_start is not None:
|
||||
elapsed = time.time() - self.proc_start
|
||||
if self.state == State.closed or elapsed > self.MAX_PROC_TIME:
|
||||
logging.debug(f'[{self.node_id}:{self.conn_no}:'
|
||||
f'{type(self).__name__}]'
|
||||
f' act:{round(1000*elapsed)}ms'
|
||||
f' max:{round(1000*self.proc_max)}ms')
|
||||
logging.debug(f'Healthy()) refs: {gc.get_referrers(self)}')
|
||||
return elapsed < 5
|
||||
|
||||
'''
|
||||
Our private methods
|
||||
'''
|
||||
async def __async_read(self) -> None:
|
||||
"""Async read handler to read received data from TCP stream"""
|
||||
data = await self.reader.read(4096)
|
||||
if data:
|
||||
self.proc_start = time.time()
|
||||
self._recv_buffer += data
|
||||
wait = self.read() # call read in parent class
|
||||
if wait > 0:
|
||||
await asyncio.sleep(wait)
|
||||
else:
|
||||
raise RuntimeError("Peer closed.")
|
||||
|
||||
async def __async_forward(self) -> None:
|
||||
if self._forward_buffer:
|
||||
"""forward handler transmits data over the remote connection"""
|
||||
if not self._forward_buffer:
|
||||
return
|
||||
try:
|
||||
if not self.remoteStream:
|
||||
await self.async_create_remote()
|
||||
if self.remoteStream:
|
||||
@@ -136,6 +212,30 @@ class AsyncStream():
|
||||
await self.remoteStream.writer.drain()
|
||||
self._forward_buffer = bytearray(0)
|
||||
|
||||
except OSError as error:
|
||||
if self.remoteStream:
|
||||
rmt = self.remoteStream
|
||||
self.remoteStream = None
|
||||
logger.error(f'[{rmt.node_id}:{rmt.conn_no}] Fwd: {error} for '
|
||||
f'l{rmt.l_addr} | r{rmt.r_addr}')
|
||||
await rmt.disc()
|
||||
rmt.close()
|
||||
|
||||
except RuntimeError as error:
|
||||
if self.remoteStream:
|
||||
rmt = self.remoteStream
|
||||
self.remoteStream = None
|
||||
logger.info(f'[{rmt.node_id}:{rmt.conn_no}] '
|
||||
f'Fwd: {error} for {rmt.l_addr}')
|
||||
await rmt.disc()
|
||||
rmt.close()
|
||||
|
||||
except Exception:
|
||||
self.inc_counter('SW_Exception')
|
||||
logger.error(
|
||||
f"Fwd Exception for {self.addr}:\n"
|
||||
f"{traceback.format_exc()}")
|
||||
|
||||
def __del__(self):
|
||||
logger.debug(
|
||||
f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}")
|
||||
|
||||
@@ -84,7 +84,7 @@ class Config():
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def class_init(cls): # pragma: no cover
|
||||
def class_init(cls) -> None | str: # pragma: no cover
|
||||
try:
|
||||
# make the default config transparaent by copying it
|
||||
# in the config.example file
|
||||
@@ -94,7 +94,9 @@ class Config():
|
||||
"config/config.example.toml")
|
||||
except Exception:
|
||||
pass
|
||||
cls.read()
|
||||
err_str = cls.read()
|
||||
del cls.conf_schema
|
||||
return err_str
|
||||
|
||||
@classmethod
|
||||
def _read_config_file(cls) -> dict: # pragma: no cover
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import logging
|
||||
# import gc
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from async_stream import AsyncStream
|
||||
from gen3.talent import Talent
|
||||
|
||||
@@ -8,12 +9,13 @@ logger = logging.getLogger('conn')
|
||||
|
||||
class ConnectionG3(AsyncStream, Talent):
|
||||
|
||||
def __init__(self, reader, writer, addr, remote_stream, server_side: bool,
|
||||
def __init__(self, reader: StreamReader, writer: StreamWriter,
|
||||
addr, remote_stream: 'ConnectionG3', server_side: bool,
|
||||
id_str=b'') -> None:
|
||||
AsyncStream.__init__(self, reader, writer, addr)
|
||||
Talent.__init__(self, server_side, id_str)
|
||||
|
||||
self.remoteStream = remote_stream
|
||||
self.remoteStream: 'ConnectionG3' = remote_stream
|
||||
|
||||
'''
|
||||
Our puplic methods
|
||||
@@ -29,6 +31,10 @@ class ConnectionG3(AsyncStream, Talent):
|
||||
async def async_publ_mqtt(self) -> None:
|
||||
pass
|
||||
|
||||
def healthy(self) -> bool:
|
||||
logger.debug('ConnectionG3 healthy()')
|
||||
return AsyncStream.healthy(self)
|
||||
|
||||
'''
|
||||
Our private methods
|
||||
'''
|
||||
|
||||
@@ -132,11 +132,24 @@ class InfosG3(Infos):
|
||||
errors='replace')
|
||||
ind += str_len+1
|
||||
|
||||
elif data_type == 0x00: # 'Nul' -> end
|
||||
i = elms # abort the loop
|
||||
|
||||
elif data_type == 0x41: # 'A' -> Nop ??
|
||||
# result = struct.unpack_from('!l', buf, ind)[0]
|
||||
ind += 0
|
||||
i += 1
|
||||
continue
|
||||
|
||||
elif data_type == 0x42: # 'B' -> byte, int8
|
||||
result = struct.unpack_from('!B', buf, ind)[0]
|
||||
ind += 1
|
||||
|
||||
elif data_type == 0x49: # 'I' -> int32
|
||||
result = struct.unpack_from('!l', buf, ind)[0]
|
||||
ind += 4
|
||||
|
||||
elif data_type == 0x53: # 'S' -> short
|
||||
elif data_type == 0x53: # 'S' -> short, int16
|
||||
result = struct.unpack_from('!h', buf, ind)[0]
|
||||
ind += 2
|
||||
|
||||
@@ -144,13 +157,14 @@ class InfosG3(Infos):
|
||||
result = round(struct.unpack_from('!f', buf, ind)[0], 2)
|
||||
ind += 4
|
||||
|
||||
elif data_type == 0x4c: # 'L' -> int64
|
||||
elif data_type == 0x4c: # 'L' -> long, int64
|
||||
result = struct.unpack_from('!q', buf, ind)[0]
|
||||
ind += 8
|
||||
|
||||
else:
|
||||
self.inc_counter('Invalid_Data_Type')
|
||||
logging.error(f"Infos.parse: data_type: {data_type}"
|
||||
f" @0x{addr:04x} No:{i}"
|
||||
" not supported")
|
||||
return
|
||||
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import traceback
|
||||
import json
|
||||
import asyncio
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from config import Config
|
||||
from inverter import Inverter
|
||||
from gen3.connection_g3 import ConnectionG3
|
||||
@@ -44,7 +45,7 @@ class InverterG3(Inverter, ConnectionG3):
|
||||
destroyed
|
||||
'''
|
||||
|
||||
def __init__(self, reader, writer, addr):
|
||||
def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
|
||||
super().__init__(reader, writer, addr, None, True)
|
||||
self.__ha_restarts = -1
|
||||
|
||||
@@ -56,11 +57,14 @@ class InverterG3(Inverter, ConnectionG3):
|
||||
addr = (host, port)
|
||||
|
||||
try:
|
||||
logging.info(f'[{self.node_id}] Connected to {addr}')
|
||||
logging.info(f'[{self.node_id}] Connect to {addr}')
|
||||
connect = asyncio.open_connection(host, port)
|
||||
reader, writer = await connect
|
||||
self.remoteStream = ConnectionG3(reader, writer, addr, self,
|
||||
False, self.id_str)
|
||||
logging.info(f'[{self.remoteStream.node_id}:'
|
||||
f'{self.remoteStream.conn_no}] '
|
||||
f'Connected to {addr}')
|
||||
asyncio.create_task(self.client_loop(addr))
|
||||
|
||||
except (ConnectionRefusedError, TimeoutError) as error:
|
||||
@@ -121,7 +125,7 @@ class InverterG3(Inverter, ConnectionG3):
|
||||
def close(self) -> None:
|
||||
logging.debug(f'InverterG3.close() l{self.l_addr} | r{self.r_addr}')
|
||||
super().close() # call close handler in the parent class
|
||||
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}')
|
||||
# logging.info(f'Inverter refs: {gc.get_referrers(self)}')
|
||||
|
||||
def __del__(self):
|
||||
logging.debug("InverterG3.__del__")
|
||||
|
||||
@@ -4,13 +4,15 @@ import time
|
||||
from datetime import datetime
|
||||
|
||||
if __name__ == "app.src.gen3.talent":
|
||||
from app.src.messages import hex_dump_memory, Message
|
||||
from app.src.messages import hex_dump_memory, Message, State
|
||||
from app.src.modbus import Modbus
|
||||
from app.src.my_timer import Timer
|
||||
from app.src.config import Config
|
||||
from app.src.gen3.infos_g3 import InfosG3
|
||||
else: # pragma: no cover
|
||||
from messages import hex_dump_memory, Message
|
||||
from messages import hex_dump_memory, Message, State
|
||||
from modbus import Modbus
|
||||
from my_timer import Timer
|
||||
from config import Config
|
||||
from gen3.infos_g3 import InfosG3
|
||||
|
||||
@@ -35,12 +37,16 @@ class Control:
|
||||
|
||||
|
||||
class Talent(Message):
|
||||
MB_START_TIMEOUT = 40
|
||||
MB_REGULAR_TIMEOUT = 60
|
||||
|
||||
def __init__(self, server_side: bool, id_str=b''):
|
||||
super().__init__(server_side, self.send_modbus_cb, mb_timeout=11)
|
||||
self.await_conn_resp_cnt = 0
|
||||
self.id_str = id_str
|
||||
self.contact_name = b''
|
||||
self.contact_mail = b''
|
||||
self.ts_offset = 0 # time offset between tsun cloud and local
|
||||
self.db = InfosG3()
|
||||
self.switch = {
|
||||
0x00: self.msg_contact_info,
|
||||
@@ -64,19 +70,20 @@ class Talent(Message):
|
||||
}
|
||||
self.modbus_elms = 0 # for unit tests
|
||||
self.node_id = 'G3' # will be overwritten in __set_serial_no
|
||||
# self.forwarding = Config.get('tsun')['enabled']
|
||||
self.mb_timer = Timer(self.mb_timout_cb, self.node_id)
|
||||
|
||||
'''
|
||||
Our puplic methods
|
||||
'''
|
||||
def close(self) -> None:
|
||||
logging.debug('Talent.close()')
|
||||
# we have refernces to methods of this class in self.switch
|
||||
# we have references to methods of this class in self.switch
|
||||
# so we have to erase self.switch, otherwise this instance can't be
|
||||
# deallocated by the garbage collector ==> we get a memory leak
|
||||
self.switch.clear()
|
||||
self.log_lvl.clear()
|
||||
self.state = self.STATE_CLOSED
|
||||
self.state = State.closed
|
||||
self.mb_timer.close()
|
||||
super().close()
|
||||
|
||||
def __set_serial_no(self, serial_no: str):
|
||||
@@ -105,7 +112,7 @@ class Talent(Message):
|
||||
|
||||
self.unique_id = serial_no
|
||||
|
||||
def read(self) -> None:
|
||||
def read(self) -> float:
|
||||
self._read()
|
||||
|
||||
if not self.header_valid:
|
||||
@@ -113,6 +120,9 @@ class Talent(Message):
|
||||
|
||||
if self.header_valid and len(self._recv_buffer) >= (self.header_len +
|
||||
self.data_len):
|
||||
if self.state == State.init:
|
||||
self.state = State.received # received 1st package
|
||||
|
||||
log_lvl = self.log_lvl.get(self.msg_id, logging.WARNING)
|
||||
if callable(log_lvl):
|
||||
log_lvl = log_lvl()
|
||||
@@ -123,7 +133,7 @@ class Talent(Message):
|
||||
self.__set_serial_no(self.id_str.decode("utf-8"))
|
||||
self.__dispatch_msg()
|
||||
self.__flush_recv_msg()
|
||||
return
|
||||
return 0.5 # wait 500ms before sending a response
|
||||
|
||||
def forward(self, buffer, buflen) -> None:
|
||||
tsun = Config.get('tsun')
|
||||
@@ -140,9 +150,9 @@ class Talent(Message):
|
||||
return
|
||||
|
||||
def send_modbus_cb(self, modbus_pdu: bytearray, log_lvl: int, state: str):
|
||||
if self.state != self.STATE_UP:
|
||||
logger.warn(f'[{self.node_id}] ignore MODBUS cmd,'
|
||||
' cause the state is not UP anymore')
|
||||
if self.state != State.up:
|
||||
logger.warning(f'[{self.node_id}] ignore MODBUS cmd,'
|
||||
' cause the state is not UP anymore')
|
||||
return
|
||||
|
||||
self.__build_header(0x70, 0x77)
|
||||
@@ -156,13 +166,25 @@ class Talent(Message):
|
||||
self.writer.write(self._send_buffer)
|
||||
self._send_buffer = bytearray(0) # self._send_buffer[sent:]
|
||||
|
||||
async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
|
||||
if self.state != self.STATE_UP:
|
||||
def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
|
||||
if self.state != State.up:
|
||||
logger.log(log_lvl, f'[{self.node_id}] ignore MODBUS cmd,'
|
||||
' as the state is not UP')
|
||||
return
|
||||
self.mb.build_msg(Modbus.INV_ADDR, func, addr, val, log_lvl)
|
||||
|
||||
async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
|
||||
self._send_modbus_cmd(func, addr, val, log_lvl)
|
||||
|
||||
def mb_timout_cb(self, exp_cnt):
|
||||
self.mb_timer.start(self.MB_REGULAR_TIMEOUT)
|
||||
|
||||
if 0 == (exp_cnt % 30):
|
||||
# logging.info("Regular Modbus Status request")
|
||||
self._send_modbus_cmd(Modbus.READ_REGS, 0x2007, 2, logging.DEBUG)
|
||||
else:
|
||||
self._send_modbus_cmd(Modbus.READ_REGS, 0x3008, 21, logging.DEBUG)
|
||||
|
||||
def _init_new_client_conn(self) -> bool:
|
||||
contact_name = self.contact_name
|
||||
contact_mail = self.contact_mail
|
||||
@@ -204,6 +226,24 @@ class Talent(Message):
|
||||
ts = (datetime.now() - datetime(1970, 1, 1)).total_seconds()
|
||||
return round(ts*1000)
|
||||
|
||||
def _update_header(self, _forward_buffer):
|
||||
'''update header for message before forwarding,
|
||||
add time offset to timestamp'''
|
||||
_len = len(_forward_buffer)
|
||||
result = struct.unpack_from('!lB', _forward_buffer, 0)
|
||||
id_len = result[1] # len of variable id string
|
||||
if _len < 2*id_len + 21:
|
||||
return
|
||||
|
||||
result = struct.unpack_from('!B', _forward_buffer, id_len+6)
|
||||
msg_code = result[0]
|
||||
if msg_code == 0x71 or msg_code == 0x04:
|
||||
result = struct.unpack_from('!q', _forward_buffer, 13+2*id_len)
|
||||
ts = result[0] + self.ts_offset
|
||||
logger.debug(f'offset: {self.ts_offset:08x}'
|
||||
f' proxy-time: {ts:08x}')
|
||||
struct.pack_into('!q', _forward_buffer, 13+2*id_len, ts)
|
||||
|
||||
# check if there is a complete header in the buffer, parse it
|
||||
# and set
|
||||
# self.header_len
|
||||
@@ -256,7 +296,8 @@ class Talent(Message):
|
||||
fnc = self.switch.get(self.msg_id, self.msg_unknown)
|
||||
if self.unique_id:
|
||||
logger.info(self.__flow_str(self.server_side, 'rx') +
|
||||
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
|
||||
f' Ctl: {int(self.ctrl):#02x} ({self.state}) '
|
||||
f'Msg: {fnc.__name__!r}')
|
||||
fnc()
|
||||
else:
|
||||
logger.info(self.__flow_str(self.server_side, 'drop') +
|
||||
@@ -305,39 +346,37 @@ class Talent(Message):
|
||||
return True
|
||||
|
||||
def msg_get_time(self):
|
||||
tsun = Config.get('tsun')
|
||||
if tsun['enabled']:
|
||||
if self.ctrl.is_ind():
|
||||
if self.data_len >= 8:
|
||||
ts = self._timestamp()
|
||||
result = struct.unpack_from('!q', self._recv_buffer,
|
||||
self.header_len)
|
||||
logger.debug(f'tsun-time: {result[0]:08x}'
|
||||
f' proxy-time: {ts:08x}')
|
||||
else:
|
||||
logger.warning('Unknown Ctrl')
|
||||
self.inc_counter('Unknown_Ctrl')
|
||||
self.forward(self._recv_buffer, self.header_len+self.data_len)
|
||||
if self.ctrl.is_ind():
|
||||
if self.data_len == 0:
|
||||
self.state = State.pend # block MODBUS cmds
|
||||
self.mb_timer.start(self.MB_START_TIMEOUT)
|
||||
ts = self._timestamp()
|
||||
logger.debug(f'time: {ts:08x}')
|
||||
self.__build_header(0x91)
|
||||
self._send_buffer += struct.pack('!q', ts)
|
||||
self.__finish_send_msg()
|
||||
|
||||
elif self.data_len >= 8:
|
||||
ts = self._timestamp()
|
||||
result = struct.unpack_from('!q', self._recv_buffer,
|
||||
self.header_len)
|
||||
self.ts_offset = result[0]-ts
|
||||
logger.debug(f'tsun-time: {int(result[0]):08x}'
|
||||
f' proxy-time: {ts:08x}'
|
||||
f' offset: {self.ts_offset}')
|
||||
return # ignore received response
|
||||
else:
|
||||
if self.ctrl.is_ind():
|
||||
if self.data_len == 0:
|
||||
ts = self._timestamp()
|
||||
logger.debug(f'time: {ts:08x}')
|
||||
logger.warning('Unknown Ctrl')
|
||||
self.inc_counter('Unknown_Ctrl')
|
||||
|
||||
self.__build_header(0x91)
|
||||
self._send_buffer += struct.pack('!q', ts)
|
||||
self.__finish_send_msg()
|
||||
|
||||
else:
|
||||
logger.warning('Unknown Ctrl')
|
||||
self.inc_counter('Unknown_Ctrl')
|
||||
self.forward(self._recv_buffer, self.header_len+self.data_len)
|
||||
|
||||
def parse_msg_header(self):
|
||||
result = struct.unpack_from('!lB', self._recv_buffer, self.header_len)
|
||||
|
||||
data_id = result[0] # len of complete message
|
||||
id_len = result[1] # len of variable id string
|
||||
logger.debug(f'Data_ID: {data_id} id_len: {id_len}')
|
||||
logger.debug(f'Data_ID: 0x{data_id:08x} id_len: {id_len}')
|
||||
|
||||
msg_hdr_len = 5+id_len+9
|
||||
|
||||
@@ -356,7 +395,6 @@ class Talent(Message):
|
||||
self._send_buffer += b'\x01'
|
||||
self.__finish_send_msg()
|
||||
self.__process_data()
|
||||
self.state = self.STATE_UP
|
||||
|
||||
elif self.ctrl.is_resp():
|
||||
return # ignore received response
|
||||
@@ -372,7 +410,7 @@ class Talent(Message):
|
||||
self._send_buffer += b'\x01'
|
||||
self.__finish_send_msg()
|
||||
self.__process_data()
|
||||
self.state = self.STATE_UP
|
||||
self.state = State.up # allow MODBUS cmds
|
||||
|
||||
elif self.ctrl.is_resp():
|
||||
return # ignore received response
|
||||
@@ -432,8 +470,13 @@ class Talent(Message):
|
||||
else:
|
||||
self.inc_counter('Invalid_Msg_Format')
|
||||
elif self.ctrl.is_ind():
|
||||
# logger.debug(f'Modbus Ind MsgLen: {modbus_len}')
|
||||
self.modbus_elms = 0
|
||||
# logger.debug(f'Modbus Ind MsgLen: {modbus_len}')
|
||||
if not self.server_side:
|
||||
logger.warning('Unknown Message')
|
||||
self.inc_counter('Unknown_Msg')
|
||||
return
|
||||
|
||||
for key, update, _ in self.mb.recv_resp(self.db, data[
|
||||
hdr_len:],
|
||||
self.node_id):
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import logging
|
||||
# import gc
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from async_stream import AsyncStream
|
||||
from gen3plus.solarman_v5 import SolarmanV5
|
||||
|
||||
@@ -8,12 +9,13 @@ logger = logging.getLogger('conn')
|
||||
|
||||
class ConnectionG3P(AsyncStream, SolarmanV5):
|
||||
|
||||
def __init__(self, reader, writer, addr, remote_stream,
|
||||
def __init__(self, reader: StreamReader, writer: StreamWriter,
|
||||
addr, remote_stream: 'ConnectionG3P',
|
||||
server_side: bool) -> None:
|
||||
AsyncStream.__init__(self, reader, writer, addr)
|
||||
SolarmanV5.__init__(self, server_side)
|
||||
|
||||
self.remoteStream = remote_stream
|
||||
self.remoteStream: 'ConnectionG3P' = remote_stream
|
||||
|
||||
'''
|
||||
Our puplic methods
|
||||
@@ -29,6 +31,10 @@ class ConnectionG3P(AsyncStream, SolarmanV5):
|
||||
async def async_publ_mqtt(self) -> None:
|
||||
pass
|
||||
|
||||
def healthy(self) -> bool:
|
||||
logger.debug('ConnectionG3P healthy()')
|
||||
return AsyncStream.healthy(self)
|
||||
|
||||
'''
|
||||
Our private methods
|
||||
'''
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import traceback
|
||||
import json
|
||||
import asyncio
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from config import Config
|
||||
from inverter import Inverter
|
||||
from gen3plus.connection_g3p import ConnectionG3P
|
||||
@@ -44,7 +45,7 @@ class InverterG3P(Inverter, ConnectionG3P):
|
||||
destroyed
|
||||
'''
|
||||
|
||||
def __init__(self, reader, writer, addr):
|
||||
def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
|
||||
super().__init__(reader, writer, addr, None, True)
|
||||
self.__ha_restarts = -1
|
||||
|
||||
@@ -56,11 +57,14 @@ class InverterG3P(Inverter, ConnectionG3P):
|
||||
addr = (host, port)
|
||||
|
||||
try:
|
||||
logging.info(f'[{self.node_id}] Connected to {addr}')
|
||||
logging.info(f'[{self.node_id}] Connect to {addr}')
|
||||
connect = asyncio.open_connection(host, port)
|
||||
reader, writer = await connect
|
||||
self.remoteStream = ConnectionG3P(reader, writer, addr, self,
|
||||
False)
|
||||
logging.info(f'[{self.remoteStream.node_id}:'
|
||||
f'{self.remoteStream.conn_no}] '
|
||||
f'Connected to {addr}')
|
||||
asyncio.create_task(self.client_loop(addr))
|
||||
|
||||
except (ConnectionRefusedError, TimeoutError) as error:
|
||||
|
||||
@@ -6,15 +6,17 @@ import asyncio
|
||||
from datetime import datetime
|
||||
|
||||
if __name__ == "app.src.gen3plus.solarman_v5":
|
||||
from app.src.messages import hex_dump_memory, Message
|
||||
from app.src.messages import hex_dump_memory, Message, State
|
||||
from app.src.modbus import Modbus
|
||||
from app.src.my_timer import Timer
|
||||
from app.src.config import Config
|
||||
from app.src.gen3plus.infos_g3p import InfosG3P
|
||||
from app.src.infos import Register
|
||||
else: # pragma: no cover
|
||||
from messages import hex_dump_memory, Message
|
||||
from messages import hex_dump_memory, Message, State
|
||||
from config import Config
|
||||
from modbus import Modbus
|
||||
from my_timer import Timer
|
||||
from gen3plus.infos_g3p import InfosG3P
|
||||
from infos import Register
|
||||
# import traceback
|
||||
@@ -51,6 +53,8 @@ class Sequence():
|
||||
class SolarmanV5(Message):
|
||||
AT_CMD = 1
|
||||
MB_RTU_CMD = 2
|
||||
MB_START_TIMEOUT = 40
|
||||
MB_REGULAR_TIMEOUT = 60
|
||||
|
||||
def __init__(self, server_side: bool):
|
||||
super().__init__(server_side, self.send_modbus_cb, mb_timeout=5)
|
||||
@@ -123,19 +127,20 @@ class SolarmanV5(Message):
|
||||
self.at_acl = g3p_cnf['at_acl']
|
||||
|
||||
self.node_id = 'G3P' # will be overwritten in __set_serial_no
|
||||
# self.forwarding = Config.get('solarman')['enabled']
|
||||
self.mb_timer = Timer(self.mb_timout_cb, self.node_id)
|
||||
|
||||
'''
|
||||
Our puplic methods
|
||||
'''
|
||||
def close(self) -> None:
|
||||
logging.debug('Solarman.close()')
|
||||
# we have refernces to methods of this class in self.switch
|
||||
# we have references to methods of this class in self.switch
|
||||
# so we have to erase self.switch, otherwise this instance can't be
|
||||
# deallocated by the garbage collector ==> we get a memory leak
|
||||
self.switch.clear()
|
||||
self.log_lvl.clear()
|
||||
self.state = self.STATE_CLOSED
|
||||
self.state = State.closed
|
||||
self.mb_timer.close()
|
||||
super().close()
|
||||
|
||||
def __set_serial_no(self, snr: int):
|
||||
@@ -169,7 +174,7 @@ class SolarmanV5(Message):
|
||||
|
||||
self.unique_id = serial_no
|
||||
|
||||
def read(self) -> None:
|
||||
def read(self) -> float:
|
||||
self._read()
|
||||
|
||||
if not self.header_valid:
|
||||
@@ -184,10 +189,13 @@ class SolarmanV5(Message):
|
||||
self._recv_buffer, self.header_len+self.data_len+2)
|
||||
if self.__trailer_is_ok(self._recv_buffer, self.header_len
|
||||
+ self.data_len + 2):
|
||||
if self.state == State.init:
|
||||
self.state = State.received
|
||||
|
||||
self.__set_serial_no(self.snr)
|
||||
self.__dispatch_msg()
|
||||
self.__flush_recv_msg()
|
||||
return
|
||||
return 0 # wait 0s before sending a response
|
||||
|
||||
def forward(self, buffer, buflen) -> None:
|
||||
tsun = Config.get('solarman')
|
||||
@@ -253,6 +261,10 @@ class SolarmanV5(Message):
|
||||
self.snr = result[4]
|
||||
|
||||
if start != 0xA5:
|
||||
hex_dump_memory(logging.ERROR,
|
||||
'Drop packet w invalid start byte from'
|
||||
f' {self.addr}:', buf, buf_len)
|
||||
|
||||
self.inc_counter('Invalid_Msg_Format')
|
||||
# erase broken recv buffer
|
||||
self._recv_buffer = bytearray()
|
||||
@@ -264,6 +276,9 @@ class SolarmanV5(Message):
|
||||
crc = buf[self.data_len+11]
|
||||
stop = buf[self.data_len+12]
|
||||
if stop != 0x15:
|
||||
hex_dump_memory(logging.ERROR,
|
||||
'Drop packet w invalid stop byte from '
|
||||
f'{self.addr}:', buf, buf_len)
|
||||
self.inc_counter('Invalid_Msg_Format')
|
||||
if len(self._recv_buffer) > (self.data_len+13):
|
||||
next_start = buf[self.data_len+13]
|
||||
@@ -338,9 +353,9 @@ class SolarmanV5(Message):
|
||||
self.__finish_send_msg()
|
||||
|
||||
def send_modbus_cb(self, pdu: bytearray, log_lvl: int, state: str):
|
||||
if self.state != self.STATE_UP:
|
||||
logger.warn(f'[{self.node_id}] ignore MODBUS cmd,'
|
||||
' cause the state is not UP anymore')
|
||||
if self.state != State.up:
|
||||
logger.warning(f'[{self.node_id}] ignore MODBUS cmd,'
|
||||
' cause the state is not UP anymore')
|
||||
return
|
||||
self.__build_header(0x4510)
|
||||
self._send_buffer += struct.pack('<BHLLL', self.MB_RTU_CMD,
|
||||
@@ -352,21 +367,33 @@ class SolarmanV5(Message):
|
||||
self.writer.write(self._send_buffer)
|
||||
self._send_buffer = bytearray(0) # self._send_buffer[sent:]
|
||||
|
||||
async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
|
||||
if self.state != self.STATE_UP:
|
||||
def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
|
||||
if self.state != State.up:
|
||||
logger.log(log_lvl, f'[{self.node_id}] ignore MODBUS cmd,'
|
||||
' as the state is not UP')
|
||||
return
|
||||
self.mb.build_msg(Modbus.INV_ADDR, func, addr, val, log_lvl)
|
||||
|
||||
async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
|
||||
self._send_modbus_cmd(func, addr, val, log_lvl)
|
||||
|
||||
def mb_timout_cb(self, exp_cnt):
|
||||
self.mb_timer.start(self.MB_REGULAR_TIMEOUT)
|
||||
|
||||
self._send_modbus_cmd(Modbus.READ_REGS, 0x3008, 21, logging.DEBUG)
|
||||
|
||||
if 0 == (exp_cnt % 30):
|
||||
# logging.info("Regular Modbus Status request")
|
||||
self._send_modbus_cmd(Modbus.READ_REGS, 0x2007, 2, logging.DEBUG)
|
||||
|
||||
def at_cmd_forbidden(self, cmd: str, connection: str) -> bool:
|
||||
return not cmd.startswith(tuple(self.at_acl[connection]['allow'])) or \
|
||||
cmd.startswith(tuple(self.at_acl[connection]['block']))
|
||||
|
||||
async def send_at_cmd(self, AT_cmd: str) -> None:
|
||||
if self.state != self.STATE_UP:
|
||||
logger.warn(f'[{self.node_id}] ignore AT+ cmd,'
|
||||
' as the state is not UP')
|
||||
if self.state != State.up:
|
||||
logger.warning(f'[{self.node_id}] ignore AT+ cmd,'
|
||||
' as the state is not UP')
|
||||
return
|
||||
AT_cmd = AT_cmd.strip()
|
||||
|
||||
@@ -375,8 +402,7 @@ class SolarmanV5(Message):
|
||||
node_id = self.node_id
|
||||
key = 'at_resp'
|
||||
logger.info(f'{key}: {data_json}')
|
||||
asyncio.ensure_future(
|
||||
self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json)) # noqa: E501
|
||||
await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
|
||||
return
|
||||
|
||||
self.forward_at_cmd_resp = False
|
||||
@@ -465,7 +491,9 @@ class SolarmanV5(Message):
|
||||
self.__process_data(ftype)
|
||||
self.__forward_msg()
|
||||
self.__send_ack_rsp(0x1210, ftype)
|
||||
self.state = self.STATE_UP
|
||||
if self.state is not State.up:
|
||||
self.state = State.up
|
||||
self.mb_timer.start(self.MB_START_TIMEOUT)
|
||||
|
||||
def msg_sync_start(self):
|
||||
data = self._recv_buffer[self.header_len:]
|
||||
@@ -499,13 +527,15 @@ class SolarmanV5(Message):
|
||||
__forward_msg):
|
||||
self.inc_counter('Modbus_Command')
|
||||
else:
|
||||
logger.error('Invalid Modbus Msg')
|
||||
self.inc_counter('Invalid_Msg_Format')
|
||||
return
|
||||
|
||||
self.__forward_msg()
|
||||
|
||||
async def publish_mqtt(self, key, data):
|
||||
await self.mqtt.publish(key, data) # pragma: no cover
|
||||
def publish_mqtt(self, key, data):
|
||||
asyncio.ensure_future(
|
||||
self.mqtt.publish(key, data))
|
||||
|
||||
def get_cmd_rsp_log_lvl(self) -> int:
|
||||
ftype = self._recv_buffer[self.header_len]
|
||||
@@ -529,8 +559,7 @@ class SolarmanV5(Message):
|
||||
node_id = self.node_id
|
||||
key = 'at_resp'
|
||||
logger.info(f'{key}: {data_json}')
|
||||
asyncio.ensure_future(
|
||||
self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json)) # noqa: E501
|
||||
self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
|
||||
return
|
||||
elif ftype == self.MB_RTU_CMD:
|
||||
valid = data[1]
|
||||
@@ -561,7 +590,9 @@ class SolarmanV5(Message):
|
||||
|
||||
self.__forward_msg()
|
||||
self.__send_ack_rsp(0x1710, ftype)
|
||||
self.state = self.STATE_UP
|
||||
if self.state is not State.up:
|
||||
self.state = State.up
|
||||
self.mb_timer.start(self.MB_START_TIMEOUT)
|
||||
|
||||
def msg_sync_end(self):
|
||||
data = self._recv_buffer[self.header_len:]
|
||||
|
||||
@@ -343,7 +343,7 @@ class Infos:
|
||||
dict[counter] -= 1
|
||||
|
||||
def ha_proxy_confs(self, ha_prfx: str, node_id: str, snr: str) \
|
||||
-> Generator[tuple[dict, str], None, None]:
|
||||
-> Generator[tuple[str, str, str, str], None, None]:
|
||||
'''Generator function yields json register struct for home-assistant
|
||||
auto configuration and the unique entity string, for all proxy
|
||||
registers
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[loggers]
|
||||
keys=root,tracer,mesg,conn,data,mqtt
|
||||
keys=root,tracer,mesg,conn,data,mqtt,asyncio
|
||||
|
||||
[handlers]
|
||||
keys=console_handler,file_handler_name1,file_handler_name2
|
||||
@@ -24,6 +24,12 @@ handlers=console_handler,file_handler_name1
|
||||
propagate=0
|
||||
qualname=mqtt
|
||||
|
||||
[logger_asyncio]
|
||||
level=INFO
|
||||
handlers=console_handler,file_handler_name1
|
||||
propagate=0
|
||||
qualname=asyncio
|
||||
|
||||
[logger_data]
|
||||
level=DEBUG
|
||||
handlers=file_handler_name1
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import weakref
|
||||
from typing import Callable
|
||||
from typing import Callable, Generator
|
||||
from enum import Enum
|
||||
|
||||
|
||||
if __name__ == "app.src.messages":
|
||||
@@ -45,21 +46,32 @@ def hex_dump_memory(level, info, data, num):
|
||||
|
||||
|
||||
class IterRegistry(type):
|
||||
def __iter__(cls):
|
||||
def __iter__(cls) -> Generator['Message', None, None]:
|
||||
for ref in cls._registry:
|
||||
obj = ref()
|
||||
if obj is not None:
|
||||
yield obj
|
||||
|
||||
|
||||
class State(Enum):
|
||||
'''state of the logical connection'''
|
||||
init = 0
|
||||
'''just created'''
|
||||
received = 1
|
||||
'''at least one packet received'''
|
||||
up = 2
|
||||
'''at least one cmd-rsp transaction'''
|
||||
pend = 3
|
||||
'''inverter transaction pending, don't send MODBUS cmds'''
|
||||
closed = 4
|
||||
'''connection closed'''
|
||||
|
||||
|
||||
class Message(metaclass=IterRegistry):
|
||||
_registry = []
|
||||
STATE_INIT = 0
|
||||
STATE_UP = 2
|
||||
STATE_CLOSED = 3
|
||||
|
||||
def __init__(self, server_side: bool, send_modbus_cb:
|
||||
Callable[[bytes, int, str], None], mb_timeout):
|
||||
Callable[[bytes, int, str], None], mb_timeout: int):
|
||||
self._registry.append(weakref.ref(self))
|
||||
|
||||
self.server_side = server_side
|
||||
@@ -78,7 +90,7 @@ class Message(metaclass=IterRegistry):
|
||||
self._send_buffer = bytearray(0)
|
||||
self._forward_buffer = bytearray(0)
|
||||
self.new_data = {}
|
||||
self.state = self.STATE_INIT
|
||||
self.state = State.init
|
||||
|
||||
'''
|
||||
Empty methods, that have to be implemented in any child class which
|
||||
@@ -97,7 +109,7 @@ class Message(metaclass=IterRegistry):
|
||||
'''
|
||||
def close(self) -> None:
|
||||
if self.mb:
|
||||
del self.mb
|
||||
self.mb.close()
|
||||
self.mb = None
|
||||
pass # pragma: no cover
|
||||
|
||||
|
||||
@@ -105,7 +105,17 @@ class Modbus():
|
||||
self.req_pend = False
|
||||
self.tim = None
|
||||
|
||||
def close(self):
|
||||
"""free the queue and erase the callback handlers"""
|
||||
logging.debug('Modbus close:')
|
||||
self.__stop_timer()
|
||||
self.rsp_handler = None
|
||||
self.snd_handler = None
|
||||
while not self.que.empty:
|
||||
self.que.get_nowait()
|
||||
|
||||
def __del__(self):
|
||||
"""log statistics on the deleting of a MODBUS instance"""
|
||||
logging.debug(f'Modbus __del__:\n {self.counter}')
|
||||
|
||||
def build_msg(self, addr: int, func: int, reg: int, val: int,
|
||||
@@ -243,6 +253,7 @@ class Modbus():
|
||||
# logging.debug(f'Modbus stop timer {self}')
|
||||
if self.tim:
|
||||
self.tim.cancel()
|
||||
self.tim = None
|
||||
|
||||
def __timeout_cb(self) -> None:
|
||||
'''Rsponse timeout handler retransmit pdu or send next pdu'''
|
||||
|
||||
35
app/src/my_timer.py
Normal file
35
app/src/my_timer.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from itertools import count
|
||||
|
||||
|
||||
class Timer:
|
||||
def __init__(self, cb, id_str: str = ''):
|
||||
self.__timeout_cb = cb
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.tim = None
|
||||
self.id_str = id_str
|
||||
self.exp_count = count(0)
|
||||
|
||||
def start(self, timeout: float) -> None:
|
||||
'''Start timer with timeout seconds'''
|
||||
if self.tim:
|
||||
self.tim.cancel()
|
||||
self.tim = self.loop.call_later(timeout, self.__timeout)
|
||||
logging.debug(f'[{self.id_str}]Start timer')
|
||||
|
||||
def stop(self) -> None:
|
||||
'''Stop timer'''
|
||||
logging.debug(f'[{self.id_str}]Stop timer')
|
||||
if self.tim:
|
||||
self.tim.cancel()
|
||||
self.tim = None
|
||||
|
||||
def __timeout(self) -> None:
|
||||
'''timer expired handler'''
|
||||
logging.debug(f'[{self.id_str}]Timer expired')
|
||||
self.__timeout_cb(next(self.exp_count))
|
||||
|
||||
def close(self) -> None:
|
||||
self.stop()
|
||||
self.__timeout_cb = None
|
||||
@@ -3,8 +3,6 @@ import json
|
||||
from mqtt import Mqtt
|
||||
from aiocron import crontab
|
||||
from infos import ClrAtMidnight
|
||||
from modbus import Modbus
|
||||
from messages import Message
|
||||
|
||||
logger_mqtt = logging.getLogger('mqtt')
|
||||
|
||||
@@ -21,9 +19,6 @@ class Schedule:
|
||||
|
||||
crontab('0 0 * * *', func=cls.atmidnight, start=True)
|
||||
|
||||
# every minute
|
||||
crontab('* * * * *', func=cls.regular_modbus_cmds, start=True)
|
||||
|
||||
@classmethod
|
||||
async def atmidnight(cls) -> None:
|
||||
'''Clear daily counters at midnight'''
|
||||
@@ -33,15 +28,3 @@ class Schedule:
|
||||
logger_mqtt.debug(f'{key}: {data}')
|
||||
data_json = json.dumps(data)
|
||||
await cls.mqtt.publish(f"{key}", data_json)
|
||||
|
||||
@classmethod
|
||||
async def regular_modbus_cmds(cls):
|
||||
for m in Message:
|
||||
if m.server_side:
|
||||
fnc = getattr(m, "send_modbus_cmd", None)
|
||||
if callable(fnc):
|
||||
await fnc(Modbus.READ_REGS, 0x3008, 21, logging.DEBUG)
|
||||
# if 0 == (cls.count % 30):
|
||||
# # logging.info("Regular Modbus Status request")
|
||||
# await fnc(Modbus.READ_REGS, 0x2007, 2, logging.DEBUG)
|
||||
cls.count += 1
|
||||
|
||||
@@ -2,6 +2,8 @@ import logging
|
||||
import asyncio
|
||||
import signal
|
||||
import os
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from aiohttp import web
|
||||
from logging import config # noqa F401
|
||||
from messages import Message
|
||||
from inverter import Inverter
|
||||
@@ -10,25 +12,83 @@ from gen3plus.inverter_g3p import InverterG3P
|
||||
from scheduler import Schedule
|
||||
from config import Config
|
||||
|
||||
routes = web.RouteTableDef()
|
||||
proxy_is_up = False
|
||||
|
||||
async def handle_client(reader, writer):
|
||||
|
||||
@routes.get('/')
|
||||
async def hello(request):
|
||||
return web.Response(text="Hello, world")
|
||||
|
||||
|
||||
@routes.get('/-/ready')
|
||||
async def ready(request):
|
||||
if proxy_is_up:
|
||||
status = 200
|
||||
text = 'Is ready'
|
||||
else:
|
||||
status = 503
|
||||
text = 'Not ready'
|
||||
return web.Response(status=status, text=text)
|
||||
|
||||
|
||||
@routes.get('/-/healthy')
|
||||
async def healthy(request):
|
||||
|
||||
if proxy_is_up:
|
||||
# logging.info('web reqeust healthy()')
|
||||
for stream in Message:
|
||||
try:
|
||||
res = stream.healthy()
|
||||
if not res:
|
||||
return web.Response(status=503, text="I have a problem")
|
||||
except Exception as err:
|
||||
logging.info(f'Exception:{err}')
|
||||
|
||||
return web.Response(status=200, text="I'm fine")
|
||||
|
||||
|
||||
async def webserver(addr, port):
|
||||
'''coro running our webserver'''
|
||||
app = web.Application()
|
||||
app.add_routes(routes)
|
||||
runner = web.AppRunner(app)
|
||||
|
||||
await runner.setup()
|
||||
site = web.TCPSite(runner, addr, port)
|
||||
await site.start()
|
||||
logging.info(f'HTTP server listen on port: {port}')
|
||||
|
||||
try:
|
||||
# Normal interaction with aiohttp
|
||||
while True:
|
||||
await asyncio.sleep(3600) # sleep forever
|
||||
except asyncio.CancelledError:
|
||||
logging.info('HTTP server cancelled')
|
||||
await runner.cleanup()
|
||||
logging.debug('HTTP cleanup done')
|
||||
|
||||
|
||||
async def handle_client(reader: StreamReader, writer: StreamWriter):
|
||||
'''Handles a new incoming connection and starts an async loop'''
|
||||
|
||||
addr = writer.get_extra_info('peername')
|
||||
await InverterG3(reader, writer, addr).server_loop(addr)
|
||||
|
||||
|
||||
async def handle_client_v2(reader, writer):
|
||||
async def handle_client_v2(reader: StreamReader, writer: StreamWriter):
|
||||
'''Handles a new incoming connection and starts an async loop'''
|
||||
|
||||
addr = writer.get_extra_info('peername')
|
||||
await InverterG3P(reader, writer, addr).server_loop(addr)
|
||||
|
||||
|
||||
async def handle_shutdown(loop):
|
||||
async def handle_shutdown(web_task):
|
||||
'''Close all TCP connections and stop the event loop'''
|
||||
|
||||
logging.info('Shutdown due to SIGTERM')
|
||||
global proxy_is_up
|
||||
proxy_is_up = False
|
||||
|
||||
#
|
||||
# first, disc all open TCP connections gracefully
|
||||
@@ -38,7 +98,7 @@ async def handle_shutdown(loop):
|
||||
await asyncio.wait_for(stream.disc(), 2)
|
||||
except Exception:
|
||||
pass
|
||||
logging.info('Disconnecting done')
|
||||
logging.info('Proxy disconnecting done')
|
||||
|
||||
#
|
||||
# second, close all open TCP connections
|
||||
@@ -46,12 +106,20 @@ async def handle_shutdown(loop):
|
||||
for stream in Message:
|
||||
stream.close()
|
||||
|
||||
#
|
||||
# at last, we stop the loop
|
||||
#
|
||||
loop.stop()
|
||||
await asyncio.sleep(0.1) # give time for closing
|
||||
logging.info('Proxy closing done')
|
||||
|
||||
logging.info('Shutdown complete')
|
||||
#
|
||||
# third, cancel the web server
|
||||
#
|
||||
web_task.cancel()
|
||||
await web_task
|
||||
|
||||
#
|
||||
# at last, start a coro for stopping the loop
|
||||
#
|
||||
logging.debug("Stop event loop")
|
||||
loop.stop()
|
||||
|
||||
|
||||
def get_log_level() -> int:
|
||||
@@ -84,17 +152,28 @@ if __name__ == "__main__":
|
||||
logging.getLogger('conn').setLevel(log_level)
|
||||
logging.getLogger('data').setLevel(log_level)
|
||||
logging.getLogger('tracer').setLevel(log_level)
|
||||
logging.getLogger('asyncio').setLevel(log_level)
|
||||
# logging.getLogger('mqtt').setLevel(log_level)
|
||||
|
||||
# read config file
|
||||
Config.class_init()
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
# read config file
|
||||
ConfigErr = Config.class_init()
|
||||
if ConfigErr is not None:
|
||||
logging.info(f'ConfigErr: {ConfigErr}')
|
||||
Inverter.class_init()
|
||||
Schedule.start()
|
||||
|
||||
#
|
||||
# Create tasks for our listening servers. These must be tasks! If we call
|
||||
# start_server directly out of our main task, the eventloop will be blocked
|
||||
# and we can't receive and handle the UNIX signals!
|
||||
#
|
||||
loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005))
|
||||
loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000))
|
||||
web_task = loop.create_task(webserver('0.0.0.0', 8127))
|
||||
|
||||
#
|
||||
# Register some UNIX Signal handler for a gracefully server shutdown
|
||||
# on Docker restart and stop
|
||||
@@ -102,22 +181,18 @@ if __name__ == "__main__":
|
||||
for signame in ('SIGINT', 'SIGTERM'):
|
||||
loop.add_signal_handler(getattr(signal, signame),
|
||||
lambda loop=loop: asyncio.create_task(
|
||||
handle_shutdown(loop)))
|
||||
|
||||
#
|
||||
# Create taska for our listening servera. These must be tasks! If we call
|
||||
# start_server directly out of our main task, the eventloop will be blocked
|
||||
# and we can't receive and handle the UNIX signals!
|
||||
#
|
||||
loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005))
|
||||
loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000))
|
||||
handle_shutdown(web_task)))
|
||||
|
||||
loop.set_debug(log_level == logging.DEBUG)
|
||||
try:
|
||||
if ConfigErr is None:
|
||||
proxy_is_up = True
|
||||
loop.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
logging.info("Event loop is stopped")
|
||||
Inverter.class_close(loop)
|
||||
logging.info('Close event loop')
|
||||
logging.debug('Close event loop')
|
||||
loop.close()
|
||||
logging.info(f'Finally, exit Server "{serv_name}"')
|
||||
|
||||
Reference in New Issue
Block a user