Compare commits

..

1 Commits

Author SHA1 Message Date
renovate[bot]
a837c26e80 Update dependency aiomqtt to v2.4.0 2025-05-04 17:19:58 +00:00
17 changed files with 305 additions and 648 deletions

View File

@@ -1,3 +1,2 @@
[run]
branch = True
omit = app/src/web/templates/*.html.j2

View File

@@ -7,10 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [unreleased]
- fix a lot of pytest-asyncio problems in the unit tests
- Cleanup startup code for Quart and the Proxy
- Redirect the hypercorn traces to a separate log-file
- Configure the dashboard trace handler by the logging.ini file
- Dashboard: add Notes page and table for important messages
- Dashboard: add Log-File page
- Dashboard: add Connection page

View File

@@ -1,15 +1,16 @@
[loggers]
keys=root,tracer,mesg,conn,data,mqtt,asyncio,hypercorn_access,hypercorn_error
keys=root,tracer,mesg,conn,data,mqtt,asyncio
[handlers]
keys=console_handler,file_handler_name1,file_handler_name2,file_handler_name3,dashboard
keys=console_handler,file_handler_name1,file_handler_name2
[formatters]
keys=console_formatter,file_formatter
[logger_root]
level=DEBUG
handlers=console_handler,file_handler_name1,dashboard
handlers=console_handler,file_handler_name1
[logger_conn]
level=DEBUG
@@ -19,13 +20,13 @@ qualname=conn
[logger_mqtt]
level=INFO
handlers=console_handler,file_handler_name1,dashboard
handlers=console_handler,file_handler_name1
propagate=0
qualname=mqtt
[logger_asyncio]
level=INFO
handlers=console_handler,file_handler_name1,dashboard
handlers=console_handler,file_handler_name1
propagate=0
qualname=asyncio
@@ -48,18 +49,6 @@ handlers=file_handler_name2
propagate=0
qualname=tracer
[logger_hypercorn_access]
level=INFO
handlers=file_handler_name3
propagate=0
qualname=hypercorn.access
[logger_hypercorn_error]
level=INFO
handlers=file_handler_name1,dashboard
propagate=0
qualname=hypercorn.error
[handler_console_handler]
class=StreamHandler
level=DEBUG
@@ -77,16 +66,6 @@ level=NOTSET
formatter=file_formatter
args=(handlers.log_path + 'trace.log', when:='midnight', backupCount:=handlers.log_backups)
[handler_file_handler_name3]
class=handlers.TimedRotatingFileHandler
level=NOTSET
formatter=file_formatter
args=(handlers.log_path + 'access.log', when:='midnight', backupCount:=handlers.log_backups)
[handler_dashboard]
level=WARNING
class=web.log_handler.LogHandler
[formatter_console_formatter]
format=%(asctime)s %(levelname)5s | %(name)4s | %(message)s
datefmt=%Y-%m-%d %H:%M:%S

View File

@@ -1,161 +1,26 @@
import logging
import logging.handlers
from logging import config # noqa F401
import asyncio
from asyncio import StreamReader, StreamWriter
import logging.handlers
import os
import argparse
from asyncio import StreamReader, StreamWriter
from quart import Quart, Response
from logging import config # noqa F401
from proxy import Proxy
from inverter_ifc import InverterIfc
from gen3.inverter_g3 import InverterG3
from gen3plus.inverter_g3p import InverterG3P
from scheduler import Schedule
from cnf.config import Config
from cnf.config_read_env import ConfigReadEnv
from cnf.config_read_toml import ConfigReadToml
from cnf.config_read_json import ConfigReadJson
from web import Web
from web.wrapper import url_for
from proxy import Proxy
from inverter_ifc import InverterIfc
from gen3.inverter_g3 import InverterG3
from gen3plus.inverter_g3p import InverterG3P
from scheduler import Schedule
from modbus_tcp import ModbusTcp
class Server():
serv_name = ''
version = ''
src_dir = ''
####
# The following default values are used for the unit tests only, since
# `Server.parse_args()' will not be called during test setup.
# Ofcorse, we can call `Server.parse_args()' in a test case explicitly
# to overwrite this values
config_path = './config/'
json_config = ''
toml_config = ''
trans_path = '../translations/'
rel_urls = False
log_path = './log/'
log_backups = 0
log_level = None
def __init__(self, app, parse_args: bool):
''' Applikation Setup
1. Read cli arguments
2. Init the logging system by the ini file
3. Log the config parms
4. Set the log-levels
5. Read the build the config for the app
'''
self.serv_name = os.getenv('SERVICE_NAME', 'proxy')
self.version = os.getenv('VERSION', 'unknown')
self.src_dir = os.path.dirname(__file__) + '/'
if parse_args: # pragma: no cover
self.parse_args(None)
self.init_logging_system()
self.build_config()
@app.context_processor
def utility_processor():
return dict(version=self.version)
def parse_args(self, arg_list: list[str] | None):
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config_path', type=str,
default='./config/',
help='set path for the configuration files')
parser.add_argument('-j', '--json_config', type=str,
help='read user config from json-file')
parser.add_argument('-t', '--toml_config', type=str,
help='read user config from toml-file')
parser.add_argument('-l', '--log_path', type=str,
default='./log/',
help='set path for the logging files')
parser.add_argument('-b', '--log_backups', type=int,
default=0,
help='set max number of daily log-files')
parser.add_argument('-tr', '--trans_path', type=str,
default='../translations/',
help='set path for the translations files')
parser.add_argument('-r', '--rel_urls', action="store_true",
help='use relative dashboard urls')
args = parser.parse_args(arg_list)
self.config_path = args.config_path
self.json_config = args.json_config
self.toml_config = args.toml_config
self.trans_path = args.trans_path
self.rel_urls = args.rel_urls
self.log_path = args.log_path
self.log_backups = args.log_backups
def init_logging_system(self):
setattr(logging.handlers, "log_path", self.log_path)
setattr(logging.handlers, "log_backups", self.log_backups)
os.makedirs(self.log_path, exist_ok=True)
logging.config.fileConfig(self.src_dir + 'logging.ini')
logging.info(
f'Server "{self.serv_name} - {self.version}" will be started')
logging.info(f'current dir: {os.getcwd()}')
logging.info(f"config_path: {self.config_path}")
logging.info(f"json_config: {self.json_config}")
logging.info(f"toml_config: {self.toml_config}")
logging.info(f"trans_path: {self.trans_path}")
logging.info(f"rel_urls: {self.rel_urls}")
logging.info(f"log_path: {self.log_path}")
if self.log_backups == 0:
logging.info("log_backups: unlimited")
else:
logging.info(f"log_backups: {self.log_backups} days")
self.log_level = self.get_log_level()
logging.info('******')
if self.log_level:
# set lowest-severity for 'root', 'msg', 'conn' and 'data' logger
logging.getLogger().setLevel(self.log_level)
logging.getLogger('msg').setLevel(self.log_level)
logging.getLogger('conn').setLevel(self.log_level)
logging.getLogger('data').setLevel(self.log_level)
logging.getLogger('tracer').setLevel(self.log_level)
logging.getLogger('asyncio').setLevel(self.log_level)
# logging.getLogger('mqtt').setLevel(self.log_level)
def build_config(self):
# read config file
Config.init(ConfigReadToml(self.src_dir + "cnf/default_config.toml"),
log_path=self.log_path)
ConfigReadEnv()
ConfigReadJson(self.config_path + "config.json")
ConfigReadToml(self.config_path + "config.toml")
ConfigReadJson(self.json_config)
ConfigReadToml(self.toml_config)
config_err = Config.get_error()
if config_err is not None:
logging.info(f'config_err: {config_err}')
return
logging.info('******')
def get_log_level(self) -> int | None:
'''checks if LOG_LVL is set in the environment and returns the
corresponding logging.LOG_LEVEL'''
switch = {
'DEBUG': logging.DEBUG,
'WARN': logging.WARNING,
'INFO': logging.INFO,
'ERROR': logging.ERROR,
}
log_lvl = os.getenv('LOG_LVL', None)
logging.info(f"LOG_LVL : {log_lvl}")
return switch.get(log_lvl, None)
class ProxyState:
_is_up = False
@@ -168,48 +33,11 @@ class ProxyState:
ProxyState._is_up = value
class HypercornLogHndl:
access_hndl = []
error_hndl = []
must_fix = False
HYPERC_ERR = 'hypercorn.error'
HYPERC_ACC = 'hypercorn.access'
@classmethod
def save(cls):
cls.access_hndl = logging.getLogger(
cls.HYPERC_ACC).handlers
cls.error_hndl = logging.getLogger(
cls.HYPERC_ERR).handlers
cls.must_fix = True
@classmethod
def restore(cls):
if not cls.must_fix:
return
cls.must_fix = False
access_hndl = logging.getLogger(
cls.HYPERC_ACC).handlers
if access_hndl != cls.access_hndl:
print(' * Fix hypercorn.access setting')
logging.getLogger(
cls.HYPERC_ACC).handlers = cls.access_hndl
error_hndl = logging.getLogger(
cls.HYPERC_ERR).handlers
if error_hndl != cls.error_hndl:
print(' * Fix hypercorn.error setting')
logging.getLogger(
cls.HYPERC_ERR).handlers = cls.error_hndl
app = Quart(__name__,
template_folder='web/templates',
static_folder='web/static')
app.secret_key = 'JKLdks.dajlKKKdladkflKwolafallsdfl'
app.jinja_env.globals.update(url_for=url_for)
server = Server(app, __name__ == "__main__")
Web(app, server.trans_path, server.rel_urls)
@app.route('/-/ready')
@@ -239,36 +67,13 @@ async def healthy():
return Response(status=200, response="I'm fine")
async def handle_client(reader: StreamReader,
writer: StreamWriter,
inv_class): # pragma: no cover
async def handle_client(reader: StreamReader, writer: StreamWriter, inv_class):
'''Handles a new incoming connection and starts an async loop'''
with inv_class(reader, writer) as inv:
await inv.local.ifc.server_loop()
@app.before_serving
async def startup_app(): # pragma: no cover
HypercornLogHndl.save()
loop = asyncio.get_event_loop()
Proxy.class_init()
Schedule.start()
ModbusTcp(loop)
for inv_class, port in [(InverterG3, 5005), (InverterG3P, 10000)]:
logging.info(f'listen on port: {port} for inverters')
loop.create_task(asyncio.start_server(lambda r, w, i=inv_class:
handle_client(r, w, i),
'0.0.0.0', port))
ProxyState.set_up(True)
@app.before_request
async def startup_request():
HypercornLogHndl.restore()
@app.after_serving
async def handle_shutdown(): # pragma: no cover
'''Close all TCP connections and stop the event loop'''
@@ -285,15 +90,136 @@ async def handle_shutdown(): # pragma: no cover
logging.info('Proxy disconnecting done')
#
# now cancel all remaining (pending) tasks
#
for task in asyncio.all_tasks():
if task == asyncio.current_task():
continue
task.cancel()
logging.info('Proxy cancelling done')
await Proxy.class_close(loop)
if __name__ == "__main__": # pragma: no cover
def get_log_level() -> int | None:
'''checks if LOG_LVL is set in the environment and returns the
corresponding logging.LOG_LEVEL'''
switch = {
'DEBUG': logging.DEBUG,
'WARN': logging.WARNING,
'INFO': logging.INFO,
'ERROR': logging.ERROR,
}
log_level = os.getenv('LOG_LVL', None)
logging.info(f"LOG_LVL : {log_level}")
return switch.get(log_level, None)
def main(): # pragma: no cover
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config_path', type=str,
default='./config/',
help='set path for the configuration files')
parser.add_argument('-j', '--json_config', type=str,
help='read user config from json-file')
parser.add_argument('-t', '--toml_config', type=str,
help='read user config from toml-file')
parser.add_argument('-l', '--log_path', type=str,
default='./log/',
help='set path for the logging files')
parser.add_argument('-b', '--log_backups', type=int,
default=0,
help='set max number of daily log-files')
parser.add_argument('-tr', '--trans_path', type=str,
default='../translations/',
help='set path for the translations files')
parser.add_argument('-r', '--rel_urls', type=bool,
default=False,
help='use relative dashboard urls')
args = parser.parse_args()
#
# Setup our daily, rotating logger
#
serv_name = os.getenv('SERVICE_NAME', 'proxy')
version = os.getenv('VERSION', 'unknown')
@app.context_processor
def utility_processor():
return dict(version=version)
setattr(logging.handlers, "log_path", args.log_path)
setattr(logging.handlers, "log_backups", args.log_backups)
os.makedirs(args.log_path, exist_ok=True)
src_dir = os.path.dirname(__file__) + '/'
logging.config.fileConfig(src_dir + 'logging.ini')
logging.info(f'Server "{serv_name} - {version}" will be started')
logging.info(f'current dir: {os.getcwd()}')
logging.info(f"config_path: {args.config_path}")
logging.info(f"json_config: {args.json_config}")
logging.info(f"toml_config: {args.toml_config}")
logging.info(f"trans_path: {args.trans_path}")
logging.info(f"rel_urls: {args.rel_urls}")
logging.info(f"log_path: {args.log_path}")
if args.log_backups == 0:
logging.info("log_backups: unlimited")
else:
logging.info(f"log_backups: {args.log_backups} days")
log_level = get_log_level()
logging.info('******')
if log_level:
# set lowest-severity for 'root', 'msg', 'conn' and 'data' logger
logging.getLogger().setLevel(log_level)
logging.getLogger('msg').setLevel(log_level)
logging.getLogger('conn').setLevel(log_level)
logging.getLogger('data').setLevel(log_level)
logging.getLogger('tracer').setLevel(log_level)
logging.getLogger('asyncio').setLevel(log_level)
# logging.getLogger('mqtt').setLevel(log_level)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# read config file
Config.init(ConfigReadToml(src_dir + "cnf/default_config.toml"),
log_path=args.log_path)
ConfigReadEnv()
ConfigReadJson(args.config_path + "config.json")
ConfigReadToml(args.config_path + "config.toml")
ConfigReadJson(args.json_config)
ConfigReadToml(args.toml_config)
config_err = Config.get_error()
if config_err is not None:
logging.info(f'config_err: {config_err}')
return
logging.info('******')
Proxy.class_init()
Schedule.start()
ModbusTcp(loop)
Web(app, args.trans_path, args.rel_urls)
#
# Create tasks for our listening servers. These must be tasks! If we call
# start_server directly out of our main task, the eventloop will be blocked
# and we can't receive and handle the UNIX signals!
#
for inv_class, port in [(InverterG3, 5005), (InverterG3P, 10000)]:
logging.info(f'listen on port: {port} for inverters')
loop.create_task(asyncio.start_server(lambda r, w, i=inv_class:
handle_client(r, w, i),
'0.0.0.0', port))
loop.set_debug(log_level == logging.DEBUG)
try:
ProxyState.set_up(True)
logging.info("Start Quart")
app.run(host='0.0.0.0', port=8127, use_reloader=False,
debug=server.log_level == logging.DEBUG)
app.run(host='0.0.0.0', port=8127, use_reloader=False, loop=loop,
debug=log_level == logging.DEBUG)
logging.info("Quart stopped")
except KeyboardInterrupt:
@@ -302,4 +228,10 @@ if __name__ == "__main__": # pragma: no cover
logging.info("Quart cancelled")
finally:
logging.info(f'Finally, exit Server "{server.serv_name}"')
logging.debug('Close event loop')
loop.close()
logging.info(f'Finally, exit Server "{serv_name}"')
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -7,6 +7,8 @@ Usage:
from quart import Quart, Blueprint
from quart_babel import Babel
from utils import load_modules
from .log_handler import LogHandler
import logging
web = Blueprint('web', __name__)
@@ -30,3 +32,8 @@ class Web:
locale_selector=get_locale,
timezone_selector=get_tz,
default_translation_directories=translation_directories)
h = LogHandler()
logging.getLogger().addHandler(h)
for name in logging.root.manager.loggerDict:
logging.getLogger(name).addHandler(h)

View File

@@ -1 +0,0 @@
mqtt.port = ":1883"

View File

@@ -1,20 +0,0 @@
import pytest_asyncio
import asyncio
@pytest_asyncio.fixture
async def my_loop():
event_loop = asyncio.get_running_loop()
yield event_loop
# Collect all tasks and cancel those that are not 'done'.
tasks = asyncio.all_tasks(event_loop)
tasks = [t for t in tasks if not t.done()]
for task in tasks:
task.cancel()
# Wait for all tasks to complete, ignoring any CancelledErrors
try:
await asyncio.wait(tasks)
except asyncio.exceptions.CancelledError:
pass

View File

@@ -113,9 +113,7 @@ def patch_unhealthy_remote():
with patch.object(AsyncStreamClient, 'healthy', new_healthy) as conn:
yield conn
@pytest.mark.asyncio
async def test_inverter_iter(my_loop):
_ = my_loop
def test_inverter_iter():
InverterBase._registry.clear()
cnt = 0
reader = FakeReader()
@@ -218,8 +216,7 @@ def test_unhealthy_remote(patch_unhealthy_remote):
assert cnt == 0
@pytest.mark.asyncio
async def test_remote_conn(my_loop, config_conn, patch_open_connection):
_ = my_loop
async def test_remote_conn(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -245,9 +242,8 @@ async def test_remote_conn(my_loop, config_conn, patch_open_connection):
assert cnt == 0
@pytest.mark.asyncio
async def test_remote_conn_to_private(my_loop, config_conn, patch_open_connection):
async def test_remote_conn_to_private(config_conn, patch_open_connection):
'''check DNS resolving of the TSUN FQDN to a local address'''
_ = my_loop
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -284,9 +280,8 @@ async def test_remote_conn_to_private(my_loop, config_conn, patch_open_connectio
@pytest.mark.asyncio
async def test_remote_conn_to_loopback(my_loop, config_conn, patch_open_connection):
async def test_remote_conn_to_loopback(config_conn, patch_open_connection):
'''check DNS resolving of the TSUN FQDN to the loopback address'''
_ = my_loop
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -322,9 +317,8 @@ async def test_remote_conn_to_loopback(my_loop, config_conn, patch_open_connecti
assert cnt == 0
@pytest.mark.asyncio
async def test_remote_conn_to_none(my_loop, config_conn, patch_open_connection):
async def test_remote_conn_to_none(config_conn, patch_open_connection):
'''check if get_extra_info() return None in case of an error'''
_ = my_loop
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -360,8 +354,7 @@ async def test_remote_conn_to_none(my_loop, config_conn, patch_open_connection):
assert cnt == 0
@pytest.mark.asyncio
async def test_unhealthy_remote(my_loop, config_conn, patch_open_connection, patch_unhealthy_remote):
_ = my_loop
async def test_unhealthy_remote(config_conn, patch_open_connection, patch_unhealthy_remote):
_ = config_conn
_ = patch_open_connection
_ = patch_unhealthy_remote
@@ -398,10 +391,10 @@ async def test_unhealthy_remote(my_loop, config_conn, patch_open_connection, pat
assert cnt == 0
@pytest.mark.asyncio
async def test_remote_disc(my_loop, config_conn, patch_open_connection):
_ = my_loop
async def test_remote_disc(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
reader = FakeReader()
writer = FakeWriter()

View File

@@ -99,8 +99,7 @@ def patch_healthy():
with patch.object(AsyncStream, 'healthy') as conn:
yield conn
@pytest.mark.asyncio
async def test_method_calls(my_loop, patch_healthy):
def test_method_calls(patch_healthy):
spy = patch_healthy
reader = FakeReader()
writer = FakeWriter()
@@ -120,7 +119,7 @@ async def test_method_calls(my_loop, patch_healthy):
assert cnt == 0
@pytest.mark.asyncio
async def test_remote_conn(my_loop, config_conn, patch_open_connection):
async def test_remote_conn(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -138,7 +137,7 @@ async def test_remote_conn(my_loop, config_conn, patch_open_connection):
assert cnt == 0
@pytest.mark.asyncio
async def test_remote_except(my_loop, config_conn, patch_open_connection):
async def test_remote_except(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -165,7 +164,7 @@ async def test_remote_except(my_loop, config_conn, patch_open_connection):
assert cnt == 0
@pytest.mark.asyncio
async def test_mqtt_publish(my_loop, config_conn, patch_open_connection):
async def test_mqtt_publish(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -192,7 +191,7 @@ async def test_mqtt_publish(my_loop, config_conn, patch_open_connection):
assert Infos.new_stat_data['proxy'] == False
@pytest.mark.asyncio
async def test_mqtt_err(my_loop, config_conn, patch_open_connection, patch_mqtt_err):
async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err):
_ = config_conn
_ = patch_open_connection
_ = patch_mqtt_err
@@ -209,7 +208,7 @@ async def test_mqtt_err(my_loop, config_conn, patch_open_connection, patch_mqtt_
assert stream.new_data['inverter'] == True
@pytest.mark.asyncio
async def test_mqtt_except(my_loop, config_conn, patch_open_connection, patch_mqtt_except):
async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except):
_ = config_conn
_ = patch_open_connection
_ = patch_mqtt_except

View File

@@ -94,8 +94,7 @@ def patch_open_connection():
with patch.object(asyncio, 'open_connection', new_open) as conn:
yield conn
@pytest.mark.asyncio
async def test_method_calls(my_loop, config_conn):
def test_method_calls(config_conn):
_ = config_conn
reader = FakeReader()
writer = FakeWriter()
@@ -106,7 +105,7 @@ async def test_method_calls(my_loop, config_conn):
assert inverter.local.ifc
@pytest.mark.asyncio
async def test_remote_conn(my_loop, config_conn, patch_open_connection):
async def test_remote_conn(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -117,7 +116,7 @@ async def test_remote_conn(my_loop, config_conn, patch_open_connection):
assert inverter.remote.stream
@pytest.mark.asyncio
async def test_remote_except(my_loop, config_conn, patch_open_connection):
async def test_remote_except(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -139,7 +138,7 @@ async def test_remote_except(my_loop, config_conn, patch_open_connection):
@pytest.mark.asyncio
async def test_mqtt_publish(my_loop, config_conn, patch_open_connection):
async def test_mqtt_publish(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -166,7 +165,7 @@ async def test_mqtt_publish(my_loop, config_conn, patch_open_connection):
assert Infos.new_stat_data['proxy'] == False
@pytest.mark.asyncio
async def test_mqtt_err(my_loop, config_conn, patch_open_connection, patch_mqtt_err):
async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err):
_ = config_conn
_ = patch_open_connection
_ = patch_mqtt_err
@@ -183,7 +182,7 @@ async def test_mqtt_err(my_loop, config_conn, patch_open_connection, patch_mqtt_
assert stream.new_data['inverter'] == True
@pytest.mark.asyncio
async def test_mqtt_except(my_loop, config_conn, patch_open_connection, patch_mqtt_except):
async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except):
_ = config_conn
_ = patch_open_connection
_ = patch_mqtt_except

View File

@@ -19,8 +19,7 @@ class ModbusTestHelper(Modbus):
def resp_handler(self):
self.recv_responses += 1
@pytest.mark.asyncio
async def test_modbus_crc():
def test_modbus_crc():
'''Check CRC-16 calculation'''
mb = Modbus(None)
assert 0x0b02 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x04')
@@ -38,8 +37,7 @@ async def test_modbus_crc():
msg += b'\x00\x00\x00\x00\x00\x00\x00\xe6\xef'
assert 0 == mb._Modbus__calc_crc(msg)
@pytest.mark.asyncio
async def test_build_modbus_pdu():
def test_build_modbus_pdu():
'''Check building and sending a MODBUS RTU'''
mb = ModbusTestHelper()
mb.build_msg(1,6,0x2000,0x12)
@@ -51,8 +49,7 @@ async def test_build_modbus_pdu():
assert mb.last_len == 18
assert mb.err == 0
@pytest.mark.asyncio
async def test_recv_req():
def test_recv_req():
'''Receive a valid request, which must transmitted'''
mb = ModbusTestHelper()
assert mb.recv_req(b'\x01\x06\x20\x00\x00\x12\x02\x07')
@@ -61,8 +58,7 @@ async def test_recv_req():
assert mb.last_len == 0x12
assert mb.err == 0
@pytest.mark.asyncio
async def test_recv_req_crc_err():
def test_recv_req_crc_err():
'''Receive a request with invalid CRC, which must be dropped'''
mb = ModbusTestHelper()
assert not mb.recv_req(b'\x01\x06\x20\x00\x00\x12\x02\x08')
@@ -72,8 +68,7 @@ async def test_recv_req_crc_err():
assert mb.last_len == 0
assert mb.err == 1
@pytest.mark.asyncio
async def test_recv_resp_crc_err():
def test_recv_resp_crc_err():
'''Receive a response with invalid CRC, which must be dropped'''
mb = ModbusTestHelper()
# simulate a transmitted request
@@ -94,8 +89,7 @@ async def test_recv_resp_crc_err():
mb._Modbus__stop_timer()
assert not mb.req_pend
@pytest.mark.asyncio
async def test_recv_resp_invalid_addr():
def test_recv_resp_invalid_addr():
'''Receive a response with wrong server addr, which must be dropped'''
mb = ModbusTestHelper()
mb.req_pend = True
@@ -119,8 +113,7 @@ async def test_recv_resp_invalid_addr():
mb._Modbus__stop_timer()
assert not mb.req_pend
@pytest.mark.asyncio
async def test_recv_recv_fcode():
def test_recv_recv_fcode():
'''Receive a response with wrong function code, which must be dropped'''
mb = ModbusTestHelper()
mb.build_msg(1,4,0x300e,2)
@@ -142,8 +135,7 @@ async def test_recv_recv_fcode():
mb._Modbus__stop_timer()
assert not mb.req_pend
@pytest.mark.asyncio
async def test_recv_resp_len():
def test_recv_resp_len():
'''Receive a response with wrong data length, which must be dropped'''
mb = ModbusTestHelper()
mb.build_msg(1,3,0x300e,3)
@@ -166,8 +158,7 @@ async def test_recv_resp_len():
mb._Modbus__stop_timer()
assert not mb.req_pend
@pytest.mark.asyncio
async def test_recv_unexpect_resp():
def test_recv_unexpect_resp():
'''Receive a response when we havb't sent a request'''
mb = ModbusTestHelper()
assert not mb.req_pend
@@ -183,8 +174,7 @@ async def test_recv_unexpect_resp():
assert mb.req_pend == False
assert mb.que.qsize() == 0
@pytest.mark.asyncio
async def test_parse_resp():
def test_parse_resp():
'''Receive matching response and parse the values'''
mb = ModbusTestHelper()
mb.build_msg(1,3,0x3007,6)
@@ -210,8 +200,7 @@ async def test_parse_resp():
assert mb.que.qsize() == 0
assert not mb.req_pend
@pytest.mark.asyncio
async def test_queue():
def test_queue():
mb = ModbusTestHelper()
mb.build_msg(1,3,0x3022,4)
assert mb.que.qsize() == 0
@@ -229,8 +218,7 @@ async def test_queue():
mb._Modbus__stop_timer()
assert not mb.req_pend
@pytest.mark.asyncio
async def test_queue2():
def test_queue2():
'''Check queue handling for build_msg() calls'''
mb = ModbusTestHelper()
mb.build_msg(1,3,0x3007,6)
@@ -279,8 +267,7 @@ async def test_queue2():
assert mb.que.qsize() == 0
assert not mb.req_pend
@pytest.mark.asyncio
async def test_queue3():
def test_queue3():
'''Check queue handling for recv_req() calls'''
mb = ModbusTestHelper()
assert mb.recv_req(b'\x01\x03\x30\x07\x00\x06{\t', mb.resp_handler)
@@ -337,7 +324,7 @@ async def test_queue3():
assert not mb.req_pend
@pytest.mark.asyncio
async def test_timeout(my_loop):
async def test_timeout():
'''Test MODBUS response timeout and RTU retransmitting'''
assert asyncio.get_running_loop()
mb = ModbusTestHelper()
@@ -384,8 +371,7 @@ async def test_timeout(my_loop):
assert mb.retry_cnt == 0
assert mb.send_calls == 4
@pytest.mark.asyncio
async def test_recv_unknown_data():
def test_recv_unknown_data():
'''Receive a response with an unknwon register'''
mb = ModbusTestHelper()
assert 0x9000 not in mb.mb_reg_mapping
@@ -404,8 +390,7 @@ async def test_recv_unknown_data():
del mb.mb_reg_mapping[0x9000]
@pytest.mark.asyncio
async def test_close():
def test_close():
'''Check queue handling for build_msg() calls'''
mb = ModbusTestHelper()
mb.build_msg(1,3,0x3007,6)

View File

@@ -3,216 +3,66 @@ import pytest
import logging
import os
from mock import patch
from server import app, Server, ProxyState, HypercornLogHndl
from server import get_log_level, app, ProxyState
pytest_plugins = ('pytest_asyncio',)
def test_get_log_level():
class TestServerClass:
class FakeServer(Server):
def __init__(self):
pass # don't call the suoer(.__init__ for unit tests
with patch.dict(os.environ, {}):
log_lvl = get_log_level()
assert log_lvl == None
def test_get_log_level(self):
s = self.FakeServer()
with patch.dict(os.environ, {'LOG_LVL': 'DEBUG'}):
log_lvl = get_log_level()
assert log_lvl == logging.DEBUG
with patch.dict(os.environ, {}):
log_lvl = s.get_log_level()
assert log_lvl == None
with patch.dict(os.environ, {'LOG_LVL': 'INFO'}):
log_lvl = get_log_level()
assert log_lvl == logging.INFO
with patch.dict(os.environ, {'LOG_LVL': 'DEBUG'}):
log_lvl = s.get_log_level()
assert log_lvl == logging.DEBUG
with patch.dict(os.environ, {'LOG_LVL': 'WARN'}):
log_lvl = get_log_level()
assert log_lvl == logging.WARNING
with patch.dict(os.environ, {'LOG_LVL': 'INFO'}):
log_lvl = s.get_log_level()
assert log_lvl == logging.INFO
with patch.dict(os.environ, {'LOG_LVL': 'ERROR'}):
log_lvl = get_log_level()
assert log_lvl == logging.ERROR
with patch.dict(os.environ, {'LOG_LVL': 'WARN'}):
log_lvl = s.get_log_level()
assert log_lvl == logging.WARNING
with patch.dict(os.environ, {'LOG_LVL': 'UNKNOWN'}):
log_lvl = get_log_level()
assert log_lvl == None
with patch.dict(os.environ, {'LOG_LVL': 'ERROR'}):
log_lvl = s.get_log_level()
assert log_lvl == logging.ERROR
@pytest.mark.asyncio
async def test_ready():
"""Test the ready route."""
with patch.dict(os.environ, {'LOG_LVL': 'UNKNOWN'}):
log_lvl = s.get_log_level()
assert log_lvl == None
ProxyState.set_up(False)
client = app.test_client()
response = await client.get('/-/ready')
assert response.status_code == 503
result = await response.get_data()
assert result == b"Not ready"
def test_default_args(self):
s = self.FakeServer()
assert s.config_path == './config/'
assert s.json_config == ''
assert s.toml_config == ''
assert s.trans_path == '../translations/'
assert s.rel_urls == False
assert s.log_path == './log/'
assert s.log_backups == 0
ProxyState.set_up(True)
response = await client.get('/-/ready')
assert response.status_code == 200
result = await response.get_data()
assert result == b"Is ready"
def test_parse_args_empty(self):
s = self.FakeServer()
s.parse_args([])
assert s.config_path == './config/'
assert s.json_config == None
assert s.toml_config == None
assert s.trans_path == '../translations/'
assert s.rel_urls == False
assert s.log_path == './log/'
assert s.log_backups == 0
@pytest.mark.asyncio
async def test_healthy():
"""Test the healthy route."""
def test_parse_args_short(self):
s = self.FakeServer()
s.parse_args(['-r', '-c', '/tmp/my-config', '-j', 'cnf.jsn', '-t', 'cnf.tml', '-tr', '/my/trans/', '-l', '/my_logs/', '-b', '3'])
assert s.config_path == '/tmp/my-config'
assert s.json_config == 'cnf.jsn'
assert s.toml_config == 'cnf.tml'
assert s.trans_path == '/my/trans/'
assert s.rel_urls == True
assert s.log_path == '/my_logs/'
assert s.log_backups == 3
def test_parse_args_long(self):
s = self.FakeServer()
s.parse_args(['--rel_urls', '--config_path', '/tmp/my-config', '--json_config', 'cnf.jsn',
'--toml_config', 'cnf.tml', '--trans_path', '/my/trans/', '--log_path', '/my_logs/',
'--log_backups', '3'])
assert s.config_path == '/tmp/my-config'
assert s.json_config == 'cnf.jsn'
assert s.toml_config == 'cnf.tml'
assert s.trans_path == '/my/trans/'
assert s.rel_urls == True
assert s.log_path == '/my_logs/'
assert s.log_backups == 3
def test_parse_args_invalid(self):
s = self.FakeServer()
with pytest.raises(SystemExit) as exc_info:
s.parse_args(['--inalid', '/tmp/my-config'])
assert exc_info.value.code == 2
def test_init_logging_system(self):
s = self.FakeServer()
s.src_dir = 'app/src/'
s.init_logging_system()
assert s.log_backups == 0
assert s.log_level == None
assert logging.handlers.log_path == './log/'
assert logging.handlers.log_backups == 0
assert logging.getLogger().level == logging.DEBUG
assert logging.getLogger('msg').level == logging.DEBUG
assert logging.getLogger('conn').level == logging.DEBUG
assert logging.getLogger('data').level == logging.DEBUG
assert logging.getLogger('tracer').level == logging.INFO
assert logging.getLogger('asyncio').level == logging.INFO
assert logging.getLogger('hypercorn.access').level == logging.INFO
assert logging.getLogger('hypercorn.error').level == logging.INFO
os.environ["LOG_LVL"] = "WARN"
s.parse_args(['--log_backups', '3'])
s.init_logging_system()
assert s.log_backups == 3
assert s.log_level == logging.WARNING
assert logging.handlers.log_backups == 3
assert logging.getLogger().level == s.log_level
assert logging.getLogger('msg').level == s.log_level
assert logging.getLogger('conn').level == s.log_level
assert logging.getLogger('data').level == s.log_level
assert logging.getLogger('tracer').level == s.log_level
assert logging.getLogger('asyncio').level == s.log_level
assert logging.getLogger('hypercorn.access').level == logging.INFO
assert logging.getLogger('hypercorn.error').level == logging.INFO
def test_build_config_error(self, caplog):
s = self.FakeServer()
s.src_dir = 'app/src/'
s.toml_config = 'app/tests/cnf/invalid_config.toml'
with caplog.at_level(logging.ERROR):
s.build_config()
assert "Can't read from app/tests/cnf/invalid_config.toml" in caplog.text
assert "Key 'port' error:" in caplog.text
class TestHypercornLogHndl:
class FakeServer(Server):
def __init__(self):
pass # don't call the suoer(.__init__ for unit tests
def test_save_and_restore(self, capsys):
s = self.FakeServer()
s.src_dir = 'app/src/'
s.init_logging_system()
h = HypercornLogHndl()
assert h.must_fix == False
assert len(h.access_hndl) == 0
assert len(h.error_hndl) == 0
h.save()
assert h.must_fix == True
assert len(h.access_hndl) == 1
assert len(h.error_hndl) == 2
assert h.access_hndl == logging.getLogger('hypercorn.access').handlers
assert h.error_hndl == logging.getLogger('hypercorn.error').handlers
logging.getLogger('hypercorn.access').handlers = []
logging.getLogger('hypercorn.error').handlers = []
h.restore()
assert h.must_fix == False
assert h.access_hndl == logging.getLogger('hypercorn.access').handlers
assert h.error_hndl == logging.getLogger('hypercorn.error').handlers
output = capsys.readouterr().out.rstrip()
assert "* Fix hypercorn.access setting" in output
assert "* Fix hypercorn.error setting" in output
h.restore() # second restore do nothing
assert h.must_fix == False
output = capsys.readouterr().out.rstrip()
assert output == ''
h.save() # save the same values second time
assert h.must_fix == True
h.restore() # restore without changing the handlers
assert h.must_fix == False
output = capsys.readouterr().out.rstrip()
assert output == ''
class TestApp:
@pytest.mark.asyncio
async def test_ready(self):
"""Test the ready route."""
ProxyState.set_up(False)
client = app.test_client()
response = await client.get('/-/ready')
assert response.status_code == 503
result = await response.get_data()
assert result == b"Not ready"
ProxyState.set_up(True)
response = await client.get('/-/ready')
assert response.status_code == 200
result = await response.get_data()
assert result == b"Is ready"
@pytest.mark.asyncio
async def test_healthy(self):
"""Test the healthy route."""
ProxyState.set_up(False)
client = app.test_client()
response = await client.get('/-/healthy')
assert response.status_code == 200
result = await response.get_data()
assert result == b"I'm fine"
ProxyState.set_up(True)
response = await client.get('/-/healthy')
assert response.status_code == 200
result = await response.get_data()
assert result == b"I'm fine"
ProxyState.set_up(False)
client = app.test_client()
response = await client.get('/-/healthy')
assert response.status_code == 200
result = await response.get_data()
assert result == b"I'm fine"
ProxyState.set_up(True)
response = await client.get('/-/healthy')
assert response.status_code == 200
result = await response.get_data()
assert result == b"I'm fine"

View File

@@ -79,7 +79,6 @@ class MemoryStream(SolarmanV5):
self.key = ''
self.data = ''
self.msg_recvd = []
def write_cb(self):
if self.test_exception_async_write:
@@ -856,8 +855,7 @@ def config_tsun_scan_dcu():
def config_tsun_dcu1():
Config.act_config = {'solarman':{'enabled': True},'batteries':{'4100000000000001':{'monitor_sn': 2070233888, 'node_id':'inv1/', 'modbus_polling': True, 'suggested_area':'roof', 'sensor_list': 0}}}
@pytest.mark.asyncio
async def test_read_message(device_ind_msg):
def test_read_message(device_ind_msg):
Config.act_config = {'solarman':{'enabled': True}}
m = MemoryStream(device_ind_msg, (0,))
m.read() # read complete msg, and dispatch msg
@@ -875,12 +873,10 @@ async def test_read_message(device_ind_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_invalid_start_byte(invalid_start_byte, device_ind_msg):
def test_invalid_start_byte(invalid_start_byte, device_ind_msg):
# received a message with wrong start byte plus an valid message
# the complete receive buffer must be cleared to
# find the next valid message
Config.act_config = {'solarman':{'enabled': True}}
m = MemoryStream(invalid_start_byte, (0,))
m.append_msg(device_ind_msg)
m.read() # read complete msg, and dispatch msg
@@ -898,12 +894,10 @@ async def test_invalid_start_byte(invalid_start_byte, device_ind_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
m.close()
@pytest.mark.asyncio
async def test_invalid_stop_byte(invalid_stop_byte):
def test_invalid_stop_byte(invalid_stop_byte):
# received a message with wrong stop byte
# the complete receive buffer must be cleared to
# find the next valid message
Config.act_config = {'solarman':{'enabled': True}}
m = MemoryStream(invalid_stop_byte, (0,))
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since start byte is wrong
@@ -920,11 +914,9 @@ async def test_invalid_stop_byte(invalid_stop_byte):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
m.close()
@pytest.mark.asyncio
async def test_invalid_stop_byte2(invalid_stop_byte, device_ind_msg):
def test_invalid_stop_byte2(invalid_stop_byte, device_ind_msg):
# received a message with wrong stop byte plus an valid message
# only the first message must be discarded
Config.act_config = {'solarman':{'enabled': True}}
m = MemoryStream(invalid_stop_byte, (0,))
m.append_msg(device_ind_msg)
@@ -947,13 +939,11 @@ async def test_invalid_stop_byte2(invalid_stop_byte, device_ind_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
m.close()
@pytest.mark.asyncio
async def test_invalid_stop_start_byte(invalid_stop_byte, invalid_start_byte):
def test_invalid_stop_start_byte(invalid_stop_byte, invalid_start_byte):
# received a message with wrong stop byte plus an invalid message
# with fron start byte
# the complete receive buffer must be cleared to
# find the next valid message
Config.act_config = {'solarman':{'enabled': True}}
m = MemoryStream(invalid_stop_byte, (0,))
m.append_msg(invalid_start_byte)
m.read() # read complete msg, and dispatch msg
@@ -971,11 +961,9 @@ async def test_invalid_stop_start_byte(invalid_stop_byte, invalid_start_byte):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
m.close()
@pytest.mark.asyncio
async def test_invalid_checksum(invalid_checksum, device_ind_msg):
def test_invalid_checksum(invalid_checksum, device_ind_msg):
# received a message with wrong checksum plus an valid message
# only the first message must be discarded
Config.act_config = {'solarman':{'enabled': True}}
m = MemoryStream(invalid_checksum, (0,))
m.append_msg(device_ind_msg)
@@ -997,8 +985,7 @@ async def test_invalid_checksum(invalid_checksum, device_ind_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
m.close()
@pytest.mark.asyncio
async def test_read_message_twice(config_no_tsun_inv1, device_ind_msg, device_rsp_msg):
def test_read_message_twice(config_no_tsun_inv1, device_ind_msg, device_rsp_msg):
_ = config_no_tsun_inv1
m = MemoryStream(device_ind_msg, (0,))
m.append_msg(device_ind_msg)
@@ -1019,9 +1006,7 @@ async def test_read_message_twice(config_no_tsun_inv1, device_ind_msg, device_rs
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_read_message_in_chunks(device_ind_msg):
Config.act_config = {'solarman':{'enabled': True}}
def test_read_message_in_chunks(device_ind_msg):
m = MemoryStream(device_ind_msg, (4,11,0))
m.read() # read 4 bytes, header incomplere
assert not m.header_valid # must be invalid, since header not complete
@@ -1042,8 +1027,7 @@ async def test_read_message_in_chunks(device_ind_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_read_message_in_chunks2(my_loop, config_tsun_inv1, device_ind_msg):
def test_read_message_in_chunks2(config_tsun_inv1, device_ind_msg):
_ = config_tsun_inv1
m = MemoryStream(device_ind_msg, (4,10,0))
m.read() # read 4 bytes, header incomplere
@@ -1068,8 +1052,7 @@ async def test_read_message_in_chunks2(my_loop, config_tsun_inv1, device_ind_msg
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_read_two_messages(my_loop, config_tsun_allow_all, device_ind_msg, device_rsp_msg, inverter_ind_msg, inverter_rsp_msg):
def test_read_two_messages(config_tsun_allow_all, device_ind_msg, device_rsp_msg, inverter_ind_msg, inverter_rsp_msg):
_ = config_tsun_allow_all
m = MemoryStream(device_ind_msg, (0,))
m.append_msg(inverter_ind_msg)
@@ -1097,8 +1080,7 @@ async def test_read_two_messages(my_loop, config_tsun_allow_all, device_ind_msg,
assert m.ifc.tx_fifo.get()==b''
m.close()
@pytest.mark.asyncio
async def test_read_two_messages2(my_loop, config_tsun_allow_all, inverter_ind_msg, inverter_ind_msg_81, inverter_rsp_msg, inverter_rsp_msg_81):
def test_read_two_messages2(config_tsun_allow_all, inverter_ind_msg, inverter_ind_msg_81, inverter_rsp_msg, inverter_rsp_msg_81):
_ = config_tsun_allow_all
m = MemoryStream(inverter_ind_msg, (0,))
m.append_msg(inverter_ind_msg_81)
@@ -1123,8 +1105,7 @@ async def test_read_two_messages2(my_loop, config_tsun_allow_all, inverter_ind_m
assert m.ifc.tx_fifo.get()==b''
m.close()
@pytest.mark.asyncio
async def test_read_two_messages3(my_loop, config_tsun_allow_all, device_ind_msg2, device_rsp_msg2, inverter_ind_msg, inverter_rsp_msg):
def test_read_two_messages3(config_tsun_allow_all, device_ind_msg2, device_rsp_msg2, inverter_ind_msg, inverter_rsp_msg):
# test device message received after the inverter masg
_ = config_tsun_allow_all
m = MemoryStream(inverter_ind_msg, (0,))
@@ -1153,8 +1134,7 @@ async def test_read_two_messages3(my_loop, config_tsun_allow_all, device_ind_msg
assert m.ifc.tx_fifo.get()==b''
m.close()
@pytest.mark.asyncio
async def test_read_two_messages4(my_loop, config_tsun_dcu1, dcu_dev_ind_msg, dcu_dev_rsp_msg, dcu_data_ind_msg, dcu_data_rsp_msg):
def test_read_two_messages4(config_tsun_dcu1, dcu_dev_ind_msg, dcu_dev_rsp_msg, dcu_data_ind_msg, dcu_data_rsp_msg):
_ = config_tsun_dcu1
m = MemoryStream(dcu_dev_ind_msg, (0,))
m.append_msg(dcu_data_ind_msg)
@@ -1182,8 +1162,7 @@ async def test_read_two_messages4(my_loop, config_tsun_dcu1, dcu_dev_ind_msg, dc
assert m.ifc.tx_fifo.get()==b''
m.close()
@pytest.mark.asyncio
async def test_unkown_frame_code(my_loop, config_tsun_inv1, inverter_ind_msg_81, inverter_rsp_msg_81):
def test_unkown_frame_code(config_tsun_inv1, inverter_ind_msg_81, inverter_rsp_msg_81):
_ = config_tsun_inv1
m = MemoryStream(inverter_ind_msg_81, (0,))
m.read() # read complete msg, and dispatch msg
@@ -1201,8 +1180,7 @@ async def test_unkown_frame_code(my_loop, config_tsun_inv1, inverter_ind_msg_81,
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_unkown_message(my_loop, config_tsun_inv1, unknown_msg):
def test_unkown_message(config_tsun_inv1, unknown_msg):
_ = config_tsun_inv1
m = MemoryStream(unknown_msg, (0,))
m.read() # read complete msg, and dispatch msg
@@ -1220,8 +1198,7 @@ async def test_unkown_message(my_loop, config_tsun_inv1, unknown_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_device_rsp(my_loop, config_tsun_inv1, device_rsp_msg):
def test_device_rsp(config_tsun_inv1, device_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(device_rsp_msg, (0,), False)
m.read() # read complete msg, and dispatch msg
@@ -1239,8 +1216,7 @@ async def test_device_rsp(my_loop, config_tsun_inv1, device_rsp_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_inverter_rsp(my_loop, config_tsun_inv1, inverter_rsp_msg):
def test_inverter_rsp(config_tsun_inv1, inverter_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(inverter_rsp_msg, (0,), False)
m.read() # read complete msg, and dispatch msg
@@ -1258,8 +1234,7 @@ async def test_inverter_rsp(my_loop, config_tsun_inv1, inverter_rsp_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_heartbeat_ind(my_loop, config_tsun_inv1, heartbeat_ind_msg, heartbeat_rsp_msg):
def test_heartbeat_ind(config_tsun_inv1, heartbeat_ind_msg, heartbeat_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(heartbeat_ind_msg, (0,))
m.read() # read complete msg, and dispatch msg
@@ -1276,8 +1251,7 @@ async def test_heartbeat_ind(my_loop, config_tsun_inv1, heartbeat_ind_msg, heart
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_heartbeat_ind2(my_loop, config_tsun_inv1, heartbeat_ind_msg, heartbeat_rsp_msg):
def test_heartbeat_ind2(config_tsun_inv1, heartbeat_ind_msg, heartbeat_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(heartbeat_ind_msg, (0,))
m.no_forwarding = True
@@ -1295,8 +1269,7 @@ async def test_heartbeat_ind2(my_loop, config_tsun_inv1, heartbeat_ind_msg, hear
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_heartbeat_rsp(my_loop, config_tsun_inv1, heartbeat_rsp_msg):
def test_heartbeat_rsp(config_tsun_inv1, heartbeat_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(heartbeat_rsp_msg, (0,), False)
m.read() # read complete msg, and dispatch msg
@@ -1314,8 +1287,7 @@ async def test_heartbeat_rsp(my_loop, config_tsun_inv1, heartbeat_rsp_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_sync_start_ind(my_loop, config_tsun_inv1, sync_start_ind_msg, sync_start_rsp_msg, sync_start_fwd_msg):
def test_sync_start_ind(config_tsun_inv1, sync_start_ind_msg, sync_start_rsp_msg, sync_start_fwd_msg):
_ = config_tsun_inv1
m = MemoryStream(sync_start_ind_msg, (0,))
m.read() # read complete msg, and dispatch msg
@@ -1338,8 +1310,7 @@ async def test_sync_start_ind(my_loop, config_tsun_inv1, sync_start_ind_msg, syn
m.close()
@pytest.mark.asyncio
async def test_sync_start_rsp(my_loop, config_tsun_inv1, sync_start_rsp_msg):
def test_sync_start_rsp(config_tsun_inv1, sync_start_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(sync_start_rsp_msg, (0,), False)
m.read() # read complete msg, and dispatch msg
@@ -1357,8 +1328,7 @@ async def test_sync_start_rsp(my_loop, config_tsun_inv1, sync_start_rsp_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_sync_end_ind(my_loop, config_tsun_inv1, sync_end_ind_msg, sync_end_rsp_msg):
def test_sync_end_ind(config_tsun_inv1, sync_end_ind_msg, sync_end_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(sync_end_ind_msg, (0,))
m.read() # read complete msg, and dispatch msg
@@ -1375,8 +1345,7 @@ async def test_sync_end_ind(my_loop, config_tsun_inv1, sync_end_ind_msg, sync_en
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_sync_end_rsp(my_loop, config_tsun_inv1, sync_end_rsp_msg):
def test_sync_end_rsp(config_tsun_inv1, sync_end_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(sync_end_rsp_msg, (0,), False)
m.read() # read complete msg, and dispatch msg
@@ -1394,8 +1363,7 @@ async def test_sync_end_rsp(my_loop, config_tsun_inv1, sync_end_rsp_msg):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_build_modell_600(my_loop, config_tsun_allow_all, inverter_ind_msg):
def test_build_modell_600(config_tsun_allow_all, inverter_ind_msg):
_ = config_tsun_allow_all
m = MemoryStream(inverter_ind_msg, (0,))
assert 0 == m.sensor_list
@@ -1414,8 +1382,7 @@ async def test_build_modell_600(my_loop, config_tsun_allow_all, inverter_ind_msg
assert m.ifc.tx_fifo.get()==b''
m.close()
@pytest.mark.asyncio
async def test_build_modell_1600(my_loop, config_tsun_allow_all, inverter_ind_msg1600):
def test_build_modell_1600(config_tsun_allow_all, inverter_ind_msg1600):
_ = config_tsun_allow_all
m = MemoryStream(inverter_ind_msg1600, (0,))
assert 0 == m.db.get_db_value(Register.MAX_DESIGNED_POWER, 0)
@@ -1427,8 +1394,7 @@ async def test_build_modell_1600(my_loop, config_tsun_allow_all, inverter_ind_ms
assert 'TSOL-MS1600' == m.db.get_db_value(Register.EQUIPMENT_MODEL, 0)
m.close()
@pytest.mark.asyncio
async def test_build_modell_1800(my_loop, config_tsun_allow_all, inverter_ind_msg1800):
def test_build_modell_1800(config_tsun_allow_all, inverter_ind_msg1800):
_ = config_tsun_allow_all
m = MemoryStream(inverter_ind_msg1800, (0,))
assert 0 == m.db.get_db_value(Register.MAX_DESIGNED_POWER, 0)
@@ -1440,8 +1406,7 @@ async def test_build_modell_1800(my_loop, config_tsun_allow_all, inverter_ind_ms
assert 'TSOL-MS1800' == m.db.get_db_value(Register.EQUIPMENT_MODEL, 0)
m.close()
@pytest.mark.asyncio
async def test_build_modell_2000(my_loop, config_tsun_allow_all, inverter_ind_msg2000):
def test_build_modell_2000(config_tsun_allow_all, inverter_ind_msg2000):
_ = config_tsun_allow_all
m = MemoryStream(inverter_ind_msg2000, (0,))
assert 0 == m.db.get_db_value(Register.MAX_DESIGNED_POWER, 0)
@@ -1453,8 +1418,7 @@ async def test_build_modell_2000(my_loop, config_tsun_allow_all, inverter_ind_ms
assert 'TSOL-MS2000' == m.db.get_db_value(Register.EQUIPMENT_MODEL, 0)
m.close()
@pytest.mark.asyncio
async def test_build_modell_800(my_loop, config_tsun_allow_all, inverter_ind_msg800):
def test_build_modell_800(config_tsun_allow_all, inverter_ind_msg800):
_ = config_tsun_allow_all
m = MemoryStream(inverter_ind_msg800, (0,))
assert 0 == m.db.get_db_value(Register.MAX_DESIGNED_POWER, 0)
@@ -1466,8 +1430,7 @@ async def test_build_modell_800(my_loop, config_tsun_allow_all, inverter_ind_msg
assert 'TSOL-MSxx00' == m.db.get_db_value(Register.EQUIPMENT_MODEL, 0)
m.close()
@pytest.mark.asyncio
async def test_build_logger_modell(my_loop, config_tsun_allow_all, device_ind_msg):
def test_build_logger_modell(config_tsun_allow_all, device_ind_msg):
_ = config_tsun_allow_all
m = MemoryStream(device_ind_msg, (0,))
assert 0 == m.db.get_db_value(Register.COLLECTOR_FW_VERSION, 0)
@@ -1478,8 +1441,7 @@ async def test_build_logger_modell(my_loop, config_tsun_allow_all, device_ind_ms
assert 'V1.1.00.0B' == m.db.get_db_value(Register.COLLECTOR_FW_VERSION, 0).rstrip('\00')
m.close()
@pytest.mark.asyncio
async def test_msg_iterator(my_loop, config_tsun_inv1):
def test_msg_iterator():
Message._registry.clear()
m1 = SolarmanV5(None, ('test1.local', 1234), ifc=AsyncIfcImpl(), server_side=True, client_mode=False)
m2 = SolarmanV5(None, ('test2.local', 1234), ifc=AsyncIfcImpl(), server_side=True, client_mode=False)
@@ -1500,8 +1462,7 @@ async def test_msg_iterator(my_loop, config_tsun_inv1):
assert test1 == 1
assert test2 == 1
@pytest.mark.asyncio
async def test_proxy_counter(my_loop, config_tsun_inv1):
def test_proxy_counter():
m = SolarmanV5(None, ('test.local', 1234), ifc=AsyncIfcImpl(), server_side=True, client_mode=False)
assert m.new_data == {}
m.db.stat['proxy']['Unknown_Msg'] = 0
@@ -1520,7 +1481,7 @@ async def test_proxy_counter(my_loop, config_tsun_inv1):
m.close()
@pytest.mark.asyncio
async def test_msg_build_modbus_req(my_loop, config_tsun_inv1, device_ind_msg, device_rsp_msg, inverter_ind_msg, inverter_rsp_msg, msg_modbus_cmd):
async def test_msg_build_modbus_req(config_tsun_inv1, device_ind_msg, device_rsp_msg, inverter_ind_msg, inverter_rsp_msg, msg_modbus_cmd):
_ = config_tsun_inv1
m = MemoryStream(device_ind_msg, (0,), True)
m.read()
@@ -1555,7 +1516,7 @@ async def test_msg_build_modbus_req(my_loop, config_tsun_inv1, device_ind_msg, d
m.close()
@pytest.mark.asyncio
async def test_at_cmd(my_loop, config_tsun_allow_all, device_ind_msg, device_rsp_msg, inverter_ind_msg, inverter_rsp_msg, at_command_ind_msg, at_command_rsp_msg):
async def test_at_cmd(config_tsun_allow_all, device_ind_msg, device_rsp_msg, inverter_ind_msg, inverter_rsp_msg, at_command_ind_msg, at_command_rsp_msg):
_ = config_tsun_allow_all
m = MemoryStream(device_ind_msg, (0,), True)
m.read() # read device ind
@@ -1615,7 +1576,7 @@ async def test_at_cmd(my_loop, config_tsun_allow_all, device_ind_msg, device_rsp
m.close()
@pytest.mark.asyncio
async def test_at_cmd_blocked(my_loop, config_tsun_allow_all, device_ind_msg, device_rsp_msg, inverter_ind_msg, inverter_rsp_msg, at_command_ind_msg):
async def test_at_cmd_blocked(config_tsun_allow_all, device_ind_msg, device_rsp_msg, inverter_ind_msg, inverter_rsp_msg, at_command_ind_msg):
_ = config_tsun_allow_all
m = MemoryStream(device_ind_msg, (0,), True)
m.read()
@@ -1649,8 +1610,7 @@ async def test_at_cmd_blocked(my_loop, config_tsun_allow_all, device_ind_msg, de
assert Proxy.mqtt.data == "'AT+WEBU' is forbidden"
m.close()
@pytest.mark.asyncio
async def test_at_cmd_ind(my_loop, config_tsun_inv1, at_command_ind_msg, at_command_rsp_msg):
def test_at_cmd_ind(config_tsun_inv1, at_command_ind_msg, at_command_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(at_command_ind_msg, (0,), False)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -1685,8 +1645,7 @@ async def test_at_cmd_ind(my_loop, config_tsun_inv1, at_command_ind_msg, at_comm
m.close()
@pytest.mark.asyncio
async def test_at_cmd_ind_block(my_loop, config_tsun_inv1, at_command_ind_msg_block):
def test_at_cmd_ind_block(config_tsun_inv1, at_command_ind_msg_block):
_ = config_tsun_inv1
m = MemoryStream(at_command_ind_msg_block, (0,), False)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -1714,8 +1673,7 @@ async def test_at_cmd_ind_block(my_loop, config_tsun_inv1, at_command_ind_msg_bl
assert Proxy.mqtt.data == ""
m.close()
@pytest.mark.asyncio
async def test_msg_at_command_rsp1(my_loop, config_tsun_inv1, at_command_rsp_msg):
def test_msg_at_command_rsp1(config_tsun_inv1, at_command_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(at_command_rsp_msg)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -1734,8 +1692,7 @@ async def test_msg_at_command_rsp1(my_loop, config_tsun_inv1, at_command_rsp_msg
assert m.db.stat['proxy']['Modbus_Command'] == 0
m.close()
@pytest.mark.asyncio
async def test_msg_at_command_rsp2(my_loop, config_tsun_inv1, at_command_rsp_msg):
def test_msg_at_command_rsp2(config_tsun_inv1, at_command_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(at_command_rsp_msg)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -1756,8 +1713,7 @@ async def test_msg_at_command_rsp2(my_loop, config_tsun_inv1, at_command_rsp_msg
assert Proxy.mqtt.data == "+ok"
m.close()
@pytest.mark.asyncio
async def test_msg_at_command_rsp3(my_loop, config_tsun_inv1, at_command_interim_rsp_msg):
def test_msg_at_command_rsp3(config_tsun_inv1, at_command_interim_rsp_msg):
_ = config_tsun_inv1
m = MemoryStream(at_command_interim_rsp_msg)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -1782,8 +1738,7 @@ async def test_msg_at_command_rsp3(my_loop, config_tsun_inv1, at_command_interim
assert Proxy.mqtt.data == ""
m.close()
@pytest.mark.asyncio
async def test_msg_modbus_req(my_loop, config_tsun_inv1, msg_modbus_cmd, msg_modbus_cmd_fwd):
def test_msg_modbus_req(config_tsun_inv1, msg_modbus_cmd, msg_modbus_cmd_fwd):
_ = config_tsun_inv1
m = MemoryStream(b'')
m.snr = get_sn_int()
@@ -1811,8 +1766,7 @@ async def test_msg_modbus_req(my_loop, config_tsun_inv1, msg_modbus_cmd, msg_mod
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_msg_modbus_req_seq(my_loop, config_tsun_inv1, msg_modbus_cmd_seq):
def test_msg_modbus_req_seq(config_tsun_inv1, msg_modbus_cmd_seq):
_ = config_tsun_inv1
m = MemoryStream(b'')
m.snr = get_sn_int()
@@ -1840,8 +1794,7 @@ async def test_msg_modbus_req_seq(my_loop, config_tsun_inv1, msg_modbus_cmd_seq)
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_msg_modbus_req2(my_loop, config_tsun_inv1, msg_modbus_cmd_crc_err):
def test_msg_modbus_req2(config_tsun_inv1, msg_modbus_cmd_crc_err):
_ = config_tsun_inv1
m = MemoryStream(b'')
m.snr = get_sn_int()
@@ -1868,8 +1821,7 @@ async def test_msg_modbus_req2(my_loop, config_tsun_inv1, msg_modbus_cmd_crc_err
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1
m.close()
@pytest.mark.asyncio
async def test_msg_unknown_cmd_req(my_loop, config_tsun_inv1, msg_unknown_cmd):
def test_msg_unknown_cmd_req(config_tsun_inv1, msg_unknown_cmd):
_ = config_tsun_inv1
m = MemoryStream(msg_unknown_cmd, (0,), False)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -1891,8 +1843,7 @@ async def test_msg_unknown_cmd_req(my_loop, config_tsun_inv1, msg_unknown_cmd):
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
m.close()
@pytest.mark.asyncio
async def test_msg_modbus_rsp1(my_loop, config_tsun_inv1, msg_modbus_rsp):
def test_msg_modbus_rsp1(config_tsun_inv1, msg_modbus_rsp):
'''Modbus response without a valid Modbus request must be dropped'''
_ = config_tsun_inv1
m = MemoryStream(msg_modbus_rsp)
@@ -1911,8 +1862,7 @@ async def test_msg_modbus_rsp1(my_loop, config_tsun_inv1, msg_modbus_rsp):
assert m.db.stat['proxy']['Modbus_Command'] == 0
m.close()
@pytest.mark.asyncio
async def test_msg_modbus_rsp2(my_loop, config_tsun_inv1, msg_modbus_rsp):
def test_msg_modbus_rsp2(config_tsun_inv1, msg_modbus_rsp):
'''Modbus response with a valid Modbus request must be forwarded'''
_ = config_tsun_inv1 # setup config structure
m = MemoryStream(msg_modbus_rsp)
@@ -1949,8 +1899,7 @@ async def test_msg_modbus_rsp2(my_loop, config_tsun_inv1, msg_modbus_rsp):
m.close()
@pytest.mark.asyncio
async def test_msg_modbus_rsp3(my_loop, config_tsun_inv1, msg_modbus_rsp):
def test_msg_modbus_rsp3(config_tsun_inv1, msg_modbus_rsp):
'''Modbus response with a valid Modbus request must be forwarded'''
_ = config_tsun_inv1
m = MemoryStream(msg_modbus_rsp)
@@ -1986,8 +1935,7 @@ async def test_msg_modbus_rsp3(my_loop, config_tsun_inv1, msg_modbus_rsp):
m.close()
@pytest.mark.asyncio
async def test_msg_unknown_rsp(my_loop, config_tsun_inv1, msg_unknown_cmd_rsp):
def test_msg_unknown_rsp(config_tsun_inv1, msg_unknown_cmd_rsp):
_ = config_tsun_inv1
m = MemoryStream(msg_unknown_cmd_rsp)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -2005,8 +1953,7 @@ async def test_msg_unknown_rsp(my_loop, config_tsun_inv1, msg_unknown_cmd_rsp):
assert m.db.stat['proxy']['Modbus_Command'] == 0
m.close()
@pytest.mark.asyncio
async def test_msg_modbus_invalid(my_loop, config_tsun_inv1, msg_modbus_invalid):
def test_msg_modbus_invalid(config_tsun_inv1, msg_modbus_invalid):
_ = config_tsun_inv1
m = MemoryStream(msg_modbus_invalid, (0,), False)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -2020,8 +1967,7 @@ async def test_msg_modbus_invalid(my_loop, config_tsun_inv1, msg_modbus_invalid)
assert m.db.stat['proxy']['Modbus_Command'] == 0
m.close()
@pytest.mark.asyncio
async def test_msg_modbus_fragment(my_loop, config_tsun_inv1, msg_modbus_rsp):
def test_msg_modbus_fragment(config_tsun_inv1, msg_modbus_rsp):
_ = config_tsun_inv1
# receive more bytes than expected (7 bytes from the next msg)
m = MemoryStream(msg_modbus_rsp+b'\x00\x00\x00\x45\x10\x52\x31', (0,))
@@ -2047,7 +1993,7 @@ async def test_msg_modbus_fragment(my_loop, config_tsun_inv1, msg_modbus_rsp):
m.close()
@pytest.mark.asyncio
async def test_modbus_polling(my_loop, config_tsun_inv1, heartbeat_ind_msg, heartbeat_rsp_msg):
async def test_modbus_polling(config_tsun_inv1, heartbeat_ind_msg, heartbeat_rsp_msg):
_ = config_tsun_inv1
assert asyncio.get_running_loop()
m = MemoryStream(heartbeat_ind_msg, (0,))
@@ -2160,7 +2106,7 @@ async def test_modbus_scaning(config_tsun_scan, heartbeat_ind_msg, heartbeat_rsp
m.close()
@pytest.mark.asyncio
async def test_start_client_mode(my_loop, config_tsun_inv1, str_test_ip):
async def test_start_client_mode(config_tsun_inv1, str_test_ip):
_ = config_tsun_inv1
assert asyncio.get_running_loop()
m = MemoryStream(b'')
@@ -2264,8 +2210,7 @@ async def test_start_client_mode_scan(config_tsun_scan_dcu, str_test_ip, dcu_mod
m.close()
@pytest.mark.asyncio
async def test_timeout(my_loop, config_tsun_inv1):
def test_timeout(config_tsun_inv1):
_ = config_tsun_inv1
m = MemoryStream(b'')
assert m.state == State.init
@@ -2278,8 +2223,7 @@ async def test_timeout(my_loop, config_tsun_inv1):
m.state = State.closed
m.close()
@pytest.mark.asyncio
async def test_fnc_dispatch(my_loop, config_tsun_inv1):
def test_fnc_dispatch():
def msg():
return
@@ -2300,8 +2244,7 @@ async def test_fnc_dispatch(my_loop, config_tsun_inv1):
assert _obj == m.msg_unknown
assert _str == "'msg_unknown'"
@pytest.mark.asyncio
async def test_timestamp(my_loop, config_tsun_inv1):
def test_timestamp():
m = MemoryStream(b'')
ts = m._timestamp()
ts_emu = m._emu_timestamp()
@@ -2328,7 +2271,7 @@ class InverterTest(InverterBase):
@pytest.mark.asyncio
async def test_proxy_at_cmd(my_loop, config_tsun_inv1, patch_open_connection, at_command_ind_msg, at_command_rsp_msg):
async def test_proxy_at_cmd(config_tsun_inv1, patch_open_connection, at_command_ind_msg, at_command_rsp_msg):
_ = config_tsun_inv1
_ = patch_open_connection
assert asyncio.get_running_loop()
@@ -2366,7 +2309,7 @@ async def test_proxy_at_cmd(my_loop, config_tsun_inv1, patch_open_connection, at
assert Proxy.mqtt.data == ""
@pytest.mark.asyncio
async def test_proxy_at_blocked(my_loop, config_tsun_inv1, patch_open_connection, at_command_ind_msg_block, at_command_rsp_msg):
async def test_proxy_at_blocked(config_tsun_inv1, patch_open_connection, at_command_ind_msg_block, at_command_rsp_msg):
_ = config_tsun_inv1
_ = patch_open_connection
assert asyncio.get_running_loop()

View File

@@ -9,9 +9,6 @@ from infos import Infos, Register
from test_solarman import FakeIfc, FakeInverter, MemoryStream, get_sn_int, get_sn, correct_checksum, config_tsun_inv1, msg_modbus_rsp
from test_infos_g3p import str_test_ip, bytes_test_ip
pytest_plugins = ('pytest_asyncio',)
timestamp = 0x3224c8bc
class InvStream(MemoryStream):
@@ -128,17 +125,17 @@ def heartbeat_ind():
msg = b'\xa5\x01\x00\x10G\x00\x01\x00\x00\x00\x00\x00Y\x15'
return msg
@pytest.mark.asyncio
async def test_emu_init_close(my_loop, config_tsun_inv1):
_ = config_tsun_inv1
assert asyncio.get_running_loop()
def test_emu_init_close():
# received a message with wrong start byte plus an valid message
# the complete receive buffer must be cleared to
# find the next valid message
inv = InvStream()
cld = CldStream(inv)
cld.close()
@pytest.mark.asyncio
async def test_emu_start(my_loop, config_tsun_inv1, msg_modbus_rsp, str_test_ip, device_ind_msg):
async def test_emu_start(config_tsun_inv1, msg_modbus_rsp, str_test_ip, device_ind_msg):
_ = config_tsun_inv1
assert asyncio.get_running_loop()
inv = InvStream(msg_modbus_rsp)
@@ -155,8 +152,7 @@ async def test_emu_start(my_loop, config_tsun_inv1, msg_modbus_rsp, str_test_ip,
assert inv.ifc.fwd_fifo.peek() == device_ind_msg
cld.close()
@pytest.mark.asyncio
async def test_snd_hb(my_loop, config_tsun_inv1, heartbeat_ind):
def test_snd_hb(config_tsun_inv1, heartbeat_ind):
_ = config_tsun_inv1
inv = InvStream()
cld = CldStream(inv)
@@ -167,7 +163,7 @@ async def test_snd_hb(my_loop, config_tsun_inv1, heartbeat_ind):
cld.close()
@pytest.mark.asyncio
async def test_snd_inv_data(my_loop, config_tsun_inv1, inverter_ind_msg, inverter_rsp_msg):
async def test_snd_inv_data(config_tsun_inv1, inverter_ind_msg, inverter_rsp_msg):
_ = config_tsun_inv1
inv = InvStream()
inv.db.set_db_def_value(Register.INVERTER_STATUS, 1)
@@ -209,7 +205,7 @@ async def test_snd_inv_data(my_loop, config_tsun_inv1, inverter_ind_msg, inverte
cld.close()
@pytest.mark.asyncio
async def test_rcv_invalid(my_loop, config_tsun_inv1, inverter_ind_msg, inverter_rsp_msg):
async def test_rcv_invalid(config_tsun_inv1, inverter_ind_msg, inverter_rsp_msg):
_ = config_tsun_inv1
inv = InvStream()
assert asyncio.get_running_loop() == inv.mb_timer.loop

View File

@@ -1048,8 +1048,7 @@ def msg_inverter_ms3000_ind(): # Data indication from the controller
msg += b'\x53\x00\x66' # | S.f'
return msg
@pytest.mark.asyncio
async def test_read_message(msg_contact_info):
def test_read_message(msg_contact_info):
Config.act_config = {'tsun':{'enabled': True}}
m = MemoryStream(msg_contact_info, (0,))
m.read() # read complete msg, and dispatch msg

View File

@@ -15,6 +15,7 @@ pytest_plugins = ('pytest_asyncio',)
@pytest.fixture(scope="session")
def client():
app.secret_key = 'super secret key'
Web(app, '../transfer', False)
return app.test_client()
@pytest.fixture

View File

@@ -30,4 +30,4 @@ cd /home/proxy || exit
export VERSION=$(cat /proxy-version.txt)
echo "Start Proxyserver..."
python3 server.py --rel_urls --json_config=/data/options.json --log_path=/homeassistant/tsun-proxy/logs/ --config_path=/homeassistant/tsun-proxy/ --log_backups=2
python3 server.py --rel_urls=True --json_config=/data/options.json --log_path=/homeassistant/tsun-proxy/logs/ --config_path=/homeassistant/tsun-proxy/ --log_backups=2