Compare commits
3 Commits
renovate/a
...
s-allius/i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d8b9a65d3 | ||
|
|
774c4c82fa | ||
|
|
8bf8c2e85d |
@@ -1,4 +1,4 @@
|
||||
aiomqtt==2.3.2
|
||||
schema==0.7.7
|
||||
aiocron==2.1
|
||||
aiohttp==3.11.16
|
||||
quart==0.20
|
||||
@@ -96,8 +96,8 @@ class Proxy():
|
||||
Infos.new_stat_data[key] = False
|
||||
|
||||
@classmethod
|
||||
def class_close(cls, loop) -> None: # pragma: no cover
|
||||
async def class_close(cls, loop) -> None: # pragma: no cover
|
||||
logging.debug('Proxy.class_close')
|
||||
logging.info('Close MQTT Task')
|
||||
loop.run_until_complete(cls.mqtt.close())
|
||||
await cls.mqtt.close()
|
||||
cls.mqtt = None
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
import logging
|
||||
import asyncio
|
||||
import logging.handlers
|
||||
import signal
|
||||
import os
|
||||
import argparse
|
||||
from asyncio import StreamReader, StreamWriter
|
||||
from aiohttp import web
|
||||
from quart import Quart, Response
|
||||
from logging import config # noqa F401
|
||||
from proxy import Proxy
|
||||
from inverter_ifc import InverterIfc
|
||||
@@ -18,61 +17,52 @@ from cnf.config_read_toml import ConfigReadToml
|
||||
from cnf.config_read_json import ConfigReadJson
|
||||
from modbus_tcp import ModbusTcp
|
||||
|
||||
routes = web.RouteTableDef()
|
||||
proxy_is_up = False
|
||||
|
||||
class ProxyState:
|
||||
_is_up = False
|
||||
|
||||
@staticmethod
|
||||
def is_up() -> bool:
|
||||
return ProxyState._is_up
|
||||
|
||||
@staticmethod
|
||||
def set_up(value: bool):
|
||||
ProxyState._is_up = value
|
||||
|
||||
|
||||
@routes.get('/')
|
||||
async def hello(request):
|
||||
return web.Response(text="Hello, world")
|
||||
app = Quart(__name__)
|
||||
|
||||
|
||||
@routes.get('/-/ready')
|
||||
async def ready(request):
|
||||
if proxy_is_up:
|
||||
@app.route('/')
|
||||
async def hello():
|
||||
return Response(response="Hello, world")
|
||||
|
||||
|
||||
@app.route('/-/ready')
|
||||
async def ready():
|
||||
if ProxyState.is_up():
|
||||
status = 200
|
||||
text = 'Is ready'
|
||||
else:
|
||||
status = 503
|
||||
text = 'Not ready'
|
||||
return web.Response(status=status, text=text)
|
||||
return Response(status=status, response=text)
|
||||
|
||||
|
||||
@routes.get('/-/healthy')
|
||||
async def healthy(request):
|
||||
@app.route('/-/healthy')
|
||||
async def healthy():
|
||||
|
||||
if proxy_is_up:
|
||||
if ProxyState.is_up():
|
||||
# logging.info('web reqeust healthy()')
|
||||
for inverter in InverterIfc:
|
||||
try:
|
||||
res = inverter.healthy()
|
||||
if not res:
|
||||
return web.Response(status=503, text="I have a problem")
|
||||
return Response(status=503, response="I have a problem")
|
||||
except Exception as err:
|
||||
logging.info(f'Exception:{err}')
|
||||
|
||||
return web.Response(status=200, text="I'm fine")
|
||||
|
||||
|
||||
async def webserver(addr, port):
|
||||
'''coro running our webserver'''
|
||||
app = web.Application()
|
||||
app.add_routes(routes)
|
||||
runner = web.AppRunner(app)
|
||||
|
||||
await runner.setup()
|
||||
site = web.TCPSite(runner, addr, port)
|
||||
await site.start()
|
||||
logging.info(f'HTTP server listen on port: {port}')
|
||||
|
||||
try:
|
||||
# Normal interaction with aiohttp
|
||||
while True:
|
||||
await asyncio.sleep(3600) # sleep forever
|
||||
except asyncio.CancelledError:
|
||||
logging.info('HTTP server cancelled')
|
||||
await runner.cleanup()
|
||||
logging.debug('HTTP cleanup done')
|
||||
return Response(status=200, response="I'm fine")
|
||||
|
||||
|
||||
async def handle_client(reader: StreamReader, writer: StreamWriter, inv_class):
|
||||
@@ -82,12 +72,13 @@ async def handle_client(reader: StreamReader, writer: StreamWriter, inv_class):
|
||||
await inv.local.ifc.server_loop()
|
||||
|
||||
|
||||
async def handle_shutdown(loop, web_task):
|
||||
@app.after_serving
|
||||
async def handle_shutdown(): # pragma: no cover
|
||||
'''Close all TCP connections and stop the event loop'''
|
||||
|
||||
logging.info('Shutdown due to SIGTERM')
|
||||
global proxy_is_up
|
||||
proxy_is_up = False
|
||||
loop = asyncio.get_event_loop()
|
||||
ProxyState.set_up(False)
|
||||
|
||||
#
|
||||
# first, disc all open TCP connections gracefully
|
||||
@@ -97,24 +88,16 @@ async def handle_shutdown(loop, web_task):
|
||||
|
||||
logging.info('Proxy disconnecting done')
|
||||
|
||||
#
|
||||
# second, cancel the web server
|
||||
#
|
||||
web_task.cancel()
|
||||
await web_task
|
||||
|
||||
#
|
||||
# now cancel all remaining (pending) tasks
|
||||
#
|
||||
pending = asyncio.all_tasks()
|
||||
for task in pending:
|
||||
for task in asyncio.all_tasks():
|
||||
if task == asyncio.current_task():
|
||||
continue
|
||||
task.cancel()
|
||||
logging.info('Proxy cancelling done')
|
||||
|
||||
#
|
||||
# at last, start a coro for stopping the loop
|
||||
#
|
||||
logging.debug("Stop event loop")
|
||||
loop.stop()
|
||||
await Proxy.class_close(loop)
|
||||
|
||||
|
||||
def get_log_level() -> int | None:
|
||||
@@ -214,27 +197,20 @@ def main(): # pragma: no cover
|
||||
loop.create_task(asyncio.start_server(lambda r, w, i=inv_class:
|
||||
handle_client(r, w, i),
|
||||
'0.0.0.0', port))
|
||||
web_task = loop.create_task(webserver('0.0.0.0', 8127))
|
||||
|
||||
#
|
||||
# Register some UNIX Signal handler for a gracefully server shutdown
|
||||
# on Docker restart and stop
|
||||
#
|
||||
for signame in ('SIGINT', 'SIGTERM'):
|
||||
loop.add_signal_handler(getattr(signal, signame),
|
||||
lambda loop=loop: asyncio.create_task(
|
||||
handle_shutdown(loop, web_task)))
|
||||
|
||||
loop.set_debug(log_level == logging.DEBUG)
|
||||
try:
|
||||
global proxy_is_up
|
||||
proxy_is_up = True
|
||||
loop.run_forever()
|
||||
ProxyState.set_up(True)
|
||||
logging.info("Start Quart")
|
||||
app.run(host='0.0.0.0', port=8127, use_reloader=False, loop=loop)
|
||||
logging.info("Quart stopped")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except asyncio.exceptions.CancelledError:
|
||||
logging.info("Quart cancelled")
|
||||
|
||||
finally:
|
||||
logging.info("Event loop is stopped")
|
||||
Proxy.class_close(loop)
|
||||
logging.debug('Close event loop')
|
||||
loop.close()
|
||||
logging.info(f'Finally, exit Server "{serv_name}"')
|
||||
|
||||
@@ -3,7 +3,9 @@ import pytest
|
||||
import logging
|
||||
import os
|
||||
from mock import patch
|
||||
from server import get_log_level
|
||||
from server import get_log_level, app, ProxyState
|
||||
|
||||
pytest_plugins = ('pytest_asyncio',)
|
||||
|
||||
def test_get_log_level():
|
||||
|
||||
@@ -30,3 +32,47 @@ def test_get_log_level():
|
||||
with patch.dict(os.environ, {'LOG_LVL': 'UNKNOWN'}):
|
||||
log_lvl = get_log_level()
|
||||
assert log_lvl == None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_home():
|
||||
"""Test the home route."""
|
||||
client = app.test_client()
|
||||
response = await client.get('/')
|
||||
assert response.status_code == 200
|
||||
result = await response.get_data()
|
||||
assert result == b"Hello, world"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ready():
|
||||
"""Test the ready route."""
|
||||
|
||||
ProxyState.set_up(False)
|
||||
client = app.test_client()
|
||||
response = await client.get('/-/ready')
|
||||
assert response.status_code == 503
|
||||
result = await response.get_data()
|
||||
assert result == b"Not ready"
|
||||
|
||||
ProxyState.set_up(True)
|
||||
response = await client.get('/-/ready')
|
||||
assert response.status_code == 200
|
||||
result = await response.get_data()
|
||||
assert result == b"Is ready"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_healthy():
|
||||
"""Test the healthy route."""
|
||||
|
||||
ProxyState.set_up(False)
|
||||
client = app.test_client()
|
||||
response = await client.get('/-/healthy')
|
||||
assert response.status_code == 200
|
||||
result = await response.get_data()
|
||||
assert result == b"I'm fine"
|
||||
|
||||
ProxyState.set_up(True)
|
||||
response = await client.get('/-/healthy')
|
||||
assert response.status_code == 200
|
||||
result = await response.get_data()
|
||||
assert result == b"I'm fine"
|
||||
|
||||
Reference in New Issue
Block a user