move code from app.py into server.py
This commit is contained in:
@@ -67,4 +67,4 @@ EXPOSE 5005 8127 10000
|
|||||||
|
|
||||||
# command to run on container start
|
# command to run on container start
|
||||||
ENTRYPOINT ["/root/entrypoint.sh"]
|
ENTRYPOINT ["/root/entrypoint.sh"]
|
||||||
CMD [ "python3", "./app.py" ]
|
CMD [ "python3", "./server.py" ]
|
||||||
|
|||||||
164
app/src/app.py
164
app/src/app.py
@@ -1,164 +0,0 @@
|
|||||||
import logging
|
|
||||||
import asyncio
|
|
||||||
import logging.handlers
|
|
||||||
from asyncio import StreamReader, StreamWriter
|
|
||||||
from quart import Quart, Response
|
|
||||||
from logging import config # noqa F401
|
|
||||||
from proxy import Proxy
|
|
||||||
from inverter_ifc import InverterIfc
|
|
||||||
from gen3.inverter_g3 import InverterG3
|
|
||||||
from gen3plus.inverter_g3p import InverterG3P
|
|
||||||
from scheduler import Schedule
|
|
||||||
from server import Server
|
|
||||||
from web import Web
|
|
||||||
from web.wrapper import url_for
|
|
||||||
|
|
||||||
from modbus_tcp import ModbusTcp
|
|
||||||
|
|
||||||
|
|
||||||
class ProxyState:
|
|
||||||
_is_up = False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def is_up() -> bool:
|
|
||||||
return ProxyState._is_up
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def set_up(value: bool):
|
|
||||||
ProxyState._is_up = value
|
|
||||||
|
|
||||||
|
|
||||||
class HypercornLogHndl:
|
|
||||||
access_hndl = []
|
|
||||||
error_hndl = []
|
|
||||||
must_fix = False
|
|
||||||
HYPERC_ERR = 'hypercorn.error'
|
|
||||||
HYPERC_ACC = 'hypercorn.access'
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def save(cls):
|
|
||||||
cls.access_hndl = logging.getLogger(
|
|
||||||
cls.HYPERC_ACC).handlers
|
|
||||||
cls.error_hndl = logging.getLogger(
|
|
||||||
cls.HYPERC_ERR).handlers
|
|
||||||
cls.must_fix = True
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def restore(cls):
|
|
||||||
if not cls.must_fix:
|
|
||||||
return
|
|
||||||
cls.must_fix = False
|
|
||||||
access_hndl = logging.getLogger(
|
|
||||||
cls.HYPERC_ACC).handlers
|
|
||||||
if access_hndl != cls.access_hndl:
|
|
||||||
print(' * Fix hypercorn.access setting')
|
|
||||||
logging.getLogger(
|
|
||||||
cls.HYPERC_ACC).handlers = cls.access_hndl
|
|
||||||
|
|
||||||
error_hndl = logging.getLogger(
|
|
||||||
cls.HYPERC_ERR).handlers
|
|
||||||
if error_hndl != cls.error_hndl:
|
|
||||||
print(' * Fix hypercorn.error setting')
|
|
||||||
logging.getLogger(
|
|
||||||
cls.HYPERC_ERR).handlers = cls.error_hndl
|
|
||||||
|
|
||||||
|
|
||||||
app = Quart(__name__,
|
|
||||||
template_folder='web/templates',
|
|
||||||
static_folder='web/static')
|
|
||||||
app.secret_key = 'JKLdks.dajlKKKdladkflKwolafallsdfl'
|
|
||||||
app.jinja_env.globals.update(url_for=url_for)
|
|
||||||
|
|
||||||
|
|
||||||
@app.route('/-/ready')
|
|
||||||
async def ready():
|
|
||||||
if ProxyState.is_up():
|
|
||||||
status = 200
|
|
||||||
text = 'Is ready'
|
|
||||||
else:
|
|
||||||
status = 503
|
|
||||||
text = 'Not ready'
|
|
||||||
return Response(status=status, response=text)
|
|
||||||
|
|
||||||
|
|
||||||
@app.route('/-/healthy')
|
|
||||||
async def healthy():
|
|
||||||
|
|
||||||
if ProxyState.is_up():
|
|
||||||
# logging.info('web reqeust healthy()')
|
|
||||||
for inverter in InverterIfc:
|
|
||||||
try:
|
|
||||||
res = inverter.healthy()
|
|
||||||
if not res:
|
|
||||||
return Response(status=503, response="I have a problem")
|
|
||||||
except Exception as err:
|
|
||||||
logging.info(f'Exception:{err}')
|
|
||||||
|
|
||||||
return Response(status=200, response="I'm fine")
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_client(reader: StreamReader, writer: StreamWriter, inv_class):
|
|
||||||
'''Handles a new incoming connection and starts an async loop'''
|
|
||||||
|
|
||||||
with inv_class(reader, writer) as inv:
|
|
||||||
await inv.local.ifc.server_loop()
|
|
||||||
|
|
||||||
|
|
||||||
@app.before_serving
|
|
||||||
async def startup_app():
|
|
||||||
HypercornLogHndl.save()
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
Proxy.class_init()
|
|
||||||
Schedule.start()
|
|
||||||
ModbusTcp(loop)
|
|
||||||
|
|
||||||
for inv_class, port in [(InverterG3, 5005), (InverterG3P, 10000)]:
|
|
||||||
logging.info(f'listen on port: {port} for inverters')
|
|
||||||
loop.create_task(asyncio.start_server(lambda r, w, i=inv_class:
|
|
||||||
handle_client(r, w, i),
|
|
||||||
'0.0.0.0', port))
|
|
||||||
ProxyState.set_up(True)
|
|
||||||
|
|
||||||
|
|
||||||
@app.before_request
|
|
||||||
async def startup_request():
|
|
||||||
HypercornLogHndl.restore()
|
|
||||||
|
|
||||||
|
|
||||||
@app.after_serving
|
|
||||||
async def handle_shutdown(): # pragma: no cover
|
|
||||||
'''Close all TCP connections and stop the event loop'''
|
|
||||||
|
|
||||||
logging.info('Shutdown due to SIGTERM')
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
ProxyState.set_up(False)
|
|
||||||
|
|
||||||
#
|
|
||||||
# first, disc all open TCP connections gracefully
|
|
||||||
#
|
|
||||||
for inverter in InverterIfc:
|
|
||||||
await inverter.disc(True)
|
|
||||||
|
|
||||||
logging.info('Proxy disconnecting done')
|
|
||||||
|
|
||||||
await Proxy.class_close(loop)
|
|
||||||
|
|
||||||
|
|
||||||
server = Server(app, __name__ == "__main__")
|
|
||||||
Web(app, server.trans_path, server.rel_urls)
|
|
||||||
|
|
||||||
if __name__ == "__main__": # pragma: no cover
|
|
||||||
|
|
||||||
try:
|
|
||||||
logging.info("Start Quart")
|
|
||||||
app.run(host='0.0.0.0', port=8127, use_reloader=False,
|
|
||||||
debug=Server.log_level == logging.DEBUG)
|
|
||||||
logging.info("Quart stopped")
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
except asyncio.exceptions.CancelledError:
|
|
||||||
logging.info("Quart cancelled")
|
|
||||||
|
|
||||||
finally:
|
|
||||||
logging.info(f'Finally, exit Server "{Server.serv_name}"')
|
|
||||||
@@ -1,11 +1,25 @@
|
|||||||
import logging
|
import logging
|
||||||
import logging.handlers
|
import logging.handlers
|
||||||
|
from logging import config # noqa F401
|
||||||
|
import asyncio
|
||||||
|
from asyncio import StreamReader, StreamWriter
|
||||||
import os
|
import os
|
||||||
import argparse
|
import argparse
|
||||||
|
from quart import Quart, Response
|
||||||
|
|
||||||
from cnf.config import Config
|
from cnf.config import Config
|
||||||
from cnf.config_read_env import ConfigReadEnv
|
from cnf.config_read_env import ConfigReadEnv
|
||||||
from cnf.config_read_toml import ConfigReadToml
|
from cnf.config_read_toml import ConfigReadToml
|
||||||
from cnf.config_read_json import ConfigReadJson
|
from cnf.config_read_json import ConfigReadJson
|
||||||
|
from web import Web
|
||||||
|
from web.wrapper import url_for
|
||||||
|
from proxy import Proxy
|
||||||
|
from inverter_ifc import InverterIfc
|
||||||
|
from gen3.inverter_g3 import InverterG3
|
||||||
|
from gen3plus.inverter_g3p import InverterG3P
|
||||||
|
from scheduler import Schedule
|
||||||
|
|
||||||
|
from modbus_tcp import ModbusTcp
|
||||||
|
|
||||||
|
|
||||||
class Server():
|
class Server():
|
||||||
@@ -135,3 +149,150 @@ class Server():
|
|||||||
logging.info(f"LOG_LVL : {log_lvl}")
|
logging.info(f"LOG_LVL : {log_lvl}")
|
||||||
|
|
||||||
return switch.get(log_lvl, None)
|
return switch.get(log_lvl, None)
|
||||||
|
|
||||||
|
|
||||||
|
class ProxyState:
|
||||||
|
_is_up = False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def is_up() -> bool:
|
||||||
|
return ProxyState._is_up
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def set_up(value: bool):
|
||||||
|
ProxyState._is_up = value
|
||||||
|
|
||||||
|
|
||||||
|
class HypercornLogHndl:
|
||||||
|
access_hndl = []
|
||||||
|
error_hndl = []
|
||||||
|
must_fix = False
|
||||||
|
HYPERC_ERR = 'hypercorn.error'
|
||||||
|
HYPERC_ACC = 'hypercorn.access'
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def save(cls):
|
||||||
|
cls.access_hndl = logging.getLogger(
|
||||||
|
cls.HYPERC_ACC).handlers
|
||||||
|
cls.error_hndl = logging.getLogger(
|
||||||
|
cls.HYPERC_ERR).handlers
|
||||||
|
cls.must_fix = True
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def restore(cls):
|
||||||
|
if not cls.must_fix:
|
||||||
|
return
|
||||||
|
cls.must_fix = False
|
||||||
|
access_hndl = logging.getLogger(
|
||||||
|
cls.HYPERC_ACC).handlers
|
||||||
|
if access_hndl != cls.access_hndl:
|
||||||
|
print(' * Fix hypercorn.access setting')
|
||||||
|
logging.getLogger(
|
||||||
|
cls.HYPERC_ACC).handlers = cls.access_hndl
|
||||||
|
|
||||||
|
error_hndl = logging.getLogger(
|
||||||
|
cls.HYPERC_ERR).handlers
|
||||||
|
if error_hndl != cls.error_hndl:
|
||||||
|
print(' * Fix hypercorn.error setting')
|
||||||
|
logging.getLogger(
|
||||||
|
cls.HYPERC_ERR).handlers = cls.error_hndl
|
||||||
|
|
||||||
|
|
||||||
|
app = Quart(__name__,
|
||||||
|
template_folder='web/templates',
|
||||||
|
static_folder='web/static')
|
||||||
|
app.secret_key = 'JKLdks.dajlKKKdladkflKwolafallsdfl'
|
||||||
|
app.jinja_env.globals.update(url_for=url_for)
|
||||||
|
server = Server(app, __name__ == "__main__")
|
||||||
|
Web(app, server.trans_path, server.rel_urls)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/-/ready')
|
||||||
|
async def ready():
|
||||||
|
if ProxyState.is_up():
|
||||||
|
status = 200
|
||||||
|
text = 'Is ready'
|
||||||
|
else:
|
||||||
|
status = 503
|
||||||
|
text = 'Not ready'
|
||||||
|
return Response(status=status, response=text)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/-/healthy')
|
||||||
|
async def healthy():
|
||||||
|
|
||||||
|
if ProxyState.is_up():
|
||||||
|
# logging.info('web reqeust healthy()')
|
||||||
|
for inverter in InverterIfc:
|
||||||
|
try:
|
||||||
|
res = inverter.healthy()
|
||||||
|
if not res:
|
||||||
|
return Response(status=503, response="I have a problem")
|
||||||
|
except Exception as err:
|
||||||
|
logging.info(f'Exception:{err}')
|
||||||
|
|
||||||
|
return Response(status=200, response="I'm fine")
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_client(reader: StreamReader, writer: StreamWriter, inv_class):
|
||||||
|
'''Handles a new incoming connection and starts an async loop'''
|
||||||
|
|
||||||
|
with inv_class(reader, writer) as inv:
|
||||||
|
await inv.local.ifc.server_loop()
|
||||||
|
|
||||||
|
|
||||||
|
@app.before_serving
|
||||||
|
async def startup_app():
|
||||||
|
HypercornLogHndl.save()
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
Proxy.class_init()
|
||||||
|
Schedule.start()
|
||||||
|
ModbusTcp(loop)
|
||||||
|
|
||||||
|
for inv_class, port in [(InverterG3, 5005), (InverterG3P, 10000)]:
|
||||||
|
logging.info(f'listen on port: {port} for inverters')
|
||||||
|
loop.create_task(asyncio.start_server(lambda r, w, i=inv_class:
|
||||||
|
handle_client(r, w, i),
|
||||||
|
'0.0.0.0', port))
|
||||||
|
ProxyState.set_up(True)
|
||||||
|
|
||||||
|
|
||||||
|
@app.before_request
|
||||||
|
async def startup_request():
|
||||||
|
HypercornLogHndl.restore()
|
||||||
|
|
||||||
|
|
||||||
|
@app.after_serving
|
||||||
|
async def handle_shutdown(): # pragma: no cover
|
||||||
|
'''Close all TCP connections and stop the event loop'''
|
||||||
|
|
||||||
|
logging.info('Shutdown due to SIGTERM')
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
ProxyState.set_up(False)
|
||||||
|
|
||||||
|
#
|
||||||
|
# first, disc all open TCP connections gracefully
|
||||||
|
#
|
||||||
|
for inverter in InverterIfc:
|
||||||
|
await inverter.disc(True)
|
||||||
|
|
||||||
|
logging.info('Proxy disconnecting done')
|
||||||
|
|
||||||
|
await Proxy.class_close(loop)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": # pragma: no cover
|
||||||
|
|
||||||
|
try:
|
||||||
|
logging.info("Start Quart")
|
||||||
|
app.run(host='0.0.0.0', port=8127, use_reloader=False,
|
||||||
|
debug=server.log_level == logging.DEBUG)
|
||||||
|
logging.info("Quart stopped")
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
except asyncio.exceptions.CancelledError:
|
||||||
|
logging.info("Quart cancelled")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
logging.info(f'Finally, exit Server "{server.serv_name}"')
|
||||||
|
|||||||
@@ -3,124 +3,127 @@ import pytest
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
from mock import patch
|
from mock import patch
|
||||||
from app import app, ProxyState
|
from server import app, Server, ProxyState
|
||||||
from server import Server
|
|
||||||
|
|
||||||
pytest_plugins = ('pytest_asyncio',)
|
pytest_plugins = ('pytest_asyncio',)
|
||||||
|
|
||||||
|
|
||||||
class FakeServer(Server):
|
class TestServerClass:
|
||||||
def __init__(self):
|
class FakeServer(Server):
|
||||||
pass # don't call the suoer(.__init__ for unit tests
|
def __init__(self):
|
||||||
|
pass # don't call the suoer(.__init__ for unit tests
|
||||||
|
|
||||||
def test_get_log_level():
|
def test_get_log_level(self):
|
||||||
s = FakeServer()
|
s = self.FakeServer()
|
||||||
|
|
||||||
with patch.dict(os.environ, {}):
|
with patch.dict(os.environ, {}):
|
||||||
log_lvl = s.get_log_level()
|
log_lvl = s.get_log_level()
|
||||||
assert log_lvl == None
|
assert log_lvl == None
|
||||||
|
|
||||||
with patch.dict(os.environ, {'LOG_LVL': 'DEBUG'}):
|
with patch.dict(os.environ, {'LOG_LVL': 'DEBUG'}):
|
||||||
log_lvl = s.get_log_level()
|
log_lvl = s.get_log_level()
|
||||||
assert log_lvl == logging.DEBUG
|
assert log_lvl == logging.DEBUG
|
||||||
|
|
||||||
with patch.dict(os.environ, {'LOG_LVL': 'INFO'}):
|
with patch.dict(os.environ, {'LOG_LVL': 'INFO'}):
|
||||||
log_lvl = s.get_log_level()
|
log_lvl = s.get_log_level()
|
||||||
assert log_lvl == logging.INFO
|
assert log_lvl == logging.INFO
|
||||||
|
|
||||||
with patch.dict(os.environ, {'LOG_LVL': 'WARN'}):
|
with patch.dict(os.environ, {'LOG_LVL': 'WARN'}):
|
||||||
log_lvl = s.get_log_level()
|
log_lvl = s.get_log_level()
|
||||||
assert log_lvl == logging.WARNING
|
assert log_lvl == logging.WARNING
|
||||||
|
|
||||||
with patch.dict(os.environ, {'LOG_LVL': 'ERROR'}):
|
with patch.dict(os.environ, {'LOG_LVL': 'ERROR'}):
|
||||||
log_lvl = s.get_log_level()
|
log_lvl = s.get_log_level()
|
||||||
assert log_lvl == logging.ERROR
|
assert log_lvl == logging.ERROR
|
||||||
|
|
||||||
with patch.dict(os.environ, {'LOG_LVL': 'UNKNOWN'}):
|
with patch.dict(os.environ, {'LOG_LVL': 'UNKNOWN'}):
|
||||||
log_lvl = s.get_log_level()
|
log_lvl = s.get_log_level()
|
||||||
assert log_lvl == None
|
assert log_lvl == None
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
def test_default_args(self):
|
||||||
async def test_ready():
|
s = self.FakeServer()
|
||||||
"""Test the ready route."""
|
assert s.config_path == './config/'
|
||||||
|
assert s.json_config == ''
|
||||||
|
assert s.toml_config == ''
|
||||||
|
assert s.trans_path == '../translations/'
|
||||||
|
assert s.rel_urls == False
|
||||||
|
assert s.log_path == './log/'
|
||||||
|
assert s.log_backups == 0
|
||||||
|
|
||||||
ProxyState.set_up(False)
|
def test_parse_args_empty(self):
|
||||||
client = app.test_client()
|
s = self.FakeServer()
|
||||||
response = await client.get('/-/ready')
|
s.parse_args([])
|
||||||
assert response.status_code == 503
|
assert s.config_path == './config/'
|
||||||
result = await response.get_data()
|
assert s.json_config == None
|
||||||
assert result == b"Not ready"
|
assert s.toml_config == None
|
||||||
|
assert s.trans_path == '../translations/'
|
||||||
|
assert s.rel_urls == False
|
||||||
|
assert s.log_path == './log/'
|
||||||
|
assert s.log_backups == 0
|
||||||
|
|
||||||
ProxyState.set_up(True)
|
def test_parse_args_short(self):
|
||||||
response = await client.get('/-/ready')
|
s = self.FakeServer()
|
||||||
assert response.status_code == 200
|
s.parse_args(['-r', '-c', '/tmp/my-config', '-j', 'cnf.jsn', '-t', 'cnf.tml', '-tr', '/my/trans/', '-l', '/my_logs/', '-b', '3'])
|
||||||
result = await response.get_data()
|
assert s.config_path == '/tmp/my-config'
|
||||||
assert result == b"Is ready"
|
assert s.json_config == 'cnf.jsn'
|
||||||
|
assert s.toml_config == 'cnf.tml'
|
||||||
|
assert s.trans_path == '/my/trans/'
|
||||||
|
assert s.rel_urls == True
|
||||||
|
assert s.log_path == '/my_logs/'
|
||||||
|
assert s.log_backups == 3
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
def test_parse_args_long(self):
|
||||||
async def test_healthy():
|
s = self.FakeServer()
|
||||||
"""Test the healthy route."""
|
s.parse_args(['--rel_urls', '--config_path', '/tmp/my-config', '--json_config', 'cnf.jsn',
|
||||||
|
'--toml_config', 'cnf.tml', '--trans_path', '/my/trans/', '--log_path', '/my_logs/',
|
||||||
|
'--log_backups', '3'])
|
||||||
|
assert s.config_path == '/tmp/my-config'
|
||||||
|
assert s.json_config == 'cnf.jsn'
|
||||||
|
assert s.toml_config == 'cnf.tml'
|
||||||
|
assert s.trans_path == '/my/trans/'
|
||||||
|
assert s.rel_urls == True
|
||||||
|
assert s.log_path == '/my_logs/'
|
||||||
|
assert s.log_backups == 3
|
||||||
|
|
||||||
ProxyState.set_up(False)
|
def test_parse_args_invalid(self):
|
||||||
client = app.test_client()
|
s = self.FakeServer()
|
||||||
response = await client.get('/-/healthy')
|
with pytest.raises(SystemExit) as exc_info:
|
||||||
assert response.status_code == 200
|
s.parse_args(['--inalid', '/tmp/my-config'])
|
||||||
result = await response.get_data()
|
assert exc_info.value.code == 2
|
||||||
assert result == b"I'm fine"
|
|
||||||
|
|
||||||
ProxyState.set_up(True)
|
|
||||||
response = await client.get('/-/healthy')
|
|
||||||
assert response.status_code == 200
|
|
||||||
result = await response.get_data()
|
|
||||||
assert result == b"I'm fine"
|
|
||||||
|
|
||||||
def test_default_args():
|
class TestApp:
|
||||||
s = FakeServer()
|
@pytest.mark.asyncio
|
||||||
assert s.config_path == './config/'
|
async def test_ready(self):
|
||||||
assert s.json_config == ''
|
"""Test the ready route."""
|
||||||
assert s.toml_config == ''
|
|
||||||
assert s.trans_path == '../translations/'
|
|
||||||
assert s.rel_urls == False
|
|
||||||
assert s.log_path == './log/'
|
|
||||||
assert s.log_backups == 0
|
|
||||||
|
|
||||||
def test_parse_args_empty():
|
ProxyState.set_up(False)
|
||||||
s = FakeServer()
|
client = app.test_client()
|
||||||
s.parse_args([])
|
response = await client.get('/-/ready')
|
||||||
assert s.config_path == './config/'
|
assert response.status_code == 503
|
||||||
assert s.json_config == None
|
result = await response.get_data()
|
||||||
assert s.toml_config == None
|
assert result == b"Not ready"
|
||||||
assert s.trans_path == '../translations/'
|
|
||||||
assert s.rel_urls == False
|
|
||||||
assert s.log_path == './log/'
|
|
||||||
assert s.log_backups == 0
|
|
||||||
|
|
||||||
def test_parse_args_short():
|
ProxyState.set_up(True)
|
||||||
s = FakeServer()
|
response = await client.get('/-/ready')
|
||||||
s.parse_args(['-r', '-c', '/tmp/my-config', '-j', 'cnf.jsn', '-t', 'cnf.tml', '-tr', '/my/trans/', '-l', '/my_logs/', '-b', '3'])
|
assert response.status_code == 200
|
||||||
assert s.config_path == '/tmp/my-config'
|
result = await response.get_data()
|
||||||
assert s.json_config == 'cnf.jsn'
|
assert result == b"Is ready"
|
||||||
assert s.toml_config == 'cnf.tml'
|
|
||||||
assert s.trans_path == '/my/trans/'
|
|
||||||
assert s.rel_urls == True
|
|
||||||
assert s.log_path == '/my_logs/'
|
|
||||||
assert s.log_backups == 3
|
|
||||||
|
|
||||||
def test_parse_args_long():
|
@pytest.mark.asyncio
|
||||||
s = FakeServer()
|
async def test_healthy(self):
|
||||||
s.parse_args(['--rel_urls', '--config_path', '/tmp/my-config', '--json_config', 'cnf.jsn',
|
"""Test the healthy route."""
|
||||||
'--toml_config', 'cnf.tml', '--trans_path', '/my/trans/', '--log_path', '/my_logs/',
|
|
||||||
'--log_backups', '3'])
|
ProxyState.set_up(False)
|
||||||
assert s.config_path == '/tmp/my-config'
|
client = app.test_client()
|
||||||
assert s.json_config == 'cnf.jsn'
|
response = await client.get('/-/healthy')
|
||||||
assert s.toml_config == 'cnf.tml'
|
assert response.status_code == 200
|
||||||
assert s.trans_path == '/my/trans/'
|
result = await response.get_data()
|
||||||
assert s.rel_urls == True
|
assert result == b"I'm fine"
|
||||||
assert s.log_path == '/my_logs/'
|
|
||||||
assert s.log_backups == 3
|
ProxyState.set_up(True)
|
||||||
|
response = await client.get('/-/healthy')
|
||||||
|
assert response.status_code == 200
|
||||||
|
result = await response.get_data()
|
||||||
|
assert result == b"I'm fine"
|
||||||
|
|
||||||
def test_parse_args_invalid():
|
|
||||||
s = FakeServer()
|
|
||||||
with pytest.raises(SystemExit) as exc_info:
|
|
||||||
s.parse_args(['--inalid', '/tmp/my-config'])
|
|
||||||
assert exc_info.value.code == 2
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# test_with_pytest.py
|
# test_with_pytest.py
|
||||||
import pytest
|
import pytest
|
||||||
from app import app
|
from server import app
|
||||||
from web import Web, web
|
from web import Web, web
|
||||||
from async_stream import AsyncStreamClient
|
from async_stream import AsyncStreamClient
|
||||||
from gen3plus.inverter_g3p import InverterG3P
|
from gen3plus.inverter_g3p import InverterG3P
|
||||||
|
|||||||
@@ -30,4 +30,4 @@ cd /home/proxy || exit
|
|||||||
export VERSION=$(cat /proxy-version.txt)
|
export VERSION=$(cat /proxy-version.txt)
|
||||||
|
|
||||||
echo "Start Proxyserver..."
|
echo "Start Proxyserver..."
|
||||||
python3 app.py --rel_urls --json_config=/data/options.json --log_path=/homeassistant/tsun-proxy/logs/ --config_path=/homeassistant/tsun-proxy/ --log_backups=2
|
python3 server.py --rel_urls --json_config=/data/options.json --log_path=/homeassistant/tsun-proxy/logs/ --config_path=/homeassistant/tsun-proxy/ --log_backups=2
|
||||||
|
|||||||
Reference in New Issue
Block a user