S allius/issue217 2 (#230)

* add some reader classes to get the configuration

* adapt unittests

* get config from json or toml file

* loop over all config readers to get the configuration

* rename config test files

* use relative paths for coverage test in vscode

* do not throw an error for missing config files

* remove obsolete tests

* use dotted key notation for pv sub dictonary

* log config reading progress

* remove create_config_toml.py

* remove obsolete tests for the ha_addon

* disable mosquitto tests if the server is down

* ignore main method for test coverage

* increase test coverage

* pytest-cov: use relative_files only on github, so coverage will work with vscode locally

* remove unneeded imports

* add missing test cases

* disable branch coverage, cause its not reachable
This commit is contained in:
Stefan Allius
2024-12-08 13:25:04 +01:00
committed by GitHub
parent ac7b02bde9
commit b335881500
22 changed files with 942 additions and 624 deletions

3
.cover_ghaction_rc Normal file
View File

@@ -0,0 +1,3 @@
[run]
branch = True
relative_files = True

View File

@@ -1,3 +1,2 @@
[run]
branch = True
relative_files = True

9
.env_example Normal file
View File

@@ -0,0 +1,9 @@
# example file for the .env file. The .env set private values
# which are needed for builing containers
# registry for debug an dev container
PRIVAT_CONTAINER_REGISTRY=docker.io/<user>/
# registry for official container (preview, rc, rel)
PUBLIC_CONTAINER_REGISTRY=ghcr.io/<user>/
PUBLIC_CR_KEY=

View File

@@ -54,7 +54,7 @@ jobs:
flake8 --exit-zero --ignore=C901,E121,E123,E126,E133,E226,E241,E242,E704,W503,W504,W505 --format=pylint --output-file=output_flake.txt --exclude=*.pyc app/src/
- name: Test with pytest
run: |
python -m pytest app ha_addons --cov=app/src --cov=ha_addons/ha_addon/rootfs/home --cov-report=xml
python -m pytest app --cov=app/src --cov-config=.cover_ghaction_rc --cov-report=xml
coverage report
- name: Analyze with SonarCloud
if: ${{ env.SONAR_TOKEN != 0 }}

16
.vscode/settings.json vendored
View File

@@ -1,22 +1,20 @@
{
"python.analysis.extraPaths": [
"app/src",
"app/tests",
".venv/lib",
"ha_addons/ha_addon/rootfs" ],
"app/src",
"app/tests",
".venv/lib",
],
"python.testing.pytestArgs": [
"-vvv",
"-vvv",
"--cov=app/src",
"--cov=ha_addons/ha_addon/rootfs",
"--cov-report=xml",
"app",
"system_tests",
"ha_addons"
"system_tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"flake8.args": [
"--extend-exclude=app/tests/*.py,system_tests/*.py,ha_addons/ha_addon/tests/*.py"
"--extend-exclude=app/tests/*.py,system_tests/*.py"
],
"sonarlint.connectedMode.project": {
"connectionId": "s-allius",

View File

@@ -1,24 +1,47 @@
'''Config module handles the proxy configuration in the config.toml file'''
'''Config module handles the proxy configuration'''
import tomllib
import shutil
import logging
from abc import ABC, abstractmethod
from schema import Schema, And, Or, Use, Optional
class ConfigIfc(ABC):
'''Abstract basis class for config readers'''
def __init__(self):
Config.add(self)
@abstractmethod
def get_config(cls) -> dict: # pragma: no cover
def get_config(self) -> dict: # pragma: no cover
'''get the unverified config from the reader'''
pass
@abstractmethod
def descr(self) -> str: # pragma: no cover
'''return a descriction of the source, e.g. the file name'''
pass
def _extend_key(self, conf, key, val):
'''split a dotted dict key into a hierarchical dict tree '''
lst = key.split('.')
d = conf
for i, idx in enumerate(lst, 1): # pragma: no branch
if i == len(lst):
d[idx] = val
break
if idx not in d:
d[idx] = {}
d = d[idx]
class Config():
'''Static class Config is reads and sanitize the config.
'''Static class Config build and sanitize the internal config dictenary.
Read config.toml file and sanitize it with read().
Get named parts of the config with get()'''
act_config = {}
def_config = {}
Using config readers, a partial configuration is added to config.
Config readers are a derivation of the abstract ConfigIfc reader.
When a config reader is instantiated, theits `get_config` method is
called automatically and afterwards the config will be merged.
'''
conf_schema = Schema({
'tsun': {
@@ -34,8 +57,10 @@ class Config():
'mqtt': {
'host': Use(str),
'port': And(Use(int), lambda n: 1024 <= n <= 65535),
'user': And(Use(str), Use(lambda s: s if len(s) > 0 else None)),
'passwd': And(Use(str), Use(lambda s: s if len(s) > 0 else None))
'user': Or(None, And(Use(str),
Use(lambda s: s if len(s) > 0 else None))),
'passwd': Or(None, And(Use(str),
Use(lambda s: s if len(s) > 0 else None)))
},
'ha': {
'auto_conf_prefix': Use(str),
@@ -99,52 +124,74 @@ class Config():
)
@classmethod
def init(cls, ifc: ConfigIfc, path='') -> None | str:
cls.ifc = ifc
cls.act_config = {}
def init(cls, def_reader: ConfigIfc) -> None | str:
'''Initialise the Proxy-Config
Copy the internal default config file into the config directory
and initialise the Config with the default configuration '''
cls.err = None
cls.def_config = {}
return cls.read(path)
try:
# make the default config transparaent by copying it
# in the config.example file
logging.debug('Copy Default Config to config.example.toml')
shutil.copy2("default_config.toml",
"config/config.example.toml")
except Exception:
pass
# read example config file as default configuration
try:
def_config = def_reader.get_config()
cls.def_config = cls.conf_schema.validate(def_config)
logging.info(f'Read from {def_reader.descr()} => ok')
except Exception as error:
cls.err = f'Config.read: {error}'
logging.error(
f"Can't read from {def_reader.descr()} => error\n {error}")
cls.act_config = cls.def_config.copy()
@classmethod
def read(cls, path) -> None | str:
'''Read config file, merge it with the default config
def add(cls, reader: ConfigIfc):
'''Merge the config from the Config Reader into the config
Checks if a default config exists. If no default configuration exists,
the Config.init method has not yet been called.This is normal for the very
first Config Reader which creates the default config and must be ignored
here. The default config reader is handled in the Config.init method'''
if hasattr(cls, 'def_config'):
cls.__parse(reader)
@classmethod
def get_error(cls) -> None | str:
'''return the last error as a string or None if there is no error'''
return cls.err
@classmethod
def __parse(cls, reader) -> None | str:
'''Read config from the reader, merge it with the default config
and sanitize the result'''
err = None
config = {}
logger = logging.getLogger('data')
res = 'ok'
try:
# read example config file as default configuration
cls.def_config = {}
with open(f"{path}default_config.toml", "rb") as f:
def_config = tomllib.load(f)
cls.def_config = cls.conf_schema.validate(def_config)
# overwrite the default values, with values from
# the config.toml file
usr_config = cls.ifc.get_config()
# merge the default and the user config
config = def_config.copy()
rd_config = reader.get_config()
config = cls.act_config.copy()
for key in ['tsun', 'solarman', 'mqtt', 'ha', 'inverters',
'gen3plus']:
if key in usr_config:
config[key] |= usr_config[key]
try:
cls.act_config = cls.conf_schema.validate(config)
except Exception as error:
err = f'Config.read: {error}'
logging.error(err)
# logging.debug(f'Readed config: "{cls.act_config}" ')
if key in rd_config:
config[key] = config[key] | rd_config[key]
cls.act_config = cls.conf_schema.validate(config)
except FileNotFoundError:
res = 'n/a'
except Exception as error:
err = f'Config.read: {error}'
logger.error(err)
cls.act_config = {}
cls.err = f'error: {error}'
logging.error(
f"Can't read from {reader.descr()} => error\n {error}")
return err
logging.info(f'Read from {reader.descr()} => {res}')
return cls.err
@classmethod
def get(cls, member: str = None):

View File

@@ -1,34 +0,0 @@
'''Config module handles the proxy configuration in the config.toml file'''
import shutil
import tomllib
import logging
from cnf.config import ConfigIfc
class ConfigIfcProxy(ConfigIfc):
def __init__(self): # pragma: no cover
try:
# make the default config transparaent by copying it
# in the config.example file
logging.info('Copy Default Config to config.example.toml')
shutil.copy2("default_config.toml",
"config/config.example.toml")
except Exception:
pass
def get_config(self, cnf_file="config/config.toml") -> dict:
usr_config = {}
try:
with open(cnf_file, "rb") as f:
usr_config = tomllib.load(f)
except Exception as error:
err = f'Config.read: {error}'
logging.error(err)
logging.info(
'\n To create the missing config.toml file, '
'you can rename the template config.example.toml\n'
' and customize it for your scenario.\n')
return usr_config

View File

@@ -0,0 +1,25 @@
'''Config Reader module which handles config values from the environment'''
import os
from cnf.config import ConfigIfc
class ConfigReadEnv(ConfigIfc):
'''Reader for environment values of the configuration'''
def get_config(self) -> dict:
conf = {}
data = [
('mqtt.host', 'MQTT_HOST'),
('mqtt.port', 'MQTT_PORT'),
('mqtt.user', 'MQTT_USER'),
('mqtt.passwd', 'MQTT_PASSWORD'),
]
for key, env_var in data:
val = os.getenv(env_var)
if val:
self._extend_key(conf, key, val)
return conf
def descr(self):
return "Read environment"

View File

@@ -0,0 +1,46 @@
'''Config Reader module which handles *.json config files'''
import json
from cnf.config import ConfigIfc
class ConfigReadJson(ConfigIfc):
'''Reader for json config files'''
def __init__(self, cnf_file='/data/options.json'):
'''Read a json file and add the settings to the config'''
if not isinstance(cnf_file, str):
return
self.cnf_file = cnf_file
super().__init__()
def convert_inv(self, conf, inv):
if 'serial' in inv:
snr = inv['serial']
del inv['serial']
conf[snr] = {}
for key, val in inv.items():
self._extend_key(conf[snr], key, val)
def convert_inv_arr(self, conf, key, val: list):
if key not in conf:
conf[key] = {}
for elm in val:
self.convert_inv(conf[key], elm)
def convert_to_obj(self, data):
conf = {}
for key, val in data.items():
if key == 'inverters' and isinstance(val, list):
self.convert_inv_arr(conf, key, val)
else:
self._extend_key(conf, key, val)
return conf
def get_config(self) -> dict:
with open(self.cnf_file) as f:
data = json.load(f)
return self.convert_to_obj(data)
def descr(self):
return self.cnf_file

View File

@@ -0,0 +1,21 @@
'''Config Reader module which handles *.toml config files'''
import tomllib
from cnf.config import ConfigIfc
class ConfigReadToml(ConfigIfc):
'''Reader for toml config files'''
def __init__(self, cnf_file):
'''Read a toml file and add the settings to the config'''
if not isinstance(cnf_file, str):
return
self.cnf_file = cnf_file
super().__init__()
def get_config(self) -> dict:
with open(self.cnf_file, "rb") as f:
return tomllib.load(f)
def descr(self):
return self.cnf_file

View File

@@ -2,6 +2,7 @@ import logging
import asyncio
import signal
import os
import argparse
from asyncio import StreamReader, StreamWriter
from aiohttp import web
from logging import config # noqa F401
@@ -11,7 +12,9 @@ from gen3.inverter_g3 import InverterG3
from gen3plus.inverter_g3p import InverterG3P
from scheduler import Schedule
from cnf.config import Config
from cnf.config_ifc_proxy import ConfigIfcProxy
from cnf.config_read_env import ConfigReadEnv
from cnf.config_read_toml import ConfigReadToml
from cnf.config_read_json import ConfigReadJson
from modbus_tcp import ModbusTcp
routes = web.RouteTableDef()
@@ -117,6 +120,8 @@ def get_log_level() -> int:
'''checks if LOG_LVL is set in the environment and returns the
corresponding logging.LOG_LEVEL'''
log_level = os.getenv('LOG_LVL', 'INFO')
logging.info(f"LOG_LVL : {log_level}")
if log_level == 'DEBUG':
log_level = logging.DEBUG
elif log_level == 'WARN':
@@ -126,7 +131,17 @@ def get_log_level() -> int:
return log_level
if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--config_path', type=str,
default='./config/',
help='set path for the configuration files')
parser.add_argument('-j', '--json_config', type=str,
help='read user config from json-file')
parser.add_argument('-t', '--toml_config', type=str,
help='read user config from toml-file')
parser.add_argument('--add_on', action='store_true')
args = parser.parse_args()
#
# Setup our daily, rotating logger
#
@@ -135,9 +150,14 @@ if __name__ == "__main__":
logging.config.fileConfig('logging.ini')
logging.info(f'Server "{serv_name} - {version}" will be started')
logging.info(f"AddOn: {args.add_on}")
logging.info(f"config_path: {args.config_path}")
logging.info(f"json_config: {args.json_config}")
logging.info(f"toml_config: {args.toml_config}")
log_level = get_log_level()
logging.info('******')
# set lowest-severity for 'root', 'msg', 'conn' and 'data' logger
log_level = get_log_level()
logging.getLogger().setLevel(log_level)
logging.getLogger('msg').setLevel(log_level)
logging.getLogger('conn').setLevel(log_level)
@@ -150,9 +170,18 @@ if __name__ == "__main__":
asyncio.set_event_loop(loop)
# read config file
ConfigErr = Config.init(ConfigIfcProxy())
Config.init(ConfigReadToml("default_config.toml"))
ConfigReadEnv()
ConfigReadJson(args.config_path + "config.json")
ConfigReadToml(args.config_path + "config.toml")
ConfigReadJson(args.json_config)
ConfigReadToml(args.toml_config)
ConfigErr = Config.get_error()
if ConfigErr is not None:
logging.info(f'ConfigErr: {ConfigErr}')
logging.info('******')
Proxy.class_init()
Schedule.start()
ModbusTcp(loop)

View File

@@ -1,8 +1,47 @@
# test_with_pytest.py
import pytest
import tomllib
import json
from mock import patch
from schema import SchemaMissingKeyError
from cnf.config import Config, ConfigIfc
from cnf.config_read_toml import ConfigReadToml
class FakeBuffer:
rd = str()
test_buffer = FakeBuffer
class FakeFile():
def __init__(self):
self.buf = test_buffer
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
pass
class FakeOptionsFile(FakeFile):
def __init__(self, OpenTextMode):
super().__init__()
self.bin_mode = 'b' in OpenTextMode
def read(self):
if self.bin_mode:
return bytearray(self.buf.rd.encode('utf-8')).copy()
else:
return self.buf.rd.copy()
def patch_open():
def new_open(file: str, OpenTextMode="rb"):
if file == "_no__file__no_":
raise FileNotFoundError
return FakeOptionsFile(OpenTextMode)
with patch('builtins.open', new_open) as conn:
yield conn
class TstConfig(ConfigIfc):
@@ -11,7 +50,7 @@ class TstConfig(ConfigIfc):
cls.act_config = cnf
@classmethod
def get_config(cls) -> dict:
def add_config(cls) -> dict:
return cls.act_config
@@ -23,6 +62,40 @@ def test_empty_config():
except SchemaMissingKeyError:
pass
@pytest.fixture
def ConfigDefault():
return {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
'inverters': {
'allow_all': False,
'R170000000000001': {
'suggested_area': '',
'modbus_polling': False,
'monitor_sn': 0,
'node_id': '',
'pv1': {'manufacturer': 'Risen',
'type': 'RSM40-8-395M'},
'pv2': {'manufacturer': 'Risen',
'type': 'RSM40-8-395M'},
'sensor_list': 688
},
'Y170000000000001': {
'modbus_polling': True,
'monitor_sn': 2000000000,
'suggested_area': '',
'node_id': '',
'pv1': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'pv2': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'pv3': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'pv4': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'sensor_list': 688
}
}
}
@pytest.fixture
def ConfigComplete():
return {
@@ -70,38 +143,9 @@ def ConfigComplete():
}
}
@pytest.fixture
def ConfigMinimum():
return {
'gen3plus': {
'at_acl': {
'mqtt': {'allow': ['AT+'], 'block': []},
'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'],
'block': []}
}
},
'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com',
'port': 5005},
'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000},
'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None},
'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
'inverters': {
'allow_all': True,
'R170000000000001': {'node_id': '',
'modbus_polling': True,
'monitor_sn': 0,
'suggested_area': '',
'sensor_list': 688}}}
def test_default_config():
with open("app/config/default_config.toml", "rb") as f:
cnf = tomllib.load(f)
try:
validated = Config.conf_schema.validate(cnf)
except Exception:
assert False
Config.init(ConfigReadToml("app/config/default_config.toml"))
validated = Config.def_config
assert validated == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
'inverters': {
'allow_all': False,
@@ -146,76 +190,53 @@ def test_full_config(ConfigComplete):
assert False
assert validated == ConfigComplete
def test_mininum_config(ConfigMinimum):
cnf = {'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005},
'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+']},
'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE']}}},
'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000},
'mqtt': {'host': 'mqtt', 'port': 1883, 'user': '', 'passwd': ''},
'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
'inverters': {'allow_all': True,
'R170000000000001': {}}
}
def test_read_empty(ConfigDefault):
test_buffer.rd = ""
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadToml("config/config.toml")
err = Config.get_error()
try:
validated = Config.conf_schema.validate(cnf)
except Exception:
assert False
assert validated == ConfigMinimum
def test_read_empty():
cnf = {}
err = Config.init(TstConfig(cnf), 'app/config/')
assert err == None
cnf = Config.get()
assert cnf == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
'inverters': {
'allow_all': False,
'R170000000000001': {
'suggested_area': '',
'modbus_polling': False,
'monitor_sn': 0,
'node_id': '',
'pv1': {'manufacturer': 'Risen',
'type': 'RSM40-8-395M'},
'pv2': {'manufacturer': 'Risen',
'type': 'RSM40-8-395M'},
'sensor_list': 688
},
'Y170000000000001': {
'modbus_polling': True,
'monitor_sn': 2000000000,
'suggested_area': '',
'node_id': '',
'pv1': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'pv2': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'pv3': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'pv4': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'sensor_list': 688
}
}
}
assert cnf == ConfigDefault
defcnf = Config.def_config.get('solarman')
assert defcnf == {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}
assert True == Config.is_default('solarman')
def test_no_file():
cnf = {}
err = Config.init(TstConfig(cnf), '')
Config.init(ConfigReadToml("default_config.toml"))
err = Config.get_error()
assert err == "Config.read: [Errno 2] No such file or directory: 'default_config.toml'"
cnf = Config.get()
assert cnf == {}
defcnf = Config.def_config.get('solarman')
assert defcnf == None
def test_no_file2():
Config.init(ConfigReadToml("app/config/default_config.toml"))
assert Config.err == None
ConfigReadToml("_no__file__no_")
err = Config.get_error()
assert err == None
def test_invalid_filename():
Config.init(ConfigReadToml("app/config/default_config.toml"))
assert Config.err == None
ConfigReadToml(None)
err = Config.get_error()
assert err == None
def test_read_cnf1():
cnf = {'solarman' : {'enabled': False}}
err = Config.init(TstConfig(cnf), 'app/config/')
test_buffer.rd = "solarman.enabled = false"
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadToml("config/config.toml")
err = Config.get_error()
assert err == None
cnf = Config.get()
assert cnf == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': False, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
@@ -256,8 +277,13 @@ def test_read_cnf1():
assert False == Config.is_default('solarman')
def test_read_cnf2():
cnf = {'solarman' : {'enabled': 'FALSE'}}
err = Config.init(TstConfig(cnf), 'app/config/')
test_buffer.rd = "solarman.enabled = 'FALSE'"
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadToml("config/config.toml")
err = Config.get_error()
assert err == None
cnf = Config.get()
assert cnf == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
@@ -293,16 +319,26 @@ def test_read_cnf2():
}
assert True == Config.is_default('solarman')
def test_read_cnf3():
cnf = {'solarman' : {'port': 'FALSE'}}
err = Config.init(TstConfig(cnf), 'app/config/')
assert err == 'Config.read: Key \'solarman\' error:\nKey \'port\' error:\nint(\'FALSE\') raised ValueError("invalid literal for int() with base 10: \'FALSE\'")'
def test_read_cnf3(ConfigDefault):
test_buffer.rd = "solarman.port = 'FALSE'"
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadToml("config/config.toml")
err = Config.get_error()
assert err == 'error: Key \'solarman\' error:\nKey \'port\' error:\nint(\'FALSE\') raised ValueError("invalid literal for int() with base 10: \'FALSE\'")'
cnf = Config.get()
assert cnf == {}
assert cnf == ConfigDefault
def test_read_cnf4():
cnf = {'solarman' : {'port': 5000}}
err = Config.init(TstConfig(cnf), 'app/config/')
test_buffer.rd = "solarman.port = 5000"
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadToml("config/config.toml")
err = Config.get_error()
assert err == None
cnf = Config.get()
assert cnf == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 5000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
@@ -339,11 +375,19 @@ def test_read_cnf4():
assert False == Config.is_default('solarman')
def test_read_cnf5():
cnf = {'solarman' : {'port': 1023}}
err = Config.init(TstConfig(cnf), 'app/config/')
test_buffer.rd = "solarman.port = 1023"
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadToml("config/config.toml")
err = Config.get_error()
assert err != None
def test_read_cnf6():
cnf = {'solarman' : {'port': 65536}}
err = Config.init(TstConfig(cnf), 'app/config/')
test_buffer.rd = "solarman.port = 65536"
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadToml("config/config.toml")
err = Config.get_error()
assert err != None

View File

@@ -1,53 +0,0 @@
# test_with_pytest.py
import tomllib
from schema import SchemaMissingKeyError
from cnf.config_ifc_proxy import ConfigIfcProxy
class CnfIfc(ConfigIfcProxy):
def __init__(self):
pass
def test_no_config():
cnf_ifc = CnfIfc()
cnf = cnf_ifc.get_config("")
assert cnf == {}
def test_get_config():
cnf_ifc = CnfIfc()
cnf = cnf_ifc.get_config("app/config/default_config.toml")
assert cnf == {
'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}},
'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005},
'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000},
'mqtt': {'host': 'mqtt', 'port': 1883, 'user': '', 'passwd': ''},
'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
'inverters': {
'allow_all': False,
'R170000000000001': {
'node_id': '',
'pv1': {'manufacturer': 'Risen',
'type': 'RSM40-8-395M'},
'pv2': {'manufacturer': 'Risen',
'type': 'RSM40-8-395M'},
'modbus_polling': False,
'suggested_area': ''
},
'Y170000000000001': {
'modbus_polling': True,
'monitor_sn': 2000000000,
'node_id': '',
'pv1': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'pv2': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'pv3': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'pv4': {'manufacturer': 'Risen',
'type': 'RSM40-8-410M'},
'suggested_area': ''
}
}
}

View File

@@ -0,0 +1,53 @@
# test_with_pytest.py
import pytest
import os
from mock import patch
from cnf.config import Config
from cnf.config_read_toml import ConfigReadToml
from cnf.config_read_env import ConfigReadEnv
def patch_getenv():
def new_getenv(key: str, defval=None):
"""Get an environment variable, return None if it doesn't exist.
The optional second argument can specify an alternate default. key,
default and the result are str."""
if key == 'MQTT_PASSWORD':
return 'passwd'
elif key == 'MQTT_PORT':
return 1234
elif key == 'MQTT_HOST':
return ""
return defval
with patch.object(os, 'getenv', new_getenv) as conn:
yield conn
def test_extend_key():
cnf_rd = ConfigReadEnv()
conf = {}
cnf_rd._extend_key(conf, "mqtt.user", "testuser")
assert conf == {
'mqtt': {
'user': 'testuser',
},
}
conf = {}
cnf_rd._extend_key(conf, "mqtt", "testuser")
assert conf == {
'mqtt': 'testuser',
}
conf = {}
cnf_rd._extend_key(conf, "", "testuser")
assert conf == {'': 'testuser'}
def test_read_env_config():
Config.init(ConfigReadToml("app/config/default_config.toml"))
assert Config.get('mqtt') == {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}
for _ in patch_getenv():
ConfigReadEnv()
assert Config.get_error() == None
assert Config.get('mqtt') == {'host': 'mqtt', 'port': 1234, 'user': None, 'passwd': 'passwd'}

View File

@@ -0,0 +1,404 @@
# test_with_pytest.py
import pytest
from mock import patch
from cnf.config import Config
from cnf.config_read_json import ConfigReadJson
from cnf.config_read_toml import ConfigReadToml
from test_config import ConfigDefault, ConfigComplete
class CnfIfc(ConfigReadJson):
def __init__(self):
pass
class FakeBuffer:
rd = str()
wr = str()
test_buffer = FakeBuffer
class FakeFile():
def __init__(self):
self.buf = test_buffer
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
pass
class FakeOptionsFile(FakeFile):
def __init__(self, OpenTextMode):
super().__init__()
self.bin_mode = 'b' in OpenTextMode
def read(self):
print(f"Fake.read: bmode:{self.bin_mode}")
if self.bin_mode:
return bytearray(self.buf.rd.encode('utf-8')).copy()
else:
print(f"Fake.read: str:{self.buf.rd}")
return self.buf.rd
def patch_open():
def new_open(file: str, OpenTextMode="r"):
if file == "_no__file__no_":
raise FileNotFoundError
return FakeOptionsFile(OpenTextMode)
with patch('builtins.open', new_open) as conn:
yield conn
@pytest.fixture
def ConfigTomlEmpty():
return {
'mqtt': {'host': 'mqtt', 'port': 1883, 'user': '', 'passwd': ''},
'ha': {'auto_conf_prefix': 'homeassistant',
'discovery_prefix': 'homeassistant',
'entity_prefix': 'tsun',
'proxy_node_id': 'proxy',
'proxy_unique_id': 'P170000000000001'},
'solarman': {
'enabled': True,
'host': 'iot.talent-monitoring.com',
'port': 10000,
},
'tsun': {
'enabled': True,
'host': 'logger.talent-monitoring.com',
'port': 5005,
},
'inverters': {
'allow_all': False
},
'gen3plus': {'at_acl': {'tsun': {'allow': [], 'block': []},
'mqtt': {'allow': [], 'block': []}}},
}
def test_no_config(ConfigDefault):
test_buffer.rd = "" # empty buffer, no json
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadJson()
err = Config.get_error()
assert err == 'error: Expecting value: line 1 column 1 (char 0)'
cnf = Config.get()
assert cnf == ConfigDefault
def test_no_file(ConfigDefault):
test_buffer.rd = "" # empty buffer, no json
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadJson("_no__file__no_")
err = Config.get_error()
assert err == None
cnf = Config.get()
assert cnf == ConfigDefault
def test_invalid_filename(ConfigDefault):
test_buffer.rd = "" # empty buffer, no json
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadJson(None)
err = Config.get_error()
assert err == None
cnf = Config.get()
assert cnf == ConfigDefault
def test_cnv1():
"""test dotted key converting"""
tst = {
"gen3plus.at_acl.mqtt.block": [
"AT+SUPDATE",
"AT+"
]
}
cnf = ConfigReadJson()
obj = cnf.convert_to_obj(tst)
assert obj == {
'gen3plus': {
'at_acl': {
'mqtt': {
'block': [
'AT+SUPDATE',
"AT+"
],
},
},
},
}
def test_cnv2():
"""test a valid list with serials in inverters"""
tst = {
"inverters": [
{
"serial": "R170000000000001",
},
{
"serial": "Y170000000000001",
}
],
}
cnf = ConfigReadJson()
obj = cnf.convert_to_obj(tst)
assert obj == {
'inverters': {
'R170000000000001': {},
'Y170000000000001': {}
},
}
def test_cnv3():
"""test the combination of a list and a scalar in inverters"""
tst = {
"inverters": [
{
"serial": "R170000000000001",
},
{
"serial": "Y170000000000001",
}
],
"inverters.allow_all": False,
}
cnf = ConfigReadJson()
obj = cnf.convert_to_obj(tst)
assert obj == {
'inverters': {
'R170000000000001': {},
'Y170000000000001': {},
'allow_all': False,
},
}
def test_cnv4():
tst = {
"inverters": [
{
"serial": "R170000000000001",
"node_id": "PV-Garage/",
"suggested_area": "Garage",
"modbus_polling": False,
"pv1_manufacturer": "man1",
"pv1_type": "type1",
"pv2_manufacturer": "man2",
"pv2_type": "type2",
"sensor_list": 688
},
{
"serial": "Y170000000000001",
"monitor_sn": 2000000000,
"node_id": "PV-Garage2/",
"suggested_area": "Garage2",
"modbus_polling": True,
"client_mode_host": "InverterIP",
"client_mode_port": 1234,
"pv1_manufacturer": "man1",
"pv1_type": "type1",
"pv2_manufacturer": "man2",
"pv2_type": "type2",
"pv3_manufacturer": "man3",
"pv3_type": "type3",
"pv4_manufacturer": "man4",
"pv4_type": "type4",
"sensor_list": 688
}
],
"tsun.enabled": True,
"solarman.enabled": True,
"inverters.allow_all": False,
"gen3plus.at_acl.tsun.allow": [
"AT+Z",
"AT+UPURL",
"AT+SUPDATE"
],
"gen3plus.at_acl.tsun.block": [
"AT+SUPDATE"
],
"gen3plus.at_acl.mqtt.allow": [
"AT+"
],
"gen3plus.at_acl.mqtt.block": [
"AT+SUPDATE"
]
}
cnf = ConfigReadJson()
obj = cnf.convert_to_obj(tst)
assert obj == {
'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': ['AT+SUPDATE']},
'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'],
'block': ['AT+SUPDATE']}}},
'inverters': {'R170000000000001': {'modbus_polling': False,
'node_id': 'PV-Garage/',
'pv1_manufacturer': 'man1',
'pv1_type': 'type1',
'pv2_manufacturer': 'man2',
'pv2_type': 'type2',
'sensor_list': 688,
'suggested_area': 'Garage'},
'Y170000000000001': {'client_mode_host': 'InverterIP',
'client_mode_port': 1234,
'modbus_polling': True,
'monitor_sn': 2000000000,
'node_id': 'PV-Garage2/',
'pv1_manufacturer': 'man1',
'pv1_type': 'type1',
'pv2_manufacturer': 'man2',
'pv2_type': 'type2',
'pv3_manufacturer': 'man3',
'pv3_type': 'type3',
'pv4_manufacturer': 'man4',
'pv4_type': 'type4',
'sensor_list': 688,
'suggested_area': 'Garage2'},
'allow_all': False},
'solarman': {'enabled': True},
'tsun': {'enabled': True}
}
def test_cnv5():
"""test a invalid list with missing serials"""
tst = {
"inverters": [
{
"node_id": "PV-Garage1/",
},
{
"serial": "Y170000000000001",
"node_id": "PV-Garage2/",
}
],
}
cnf = ConfigReadJson()
obj = cnf.convert_to_obj(tst)
assert obj == {
'inverters': {
'Y170000000000001': {'node_id': 'PV-Garage2/'}
},
}
def test_cnv6():
"""test overwritting a value in inverters"""
tst = {
"inverters": [{
"serial": "Y170000000000001",
"node_id": "PV-Garage2/",
}],
}
tst2 = {
"inverters": [{
"serial": "Y170000000000001",
"node_id": "PV-Garden/",
}],
}
cnf = ConfigReadJson()
conf = {}
for key, val in tst.items():
cnf.convert_inv_arr(conf, key, val)
assert conf == {
'inverters': {
'Y170000000000001': {'node_id': 'PV-Garage2/'}
},
}
for key, val in tst2.items():
cnf.convert_inv_arr(conf, key, val)
assert conf == {
'inverters': {
'Y170000000000001': {'node_id': 'PV-Garden/'}
},
}
def test_empty_config(ConfigDefault):
test_buffer.rd = "{}" # empty json
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadJson()
err = Config.get_error()
assert err == None
cnf = Config.get()
assert cnf == ConfigDefault
def test_full_config(ConfigComplete):
test_buffer.rd = """
{
"inverters": [
{
"serial": "R170000000000001",
"node_id": "PV-Garage/",
"suggested_area": "Garage",
"modbus_polling": false,
"pv1.manufacturer": "man1",
"pv1.type": "type1",
"pv2.manufacturer": "man2",
"pv2.type": "type2",
"sensor_list": 688
},
{
"serial": "Y170000000000001",
"monitor_sn": 2000000000,
"node_id": "PV-Garage2/",
"suggested_area": "Garage2",
"modbus_polling": true,
"client_mode_host": "InverterIP",
"client_mode_port": 1234,
"pv1.manufacturer": "man1",
"pv1.type": "type1",
"pv2.manufacturer": "man2",
"pv2.type": "type2",
"pv3.manufacturer": "man3",
"pv3.type": "type3",
"pv4.manufacturer": "man4",
"pv4.type": "type4",
"sensor_list": 688
}
],
"tsun.enabled": true,
"solarman.enabled": true,
"inverters.allow_all": false,
"gen3plus.at_acl.tsun.allow": [
"AT+Z",
"AT+UPURL",
"AT+SUPDATE"
],
"gen3plus.at_acl.tsun.block": [
"AT+SUPDATE"
],
"gen3plus.at_acl.mqtt.allow": [
"AT+"
],
"gen3plus.at_acl.mqtt.block": [
"AT+SUPDATE"
]
}
"""
Config.init(ConfigReadToml("app/config/default_config.toml"))
for _ in patch_open():
ConfigReadJson()
err = Config.get_error()
assert err == None
cnf = Config.get()
assert cnf == ConfigComplete

View File

@@ -12,6 +12,8 @@ from modbus import Modbus
from gen3plus.solarman_v5 import SolarmanV5
from cnf.config import Config
NO_MOSQUITTO_TEST = False
'''disable all tests with connections to test.mosquitto.org'''
pytest_plugins = ('pytest_asyncio',)
@@ -69,8 +71,12 @@ def spy_modbus_cmd_client():
def test_native_client(test_hostname, test_port):
"""Sanity check: Make sure the paho-mqtt client can connect to the test
MQTT server.
MQTT server. Otherwise the test set NO_MOSQUITTO_TEST to True and disable
all test cases which depends on the test.mosquitto.org server
"""
global NO_MOSQUITTO_TEST
if NO_MOSQUITTO_TEST:
pytest.skip('skipping, since Mosquitto is not reliable at the moment')
import paho.mqtt.client as mqtt
import threading
@@ -82,10 +88,62 @@ def test_native_client(test_hostname, test_port):
on_connect = threading.Event()
c.on_connect = Mock(side_effect=lambda *_: on_connect.set())
c.connect_async(test_hostname, test_port)
assert on_connect.wait(10)
if not on_connect.wait(3):
NO_MOSQUITTO_TEST = True # skip all mosquitto tests
pytest.skip('skipping, since Mosquitto is not reliable at the moment')
finally:
c.loop_stop()
@pytest.mark.asyncio
async def test_mqtt_connection(config_mqtt_conn):
global NO_MOSQUITTO_TEST
if NO_MOSQUITTO_TEST:
pytest.skip('skipping, since Mosquitto is not reliable at the moment')
_ = config_mqtt_conn
assert asyncio.get_running_loop()
on_connect = asyncio.Event()
async def cb():
on_connect.set()
try:
m = Mqtt(cb)
assert m.task
assert await asyncio.wait_for(on_connect.wait(), 5)
# await asyncio.sleep(1)
assert 0 == m.ha_restarts
await m.publish('homeassistant/status', 'online')
except TimeoutError:
assert False
finally:
await m.close()
await m.publish('homeassistant/status', 'online')
@pytest.mark.asyncio
async def test_ha_reconnect(config_mqtt_conn):
global NO_MOSQUITTO_TEST
if NO_MOSQUITTO_TEST:
pytest.skip('skipping, since Mosquitto is not reliable at the moment')
_ = config_mqtt_conn
on_connect = asyncio.Event()
async def cb():
on_connect.set()
try:
m = Mqtt(cb)
msg = aiomqtt.Message(topic= 'homeassistant/status', payload= b'offline', qos= 0, retain = False, mid= 0, properties= None)
await m.dispatch_msg(msg)
assert not on_connect.is_set()
msg = aiomqtt.Message(topic= 'homeassistant/status', payload= b'online', qos= 0, retain = False, mid= 0, properties= None)
await m.dispatch_msg(msg)
assert on_connect.is_set()
finally:
await m.close()
@pytest.mark.asyncio
async def test_mqtt_no_config(config_no_conn):
_ = config_no_conn
@@ -110,29 +168,6 @@ async def test_mqtt_no_config(config_no_conn):
finally:
await m.close()
@pytest.mark.asyncio
async def test_mqtt_connection(config_mqtt_conn):
_ = config_mqtt_conn
assert asyncio.get_running_loop()
on_connect = asyncio.Event()
async def cb():
on_connect.set()
try:
m = Mqtt(cb)
assert m.task
assert await asyncio.wait_for(on_connect.wait(), 5)
# await asyncio.sleep(1)
assert 0 == m.ha_restarts
await m.publish('homeassistant/status', 'online')
except TimeoutError:
assert False
finally:
await m.close()
await m.publish('homeassistant/status', 'online')
@pytest.mark.asyncio
async def test_msg_dispatch(config_mqtt_conn, spy_modbus_cmd):
_ = config_mqtt_conn
@@ -209,26 +244,6 @@ async def test_msg_ignore_client_conn(config_mqtt_conn, spy_modbus_cmd_client):
finally:
await m.close()
@pytest.mark.asyncio
async def test_ha_reconnect(config_mqtt_conn):
_ = config_mqtt_conn
on_connect = asyncio.Event()
async def cb():
on_connect.set()
try:
m = Mqtt(cb)
msg = aiomqtt.Message(topic= 'homeassistant/status', payload= b'offline', qos= 0, retain = False, mid= 0, properties= None)
await m.dispatch_msg(msg)
assert not on_connect.is_set()
msg = aiomqtt.Message(topic= 'homeassistant/status', payload= b'online', qos= 0, retain = False, mid= 0, properties= None)
await m.dispatch_msg(msg)
assert on_connect.is_set()
finally:
await m.close()
@pytest.mark.asyncio
async def test_ignore_unknown_func(config_mqtt_conn):
'''don't dispatch for unknwon function names'''

24
app/tests/test_server.py Normal file
View File

@@ -0,0 +1,24 @@
# test_with_pytest.py
import pytest
import logging
import os
from mock import patch
from server import get_log_level
def test_get_log_level():
with patch.dict(os.environ, {'LOG_LVL': ''}):
log_lvl = get_log_level()
assert log_lvl == logging.INFO
with patch.dict(os.environ, {'LOG_LVL': 'DEBUG'}):
log_lvl = get_log_level()
assert log_lvl == logging.DEBUG
with patch.dict(os.environ, {'LOG_LVL': 'WARN'}):
log_lvl = get_log_level()
assert log_lvl == logging.WARNING
with patch.dict(os.environ, {'LOG_LVL': 'UNKNOWN'}):
log_lvl = get_log_level()
assert log_lvl == logging.INFO

View File

@@ -39,18 +39,18 @@ schema:
# type: str
# manufacturer: str
# daher diese variante
pv1_manufacturer: str?
pv1_type: str?
pv2_manufacturer: str?
pv2_type: str?
pv3_manufacturer: str?
pv3_type: str?
pv4_manufacturer: str?
pv4_type: str?
pv5_manufacturer: str?
pv5_type: str?
pv6_manufacturer: str?
pv6_type: str?
pv1.manufacturer: str?
pv1.type: str?
pv2.manufacturer: str?
pv2.type: str?
pv3.manufacturer: str?
pv3.type: str?
pv4.manufacturer: str?
pv4.type: str?
pv5.manufacturer: str?
pv5.type: str?
pv6.manufacturer: str?
pv6.type: str?
tsun.enabled: bool
solarman.enabled: bool
inverters.allow_all: bool
@@ -92,10 +92,10 @@ options:
# - string: PV2
# type: SF-M18/144550
# manufacturer: Shinefar
pv1_manufacturer: Shinefar
pv1_type: SF-M18/144550
pv2_manufacturer: Shinefar
pv2_type: SF-M18/144550
pv1.manufacturer: Shinefar
pv1.type: SF-M18/144550
pv2.manufacturer: Shinefar
pv2.type: SF-M18/144550
tsun.enabled: true # set default
solarman.enabled: true # set default
inverters.allow_all: false # set default

View File

@@ -1,115 +0,0 @@
import json
import os
# Dieses file übernimmt die Add-On Konfiguration und schreibt sie in die
# Konfigurationsdatei des tsun-proxy
# Die Addon Konfiguration wird in der Datei /data/options.json bereitgestellt
# Die Konfiguration wird in der Datei /home/proxy/config/config.toml
# gespeichert
# Übernehme die Umgebungsvariablen
# alternativ kann auch auf die homeassistant supervisor API zugegriffen werden
def create_config():
data = {}
data['mqtt.host'] = os.getenv('MQTT_HOST', "mqtt")
data['mqtt.port'] = os.getenv('MQTT_PORT', 1883)
data['mqtt.user'] = os.getenv('MQTT_USER', "")
data['mqtt.passwd'] = os.getenv('MQTT_PASSWORD', "")
# Lese die Add-On Konfiguration aus der Datei /data/options.json
# with open('data/options.json') as json_file:
with open('/data/options.json') as json_file:
try:
options_data = json.load(json_file)
data.update(options_data)
except json.JSONDecodeError:
pass
# Schreibe die Add-On Konfiguration in die Datei /home/proxy/config/config.toml # noqa: E501
# with open('./config/config.toml', 'w+') as f:
with open('/home/proxy/config/config.toml', 'w+') as f:
f.write(f"""
mqtt.host = '{data.get('mqtt.host')}' # URL or IP address of the mqtt broker
mqtt.port = {data.get('mqtt.port')}
mqtt.user = '{data.get('mqtt.user')}'
mqtt.passwd = '{data.get('mqtt.passwd')}'
ha.auto_conf_prefix = '{data.get('ha.auto_conf_prefix', 'homeassistant')}' # MQTT prefix for subscribing for homeassistant status updates
ha.discovery_prefix = '{data.get('ha.discovery_prefix', 'homeassistant')}' # MQTT prefix for discovery topic
ha.entity_prefix = '{data.get('ha.entity_prefix', 'tsun')}' # MQTT topic prefix for publishing inverter values
ha.proxy_node_id = '{data.get('ha.proxy_node_id', 'proxy')}' # MQTT node id, for the proxy_node_id
ha.proxy_unique_id = '{data.get('ha.proxy_unique_id', 'P170000000000001')}' # MQTT unique id, to identify a proxy instance
tsun.enabled = {str(data.get('tsun.enabled', True)).lower()}
tsun.host = '{data.get('tsun.host', 'logger.talent-monitoring.com')}'
tsun.port = {data.get('tsun.port', 5005)}
solarman.enabled = {str(data.get('solarman.enabled', True)).lower()}
solarman.host = '{data.get('solarman.host', 'iot.talent-monitoring.com')}'
solarman.port = {data.get('solarman.port', 10000)}
inverters.allow_all = {str(data.get('inverters.allow_all', False)).lower()}
""") # noqa: E501
if 'inverters' in data:
for inverter in data['inverters']:
f.write(f"""
[inverters."{inverter['serial']}"]
node_id = '{inverter['node_id']}'
suggested_area = '{inverter['suggested_area']}'
modbus_polling = {str(inverter['modbus_polling']).lower()}
# check if inverter has monitor_sn key. if not, skip monitor_sn
{f"monitor_sn = '{inverter['monitor_sn']}'" if 'monitor_sn' in inverter else ''}
# check if inverter has 'pv1_type' and 'pv1_manufacturer' keys. if not, skip pv1
{f"pv1 = {{type = '{inverter['pv1_type']}', manufacturer = '{inverter['pv1_manufacturer']}'}}" if 'pv1_type' in inverter and 'pv1_manufacturer' in inverter else ''}
# check if inverter has 'pv2_type' and 'pv2_manufacturer' keys. if not, skip pv2
{f"pv2 = {{type = '{inverter['pv2_type']}', manufacturer = '{inverter['pv2_manufacturer']}'}}" if 'pv2_type' in inverter and 'pv2_manufacturer' in inverter else ''}
# check if inverter has 'pv3_type' and 'pv3_manufacturer' keys. if not, skip pv3
{f"pv3 = {{type = '{inverter['pv3_type']}', manufacturer = '{inverter['pv3_manufacturer']}'}}" if 'pv3_type' in inverter and 'pv3_manufacturer' in inverter else ''}
# check if inverter has 'pv4_type' and 'pv4_manufacturer' keys. if not, skip pv4
{f"pv4 = {{type = '{inverter['pv4_type']}', manufacturer = '{inverter['pv4_manufacturer']}'}}" if 'pv4_type' in inverter and 'pv4_manufacturer' in inverter else ''}
# check if inverter has 'pv5_type' and 'pv5_manufacturer' keys. if not, skip pv5
{f"pv5 = {{type = '{inverter['pv5_type']}', manufacturer = '{inverter['pv5_manufacturer']}'}}" if 'pv5_type' in inverter and 'pv5_manufacturer' in inverter else ''}
# check if inverter has 'pv6_type' and 'pv6_manufacturer' keys. if not, skip pv6
{f"pv6 = {{type = '{inverter['pv6_type']}', manufacturer = '{inverter['pv6_manufacturer']}'}}" if 'pv6_type' in inverter and 'pv6_manufacturer' in inverter else ''}
""") # noqa: E501
# add filters
f.write("""
[gen3plus.at_acl]
# filter for received commands from the internet
tsun.allow = [""")
if 'gen3plus.at_acl.tsun.allow' in data:
for rule in data['gen3plus.at_acl.tsun.allow']:
f.write(f"'{rule}',")
f.write("]\ntsun.block = [")
if 'gen3plus.at_acl.tsun.block' in data:
for rule in data['gen3plus.at_acl.tsun.block']:
f.write(f"'{rule}',")
f.write("""]
# filter for received commands from the MQTT broker
mqtt.allow = [""")
if 'gen3plus.at_acl.mqtt.allow' in data:
for rule in data['gen3plus.at_acl.mqtt.allow']:
f.write(f"'{rule}',")
f.write("]\nmqtt.block = [")
if 'gen3plus.at_acl.mqtt.block' in data:
for rule in data['gen3plus.at_acl.mqtt.block']:
f.write(f"'{rule}',")
f.write("]")
if __name__ == "__main__": # pragma: no cover
create_config()

View File

@@ -27,12 +27,9 @@ cd /home || exit
mkdir -p proxy/log
mkdir -p proxy/config
echo "Create config.toml..."
python3 create_config_toml.py
cd /home/proxy || exit
export VERSION=$(cat /proxy-version.txt)
echo "Start Proxyserver..."
python3 server.py
python3 server.py --json_config=/data/options.json

View File

@@ -1,194 +0,0 @@
# test_with_pytest.py
import pytest
import tomllib
from mock import patch
from cnf.config import Config
from home.create_config_toml import create_config
from test_config import ConfigComplete, ConfigMinimum
class FakeBuffer:
rd = bytearray()
wr = str()
test_buffer = FakeBuffer
class FakeFile():
def __init__(self):
self.buf = test_buffer
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
pass
class FakeOptionsFile(FakeFile):
def read(self):
return self.buf.rd
class FakeConfigFile(FakeFile):
def write(self, data: str):
self.buf.wr += data
@pytest.fixture
def patch_open():
def new_open(file: str, OpenTextMode="r"):
if file == '/data/options.json':
return FakeOptionsFile()
elif file == '/home/proxy/config/config.toml':
# write_buffer += 'bla1'.encode('utf-8')
return FakeConfigFile()
raise TimeoutError
with patch('builtins.open', new_open) as conn:
yield conn
@pytest.fixture
def ConfigTomlEmpty():
return {
'gen3plus': {'at_acl': {'mqtt': {'allow': [], 'block': []},
'tsun': {'allow': [], 'block': []}}},
'ha': {'auto_conf_prefix': 'homeassistant',
'discovery_prefix': 'homeassistant',
'entity_prefix': 'tsun',
'proxy_node_id': 'proxy',
'proxy_unique_id': 'P170000000000001'},
'inverters': {
'allow_all': False
},
'mqtt': {'host': 'mqtt', 'passwd': '', 'port': 1883, 'user': ''},
'solarman': {
'enabled': True,
'host': 'iot.talent-monitoring.com',
'port': 10000,
},
'tsun': {
'enabled': True,
'host': 'logger.talent-monitoring.com',
'port': 5005,
},
}
def test_no_config(patch_open, ConfigTomlEmpty):
_ = patch_open
test_buffer.wr = ""
test_buffer.rd = "" # empty buffer, no json
create_config()
cnf = tomllib.loads(test_buffer.wr)
assert cnf == ConfigTomlEmpty
def test_empty_config(patch_open, ConfigTomlEmpty):
_ = patch_open
test_buffer.wr = ""
test_buffer.rd = "{}" # empty json
create_config()
cnf = tomllib.loads(test_buffer.wr)
assert cnf == ConfigTomlEmpty
def test_full_config(patch_open, ConfigComplete):
_ = patch_open
test_buffer.wr = ""
test_buffer.rd = """
{
"inverters": [
{
"serial": "R170000000000001",
"node_id": "PV-Garage",
"suggested_area": "Garage",
"modbus_polling": false,
"pv1_manufacturer": "man1",
"pv1_type": "type1",
"pv2_manufacturer": "man2",
"pv2_type": "type2"
},
{
"serial": "Y170000000000001",
"monitor_sn": 2000000000,
"node_id": "PV-Garage2",
"suggested_area": "Garage2",
"modbus_polling": true,
"client_mode_host": "InverterIP",
"client_mode_port": 1234,
"pv1_manufacturer": "man1",
"pv1_type": "type1",
"pv2_manufacturer": "man2",
"pv2_type": "type2",
"pv3_manufacturer": "man3",
"pv3_type": "type3",
"pv4_manufacturer": "man4",
"pv4_type": "type4"
}
],
"tsun.enabled": true,
"solarman.enabled": true,
"inverters.allow_all": false,
"gen3plus.at_acl.tsun.allow": [
"AT+Z",
"AT+UPURL",
"AT+SUPDATE"
],
"gen3plus.at_acl.tsun.block": [
"AT+SUPDATE"
],
"gen3plus.at_acl.mqtt.allow": [
"AT+"
],
"gen3plus.at_acl.mqtt.block": [
"AT+SUPDATE"
]
}
"""
create_config()
cnf = tomllib.loads(test_buffer.wr)
validated = Config.conf_schema.validate(cnf)
assert validated == ConfigComplete
def test_minimum_config(patch_open, ConfigMinimum):
_ = patch_open
test_buffer.wr = ""
test_buffer.rd = """
{
"inverters": [
{
"serial": "R170000000000001",
"monitor_sn": 0,
"node_id": "",
"suggested_area": "",
"modbus_polling": true,
"client_mode_host": "InverterIP",
"client_mode_port": 1234
}
],
"tsun.enabled": true,
"solarman.enabled": true,
"inverters.allow_all": true,
"gen3plus.at_acl.tsun.allow": [
"AT+Z",
"AT+UPURL",
"AT+SUPDATE"
],
"gen3plus.at_acl.mqtt.allow": [
"AT+"
]
}
"""
create_config()
cnf = tomllib.loads(test_buffer.wr)
validated = Config.conf_schema.validate(cnf)
assert validated == ConfigMinimum

View File

@@ -7,13 +7,13 @@ sonar.projectName=tsun-gen3-proxy
# Path is relative to the sonar-project.properties file. Replace "\" by "/" on Windows.
sonar.sources=app/src/,ha_addons/ha_addon/rootfs/home/
sonar.sources=app/src/
# Encoding of the source code. Default is default system encoding
#sonar.sourceEncoding=UTF-8
sonar.python.version=3.12
sonar.tests=system_tests/,app/tests/,ha_addons/ha_addon/tests/
sonar.tests=system_tests/,app/tests/
sonar.exclusions=**/.vscode/**/*
# Name your criteria
sonar.issue.ignore.multicriteria=e1,e2