remove all eval() calls

This commit is contained in:
Stefan Allius
2024-10-25 23:41:25 +02:00
parent 10a18237c7
commit 4993676614
6 changed files with 56 additions and 63 deletions

View File

@@ -183,11 +183,8 @@ class InfosG3(Infos):
i += 1
def __modify_val(self, row, result):
if row:
if 'eval' in row:
result = eval(row['eval'])
if 'ratio' in row:
result = round(result * row['ratio'], 2)
if row and 'ratio' in row:
result = round(result * row['ratio'], 2)
return result
def __store_result(self, addr, result, info_id, node_id):

View File

@@ -1,11 +1,10 @@
import struct
from typing import Generator
if __name__ == "app.src.gen3plus.infos_g3p":
from app.src.infos import Infos, Register, ProxyMode
from app.src.infos import Infos, Register, ProxyMode, Fmt
else: # pragma: no cover
from infos import Infos, Register, ProxyMode
from infos import Infos, Register, ProxyMode, Fmt
class RegisterMap:
@@ -19,16 +18,16 @@ class RegisterMap:
0x4102001a: {'reg': Register.HEARTBEAT_INTERVAL, 'fmt': '<B', 'ratio': 1}, # noqa: E501
0x4102001c: {'reg': Register.SIGNAL_STRENGTH, 'fmt': '<B', 'ratio': 1, 'dep': ProxyMode.SERVER}, # noqa: E501
0x4102001e: {'reg': Register.CHIP_MODEL, 'fmt': '!40s'}, # noqa: E501
0x41020046: {'reg': Register.MAC_ADDR, 'fmt': '!BBBBBB', 'eval': '"%02x:%02x:%02x:%02x:%02x:%02x" % res'}, # noqa: E501
0x41020046: {'reg': Register.MAC_ADDR, 'fmt': '!BBBBBB', 'func': Fmt.mac}, # noqa: E501
0x4102004c: {'reg': Register.IP_ADDRESS, 'fmt': '!16s'}, # noqa: E501
0x4102005f: {'reg': Register.SENSOR_LIST, 'fmt': '<H', 'eval': "f'{result:04x}'"}, # noqa: E501
0x4102005f: {'reg': Register.SENSOR_LIST, 'fmt': '<H', 'func': Fmt.hex4}, # noqa: E501
0x41020064: {'reg': Register.COLLECTOR_FW_VERSION, 'fmt': '!40s'}, # noqa: E501
0x4201000c: {'reg': Register.SENSOR_LIST, 'fmt': '<H', 'eval': "f'{result:04x}'"}, # noqa: E501
0x4201000c: {'reg': Register.SENSOR_LIST, 'fmt': '<H', 'func': Fmt.hex4}, # noqa: E501
0x4201001c: {'reg': Register.POWER_ON_TIME, 'fmt': '<H', 'ratio': 1, 'dep': ProxyMode.SERVER}, # noqa: E501, or packet number
0x42010020: {'reg': Register.SERIAL_NUMBER, 'fmt': '!16s'}, # noqa: E501
0x420100c0: {'reg': Register.INVERTER_STATUS, 'fmt': '!H'}, # noqa: E501
0x420100d0: {'reg': Register.VERSION, 'fmt': '!H', 'eval': "f'V{(result>>12)}.{(result>>8)&0xf}.{(result>>4)&0xf}{result&0xf}'"}, # noqa: E501
0x420100d0: {'reg': Register.VERSION, 'fmt': '!H', 'func': Fmt.version}, # noqa: E501
0x420100d2: {'reg': Register.GRID_VOLTAGE, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501
0x420100d4: {'reg': Register.GRID_CURRENT, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501
0x420100d6: {'reg': Register.GRID_FREQUENCY, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501
@@ -123,7 +122,7 @@ class InfosG3P(Infos):
if not isinstance(row, dict):
continue
info_id = row['reg']
result = self.__get_value(buf, addr, row)
result = Fmt.get_value(buf, addr, row)
keys, level, unit, must_incr = self._key_obj(info_id)
@@ -137,20 +136,3 @@ class InfosG3P(Infos):
if update:
self.tracer.log(level, f'[{node_id}] GEN3PLUS: {name}'
f' : {result}{unit}')
def __get_value(self, buf, idx, row):
'''Get a value from buf and interpret as in row'''
fmt = row['fmt']
res = struct.unpack_from(fmt, buf, idx)
result = res[0]
if isinstance(result, (bytearray, bytes)):
result = result.decode().split('\x00')[0]
if 'eval' in row:
result = eval(row['eval'])
if 'ratio' in row:
result = round(result * row['ratio'], 2)
if 'quotient' in row:
result = round(result/row['quotient'])
if 'offset' in row:
result = result + row['offset']
return result

View File

@@ -1,5 +1,6 @@
import logging
import json
import struct
import os
from enum import Enum
from typing import Generator
@@ -123,6 +124,40 @@ class Register(Enum):
TEST_REG2 = 10001
class Fmt:
@staticmethod
def get_value(buf: bytes, idx: int, row: dict):
'''Get a value from buf and interpret as in row defined'''
fmt = row['fmt']
res = struct.unpack_from(fmt, buf, idx)
result = res[0]
if isinstance(result, (bytearray, bytes)):
result = result.decode().split('\x00')[0]
if 'func' in row:
result = row['func'](res)
if 'ratio' in row:
result = round(result * row['ratio'], 2)
if 'quotient' in row:
result = round(result/row['quotient'])
if 'offset' in row:
result = result + row['offset']
return result
@staticmethod
def hex4(val):
return f'{val[0]:04x}'
@staticmethod
def mac(val):
return "%02x:%02x:%02x:%02x:%02x:%02x" % val
@staticmethod
def version(val):
x = val[0]
return f'V{(x>>12)}.{(x>>8)&0xf}.{(x>>4)&0xf}{x&0xf:1X}'
# return f'V{x>>12}.{(x>>8)&0xf}.{(x>>4)&0xf}{val[0]&0xf}'
class ClrAtMidnight:
__clr_at_midnight = [Register.PV1_DAILY_GENERATION, Register.PV2_DAILY_GENERATION, Register.PV3_DAILY_GENERATION, Register.PV4_DAILY_GENERATION, Register.PV5_DAILY_GENERATION, Register.PV6_DAILY_GENERATION, Register.DAILY_GENERATION] # noqa: E501
db = {}

View File

@@ -17,9 +17,9 @@ import asyncio
from typing import Generator, Callable
if __name__ == "app.src.modbus":
from app.src.infos import Register
from app.src.infos import Register, Fmt
else: # pragma: no cover
from infos import Register
from infos import Register, Fmt
logger = logging.getLogger('data')
@@ -44,11 +44,11 @@ class Modbus():
0x202c: {'reg': Register.OUTPUT_COEFFICIENT, 'fmt': '!H', 'ratio': 100/1024}, # noqa: E501
0x3000: {'reg': Register.INVERTER_STATUS, 'fmt': '!H'}, # noqa: E501
0x3008: {'reg': Register.VERSION, 'fmt': '!H', 'eval': "f'V{(result>>12)}.{(result>>8)&0xf}.{(result>>4)&0xf}{result&0xf:1X}'"}, # noqa: E501
0x3008: {'reg': Register.VERSION, 'fmt': '!H', 'func': Fmt.version}, # noqa: E501
0x3009: {'reg': Register.GRID_VOLTAGE, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501
0x300a: {'reg': Register.GRID_CURRENT, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501
0x300b: {'reg': Register.GRID_FREQUENCY, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501
0x300c: {'reg': Register.INVERTER_TEMP, 'fmt': '!H', 'eval': 'result-40'}, # noqa: E501
0x300c: {'reg': Register.INVERTER_TEMP, 'fmt': '!H', 'offset': -40}, # noqa: E501
# 0x300d
0x300e: {'reg': Register.RATED_POWER, 'fmt': '!H', 'ratio': 1}, # noqa: E501
0x300f: {'reg': Register.OUTPUT_POWER, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501
@@ -229,17 +229,6 @@ class Modbus():
return False
def __get_value(self, buf: bytes, idx: int, row: dict):
'''get a value from the received buffer'''
val = struct.unpack_from(row['fmt'], buf, idx)
result = val[0]
if 'eval' in row:
result = eval(row['eval'])
if 'ratio' in row:
result = round(result * row['ratio'], 2)
return result
def __process_data(self, info_db, buf: bytes, first_reg, elmlen):
'''Generator over received registers, updates the db'''
for i in range(0, elmlen):
@@ -249,7 +238,7 @@ class Modbus():
info_id = row['reg']
keys, level, unit, must_incr = info_db._key_obj(info_id)
if keys:
result = self.__get_value(buf, 3+2*i, row)
result = Fmt.get_value(buf, 3+2*i, row)
name, update = info_db.update_db(keys, must_incr,
result)
yield keys[0], update, result

View File

@@ -520,15 +520,3 @@ def test_invalid_data_type(invalid_data_seq):
val = i.dev_value(Register.INVALID_DATA_TYPE) # check invalid data type counter
assert val == 1
def test_result_eval(inv_data_seq2: bytes):
# add eval to convert temperature from °F to °C
RegisterMap.map[0x00000514]['eval'] = '(result-32)/1.8'
i = InfosG3()
for _, _ in i.parse (inv_data_seq2):
pass # side effect is calling generator i.parse()
assert math.isclose(-5.0, round (i.get_db_value(Register.INVERTER_TEMP, 0),4), rel_tol=1e-09, abs_tol=1e-09)
del RegisterMap.map[0x00000514]['eval'] # remove eval

View File

@@ -256,12 +256,12 @@ def test_build_ha_conf4():
assert tests==1
def test_exception_and_eval(inverter_data: bytes):
def test_exception_and_calc(inverter_data: bytes):
# add eval to convert temperature from °F to °C
# patch table to convert temperature from °F to °C
ofs = RegisterMap.map[0x420100d8]['offset']
del RegisterMap.map[0x420100d8]['offset']
RegisterMap.map[0x420100d8]['eval'] = '(result-32)/1.8'
RegisterMap.map[0x420100d8]['quotient'] = 1.8
RegisterMap.map[0x420100d8]['offset'] = -32/1.8
# map PV1_VOLTAGE to invalid register
RegisterMap.map[0x420100e0]['reg'] = Register.TEST_REG2
# set invalid maping entry for OUTPUT_POWER (string instead of dict type)
@@ -274,7 +274,9 @@ def test_exception_and_eval(inverter_data: bytes):
for key, update in i.parse (inverter_data, 0x42, 1):
pass # side effect is calling generator i.parse()
assert math.isclose(12.2222, round (i.get_db_value(Register.INVERTER_TEMP, 0),4), rel_tol=1e-09, abs_tol=1e-09)
del RegisterMap.map[0x420100d8]['eval'] # remove eval
del RegisterMap.map[0x420100d8]['quotient']
del RegisterMap.map[0x420100d8]['offset']
RegisterMap.map[0x420100e0]['reg'] = Register.PV1_VOLTAGE # reset mapping
RegisterMap.map[0x420100de] = backup # reset mapping