isolate Modbus fix
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
import logging
|
||||
import traceback
|
||||
import time
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from messages import hex_dump_memory
|
||||
from typing import Self
|
||||
@@ -18,8 +17,6 @@ class AsyncStream():
|
||||
self.addr = addr
|
||||
self.r_addr = ''
|
||||
self.l_addr = ''
|
||||
self.proc_start = None # start processing start timestamp
|
||||
self.proc_max = 0
|
||||
|
||||
async def server_loop(self, addr: str) -> None:
|
||||
'''Loop for receiving messages from the inverter (server-side)'''
|
||||
@@ -64,14 +61,8 @@ class AsyncStream():
|
||||
"""Async loop handler for precessing all received messages"""
|
||||
self.r_addr = self.writer.get_extra_info('peername')
|
||||
self.l_addr = self.writer.get_extra_info('sockname')
|
||||
self.proc_start = time.time()
|
||||
while True:
|
||||
try:
|
||||
proc = time.time() - self.proc_start
|
||||
if proc > self.proc_max:
|
||||
self.proc_max = proc
|
||||
self.proc_start = None
|
||||
|
||||
await self.__async_read()
|
||||
|
||||
if self.unique_id:
|
||||
@@ -126,15 +117,6 @@ class AsyncStream():
|
||||
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
|
||||
self.writer.close()
|
||||
|
||||
def healthy(self) -> bool:
|
||||
elapsed = 0
|
||||
if self.proc_start is not None:
|
||||
elapsed = time.time() - self.proc_start
|
||||
logging.debug('async_stream healthy() elapsed: '
|
||||
f'{round(1000*elapsed)}ms'
|
||||
f' max:{round(1000*self.proc_max)}ms')
|
||||
return elapsed < 5
|
||||
|
||||
'''
|
||||
Our private methods
|
||||
'''
|
||||
@@ -142,7 +124,6 @@ class AsyncStream():
|
||||
"""Async read handler to read received data from TCP stream"""
|
||||
data = await self.reader.read(4096)
|
||||
if data:
|
||||
self.proc_start = time.time()
|
||||
self._recv_buffer += data
|
||||
self.read() # call read in parent class
|
||||
else:
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
import shutil
|
||||
import tomllib
|
||||
import logging
|
||||
from typing import Tuple
|
||||
from schema import Schema, And, Or, Use, Optional
|
||||
|
||||
|
||||
@@ -85,7 +84,7 @@ class Config():
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def class_init(cls) -> None | str: # pragma: no cover
|
||||
def class_init(cls): # pragma: no cover
|
||||
try:
|
||||
# make the default config transparaent by copying it
|
||||
# in the config.example file
|
||||
@@ -95,12 +94,11 @@ class Config():
|
||||
"config/config.example.toml")
|
||||
except Exception:
|
||||
pass
|
||||
return cls.read()
|
||||
cls.read()
|
||||
|
||||
@classmethod
|
||||
def _read_config_file(cls) -> Tuple[dict, None | str]: # pragma: no cover
|
||||
def _read_config_file(cls) -> dict: # pragma: no cover
|
||||
usr_config = {}
|
||||
err = None
|
||||
|
||||
try:
|
||||
with open("config/config.toml", "rb") as f:
|
||||
@@ -112,7 +110,7 @@ class Config():
|
||||
'\n To create the missing config.toml file, '
|
||||
'you can rename the template config.example.toml\n'
|
||||
' and customize it for your scenario.\n')
|
||||
return usr_config, err
|
||||
return usr_config
|
||||
|
||||
@classmethod
|
||||
def read(cls, path='') -> None | str:
|
||||
@@ -131,7 +129,7 @@ class Config():
|
||||
|
||||
# overwrite the default values, with values from
|
||||
# the config.toml file
|
||||
usr_config, err = cls._read_config_file()
|
||||
usr_config = cls._read_config_file()
|
||||
|
||||
# merge the default and the user config
|
||||
config = def_config.copy()
|
||||
|
||||
@@ -31,10 +31,6 @@ class ConnectionG3(AsyncStream, Talent):
|
||||
async def async_publ_mqtt(self) -> None:
|
||||
pass
|
||||
|
||||
def healthy(self) -> bool:
|
||||
logger.debug('ConnectionG3 healthy()')
|
||||
return AsyncStream.healthy(self)
|
||||
|
||||
'''
|
||||
Our private methods
|
||||
'''
|
||||
|
||||
@@ -31,10 +31,6 @@ class ConnectionG3P(AsyncStream, SolarmanV5):
|
||||
async def async_publ_mqtt(self) -> None:
|
||||
pass
|
||||
|
||||
def healthy(self) -> bool:
|
||||
logger.debug('ConnectionG3P healthy()')
|
||||
return AsyncStream.healthy(self)
|
||||
|
||||
'''
|
||||
Our private methods
|
||||
'''
|
||||
|
||||
@@ -3,7 +3,6 @@ import asyncio
|
||||
import signal
|
||||
import os
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from aiohttp import web
|
||||
from logging import config # noqa F401
|
||||
from messages import Message
|
||||
from inverter import Inverter
|
||||
@@ -12,51 +11,6 @@ from gen3plus.inverter_g3p import InverterG3P
|
||||
from scheduler import Schedule
|
||||
from config import Config
|
||||
|
||||
routes = web.RouteTableDef()
|
||||
proxy_is_up = False
|
||||
|
||||
|
||||
@routes.get('/')
|
||||
async def hello(request):
|
||||
return web.Response(text="Hello, world")
|
||||
|
||||
|
||||
@routes.get('/-/ready')
|
||||
async def ready(request):
|
||||
if proxy_is_up:
|
||||
status = 200
|
||||
text = 'Is ready'
|
||||
else:
|
||||
status = 503
|
||||
text = 'Not ready'
|
||||
return web.Response(status=status, text=text)
|
||||
|
||||
|
||||
@routes.get('/-/healthy')
|
||||
async def healthy(request):
|
||||
|
||||
if proxy_is_up:
|
||||
# logging.info('web request healthy()')
|
||||
for stream in Message:
|
||||
try:
|
||||
res = stream.healthy()
|
||||
if not res:
|
||||
return web.Response(status=503, text="I have a problem")
|
||||
except Exception as err:
|
||||
logging.info(f'Exception:{err}')
|
||||
|
||||
return web.Response(status=200, text="I'm fine")
|
||||
|
||||
|
||||
async def webserver(runner, addr, port):
|
||||
await runner.setup()
|
||||
site = web.TCPSite(runner, addr, port)
|
||||
await site.start()
|
||||
logging.info(f'HTTP server listen on port: {port}')
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(3600) # sleep forever
|
||||
|
||||
|
||||
async def handle_client(reader: StreamReader, writer: StreamWriter):
|
||||
'''Handles a new incoming connection and starts an async loop'''
|
||||
@@ -72,12 +26,10 @@ async def handle_client_v2(reader: StreamReader, writer: StreamWriter):
|
||||
await InverterG3P(reader, writer, addr).server_loop(addr)
|
||||
|
||||
|
||||
async def handle_shutdown(loop, runner):
|
||||
async def handle_shutdown(loop):
|
||||
'''Close all TCP connections and stop the event loop'''
|
||||
|
||||
logging.info('Shutdown due to SIGTERM')
|
||||
await runner.cleanup()
|
||||
logging.info('HTTP server stopped')
|
||||
|
||||
#
|
||||
# first, disc all open TCP connections gracefully
|
||||
@@ -135,22 +87,15 @@ if __name__ == "__main__":
|
||||
logging.getLogger('tracer').setLevel(log_level)
|
||||
# logging.getLogger('mqtt').setLevel(log_level)
|
||||
|
||||
# read config file
|
||||
Config.class_init()
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
# read config file
|
||||
ConfigErr = Config.class_init()
|
||||
logging.debug(f'ConfigErr: {ConfigErr}')
|
||||
Inverter.class_init()
|
||||
Schedule.start()
|
||||
|
||||
#
|
||||
# Setup webserver application and runner
|
||||
#
|
||||
app = web.Application()
|
||||
app.add_routes(routes)
|
||||
runner = web.AppRunner(app)
|
||||
|
||||
#
|
||||
# Register some UNIX Signal handler for a gracefully server shutdown
|
||||
# on Docker restart and stop
|
||||
@@ -158,7 +103,7 @@ if __name__ == "__main__":
|
||||
for signame in ('SIGINT', 'SIGTERM'):
|
||||
loop.add_signal_handler(getattr(signal, signame),
|
||||
lambda loop=loop: asyncio.create_task(
|
||||
handle_shutdown(loop, runner)))
|
||||
handle_shutdown(loop)))
|
||||
|
||||
#
|
||||
# Create tasks for our listening servers. These must be tasks! If we call
|
||||
@@ -167,16 +112,12 @@ if __name__ == "__main__":
|
||||
#
|
||||
loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005))
|
||||
loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000))
|
||||
loop.create_task(webserver(runner, '0.0.0.0', 8127))
|
||||
|
||||
try:
|
||||
if ConfigErr is None:
|
||||
proxy_is_up = True
|
||||
loop.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
proxy_is_up = False
|
||||
Inverter.class_close(loop)
|
||||
logging.info('Close event loop')
|
||||
loop.close()
|
||||
|
||||
Reference in New Issue
Block a user