diff --git a/app/src/gen3/infos_g3.py b/app/src/gen3/infos_g3.py new file mode 100644 index 0000000..b8dd908 --- /dev/null +++ b/app/src/gen3/infos_g3.py @@ -0,0 +1,236 @@ + +import struct +import json +import logging +from typing import Generator + +if __name__ == "app.src.gen3.infos_g3": + from app.src.infos import Infos +else: # pragma: no cover + from infos import Infos + + +class InfosG3(Infos): + def ha_confs(self, ha_prfx, node_id, snr, singleton: bool, sug_area='') \ + -> Generator[tuple[dict, str], None, None]: + '''Generator function yields a json register struct for home-assistant + auto configuration and a unique entity string + + arguments: + prfx:str ==> MQTT prefix for the home assistant 'stat_t string + snr:str ==> serial number of the inverter, used to build unique + entity strings + sug_area:str ==> suggested area string from the config file''' + tab = self.info_defs + for key in tab: + row = tab[key] + if 'singleton' in row: + if singleton != row['singleton']: + continue + elif singleton: + continue + prfx = ha_prfx + node_id + + # check if we have details for home assistant + if 'ha' in row: + ha = row['ha'] + if 'comp' in ha: + component = ha['comp'] + else: + component = 'sensor' + attr = {} + if 'name' in ha: + attr['name'] = ha['name'] + else: + attr['name'] = row['name'][-1] + + attr['stat_t'] = prfx + row['name'][0] + attr['dev_cla'] = ha['dev_cla'] + attr['stat_cla'] = ha['stat_cla'] + attr['uniq_id'] = ha['id']+snr + if 'val_tpl' in ha: + attr['val_tpl'] = ha['val_tpl'] + elif 'fmt' in ha: + attr['val_tpl'] = '{{value_json' + f"['{row['name'][-1]}'] {ha['fmt']}" + '}}' # eg. 'val_tpl': "{{ value_json['Output_Power']|float }} # noqa: E501 + else: + self.inc_counter('Internal_Error') + logging.error(f"Infos._info_defs: the row for {key} do" + " not have a 'val_tpl' nor a 'fmt' value") + + # add unit_of_meas only, if status_class isn't none. If + # status_cla is None we want a number format and not line + # graph in home assistant. A unit will change the number + # format to a line graph + if 'unit' in row and attr['stat_cla'] is not None: + attr['unit_of_meas'] = row['unit'] # 'unit_of_meas' + if 'icon' in ha: + attr['ic'] = ha['icon'] # icon for the entity + if 'nat_prc' in ha: + attr['sug_dsp_prc'] = ha['nat_prc'] # precison of floats + if 'ent_cat' in ha: + attr['ent_cat'] = ha['ent_cat'] # diagnostic, config + + # enabled_by_default is deactivated, since it avoid the via + # setup of the devices. It seems, that there is a bug in home + # assistant. tested with 'Home Assistant 2023.10.4' + # if 'en' in ha: # enabled_by_default + # attr['en'] = ha['en'] + + if 'dev' in ha: + device = self.info_devs[ha['dev']] + + if 'dep' in device and self.ignore_this_device(device['dep']): # noqa: E501 + continue + + dev = {} + + # the same name for 'name' and 'suggested area', so we get + # dedicated devices in home assistant with short value + # name and headline + if (sug_area == '' or + ('singleton' in device and device['singleton'])): + dev['name'] = device['name'] + dev['sa'] = device['name'] + else: + dev['name'] = device['name']+' - '+sug_area + dev['sa'] = device['name']+' - '+sug_area + + if 'via' in device: # add the link to the parent device + via = device['via'] + if via in self.info_devs: + via_dev = self.info_devs[via] + if 'singleton' in via_dev and via_dev['singleton']: + dev['via_device'] = via + else: + dev['via_device'] = f"{via}_{snr}" + else: + self.inc_counter('Internal_Error') + logging.error(f"Infos._info_defs: the row for " + f"{key} has an invalid via value: " + f"{via}") + + for key in ('mdl', 'mf', 'sw', 'hw'): # add optional + # values fpr 'modell', 'manufacturer', 'sw version' and + # 'hw version' + if key in device: + data = self.dev_value(device[key]) + if data is not None: + dev[key] = data + + if 'singleton' in device and device['singleton']: + dev['ids'] = [f"{ha['dev']}"] + else: + dev['ids'] = [f"{ha['dev']}_{snr}"] + + attr['dev'] = dev + + origin = {} + origin['name'] = self.app_name + origin['sw'] = self.version + attr['o'] = origin + else: + self.inc_counter('Internal_Error') + logging.error(f"Infos._info_defs: the row for {key} " + "missing 'dev' value for ha register") + + yield json.dumps(attr), component, node_id, attr['uniq_id'] + + def parse(self, buf, ind=0) -> Generator[tuple[str, bool], None, None]: + '''parse a data sequence received from the inverter and + stores the values in Infos.db + + buf: buffer of the sequence to parse''' + result = struct.unpack_from('!l', buf, ind) + elms = result[0] + i = 0 + ind += 4 + while i < elms: + result = struct.unpack_from('!lB', buf, ind) + info_id = result[0] + data_type = result[1] + ind += 5 + keys, level, unit, must_incr, new_val = self._key_obj(info_id) + + if data_type == 0x54: # 'T' -> Pascal-String + str_len = buf[ind] + result = struct.unpack_from(f'!{str_len+1}p', buf, + ind)[0].decode(encoding='ascii', + errors='replace') + ind += str_len+1 + + elif data_type == 0x49: # 'I' -> int32 + # if new_val: + # struct.pack_into('!l', buf, ind, new_val) + result = struct.unpack_from('!l', buf, ind)[0] + ind += 4 + + elif data_type == 0x53: # 'S' -> short + # if new_val: + # struct.pack_into('!h', buf, ind, new_val) + result = struct.unpack_from('!h', buf, ind)[0] + ind += 2 + + elif data_type == 0x46: # 'F' -> float32 + # if new_val: + # struct.pack_into('!f', buf, ind, new_val) + result = round(struct.unpack_from('!f', buf, ind)[0], 2) + ind += 4 + + elif data_type == 0x4c: # 'L' -> int64 + # if new_val: + # struct.pack_into('!q', buf, ind, new_val) + result = struct.unpack_from('!q', buf, ind)[0] + ind += 8 + + else: + self.inc_counter('Invalid_Data_Type') + logging.error(f"Infos.parse: data_type: {data_type}" + " not supported") + return + + if keys: + dict = self.db + name = '' + + for key in keys[:-1]: + if key not in dict: + dict[key] = {} + dict = dict[key] + name += key + '.' + + if keys[-1] not in dict: + update = (not must_incr or result > 0) + else: + if must_incr: + update = dict[keys[-1]] < result + else: + update = dict[keys[-1]] != result + + if update: + dict[keys[-1]] = result + name += keys[-1] + yield keys[0], update + else: + update = False + name = str(f'info-id.0x{info_id:x}') + + self.tracer.log(level, f'{name} : {result}{unit}' + f' update: {update}') + + i += 1 + + def ignore_this_device(self, dep: dict) -> bool: + '''Checks the equation in the dep dict + + returns 'False' only if the equation is valid; + 'True' in any other case''' + if 'reg' in dep: + value = self.dev_value(dep['reg']) + if not value: + return True + + if 'gte' in dep: + return not value >= dep['gte'] + elif 'less_eq' in dep: + return not value <= dep['less_eq'] + return True diff --git a/app/src/gen3/talent.py b/app/src/gen3/talent.py index cac7ca2..0bc08a8 100644 --- a/app/src/gen3/talent.py +++ b/app/src/gen3/talent.py @@ -6,9 +6,11 @@ from datetime import datetime if __name__ == "app.src.gen3.talent": from app.src.messages import hex_dump_memory, Message from app.src.config import Config + from app.src.gen3.infos_g3 import InfosG3 else: # pragma: no cover from messages import hex_dump_memory, Message from config import Config + from gen3.infos_g3 import InfosG3 logger = logging.getLogger('msg') @@ -38,6 +40,7 @@ class Talent(Message): self.id_str = id_str self.contact_name = b'' self.contact_mail = b'' + self.db = InfosG3() self.switch = { 0x00: self.msg_contact_info, 0x13: self.msg_ota_update, diff --git a/app/src/gen3plus/infos_g3p.py b/app/src/gen3plus/infos_g3p.py new file mode 100644 index 0000000..fff6efa --- /dev/null +++ b/app/src/gen3plus/infos_g3p.py @@ -0,0 +1,236 @@ + +import struct +import json +import logging +from typing import Generator + +if __name__ == "app.src.gen3plus.infos_g3p": + from app.src.infos import Infos +else: # pragma: no cover + from infos import Infos + + +class InfosG3P(Infos): + def ha_confs(self, ha_prfx, node_id, snr, singleton: bool, sug_area='') \ + -> Generator[tuple[dict, str], None, None]: + '''Generator function yields a json register struct for home-assistant + auto configuration and a unique entity string + + arguments: + prfx:str ==> MQTT prefix for the home assistant 'stat_t string + snr:str ==> serial number of the inverter, used to build unique + entity strings + sug_area:str ==> suggested area string from the config file''' + tab = self.info_defs + for key in tab: + row = tab[key] + if 'singleton' in row: + if singleton != row['singleton']: + continue + elif singleton: + continue + prfx = ha_prfx + node_id + + # check if we have details for home assistant + if 'ha' in row: + ha = row['ha'] + if 'comp' in ha: + component = ha['comp'] + else: + component = 'sensor' + attr = {} + if 'name' in ha: + attr['name'] = ha['name'] + else: + attr['name'] = row['name'][-1] + + attr['stat_t'] = prfx + row['name'][0] + attr['dev_cla'] = ha['dev_cla'] + attr['stat_cla'] = ha['stat_cla'] + attr['uniq_id'] = ha['id']+snr + if 'val_tpl' in ha: + attr['val_tpl'] = ha['val_tpl'] + elif 'fmt' in ha: + attr['val_tpl'] = '{{value_json' + f"['{row['name'][-1]}'] {ha['fmt']}" + '}}' # eg. 'val_tpl': "{{ value_json['Output_Power']|float }} # noqa: E501 + else: + self.inc_counter('Internal_Error') + logging.error(f"Infos._info_defs: the row for {key} do" + " not have a 'val_tpl' nor a 'fmt' value") + + # add unit_of_meas only, if status_class isn't none. If + # status_cla is None we want a number format and not line + # graph in home assistant. A unit will change the number + # format to a line graph + if 'unit' in row and attr['stat_cla'] is not None: + attr['unit_of_meas'] = row['unit'] # 'unit_of_meas' + if 'icon' in ha: + attr['ic'] = ha['icon'] # icon for the entity + if 'nat_prc' in ha: + attr['sug_dsp_prc'] = ha['nat_prc'] # precison of floats + if 'ent_cat' in ha: + attr['ent_cat'] = ha['ent_cat'] # diagnostic, config + + # enabled_by_default is deactivated, since it avoid the via + # setup of the devices. It seems, that there is a bug in home + # assistant. tested with 'Home Assistant 2023.10.4' + # if 'en' in ha: # enabled_by_default + # attr['en'] = ha['en'] + + if 'dev' in ha: + device = self.info_devs[ha['dev']] + + if 'dep' in device and self.ignore_this_device(device['dep']): # noqa: E501 + continue + + dev = {} + + # the same name for 'name' and 'suggested area', so we get + # dedicated devices in home assistant with short value + # name and headline + if (sug_area == '' or + ('singleton' in device and device['singleton'])): + dev['name'] = device['name'] + dev['sa'] = device['name'] + else: + dev['name'] = device['name']+' - '+sug_area + dev['sa'] = device['name']+' - '+sug_area + + if 'via' in device: # add the link to the parent device + via = device['via'] + if via in self.info_devs: + via_dev = self.info_devs[via] + if 'singleton' in via_dev and via_dev['singleton']: + dev['via_device'] = via + else: + dev['via_device'] = f"{via}_{snr}" + else: + self.inc_counter('Internal_Error') + logging.error(f"Infos._info_defs: the row for " + f"{key} has an invalid via value: " + f"{via}") + + for key in ('mdl', 'mf', 'sw', 'hw'): # add optional + # values fpr 'modell', 'manufacturer', 'sw version' and + # 'hw version' + if key in device: + data = self.dev_value(device[key]) + if data is not None: + dev[key] = data + + if 'singleton' in device and device['singleton']: + dev['ids'] = [f"{ha['dev']}"] + else: + dev['ids'] = [f"{ha['dev']}_{snr}"] + + attr['dev'] = dev + + origin = {} + origin['name'] = self.app_name + origin['sw'] = self.version + attr['o'] = origin + else: + self.inc_counter('Internal_Error') + logging.error(f"Infos._info_defs: the row for {key} " + "missing 'dev' value for ha register") + + yield json.dumps(attr), component, node_id, attr['uniq_id'] + + def parse(self, buf, ind=0) -> Generator[tuple[str, bool], None, None]: + '''parse a data sequence received from the inverter and + stores the values in Infos.db + + buf: buffer of the sequence to parse''' + result = struct.unpack_from('!l', buf, ind) + elms = result[0] + i = 0 + ind += 4 + while i < elms: + result = struct.unpack_from('!lB', buf, ind) + info_id = result[0] + data_type = result[1] + ind += 5 + keys, level, unit, must_incr, new_val = self._key_obj(info_id) + + if data_type == 0x54: # 'T' -> Pascal-String + str_len = buf[ind] + result = struct.unpack_from(f'!{str_len+1}p', buf, + ind)[0].decode(encoding='ascii', + errors='replace') + ind += str_len+1 + + elif data_type == 0x49: # 'I' -> int32 + # if new_val: + # struct.pack_into('!l', buf, ind, new_val) + result = struct.unpack_from('!l', buf, ind)[0] + ind += 4 + + elif data_type == 0x53: # 'S' -> short + # if new_val: + # struct.pack_into('!h', buf, ind, new_val) + result = struct.unpack_from('!h', buf, ind)[0] + ind += 2 + + elif data_type == 0x46: # 'F' -> float32 + # if new_val: + # struct.pack_into('!f', buf, ind, new_val) + result = round(struct.unpack_from('!f', buf, ind)[0], 2) + ind += 4 + + elif data_type == 0x4c: # 'L' -> int64 + # if new_val: + # struct.pack_into('!q', buf, ind, new_val) + result = struct.unpack_from('!q', buf, ind)[0] + ind += 8 + + else: + self.inc_counter('Invalid_Data_Type') + logging.error(f"Infos.parse: data_type: {data_type}" + " not supported") + return + + if keys: + dict = self.db + name = '' + + for key in keys[:-1]: + if key not in dict: + dict[key] = {} + dict = dict[key] + name += key + '.' + + if keys[-1] not in dict: + update = (not must_incr or result > 0) + else: + if must_incr: + update = dict[keys[-1]] < result + else: + update = dict[keys[-1]] != result + + if update: + dict[keys[-1]] = result + name += keys[-1] + yield keys[0], update + else: + update = False + name = str(f'info-id.0x{info_id:x}') + + self.tracer.log(level, f'{name} : {result}{unit}' + f' update: {update}') + + i += 1 + + def ignore_this_device(self, dep: dict) -> bool: + '''Checks the equation in the dep dict + + returns 'False' only if the equation is valid; + 'True' in any other case''' + if 'reg' in dep: + value = self.dev_value(dep['reg']) + if not value: + return True + + if 'gte' in dep: + return not value >= dep['gte'] + elif 'less_eq' in dep: + return not value <= dep['less_eq'] + return True diff --git a/app/src/gen3plus/solarman_v5.py b/app/src/gen3plus/solarman_v5.py index c45c589..6a371ff 100644 --- a/app/src/gen3plus/solarman_v5.py +++ b/app/src/gen3plus/solarman_v5.py @@ -6,9 +6,11 @@ from datetime import datetime if __name__ == "app.src.gen3plus.solarman_v5": from app.src.messages import hex_dump_memory, Message from app.src.config import Config + from app.src.gen3plus.infos_g3p import InfosG3P else: # pragma: no cover from messages import hex_dump_memory, Message from config import Config + from gen3plus.infos_g3p import InfosG3P # import traceback logger = logging.getLogger('msg') @@ -25,6 +27,7 @@ class SolarmanV5(Message): self.snr = 0 # self.await_conn_resp_cnt = 0 # self.id_str = id_str + self.db = InfosG3P() self.switch = { 0x4110: self.msg_dev_ind, # hello 0x1110: self.msg_dev_rsp, diff --git a/app/src/infos.py b/app/src/infos.py index 33e7df7..a0dab8e 100644 --- a/app/src/infos.py +++ b/app/src/infos.py @@ -1,5 +1,3 @@ -import struct -import json import logging import os @@ -16,13 +14,13 @@ class Infos: logging.info('Initialize proxy statistics') # init proxy counter in the class.stat dictionary cls.stat['proxy'] = {} - for key in cls.__info_defs: - name = cls.__info_defs[key]['name'] + for key in cls._info_defs: + name = cls._info_defs[key]['name'] if name[0] == 'proxy': cls.stat['proxy'][name[1]] = 0 # add values from the environment to the device definition table - prxy = cls.__info_devs['proxy'] + prxy = cls._info_devs['proxy'] prxy['sw'] = cls.version prxy['mdl'] = cls.app_name @@ -30,7 +28,7 @@ class Infos: self.db = {} self.tracer = logging.getLogger('data') - __info_devs = { + _info_devs = { 'proxy': {'singleton': True, 'name': 'Proxy', 'mf': 'Stefan Allius'}, # noqa: E501 'controller': {'via': 'proxy', 'name': 'Controller', 'mdl': 0x00092f90, 'mf': 0x000927c0, 'sw': 0x00092ba8}, # noqa: E501 'inverter': {'via': 'controller', 'name': 'Micro Inverter', 'mdl': 0x00000032, 'mf': 0x00000014, 'sw': 0x0000001e}, # noqa: E501 @@ -42,7 +40,7 @@ class Infos: __comm_type_val_tpl = "{%set com_types = ['n/a','Wi-Fi', 'G4', 'G5', 'GPRS'] %}{{com_types[value_json['Communication_Type']|int(0)]|default(value_json['Communication_Type'])}}" # noqa: E501 - __info_defs = { + _info_defs = { # collector values used for device registration: 0x00092ba8: {'name': ['collector', 'Collector_Fw_Version'], 'level': logging.INFO, 'unit': ''}, # noqa: E501 0x000927c0: {'name': ['collector', 'Chip_Type'], 'level': logging.DEBUG, 'unit': ''}, # noqa: E501 @@ -131,6 +129,22 @@ class Infos: } + @property + def info_devs(self) -> dict: + return self._info_devs + + @info_devs.setter + def info_devs(self, value: dict) -> None: + self._info_devs = value + + @property + def info_defs(self) -> dict: + return self._info_defs + + @info_defs.setter + def info_defs(self, value: dict) -> None: + self._info_defs = value + def dev_value(self, idx: str | int) -> str | int | float | None: '''returns the stored device value from our database @@ -141,8 +155,8 @@ class Infos: ''' if type(idx) is str: return idx # return idx as a fixed value - elif idx in self.__info_defs: - row = self.__info_defs[idx] + elif idx in self._info_defs: + row = self._info_defs[idx] if 'singleton' in row and row['singleton']: dict = self.stat else: @@ -158,145 +172,6 @@ class Infos: return None # unknwon idx, not in __info_defs - def ignore_this_device(self, dep: dict) -> bool: - '''Checks the equation in the dep dict - - returns 'False' only if the equation is valid; - 'True' in any other case''' - if 'reg' in dep: - value = self.dev_value(dep['reg']) - if not value: - return True - - if 'gte' in dep: - return not value >= dep['gte'] - elif 'less_eq' in dep: - return not value <= dep['less_eq'] - return True - - def ha_confs(self, ha_prfx, node_id, snr, singleton: bool, sug_area=''): - '''Generator function yields a json register struct for home-assistant - auto configuration and a unique entity string - - arguments: - prfx:str ==> MQTT prefix for the home assistant 'stat_t string - snr:str ==> serial number of the inverter, used to build unique - entity strings - sug_area:str ==> suggested area string from the config file''' - tab = self.__info_defs - for key in tab: - row = tab[key] - if 'singleton' in row: - if singleton != row['singleton']: - continue - elif singleton: - continue - prfx = ha_prfx + node_id - - # check if we have details for home assistant - if 'ha' in row: - ha = row['ha'] - if 'comp' in ha: - component = ha['comp'] - else: - component = 'sensor' - attr = {} - if 'name' in ha: - attr['name'] = ha['name'] - else: - attr['name'] = row['name'][-1] - - attr['stat_t'] = prfx + row['name'][0] - attr['dev_cla'] = ha['dev_cla'] - attr['stat_cla'] = ha['stat_cla'] - attr['uniq_id'] = ha['id']+snr - if 'val_tpl' in ha: - attr['val_tpl'] = ha['val_tpl'] - elif 'fmt' in ha: - attr['val_tpl'] = '{{value_json' + f"['{row['name'][-1]}'] {ha['fmt']}" + '}}' # eg. 'val_tpl': "{{ value_json['Output_Power']|float }} # noqa: E501 - else: - self.inc_counter('Internal_Error') - logging.error(f"Infos.__info_defs: the row for {key} do" - " not have a 'val_tpl' nor a 'fmt' value") - - # add unit_of_meas only, if status_class isn't none. If - # status_cla is None we want a number format and not line - # graph in home assistant. A unit will change the number - # format to a line graph - if 'unit' in row and attr['stat_cla'] is not None: - attr['unit_of_meas'] = row['unit'] # 'unit_of_meas' - if 'icon' in ha: - attr['ic'] = ha['icon'] # icon for the entity - if 'nat_prc' in ha: - attr['sug_dsp_prc'] = ha['nat_prc'] # precison of floats - if 'ent_cat' in ha: - attr['ent_cat'] = ha['ent_cat'] # diagnostic, config - - # enabled_by_default is deactivated, since it avoid the via - # setup of the devices. It seems, that there is a bug in home - # assistant. tested with 'Home Assistant 2023.10.4' - # if 'en' in ha: # enabled_by_default - # attr['en'] = ha['en'] - - if 'dev' in ha: - device = self.__info_devs[ha['dev']] - - if 'dep' in device and self.ignore_this_device(device['dep']): # noqa: E501 - continue - - dev = {} - - # the same name for 'name' and 'suggested area', so we get - # dedicated devices in home assistant with short value - # name and headline - if (sug_area == '' or - ('singleton' in device and device['singleton'])): - dev['name'] = device['name'] - dev['sa'] = device['name'] - else: - dev['name'] = device['name']+' - '+sug_area - dev['sa'] = device['name']+' - '+sug_area - - if 'via' in device: # add the link to the parent device - via = device['via'] - if via in self.__info_devs: - via_dev = self.__info_devs[via] - if 'singleton' in via_dev and via_dev['singleton']: - dev['via_device'] = via - else: - dev['via_device'] = f"{via}_{snr}" - else: - self.inc_counter('Internal_Error') - logging.error(f"Infos.__info_defs: the row for " - f"{key} has an invalid via value: " - f"{via}") - - for key in ('mdl', 'mf', 'sw', 'hw'): # add optional - # values fpr 'modell', 'manufacturer', 'sw version' and - # 'hw version' - if key in device: - data = self.dev_value(device[key]) - if data is not None: - dev[key] = data - - if 'singleton' in device and device['singleton']: - dev['ids'] = [f"{ha['dev']}"] - else: - dev['ids'] = [f"{ha['dev']}_{snr}"] - - attr['dev'] = dev - - origin = {} - origin['name'] = self.app_name - origin['sw'] = self.version - attr['o'] = origin - else: - self.inc_counter('Internal_Error') - logging.error(f"Infos.__info_defs: the row for {key} " - "missing 'dev' value for ha register") - - yield json.dumps(attr), component, node_id, attr['uniq_id'] - def inc_counter(self, counter: str) -> None: '''inc proxy statistic counter''' dict = self.stat['proxy'] @@ -307,9 +182,9 @@ class Infos: dict = self.stat['proxy'] dict[counter] -= 1 - def __key_obj(self, id) -> list: - d = self.__info_defs.get(id, {'name': None, 'level': logging.DEBUG, - 'unit': ''}) + def _key_obj(self, id) -> list: + d = self._info_defs.get(id, {'name': None, 'level': logging.DEBUG, + 'unit': ''}) if 'ha' in d and 'must_incr' in d['ha']: must_incr = d['ha']['must_incr'] else: @@ -319,87 +194,3 @@ class Infos: # new_val = d['new_value'] return d['name'], d['level'], d['unit'], must_incr, new_val - - def parse(self, buf, ind=0): # -> None | tuple[str,bool]: - '''parse a data sequence received from the inverter and - stores the values in Infos.db - - buf: buffer of the sequence to parse''' - result = struct.unpack_from('!l', buf, ind) - elms = result[0] - i = 0 - ind += 4 - while i < elms: - result = struct.unpack_from('!lB', buf, ind) - info_id = result[0] - data_type = result[1] - ind += 5 - keys, level, unit, must_incr, new_val = self.__key_obj(info_id) - - if data_type == 0x54: # 'T' -> Pascal-String - str_len = buf[ind] - result = struct.unpack_from(f'!{str_len+1}p', buf, - ind)[0].decode(encoding='ascii', - errors='replace') - ind += str_len+1 - - elif data_type == 0x49: # 'I' -> int32 - # if new_val: - # struct.pack_into('!l', buf, ind, new_val) - result = struct.unpack_from('!l', buf, ind)[0] - ind += 4 - - elif data_type == 0x53: # 'S' -> short - # if new_val: - # struct.pack_into('!h', buf, ind, new_val) - result = struct.unpack_from('!h', buf, ind)[0] - ind += 2 - - elif data_type == 0x46: # 'F' -> float32 - # if new_val: - # struct.pack_into('!f', buf, ind, new_val) - result = round(struct.unpack_from('!f', buf, ind)[0], 2) - ind += 4 - - elif data_type == 0x4c: # 'L' -> int64 - # if new_val: - # struct.pack_into('!q', buf, ind, new_val) - result = struct.unpack_from('!q', buf, ind)[0] - ind += 8 - - else: - self.inc_counter('Invalid_Data_Type') - logging.error(f"Infos.parse: data_type: {data_type}" - " not supported") - return - - if keys: - dict = self.db - name = '' - - for key in keys[:-1]: - if key not in dict: - dict[key] = {} - dict = dict[key] - name += key + '.' - - if keys[-1] not in dict: - update = (not must_incr or result > 0) - else: - if must_incr: - update = dict[keys[-1]] < result - else: - update = dict[keys[-1]] != result - - if update: - dict[keys[-1]] = result - name += keys[-1] - yield keys[0], update - else: - update = False - name = str(f'info-id.0x{info_id:x}') - - self.tracer.log(level, f'{name} : {result}{unit}' - f' update: {update}') - - i += 1 diff --git a/app/src/messages.py b/app/src/messages.py index f348446..ab0b6c5 100644 --- a/app/src/messages.py +++ b/app/src/messages.py @@ -64,7 +64,6 @@ class Message(metaclass=IterRegistry): self._recv_buffer = bytearray(0) self._send_buffer = bytearray(0) self._forward_buffer = bytearray(0) - self.db = Infos() self.new_data = {} ''' diff --git a/app/tests/test_infos.py b/app/tests/test_infos.py index 923c6e2..4a23400 100644 --- a/app/tests/test_infos.py +++ b/app/tests/test_infos.py @@ -1,6 +1,6 @@ # test_with_pytest.py import pytest, json -from app.src.infos import Infos +from app.src.gen3.infos_g3 import InfosG3 @pytest.fixture def ContrDataSeq(): # Get Time Request message @@ -176,7 +176,7 @@ def InvDataSeq2_Zero(): # Data indication from the controller def test_parse_control(ContrDataSeq): - i = Infos() + i = InfosG3() for key, result in i.parse (ContrDataSeq): pass @@ -184,7 +184,7 @@ def test_parse_control(ContrDataSeq): {"collector": {"Collector_Fw_Version": "RSW_400_V1.00.06", "Chip_Type": "Raymon", "Chip_Model": "RSW-1-10001", "Trace_URL": "t.raymoniot.com", "Logger_URL": "logger.talent-monitoring.com"}, "controller": {"Collect_Interval": 1, "Signal_Strength": 100, "Power_On_Time": 29, "Communication_Type": 1, "Connect_Count": 1, "Data_Up_Interval": 300}}) def test_parse_control2(Contr2DataSeq): - i = Infos() + i = InfosG3() for key, result in i.parse (Contr2DataSeq): pass @@ -192,7 +192,7 @@ def test_parse_control2(Contr2DataSeq): {"collector": {"Collector_Fw_Version": "RSW_400_V1.00.20", "Chip_Type": "Raymon", "Chip_Model": "RSW-1-10001", "Trace_URL": "t.raymoniot.com", "Logger_URL": "logger.talent-monitoring.com"}, "controller": {"Collect_Interval": 1, "Signal_Strength": 16, "Power_On_Time": 334, "Communication_Type": 1, "Connect_Count": 1, "Data_Up_Interval": 300}}) def test_parse_inverter(InvDataSeq): - i = Infos() + i = InfosG3() for key, result in i.parse (InvDataSeq): pass @@ -200,7 +200,7 @@ def test_parse_inverter(InvDataSeq): {"inverter": {"Product_Name": "Microinv", "Manufacturer": "TSUN", "Version": "V5.0.11", "Serial_Number": "T170000000000001", "Equipment_Model": "TSOL-MS600"}}) def test_parse_cont_and_invert(ContrDataSeq, InvDataSeq): - i = Infos() + i = InfosG3() for key, result in i.parse (ContrDataSeq): pass @@ -214,7 +214,7 @@ def test_parse_cont_and_invert(ContrDataSeq, InvDataSeq): def test_build_ha_conf1(ContrDataSeq): - i = Infos() + i = InfosG3() i.static_init() # initialize counter tests = 0 @@ -270,7 +270,7 @@ def test_build_ha_conf1(ContrDataSeq): assert tests==5 def test_build_ha_conf2(ContrDataSeq, InvDataSeq, InvDataSeq2): - i = Infos() + i = InfosG3() for key, result in i.parse (ContrDataSeq): pass for key, result in i.parse (InvDataSeq): @@ -308,7 +308,7 @@ def test_build_ha_conf2(ContrDataSeq, InvDataSeq, InvDataSeq2): assert tests==5 def test_must_incr_total(InvDataSeq2, InvDataSeq2_Zero): - i = Infos() + i = InfosG3() tests = 0 for key, update in i.parse (InvDataSeq2): if key == 'total': @@ -353,7 +353,7 @@ def test_must_incr_total(InvDataSeq2, InvDataSeq2_Zero): assert json.dumps(i.db['env']) == json.dumps({"Inverter_Temp": 0, "Rated_Power": 0}) def test_must_incr_total2(InvDataSeq2, InvDataSeq2_Zero): - i = Infos() + i = InfosG3() tests = 0 for key, update in i.parse (InvDataSeq2_Zero): if key == 'total': @@ -398,7 +398,7 @@ def test_must_incr_total2(InvDataSeq2, InvDataSeq2_Zero): def test_statistic_counter(): - i = Infos() + i = InfosG3() val = i.dev_value("Test-String") assert val == "Test-String" @@ -424,7 +424,7 @@ def test_statistic_counter(): assert val == 0 def test_dep_rules(): - i = Infos() + i = InfosG3() i.static_init() # initialize counter res = i.ignore_this_device({}) @@ -456,7 +456,7 @@ def test_dep_rules(): assert res == False def test_table_definition(): - i = Infos() + i = InfosG3() i.static_init() # initialize counter val = i.dev_value(0xffffff04) # check internal error counter @@ -472,7 +472,7 @@ def test_table_definition(): assert val == 0 # test missing 'fmt' value - Infos._Infos__info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test1'], 'singleton': True, 'ha':{'dev':'proxy', 'dev_cla': None, 'stat_cla': None, 'id':'intern_test1_'}} + i.info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test1'], 'singleton': True, 'ha':{'dev':'proxy', 'dev_cla': None, 'stat_cla': None, 'id':'intern_test1_'}} tests = 0 for d_json, comp, node_id, id in i.ha_confs(ha_prfx="tsun/", node_id = 'proxy/', snr = '456', singleton=True, sug_area = 'roof'): @@ -485,7 +485,7 @@ def test_table_definition(): assert val == 1 # test missing 'dev' value - Infos._Infos__info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test2'], 'singleton': True, 'ha':{'dev_cla': None, 'stat_cla': None, 'id':'intern_test2_', 'fmt':'| int'}} + i.info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test2'], 'singleton': True, 'ha':{'dev_cla': None, 'stat_cla': None, 'id':'intern_test2_', 'fmt':'| int'}} tests = 0 for d_json, comp, node_id, id in i.ha_confs(ha_prfx="tsun/", node_id = 'proxy/', snr = '456', singleton=True, sug_area = 'roof'): if id == 'intern_test2_456': @@ -499,9 +499,9 @@ def test_table_definition(): # test invalid 'via' value - Infos._Infos__info_devs['test_dev'] = {'via':'xyz', 'name':'Module PV1'} + i.info_devs['test_dev'] = {'via':'xyz', 'name':'Module PV1'} - Infos._Infos__info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test2'], 'singleton': True, 'ha':{'dev':'test_dev', 'dev_cla': None, 'stat_cla': None, 'id':'intern_test2_', 'fmt':'| int'}} + i.info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test2'], 'singleton': True, 'ha':{'dev':'test_dev', 'dev_cla': None, 'stat_cla': None, 'id':'intern_test2_', 'fmt':'| int'}} tests = 0 for d_json, comp, node_id, id in i.ha_confs(ha_prfx="tsun/", node_id = 'proxy/', snr = '456', singleton=True, sug_area = 'roof'): if id == 'intern_test2_456': @@ -514,7 +514,7 @@ def test_table_definition(): def test_invalid_data_type(InvalidDataSeq): - i = Infos() + i = InfosG3() i.static_init() # initialize counter val = i.dev_value(0xffffff03) # check invalid data type counter