add allow and block filter for AT+ commands

This commit is contained in:
Stefan Allius
2024-05-30 18:38:05 +02:00
parent 17c33601a0
commit 063850c7fb
5 changed files with 52 additions and 9 deletions

View File

@@ -49,3 +49,8 @@ monitor_sn = 2000000000 # The "Monitoring SN:" can be found on a sticker e
#pv3 = {type = 'RSM40-8-410M', manufacturer = 'Risen'} # Optional, PV module descr
#pv4 = {type = 'RSM40-8-410M', manufacturer = 'Risen'} # Optional, PV module descr
[gen3plus.at_acl]
tsun.allow = ['AT+Z', 'AT+UPURL', 'AT+SUPDATE']
tsun.block = []
mqtt.allow = ['AT+']
mqtt.block = []

View File

@@ -3,7 +3,7 @@
import shutil
import tomllib
import logging
from schema import Schema, And, Use, Optional
from schema import Schema, And, Or, Use, Optional
class Config():
@@ -38,6 +38,14 @@ class Config():
'proxy_node_id': Use(str),
'proxy_unique_id': Use(str)
},
'gen3plus': {
'at_acl': {
Or('mqtt', 'tsun'): {
'allow': [str],
Optional('block', default=[]): [str]
}
}
},
'inverters': {
'allow_all': Use(bool), And(Use(str), lambda s: len(s) == 16): {
Optional('monitor_sn', default=0): Use(int),
@@ -125,7 +133,8 @@ class Config():
# merge the default and the user config
config = def_config.copy()
for key in ['tsun', 'solarman', 'mqtt', 'ha', 'inverters']:
for key in ['tsun', 'solarman', 'mqtt', 'ha', 'inverters',
'gen3plus']:
if key in usr_config:
config[key] |= usr_config[key]

View File

@@ -91,8 +91,13 @@ class SolarmanV5(Message):
# MODbus or AT cmd
0x4510: self.msg_command_req, # from server
0x1510: self.msg_command_rsp, # from inverter
# 0x0510: self.msg_command_rsp, # from inverter
}
self.modbus_elms = 0 # for unit tests
g3p_cnf = Config.get('gen3plus')
if 'at_acl' in g3p_cnf:
self.at_acl = g3p_cnf['at_acl']
'''
Our puplic methods
@@ -320,9 +325,24 @@ class SolarmanV5(Message):
return
self.mb.build_msg(Modbus.INV_ADDR, func, addr, val, log_lvl)
def at_cmd_forbidden(self, cmd: str, connection: str) -> bool:
return not cmd.startswith(tuple(self.at_acl[connection]['allow'])) or \
cmd.startswith(tuple(self.at_acl[connection]['block']))
async def send_at_cmd(self, AT_cmd: str) -> None:
if self.state != self.STATE_UP:
return
AT_cmd = AT_cmd.strip()
if self.at_cmd_forbidden(cmd=AT_cmd, connection='mqtt'):
data_json = f'\'{AT_cmd}\' is forbidden'
node_id = self.node_id
key = 'at_resp'
logger.info(f'{key}: {data_json}')
asyncio.ensure_future(
self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json)) # noqa: E501
return
self.forward_at_cmd_resp = False
self.__build_header(0x4510)
self._send_buffer += struct.pack(f'<BHLLL{len(AT_cmd)}sc', self.AT_CMD,
@@ -432,6 +452,10 @@ class SolarmanV5(Message):
if ftype == self.AT_CMD:
self.inc_counter('AT_Command')
self.forward_at_cmd_resp = True
AT_cmd = data[15:].decode()
if self.at_cmd_forbidden(cmd=AT_cmd, connection='tsun'):
return
elif ftype == self.MB_RTU_CMD:
if self.remoteStream.mb.recv_req(data[15:],
self.__forward_msg()):

View File

@@ -31,10 +31,12 @@ def test_default_config():
assert True
except:
assert False
assert validated == {'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'node_id': '', 'monitor_sn': 0, 'suggested_area': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'node_id': '', 'suggested_area': ''}}}
assert validated == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'node_id': '', 'monitor_sn': 0, 'suggested_area': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'node_id': '', 'suggested_area': ''}}}
def test_full_config():
cnf = {'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005},
'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []},
'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}},
'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000},
'mqtt': {'host': 'mqtt', 'port': 1883, 'user': '', 'passwd': ''},
'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
@@ -46,10 +48,12 @@ def test_full_config():
assert True
except:
assert False
assert validated == {'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'node_id': '', 'monitor_sn': 0, 'pv1': {'manufacturer': 'man1','type': 'type1'},'pv2': {'manufacturer': 'man2','type': 'type2'},'pv3': {'manufacturer': 'man3','type': 'type3'}, 'suggested_area': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'node_id': '', 'suggested_area': ''}}}
assert validated == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'node_id': '', 'monitor_sn': 0, 'pv1': {'manufacturer': 'man1','type': 'type1'},'pv2': {'manufacturer': 'man2','type': 'type2'},'pv3': {'manufacturer': 'man3','type': 'type3'}, 'suggested_area': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'node_id': '', 'suggested_area': ''}}}
def test_mininum_config():
cnf = {'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005},
'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+']},
'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE']}}},
'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000},
'mqtt': {'host': 'mqtt', 'port': 1883, 'user': '', 'passwd': ''},
'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'},
@@ -62,7 +66,7 @@ def test_mininum_config():
assert True
except:
assert False
assert validated == {'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'node_id': '', 'monitor_sn': 0, 'suggested_area': ''}}}
assert validated == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'node_id': '', 'monitor_sn': 0, 'suggested_area': ''}}}
def test_read_empty():
cnf = {}
@@ -70,7 +74,7 @@ def test_read_empty():
err = TstConfig.read('app/config/')
assert err == None
cnf = TstConfig.get()
assert cnf == {'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'suggested_area': '', 'monitor_sn': 0, 'node_id': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'suggested_area': '', 'node_id': ''}}}
assert cnf == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'suggested_area': '', 'monitor_sn': 0, 'node_id': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'suggested_area': '', 'node_id': ''}}}
defcnf = TstConfig.def_config.get('solarman')
assert defcnf == {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}
@@ -92,7 +96,7 @@ def test_read_cnf1():
err = TstConfig.read('app/config/')
assert err == None
cnf = TstConfig.get()
assert cnf == {'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': False, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'suggested_area': '', 'monitor_sn': 0, 'node_id': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'suggested_area': '', 'node_id': ''}}}
assert cnf == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': False, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'suggested_area': '', 'monitor_sn': 0, 'node_id': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'suggested_area': '', 'node_id': ''}}}
cnf = TstConfig.get('solarman')
assert cnf == {'enabled': False, 'host': 'iot.talent-monitoring.com', 'port': 10000}
defcnf = TstConfig.def_config.get('solarman')
@@ -105,7 +109,7 @@ def test_read_cnf2():
err = TstConfig.read('app/config/')
assert err == None
cnf = TstConfig.get()
assert cnf == {'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'suggested_area': '', 'monitor_sn': 0, 'node_id': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'suggested_area': '', 'node_id': ''}}}
assert cnf == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 10000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'suggested_area': '', 'monitor_sn': 0, 'node_id': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'suggested_area': '', 'node_id': ''}}}
assert True == TstConfig.is_default('solarman')
def test_read_cnf3():
@@ -122,7 +126,7 @@ def test_read_cnf4():
err = TstConfig.read('app/config/')
assert err == None
cnf = TstConfig.get()
assert cnf == {'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 5000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'suggested_area': '', 'monitor_sn': 0, 'node_id': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'suggested_area': '', 'node_id': ''}}}
assert cnf == {'gen3plus': {'at_acl': {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE'], 'block': []}}}, 'tsun': {'enabled': True, 'host': 'logger.talent-monitoring.com', 'port': 5005}, 'solarman': {'enabled': True, 'host': 'iot.talent-monitoring.com', 'port': 5000}, 'mqtt': {'host': 'mqtt', 'port': 1883, 'user': None, 'passwd': None}, 'ha': {'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'proxy', 'proxy_unique_id': 'P170000000000001'}, 'inverters': {'allow_all': True, 'R170000000000001': {'suggested_area': '', 'monitor_sn': 0, 'node_id': ''}, 'Y170000000000001': {'monitor_sn': 2000000000, 'suggested_area': '', 'node_id': ''}}}
assert False == TstConfig.is_default('solarman')
def test_read_cnf5():

View File

@@ -41,6 +41,7 @@ class MemoryStream(SolarmanV5):
self.db.stat['proxy']['AT_Command'] = 0
self.test_exception_async_write = False
self.entity_prfx = ''
self.at_acl = {'mqtt': {'allow': ['AT+'], 'block': []}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE', 'AT+TIME'], 'block': []}}
def _timestamp(self):
return timestamp