from typing import Generator from itertools import chain from infos import Infos, Register, ProxyMode, Fmt class RegisterFunc: @staticmethod def prod_sum(info: Infos, arr: dict) -> None | int: result = 0 for sum in arr: prod = 1 for factor in sum: val = info.get_db_value(factor) if val is None: return None prod = prod * val result += prod return result @staticmethod def cmp_values(info: Infos, params: map) -> None | int: try: val = info.get_db_value(params['reg']) if val < params['cmp_val']: return params['res'][0] if val == params['cmp_val']: return params['res'][1] return params['res'][2] except Exception: pass return None class RegisterMap: # make the class read/only by using __slots__ __slots__ = () FMT_2_16BIT_VAL = '!HH' FMT_3_16BIT_VAL = '!HHH' FMT_4_16BIT_VAL = '!HHHH' map = { # 0x41020007: {'reg': Register.DEVICE_SNR, 'fmt': ' Batterie Status: <0(Discharging), 0(Static), 0>(Loading) 0x42010044: {'reg': Register.BATT_SOC, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501, state of charge (SOC) in percent 0x42010046: {'reg': Register.BATT_CELL1_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x42010048: {'reg': Register.BATT_CELL2_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x4201004a: {'reg': Register.BATT_CELL3_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x4201004c: {'reg': Register.BATT_CELL4_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x4201004e: {'reg': Register.BATT_CELL5_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x42010050: {'reg': Register.BATT_CELL6_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x42010052: {'reg': Register.BATT_CELL7_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x42010054: {'reg': Register.BATT_CELL8_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x42010056: {'reg': Register.BATT_CELL9_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x42010058: {'reg': Register.BATT_CELL10_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x4201005a: {'reg': Register.BATT_CELL11_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x4201005c: {'reg': Register.BATT_CELL12_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x4201005e: {'reg': Register.BATT_CELL13_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x42010060: {'reg': Register.BATT_CELL14_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x42010062: {'reg': Register.BATT_CELL15_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501 0x42010064: {'reg': Register.BATT_CELL16_VOLT, 'fmt': '!H', 'ratio': 0.001}, # noqa: E501H 0x42010066: {'reg': Register.BATT_TEMP_1, 'fmt': '!h'}, # noqa: E501 Cell Temperture 1 0x42010068: {'reg': Register.BATT_TEMP_2, 'fmt': '!h'}, # noqa: E501 Cell Temperture 2 0x4201006a: {'reg': Register.BATT_TEMP_3, 'fmt': '!h'}, # noqa: E501 Cell Temperture 3 0x4201006c: {'reg': Register.BATT_OUT_VOLT, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 Output Voltage 0x4201006e: {'reg': Register.BATT_OUT_CUR, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 Output Current 0x42010070: {'reg': Register.BATT_OUT_STATUS, 'fmt': '!H'}, # noqa: E501 Output Working Status: 0(Standby), 1(Work) 0x42010072: {'reg': Register.BATT_TEMP_4, 'fmt': '!h'}, # noqa: E50, Environment temp 0x42010074: {'reg': Register.BATT_ALARM, 'fmt': '!H'}, # noqa: E501 Warning Alarmcode 1, Bit 0..15 0x42010076: {'reg': Register.BATT_HW_VERS, 'fmt': '!h'}, # noqa: E501 hardware version 0x42010078: {'reg': Register.BATT_SW_VERS, 'fmt': '!h'}, # noqa: E501 software main version 'calc': { 1: {'reg': Register.BATT_PV_PWR, 'func': RegisterFunc.prod_sum, # noqa: E501 Generated Power 'params': [[Register.BATT_PV1_VOLT, Register.BATT_PV1_CUR], [Register.BATT_PV2_VOLT, Register.BATT_PV2_CUR]]}, 2: {'reg': Register.BATT_PWR, 'func': RegisterFunc.prod_sum, # noqa: E501 'params': [[Register.BATT_VOLT, Register.BATT_CUR]]}, 3: {'reg': Register.BATT_OUT_PWR, 'func': RegisterFunc.prod_sum, # noqa: E501 Supply Power => Power Supply State: 0(Idle), 0>(Power Supply) 'params': [[Register.BATT_OUT_VOLT, Register.BATT_OUT_CUR]]}, 4: {'reg': Register.BATT_PWR_SUPL_STATE, 'func': RegisterFunc.cmp_values, # noqa: E501 'params': {'reg': Register.BATT_OUT_PWR, 'cmp_val': 0, 'res': [0, 0, 1]}}, # noqa: E501 5: {'reg': Register.BATT_STATUS, 'func': RegisterFunc.cmp_values, # noqa: E501 'params': {'reg': Register.BATT_CUR, 'cmp_val': 0.0, 'res': [0, 1, 2]}} # noqa: E501 } } class RegisterSel: __sensor_map = { 0x02b0: RegisterMap.map_02b0, 0x3026: RegisterMap.map_3026, } @classmethod def get(cls, sensor: int): return cls.__sensor_map.get(sensor, RegisterMap.map) class InfosG3P(Infos): __slots__ = ('client_mode', ) def __init__(self, client_mode: bool): super().__init__() self.client_mode = client_mode self.set_db_def_value(Register.MANUFACTURER, 'TSUN') self.set_db_def_value(Register.EQUIPMENT_MODEL, 'TSOL-MSxx00') self.set_db_def_value(Register.CHIP_TYPE, 'IGEN TECH') self.set_db_def_value(Register.NO_INPUTS, 2) def __hide_topic(self, row: dict) -> bool: if 'dep' in row: mode = row['dep'] if self.client_mode: return mode != ProxyMode.CLIENT else: return mode != ProxyMode.SERVER return False def ha_confs(self, ha_prfx: str, node_id: str, snr: str, sug_area: str = '') \ -> Generator[tuple[dict, str], None, None]: '''Generator function yields a json register struct for home-assistant auto configuration and a unique entity string arguments: prfx:str ==> MQTT prefix for the home assistant 'stat_t string snr:str ==> serial number of the inverter, used to build unique entity strings sug_area:str ==> suggested area string from the config file''' # iterate over RegisterMap.map and get the register values sensor = self.get_db_value(Register.SENSOR_LIST) if "3026" == sensor: reg_map = RegisterMap.map_3026 elif "02b0" == sensor: reg_map = RegisterMap.map_02b0 else: reg_map = {} items = reg_map.items() if 'calc' in reg_map: virt = reg_map['calc'].items() else: virt = {} for idx, row in chain(RegisterMap.map.items(), items, virt): if 'calc' == idx: continue info_id = row['reg'] if self.__hide_topic(row): res = self.ha_remove(info_id, node_id, snr) # noqa: E501 else: res = self.ha_conf(info_id, ha_prfx, node_id, snr, False, sug_area) # noqa: E501 if res: yield res def parse(self, buf, msg_type: int, rcv_ftype: int, sensor: int = 0, node_id: str = '') \ -> Generator[tuple[str, bool], None, None]: '''parse a data sequence received from the inverter and stores the values in Infos.db buf: buffer of the sequence to parse''' reg_map = RegisterSel.get(sensor) for idx, row in reg_map.items(): if 'calc' == idx: continue addr = idx & 0xffff ftype = (idx >> 16) & 0xff mtype = (idx >> 24) & 0xff if ftype != rcv_ftype or mtype != msg_type: continue if not isinstance(row, dict): continue info_id = row['reg'] result = Fmt.get_value(buf, addr, row) yield from self.__update_val(node_id, "GEN3PLUS", info_id, result) yield from self.calc(sensor, node_id) def calc(self, sensor: int = 0, node_id: str = '') \ -> Generator[tuple[str, bool], None, None]: '''calculate meta values from the stored values in Infos.db sensor: sensor_list number node_id: id-string for the node''' reg_map = RegisterSel.get(sensor) if 'calc' in reg_map: for row in reg_map['calc'].values(): info_id = row['reg'] result = row['func'](self, row['params']) yield from self.__update_val(node_id, "CALC", info_id, result) def __update_val(self, node_id, source: str, info_id, result): keys, level, unit, must_incr = self._key_obj(info_id) if keys: name, update = self.update_db(keys, must_incr, result) yield keys[0], update if update: self.tracer.log(level, f'[{node_id}] {source}: {name}' f' : {result}{unit}') def build(self, len, msg_type: int, rcv_ftype: int, sensor: int = 0): buf = bytearray(len) for idx, row in RegisterSel.get(sensor).items(): addr = idx & 0xffff ftype = (idx >> 16) & 0xff mtype = (idx >> 24) & 0xff if ftype != rcv_ftype or mtype != msg_type: continue if not isinstance(row, dict): continue if 'const' in row: val = row['const'] else: info_id = row['reg'] val = self.get_db_value(info_id) if not val: continue Fmt.set_value(buf, addr, row, val) return buf