implent register mapping

This commit is contained in:
Stefan Allius
2024-03-29 10:48:09 +01:00
parent 21e46ae456
commit 37cb7cc1a1
3 changed files with 385 additions and 307 deletions

View File

@@ -1,13 +1,18 @@
import struct
import json
import logging
from typing import Generator
if __name__ == "app.src.gen3plus.infos_g3p":
from app.src.infos import Infos
from app.src.infos import Infos, Register
else: # pragma: no cover
from infos import Infos
from infos import Infos, Register
class RegisterMap:
map = {
0x00092ba8: Register.COLLECTOR_FW_VERSION,
}
class InfosG3P(Infos):
@@ -21,119 +26,14 @@ class InfosG3P(Infos):
snr:str ==> serial number of the inverter, used to build unique
entity strings
sug_area:str ==> suggested area string from the config file'''
tab = self.info_defs
for key in tab:
row = tab[key]
if 'singleton' in row:
if singleton != row['singleton']:
continue
elif singleton:
# iterate over RegisterMap.map and get the register values
for key, reg in RegisterMap.map.items():
if reg not in self.info_defs:
continue
prfx = ha_prfx + node_id
# check if we have details for home assistant
if 'ha' in row:
ha = row['ha']
if 'comp' in ha:
component = ha['comp']
else:
component = 'sensor'
attr = {}
if 'name' in ha:
attr['name'] = ha['name']
else:
attr['name'] = row['name'][-1]
attr['stat_t'] = prfx + row['name'][0]
attr['dev_cla'] = ha['dev_cla']
attr['stat_cla'] = ha['stat_cla']
attr['uniq_id'] = ha['id']+snr
if 'val_tpl' in ha:
attr['val_tpl'] = ha['val_tpl']
elif 'fmt' in ha:
attr['val_tpl'] = '{{value_json' + f"['{row['name'][-1]}'] {ha['fmt']}" + '}}' # eg. 'val_tpl': "{{ value_json['Output_Power']|float }} # noqa: E501
else:
self.inc_counter('Internal_Error')
logging.error(f"Infos._info_defs: the row for {key} do"
" not have a 'val_tpl' nor a 'fmt' value")
# add unit_of_meas only, if status_class isn't none. If
# status_cla is None we want a number format and not line
# graph in home assistant. A unit will change the number
# format to a line graph
if 'unit' in row and attr['stat_cla'] is not None:
attr['unit_of_meas'] = row['unit'] # 'unit_of_meas'
if 'icon' in ha:
attr['ic'] = ha['icon'] # icon for the entity
if 'nat_prc' in ha:
attr['sug_dsp_prc'] = ha['nat_prc'] # precison of floats
if 'ent_cat' in ha:
attr['ent_cat'] = ha['ent_cat'] # diagnostic, config
# enabled_by_default is deactivated, since it avoid the via
# setup of the devices. It seems, that there is a bug in home
# assistant. tested with 'Home Assistant 2023.10.4'
# if 'en' in ha: # enabled_by_default
# attr['en'] = ha['en']
if 'dev' in ha:
device = self.info_devs[ha['dev']]
if 'dep' in device and self.ignore_this_device(device['dep']): # noqa: E501
continue
dev = {}
# the same name for 'name' and 'suggested area', so we get
# dedicated devices in home assistant with short value
# name and headline
if (sug_area == '' or
('singleton' in device and device['singleton'])):
dev['name'] = device['name']
dev['sa'] = device['name']
else:
dev['name'] = device['name']+' - '+sug_area
dev['sa'] = device['name']+' - '+sug_area
if 'via' in device: # add the link to the parent device
via = device['via']
if via in self.info_devs:
via_dev = self.info_devs[via]
if 'singleton' in via_dev and via_dev['singleton']:
dev['via_device'] = via
else:
dev['via_device'] = f"{via}_{snr}"
else:
self.inc_counter('Internal_Error')
logging.error(f"Infos._info_defs: the row for "
f"{key} has an invalid via value: "
f"{via}")
for key in ('mdl', 'mf', 'sw', 'hw'): # add optional
# values fpr 'modell', 'manufacturer', 'sw version' and
# 'hw version'
if key in device:
data = self.dev_value(device[key])
if data is not None:
dev[key] = data
if 'singleton' in device and device['singleton']:
dev['ids'] = [f"{ha['dev']}"]
else:
dev['ids'] = [f"{ha['dev']}_{snr}"]
attr['dev'] = dev
origin = {}
origin['name'] = self.app_name
origin['sw'] = self.version
attr['o'] = origin
else:
self.inc_counter('Internal_Error')
logging.error(f"Infos._info_defs: the row for {key} "
"missing 'dev' value for ha register")
yield json.dumps(attr), component, node_id, attr['uniq_id']
row = self.info_defs[reg]
res = self.ha_conf(row, reg, ha_prfx, node_id, snr, singleton, sug_area) # noqa: E501
if res:
yield res
def parse(self, buf, ind=0) -> Generator[tuple[str, bool], None, None]:
'''parse a data sequence received from the inverter and