from typing import Generator from itertools import chain from infos import Infos, Register, ProxyMode, Fmt class RegisterFunc: @staticmethod def prod_sum(info: Infos, arr: dict) -> None | int: result = 0 for sum in arr: prod = 1 for factor in sum: val = info.get_db_value(factor) if val is None: return None prod = prod * val result += prod return result class RegisterMap: # make the class read/only by using __slots__ __slots__ = () FMT_2_16BIT_VAL = '!HH' FMT_3_16BIT_VAL = '!HHH' FMT_4_16BIT_VAL = '!HHHH' map = { # 0x41020007: {'reg': Register.DEVICE_SNR, 'fmt': ' bool: if 'dep' in row: mode = row['dep'] if self.client_mode: return mode != ProxyMode.CLIENT else: return mode != ProxyMode.SERVER return False def ha_confs(self, ha_prfx: str, node_id: str, snr: str, sug_area: str = '') \ -> Generator[tuple[dict, str], None, None]: '''Generator function yields a json register struct for home-assistant auto configuration and a unique entity string arguments: prfx:str ==> MQTT prefix for the home assistant 'stat_t string snr:str ==> serial number of the inverter, used to build unique entity strings sug_area:str ==> suggested area string from the config file''' # iterate over RegisterMap.map and get the register values sensor = self.get_db_value(Register.SENSOR_LIST) if "3026" == sensor: reg_map = RegisterMap.map_3026 elif "02b0" == sensor: reg_map = RegisterMap.map_02b0 else: reg_map = {} items = reg_map.items() if 'calc' in reg_map: virt = reg_map['calc'].items() else: virt = {} for idx, row in chain(RegisterMap.map.items(), items, virt): if 'calc' == idx: continue info_id = row['reg'] if self.__hide_topic(row): res = self.ha_remove(info_id, node_id, snr) # noqa: E501 else: res = self.ha_conf(info_id, ha_prfx, node_id, snr, False, sug_area) # noqa: E501 if res: yield res def parse(self, buf, msg_type: int, rcv_ftype: int, sensor: int = 0, node_id: str = '') \ -> Generator[tuple[str, bool], None, None]: '''parse a data sequence received from the inverter and stores the values in Infos.db buf: buffer of the sequence to parse''' reg_map = RegisterSel.get(sensor) for idx, row in reg_map.items(): if 'calc' == idx: continue addr = idx & 0xffff ftype = (idx >> 16) & 0xff mtype = (idx >> 24) & 0xff if ftype != rcv_ftype or mtype != msg_type: continue if not isinstance(row, dict): continue info_id = row['reg'] result = Fmt.get_value(buf, addr, row) yield from self.__update_val(node_id, "GEN3PLUS", info_id, result) yield from self.calc(sensor, node_id) def calc(self, sensor: int = 0, node_id: str = '') \ -> Generator[tuple[str, bool], None, None]: '''calculate meta values from the stored values in Infos.db sensor: sensor_list number node_id: id-string for the node''' reg_map = RegisterSel.get(sensor) if 'calc' in reg_map: for row in reg_map['calc'].values(): info_id = row['reg'] result = row['func'](self, row['params']) yield from self.__update_val(node_id, "CALC", info_id, result) def __update_val(self, node_id, source: str, info_id, result): keys, level, unit, must_incr = self._key_obj(info_id) if keys: name, update = self.update_db(keys, must_incr, result) yield keys[0], update if update: self.tracer.log(level, f'[{node_id}] {source}: {name}' f' : {result}{unit}') def build(self, len, msg_type: int, rcv_ftype: int, sensor: int = 0): buf = bytearray(len) for idx, row in RegisterSel.get(sensor).items(): addr = idx & 0xffff ftype = (idx >> 16) & 0xff mtype = (idx >> 24) & 0xff if ftype != rcv_ftype or mtype != msg_type: continue if not isinstance(row, dict): continue if 'const' in row: val = row['const'] else: info_id = row['reg'] val = self.get_db_value(info_id) if not val: continue Fmt.set_value(buf, addr, row, val) return buf