refactor info class
This commit is contained in:
236
app/src/gen3/infos_g3.py
Normal file
236
app/src/gen3/infos_g3.py
Normal file
@@ -0,0 +1,236 @@
|
||||
|
||||
import struct
|
||||
import json
|
||||
import logging
|
||||
from typing import Generator
|
||||
|
||||
if __name__ == "app.src.gen3.infos_g3":
|
||||
from app.src.infos import Infos
|
||||
else: # pragma: no cover
|
||||
from infos import Infos
|
||||
|
||||
|
||||
class InfosG3(Infos):
|
||||
def ha_confs(self, ha_prfx, node_id, snr, singleton: bool, sug_area='') \
|
||||
-> Generator[tuple[dict, str], None, None]:
|
||||
'''Generator function yields a json register struct for home-assistant
|
||||
auto configuration and a unique entity string
|
||||
|
||||
arguments:
|
||||
prfx:str ==> MQTT prefix for the home assistant 'stat_t string
|
||||
snr:str ==> serial number of the inverter, used to build unique
|
||||
entity strings
|
||||
sug_area:str ==> suggested area string from the config file'''
|
||||
tab = self.info_defs
|
||||
for key in tab:
|
||||
row = tab[key]
|
||||
if 'singleton' in row:
|
||||
if singleton != row['singleton']:
|
||||
continue
|
||||
elif singleton:
|
||||
continue
|
||||
prfx = ha_prfx + node_id
|
||||
|
||||
# check if we have details for home assistant
|
||||
if 'ha' in row:
|
||||
ha = row['ha']
|
||||
if 'comp' in ha:
|
||||
component = ha['comp']
|
||||
else:
|
||||
component = 'sensor'
|
||||
attr = {}
|
||||
if 'name' in ha:
|
||||
attr['name'] = ha['name']
|
||||
else:
|
||||
attr['name'] = row['name'][-1]
|
||||
|
||||
attr['stat_t'] = prfx + row['name'][0]
|
||||
attr['dev_cla'] = ha['dev_cla']
|
||||
attr['stat_cla'] = ha['stat_cla']
|
||||
attr['uniq_id'] = ha['id']+snr
|
||||
if 'val_tpl' in ha:
|
||||
attr['val_tpl'] = ha['val_tpl']
|
||||
elif 'fmt' in ha:
|
||||
attr['val_tpl'] = '{{value_json' + f"['{row['name'][-1]}'] {ha['fmt']}" + '}}' # eg. 'val_tpl': "{{ value_json['Output_Power']|float }} # noqa: E501
|
||||
else:
|
||||
self.inc_counter('Internal_Error')
|
||||
logging.error(f"Infos._info_defs: the row for {key} do"
|
||||
" not have a 'val_tpl' nor a 'fmt' value")
|
||||
|
||||
# add unit_of_meas only, if status_class isn't none. If
|
||||
# status_cla is None we want a number format and not line
|
||||
# graph in home assistant. A unit will change the number
|
||||
# format to a line graph
|
||||
if 'unit' in row and attr['stat_cla'] is not None:
|
||||
attr['unit_of_meas'] = row['unit'] # 'unit_of_meas'
|
||||
if 'icon' in ha:
|
||||
attr['ic'] = ha['icon'] # icon for the entity
|
||||
if 'nat_prc' in ha:
|
||||
attr['sug_dsp_prc'] = ha['nat_prc'] # precison of floats
|
||||
if 'ent_cat' in ha:
|
||||
attr['ent_cat'] = ha['ent_cat'] # diagnostic, config
|
||||
|
||||
# enabled_by_default is deactivated, since it avoid the via
|
||||
# setup of the devices. It seems, that there is a bug in home
|
||||
# assistant. tested with 'Home Assistant 2023.10.4'
|
||||
# if 'en' in ha: # enabled_by_default
|
||||
# attr['en'] = ha['en']
|
||||
|
||||
if 'dev' in ha:
|
||||
device = self.info_devs[ha['dev']]
|
||||
|
||||
if 'dep' in device and self.ignore_this_device(device['dep']): # noqa: E501
|
||||
continue
|
||||
|
||||
dev = {}
|
||||
|
||||
# the same name for 'name' and 'suggested area', so we get
|
||||
# dedicated devices in home assistant with short value
|
||||
# name and headline
|
||||
if (sug_area == '' or
|
||||
('singleton' in device and device['singleton'])):
|
||||
dev['name'] = device['name']
|
||||
dev['sa'] = device['name']
|
||||
else:
|
||||
dev['name'] = device['name']+' - '+sug_area
|
||||
dev['sa'] = device['name']+' - '+sug_area
|
||||
|
||||
if 'via' in device: # add the link to the parent device
|
||||
via = device['via']
|
||||
if via in self.info_devs:
|
||||
via_dev = self.info_devs[via]
|
||||
if 'singleton' in via_dev and via_dev['singleton']:
|
||||
dev['via_device'] = via
|
||||
else:
|
||||
dev['via_device'] = f"{via}_{snr}"
|
||||
else:
|
||||
self.inc_counter('Internal_Error')
|
||||
logging.error(f"Infos._info_defs: the row for "
|
||||
f"{key} has an invalid via value: "
|
||||
f"{via}")
|
||||
|
||||
for key in ('mdl', 'mf', 'sw', 'hw'): # add optional
|
||||
# values fpr 'modell', 'manufacturer', 'sw version' and
|
||||
# 'hw version'
|
||||
if key in device:
|
||||
data = self.dev_value(device[key])
|
||||
if data is not None:
|
||||
dev[key] = data
|
||||
|
||||
if 'singleton' in device and device['singleton']:
|
||||
dev['ids'] = [f"{ha['dev']}"]
|
||||
else:
|
||||
dev['ids'] = [f"{ha['dev']}_{snr}"]
|
||||
|
||||
attr['dev'] = dev
|
||||
|
||||
origin = {}
|
||||
origin['name'] = self.app_name
|
||||
origin['sw'] = self.version
|
||||
attr['o'] = origin
|
||||
else:
|
||||
self.inc_counter('Internal_Error')
|
||||
logging.error(f"Infos._info_defs: the row for {key} "
|
||||
"missing 'dev' value for ha register")
|
||||
|
||||
yield json.dumps(attr), component, node_id, attr['uniq_id']
|
||||
|
||||
def parse(self, buf, ind=0) -> Generator[tuple[str, bool], None, None]:
|
||||
'''parse a data sequence received from the inverter and
|
||||
stores the values in Infos.db
|
||||
|
||||
buf: buffer of the sequence to parse'''
|
||||
result = struct.unpack_from('!l', buf, ind)
|
||||
elms = result[0]
|
||||
i = 0
|
||||
ind += 4
|
||||
while i < elms:
|
||||
result = struct.unpack_from('!lB', buf, ind)
|
||||
info_id = result[0]
|
||||
data_type = result[1]
|
||||
ind += 5
|
||||
keys, level, unit, must_incr, new_val = self._key_obj(info_id)
|
||||
|
||||
if data_type == 0x54: # 'T' -> Pascal-String
|
||||
str_len = buf[ind]
|
||||
result = struct.unpack_from(f'!{str_len+1}p', buf,
|
||||
ind)[0].decode(encoding='ascii',
|
||||
errors='replace')
|
||||
ind += str_len+1
|
||||
|
||||
elif data_type == 0x49: # 'I' -> int32
|
||||
# if new_val:
|
||||
# struct.pack_into('!l', buf, ind, new_val)
|
||||
result = struct.unpack_from('!l', buf, ind)[0]
|
||||
ind += 4
|
||||
|
||||
elif data_type == 0x53: # 'S' -> short
|
||||
# if new_val:
|
||||
# struct.pack_into('!h', buf, ind, new_val)
|
||||
result = struct.unpack_from('!h', buf, ind)[0]
|
||||
ind += 2
|
||||
|
||||
elif data_type == 0x46: # 'F' -> float32
|
||||
# if new_val:
|
||||
# struct.pack_into('!f', buf, ind, new_val)
|
||||
result = round(struct.unpack_from('!f', buf, ind)[0], 2)
|
||||
ind += 4
|
||||
|
||||
elif data_type == 0x4c: # 'L' -> int64
|
||||
# if new_val:
|
||||
# struct.pack_into('!q', buf, ind, new_val)
|
||||
result = struct.unpack_from('!q', buf, ind)[0]
|
||||
ind += 8
|
||||
|
||||
else:
|
||||
self.inc_counter('Invalid_Data_Type')
|
||||
logging.error(f"Infos.parse: data_type: {data_type}"
|
||||
" not supported")
|
||||
return
|
||||
|
||||
if keys:
|
||||
dict = self.db
|
||||
name = ''
|
||||
|
||||
for key in keys[:-1]:
|
||||
if key not in dict:
|
||||
dict[key] = {}
|
||||
dict = dict[key]
|
||||
name += key + '.'
|
||||
|
||||
if keys[-1] not in dict:
|
||||
update = (not must_incr or result > 0)
|
||||
else:
|
||||
if must_incr:
|
||||
update = dict[keys[-1]] < result
|
||||
else:
|
||||
update = dict[keys[-1]] != result
|
||||
|
||||
if update:
|
||||
dict[keys[-1]] = result
|
||||
name += keys[-1]
|
||||
yield keys[0], update
|
||||
else:
|
||||
update = False
|
||||
name = str(f'info-id.0x{info_id:x}')
|
||||
|
||||
self.tracer.log(level, f'{name} : {result}{unit}'
|
||||
f' update: {update}')
|
||||
|
||||
i += 1
|
||||
|
||||
def ignore_this_device(self, dep: dict) -> bool:
|
||||
'''Checks the equation in the dep dict
|
||||
|
||||
returns 'False' only if the equation is valid;
|
||||
'True' in any other case'''
|
||||
if 'reg' in dep:
|
||||
value = self.dev_value(dep['reg'])
|
||||
if not value:
|
||||
return True
|
||||
|
||||
if 'gte' in dep:
|
||||
return not value >= dep['gte']
|
||||
elif 'less_eq' in dep:
|
||||
return not value <= dep['less_eq']
|
||||
return True
|
||||
@@ -6,9 +6,11 @@ from datetime import datetime
|
||||
if __name__ == "app.src.gen3.talent":
|
||||
from app.src.messages import hex_dump_memory, Message
|
||||
from app.src.config import Config
|
||||
from app.src.gen3.infos_g3 import InfosG3
|
||||
else: # pragma: no cover
|
||||
from messages import hex_dump_memory, Message
|
||||
from config import Config
|
||||
from gen3.infos_g3 import InfosG3
|
||||
|
||||
logger = logging.getLogger('msg')
|
||||
|
||||
@@ -38,6 +40,7 @@ class Talent(Message):
|
||||
self.id_str = id_str
|
||||
self.contact_name = b''
|
||||
self.contact_mail = b''
|
||||
self.db = InfosG3()
|
||||
self.switch = {
|
||||
0x00: self.msg_contact_info,
|
||||
0x13: self.msg_ota_update,
|
||||
|
||||
236
app/src/gen3plus/infos_g3p.py
Normal file
236
app/src/gen3plus/infos_g3p.py
Normal file
@@ -0,0 +1,236 @@
|
||||
|
||||
import struct
|
||||
import json
|
||||
import logging
|
||||
from typing import Generator
|
||||
|
||||
if __name__ == "app.src.gen3plus.infos_g3p":
|
||||
from app.src.infos import Infos
|
||||
else: # pragma: no cover
|
||||
from infos import Infos
|
||||
|
||||
|
||||
class InfosG3P(Infos):
|
||||
def ha_confs(self, ha_prfx, node_id, snr, singleton: bool, sug_area='') \
|
||||
-> Generator[tuple[dict, str], None, None]:
|
||||
'''Generator function yields a json register struct for home-assistant
|
||||
auto configuration and a unique entity string
|
||||
|
||||
arguments:
|
||||
prfx:str ==> MQTT prefix for the home assistant 'stat_t string
|
||||
snr:str ==> serial number of the inverter, used to build unique
|
||||
entity strings
|
||||
sug_area:str ==> suggested area string from the config file'''
|
||||
tab = self.info_defs
|
||||
for key in tab:
|
||||
row = tab[key]
|
||||
if 'singleton' in row:
|
||||
if singleton != row['singleton']:
|
||||
continue
|
||||
elif singleton:
|
||||
continue
|
||||
prfx = ha_prfx + node_id
|
||||
|
||||
# check if we have details for home assistant
|
||||
if 'ha' in row:
|
||||
ha = row['ha']
|
||||
if 'comp' in ha:
|
||||
component = ha['comp']
|
||||
else:
|
||||
component = 'sensor'
|
||||
attr = {}
|
||||
if 'name' in ha:
|
||||
attr['name'] = ha['name']
|
||||
else:
|
||||
attr['name'] = row['name'][-1]
|
||||
|
||||
attr['stat_t'] = prfx + row['name'][0]
|
||||
attr['dev_cla'] = ha['dev_cla']
|
||||
attr['stat_cla'] = ha['stat_cla']
|
||||
attr['uniq_id'] = ha['id']+snr
|
||||
if 'val_tpl' in ha:
|
||||
attr['val_tpl'] = ha['val_tpl']
|
||||
elif 'fmt' in ha:
|
||||
attr['val_tpl'] = '{{value_json' + f"['{row['name'][-1]}'] {ha['fmt']}" + '}}' # eg. 'val_tpl': "{{ value_json['Output_Power']|float }} # noqa: E501
|
||||
else:
|
||||
self.inc_counter('Internal_Error')
|
||||
logging.error(f"Infos._info_defs: the row for {key} do"
|
||||
" not have a 'val_tpl' nor a 'fmt' value")
|
||||
|
||||
# add unit_of_meas only, if status_class isn't none. If
|
||||
# status_cla is None we want a number format and not line
|
||||
# graph in home assistant. A unit will change the number
|
||||
# format to a line graph
|
||||
if 'unit' in row and attr['stat_cla'] is not None:
|
||||
attr['unit_of_meas'] = row['unit'] # 'unit_of_meas'
|
||||
if 'icon' in ha:
|
||||
attr['ic'] = ha['icon'] # icon for the entity
|
||||
if 'nat_prc' in ha:
|
||||
attr['sug_dsp_prc'] = ha['nat_prc'] # precison of floats
|
||||
if 'ent_cat' in ha:
|
||||
attr['ent_cat'] = ha['ent_cat'] # diagnostic, config
|
||||
|
||||
# enabled_by_default is deactivated, since it avoid the via
|
||||
# setup of the devices. It seems, that there is a bug in home
|
||||
# assistant. tested with 'Home Assistant 2023.10.4'
|
||||
# if 'en' in ha: # enabled_by_default
|
||||
# attr['en'] = ha['en']
|
||||
|
||||
if 'dev' in ha:
|
||||
device = self.info_devs[ha['dev']]
|
||||
|
||||
if 'dep' in device and self.ignore_this_device(device['dep']): # noqa: E501
|
||||
continue
|
||||
|
||||
dev = {}
|
||||
|
||||
# the same name for 'name' and 'suggested area', so we get
|
||||
# dedicated devices in home assistant with short value
|
||||
# name and headline
|
||||
if (sug_area == '' or
|
||||
('singleton' in device and device['singleton'])):
|
||||
dev['name'] = device['name']
|
||||
dev['sa'] = device['name']
|
||||
else:
|
||||
dev['name'] = device['name']+' - '+sug_area
|
||||
dev['sa'] = device['name']+' - '+sug_area
|
||||
|
||||
if 'via' in device: # add the link to the parent device
|
||||
via = device['via']
|
||||
if via in self.info_devs:
|
||||
via_dev = self.info_devs[via]
|
||||
if 'singleton' in via_dev and via_dev['singleton']:
|
||||
dev['via_device'] = via
|
||||
else:
|
||||
dev['via_device'] = f"{via}_{snr}"
|
||||
else:
|
||||
self.inc_counter('Internal_Error')
|
||||
logging.error(f"Infos._info_defs: the row for "
|
||||
f"{key} has an invalid via value: "
|
||||
f"{via}")
|
||||
|
||||
for key in ('mdl', 'mf', 'sw', 'hw'): # add optional
|
||||
# values fpr 'modell', 'manufacturer', 'sw version' and
|
||||
# 'hw version'
|
||||
if key in device:
|
||||
data = self.dev_value(device[key])
|
||||
if data is not None:
|
||||
dev[key] = data
|
||||
|
||||
if 'singleton' in device and device['singleton']:
|
||||
dev['ids'] = [f"{ha['dev']}"]
|
||||
else:
|
||||
dev['ids'] = [f"{ha['dev']}_{snr}"]
|
||||
|
||||
attr['dev'] = dev
|
||||
|
||||
origin = {}
|
||||
origin['name'] = self.app_name
|
||||
origin['sw'] = self.version
|
||||
attr['o'] = origin
|
||||
else:
|
||||
self.inc_counter('Internal_Error')
|
||||
logging.error(f"Infos._info_defs: the row for {key} "
|
||||
"missing 'dev' value for ha register")
|
||||
|
||||
yield json.dumps(attr), component, node_id, attr['uniq_id']
|
||||
|
||||
def parse(self, buf, ind=0) -> Generator[tuple[str, bool], None, None]:
|
||||
'''parse a data sequence received from the inverter and
|
||||
stores the values in Infos.db
|
||||
|
||||
buf: buffer of the sequence to parse'''
|
||||
result = struct.unpack_from('!l', buf, ind)
|
||||
elms = result[0]
|
||||
i = 0
|
||||
ind += 4
|
||||
while i < elms:
|
||||
result = struct.unpack_from('!lB', buf, ind)
|
||||
info_id = result[0]
|
||||
data_type = result[1]
|
||||
ind += 5
|
||||
keys, level, unit, must_incr, new_val = self._key_obj(info_id)
|
||||
|
||||
if data_type == 0x54: # 'T' -> Pascal-String
|
||||
str_len = buf[ind]
|
||||
result = struct.unpack_from(f'!{str_len+1}p', buf,
|
||||
ind)[0].decode(encoding='ascii',
|
||||
errors='replace')
|
||||
ind += str_len+1
|
||||
|
||||
elif data_type == 0x49: # 'I' -> int32
|
||||
# if new_val:
|
||||
# struct.pack_into('!l', buf, ind, new_val)
|
||||
result = struct.unpack_from('!l', buf, ind)[0]
|
||||
ind += 4
|
||||
|
||||
elif data_type == 0x53: # 'S' -> short
|
||||
# if new_val:
|
||||
# struct.pack_into('!h', buf, ind, new_val)
|
||||
result = struct.unpack_from('!h', buf, ind)[0]
|
||||
ind += 2
|
||||
|
||||
elif data_type == 0x46: # 'F' -> float32
|
||||
# if new_val:
|
||||
# struct.pack_into('!f', buf, ind, new_val)
|
||||
result = round(struct.unpack_from('!f', buf, ind)[0], 2)
|
||||
ind += 4
|
||||
|
||||
elif data_type == 0x4c: # 'L' -> int64
|
||||
# if new_val:
|
||||
# struct.pack_into('!q', buf, ind, new_val)
|
||||
result = struct.unpack_from('!q', buf, ind)[0]
|
||||
ind += 8
|
||||
|
||||
else:
|
||||
self.inc_counter('Invalid_Data_Type')
|
||||
logging.error(f"Infos.parse: data_type: {data_type}"
|
||||
" not supported")
|
||||
return
|
||||
|
||||
if keys:
|
||||
dict = self.db
|
||||
name = ''
|
||||
|
||||
for key in keys[:-1]:
|
||||
if key not in dict:
|
||||
dict[key] = {}
|
||||
dict = dict[key]
|
||||
name += key + '.'
|
||||
|
||||
if keys[-1] not in dict:
|
||||
update = (not must_incr or result > 0)
|
||||
else:
|
||||
if must_incr:
|
||||
update = dict[keys[-1]] < result
|
||||
else:
|
||||
update = dict[keys[-1]] != result
|
||||
|
||||
if update:
|
||||
dict[keys[-1]] = result
|
||||
name += keys[-1]
|
||||
yield keys[0], update
|
||||
else:
|
||||
update = False
|
||||
name = str(f'info-id.0x{info_id:x}')
|
||||
|
||||
self.tracer.log(level, f'{name} : {result}{unit}'
|
||||
f' update: {update}')
|
||||
|
||||
i += 1
|
||||
|
||||
def ignore_this_device(self, dep: dict) -> bool:
|
||||
'''Checks the equation in the dep dict
|
||||
|
||||
returns 'False' only if the equation is valid;
|
||||
'True' in any other case'''
|
||||
if 'reg' in dep:
|
||||
value = self.dev_value(dep['reg'])
|
||||
if not value:
|
||||
return True
|
||||
|
||||
if 'gte' in dep:
|
||||
return not value >= dep['gte']
|
||||
elif 'less_eq' in dep:
|
||||
return not value <= dep['less_eq']
|
||||
return True
|
||||
@@ -6,9 +6,11 @@ from datetime import datetime
|
||||
if __name__ == "app.src.gen3plus.solarman_v5":
|
||||
from app.src.messages import hex_dump_memory, Message
|
||||
from app.src.config import Config
|
||||
from app.src.gen3plus.infos_g3p import InfosG3P
|
||||
else: # pragma: no cover
|
||||
from messages import hex_dump_memory, Message
|
||||
from config import Config
|
||||
from gen3plus.infos_g3p import InfosG3P
|
||||
# import traceback
|
||||
|
||||
logger = logging.getLogger('msg')
|
||||
@@ -25,6 +27,7 @@ class SolarmanV5(Message):
|
||||
self.snr = 0
|
||||
# self.await_conn_resp_cnt = 0
|
||||
# self.id_str = id_str
|
||||
self.db = InfosG3P()
|
||||
self.switch = {
|
||||
0x4110: self.msg_dev_ind, # hello
|
||||
0x1110: self.msg_dev_rsp,
|
||||
|
||||
261
app/src/infos.py
261
app/src/infos.py
@@ -1,5 +1,3 @@
|
||||
import struct
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
@@ -16,13 +14,13 @@ class Infos:
|
||||
logging.info('Initialize proxy statistics')
|
||||
# init proxy counter in the class.stat dictionary
|
||||
cls.stat['proxy'] = {}
|
||||
for key in cls.__info_defs:
|
||||
name = cls.__info_defs[key]['name']
|
||||
for key in cls._info_defs:
|
||||
name = cls._info_defs[key]['name']
|
||||
if name[0] == 'proxy':
|
||||
cls.stat['proxy'][name[1]] = 0
|
||||
|
||||
# add values from the environment to the device definition table
|
||||
prxy = cls.__info_devs['proxy']
|
||||
prxy = cls._info_devs['proxy']
|
||||
prxy['sw'] = cls.version
|
||||
prxy['mdl'] = cls.app_name
|
||||
|
||||
@@ -30,7 +28,7 @@ class Infos:
|
||||
self.db = {}
|
||||
self.tracer = logging.getLogger('data')
|
||||
|
||||
__info_devs = {
|
||||
_info_devs = {
|
||||
'proxy': {'singleton': True, 'name': 'Proxy', 'mf': 'Stefan Allius'}, # noqa: E501
|
||||
'controller': {'via': 'proxy', 'name': 'Controller', 'mdl': 0x00092f90, 'mf': 0x000927c0, 'sw': 0x00092ba8}, # noqa: E501
|
||||
'inverter': {'via': 'controller', 'name': 'Micro Inverter', 'mdl': 0x00000032, 'mf': 0x00000014, 'sw': 0x0000001e}, # noqa: E501
|
||||
@@ -42,7 +40,7 @@ class Infos:
|
||||
|
||||
__comm_type_val_tpl = "{%set com_types = ['n/a','Wi-Fi', 'G4', 'G5', 'GPRS'] %}{{com_types[value_json['Communication_Type']|int(0)]|default(value_json['Communication_Type'])}}" # noqa: E501
|
||||
|
||||
__info_defs = {
|
||||
_info_defs = {
|
||||
# collector values used for device registration:
|
||||
0x00092ba8: {'name': ['collector', 'Collector_Fw_Version'], 'level': logging.INFO, 'unit': ''}, # noqa: E501
|
||||
0x000927c0: {'name': ['collector', 'Chip_Type'], 'level': logging.DEBUG, 'unit': ''}, # noqa: E501
|
||||
@@ -131,6 +129,22 @@ class Infos:
|
||||
|
||||
}
|
||||
|
||||
@property
|
||||
def info_devs(self) -> dict:
|
||||
return self._info_devs
|
||||
|
||||
@info_devs.setter
|
||||
def info_devs(self, value: dict) -> None:
|
||||
self._info_devs = value
|
||||
|
||||
@property
|
||||
def info_defs(self) -> dict:
|
||||
return self._info_defs
|
||||
|
||||
@info_defs.setter
|
||||
def info_defs(self, value: dict) -> None:
|
||||
self._info_defs = value
|
||||
|
||||
def dev_value(self, idx: str | int) -> str | int | float | None:
|
||||
'''returns the stored device value from our database
|
||||
|
||||
@@ -141,8 +155,8 @@ class Infos:
|
||||
'''
|
||||
if type(idx) is str:
|
||||
return idx # return idx as a fixed value
|
||||
elif idx in self.__info_defs:
|
||||
row = self.__info_defs[idx]
|
||||
elif idx in self._info_defs:
|
||||
row = self._info_defs[idx]
|
||||
if 'singleton' in row and row['singleton']:
|
||||
dict = self.stat
|
||||
else:
|
||||
@@ -158,145 +172,6 @@ class Infos:
|
||||
|
||||
return None # unknwon idx, not in __info_defs
|
||||
|
||||
def ignore_this_device(self, dep: dict) -> bool:
|
||||
'''Checks the equation in the dep dict
|
||||
|
||||
returns 'False' only if the equation is valid;
|
||||
'True' in any other case'''
|
||||
if 'reg' in dep:
|
||||
value = self.dev_value(dep['reg'])
|
||||
if not value:
|
||||
return True
|
||||
|
||||
if 'gte' in dep:
|
||||
return not value >= dep['gte']
|
||||
elif 'less_eq' in dep:
|
||||
return not value <= dep['less_eq']
|
||||
return True
|
||||
|
||||
def ha_confs(self, ha_prfx, node_id, snr, singleton: bool, sug_area=''):
|
||||
'''Generator function yields a json register struct for home-assistant
|
||||
auto configuration and a unique entity string
|
||||
|
||||
arguments:
|
||||
prfx:str ==> MQTT prefix for the home assistant 'stat_t string
|
||||
snr:str ==> serial number of the inverter, used to build unique
|
||||
entity strings
|
||||
sug_area:str ==> suggested area string from the config file'''
|
||||
tab = self.__info_defs
|
||||
for key in tab:
|
||||
row = tab[key]
|
||||
if 'singleton' in row:
|
||||
if singleton != row['singleton']:
|
||||
continue
|
||||
elif singleton:
|
||||
continue
|
||||
prfx = ha_prfx + node_id
|
||||
|
||||
# check if we have details for home assistant
|
||||
if 'ha' in row:
|
||||
ha = row['ha']
|
||||
if 'comp' in ha:
|
||||
component = ha['comp']
|
||||
else:
|
||||
component = 'sensor'
|
||||
attr = {}
|
||||
if 'name' in ha:
|
||||
attr['name'] = ha['name']
|
||||
else:
|
||||
attr['name'] = row['name'][-1]
|
||||
|
||||
attr['stat_t'] = prfx + row['name'][0]
|
||||
attr['dev_cla'] = ha['dev_cla']
|
||||
attr['stat_cla'] = ha['stat_cla']
|
||||
attr['uniq_id'] = ha['id']+snr
|
||||
if 'val_tpl' in ha:
|
||||
attr['val_tpl'] = ha['val_tpl']
|
||||
elif 'fmt' in ha:
|
||||
attr['val_tpl'] = '{{value_json' + f"['{row['name'][-1]}'] {ha['fmt']}" + '}}' # eg. 'val_tpl': "{{ value_json['Output_Power']|float }} # noqa: E501
|
||||
else:
|
||||
self.inc_counter('Internal_Error')
|
||||
logging.error(f"Infos.__info_defs: the row for {key} do"
|
||||
" not have a 'val_tpl' nor a 'fmt' value")
|
||||
|
||||
# add unit_of_meas only, if status_class isn't none. If
|
||||
# status_cla is None we want a number format and not line
|
||||
# graph in home assistant. A unit will change the number
|
||||
# format to a line graph
|
||||
if 'unit' in row and attr['stat_cla'] is not None:
|
||||
attr['unit_of_meas'] = row['unit'] # 'unit_of_meas'
|
||||
if 'icon' in ha:
|
||||
attr['ic'] = ha['icon'] # icon for the entity
|
||||
if 'nat_prc' in ha:
|
||||
attr['sug_dsp_prc'] = ha['nat_prc'] # precison of floats
|
||||
if 'ent_cat' in ha:
|
||||
attr['ent_cat'] = ha['ent_cat'] # diagnostic, config
|
||||
|
||||
# enabled_by_default is deactivated, since it avoid the via
|
||||
# setup of the devices. It seems, that there is a bug in home
|
||||
# assistant. tested with 'Home Assistant 2023.10.4'
|
||||
# if 'en' in ha: # enabled_by_default
|
||||
# attr['en'] = ha['en']
|
||||
|
||||
if 'dev' in ha:
|
||||
device = self.__info_devs[ha['dev']]
|
||||
|
||||
if 'dep' in device and self.ignore_this_device(device['dep']): # noqa: E501
|
||||
continue
|
||||
|
||||
dev = {}
|
||||
|
||||
# the same name for 'name' and 'suggested area', so we get
|
||||
# dedicated devices in home assistant with short value
|
||||
# name and headline
|
||||
if (sug_area == '' or
|
||||
('singleton' in device and device['singleton'])):
|
||||
dev['name'] = device['name']
|
||||
dev['sa'] = device['name']
|
||||
else:
|
||||
dev['name'] = device['name']+' - '+sug_area
|
||||
dev['sa'] = device['name']+' - '+sug_area
|
||||
|
||||
if 'via' in device: # add the link to the parent device
|
||||
via = device['via']
|
||||
if via in self.__info_devs:
|
||||
via_dev = self.__info_devs[via]
|
||||
if 'singleton' in via_dev and via_dev['singleton']:
|
||||
dev['via_device'] = via
|
||||
else:
|
||||
dev['via_device'] = f"{via}_{snr}"
|
||||
else:
|
||||
self.inc_counter('Internal_Error')
|
||||
logging.error(f"Infos.__info_defs: the row for "
|
||||
f"{key} has an invalid via value: "
|
||||
f"{via}")
|
||||
|
||||
for key in ('mdl', 'mf', 'sw', 'hw'): # add optional
|
||||
# values fpr 'modell', 'manufacturer', 'sw version' and
|
||||
# 'hw version'
|
||||
if key in device:
|
||||
data = self.dev_value(device[key])
|
||||
if data is not None:
|
||||
dev[key] = data
|
||||
|
||||
if 'singleton' in device and device['singleton']:
|
||||
dev['ids'] = [f"{ha['dev']}"]
|
||||
else:
|
||||
dev['ids'] = [f"{ha['dev']}_{snr}"]
|
||||
|
||||
attr['dev'] = dev
|
||||
|
||||
origin = {}
|
||||
origin['name'] = self.app_name
|
||||
origin['sw'] = self.version
|
||||
attr['o'] = origin
|
||||
else:
|
||||
self.inc_counter('Internal_Error')
|
||||
logging.error(f"Infos.__info_defs: the row for {key} "
|
||||
"missing 'dev' value for ha register")
|
||||
|
||||
yield json.dumps(attr), component, node_id, attr['uniq_id']
|
||||
|
||||
def inc_counter(self, counter: str) -> None:
|
||||
'''inc proxy statistic counter'''
|
||||
dict = self.stat['proxy']
|
||||
@@ -307,9 +182,9 @@ class Infos:
|
||||
dict = self.stat['proxy']
|
||||
dict[counter] -= 1
|
||||
|
||||
def __key_obj(self, id) -> list:
|
||||
d = self.__info_defs.get(id, {'name': None, 'level': logging.DEBUG,
|
||||
'unit': ''})
|
||||
def _key_obj(self, id) -> list:
|
||||
d = self._info_defs.get(id, {'name': None, 'level': logging.DEBUG,
|
||||
'unit': ''})
|
||||
if 'ha' in d and 'must_incr' in d['ha']:
|
||||
must_incr = d['ha']['must_incr']
|
||||
else:
|
||||
@@ -319,87 +194,3 @@ class Infos:
|
||||
# new_val = d['new_value']
|
||||
|
||||
return d['name'], d['level'], d['unit'], must_incr, new_val
|
||||
|
||||
def parse(self, buf, ind=0): # -> None | tuple[str,bool]:
|
||||
'''parse a data sequence received from the inverter and
|
||||
stores the values in Infos.db
|
||||
|
||||
buf: buffer of the sequence to parse'''
|
||||
result = struct.unpack_from('!l', buf, ind)
|
||||
elms = result[0]
|
||||
i = 0
|
||||
ind += 4
|
||||
while i < elms:
|
||||
result = struct.unpack_from('!lB', buf, ind)
|
||||
info_id = result[0]
|
||||
data_type = result[1]
|
||||
ind += 5
|
||||
keys, level, unit, must_incr, new_val = self.__key_obj(info_id)
|
||||
|
||||
if data_type == 0x54: # 'T' -> Pascal-String
|
||||
str_len = buf[ind]
|
||||
result = struct.unpack_from(f'!{str_len+1}p', buf,
|
||||
ind)[0].decode(encoding='ascii',
|
||||
errors='replace')
|
||||
ind += str_len+1
|
||||
|
||||
elif data_type == 0x49: # 'I' -> int32
|
||||
# if new_val:
|
||||
# struct.pack_into('!l', buf, ind, new_val)
|
||||
result = struct.unpack_from('!l', buf, ind)[0]
|
||||
ind += 4
|
||||
|
||||
elif data_type == 0x53: # 'S' -> short
|
||||
# if new_val:
|
||||
# struct.pack_into('!h', buf, ind, new_val)
|
||||
result = struct.unpack_from('!h', buf, ind)[0]
|
||||
ind += 2
|
||||
|
||||
elif data_type == 0x46: # 'F' -> float32
|
||||
# if new_val:
|
||||
# struct.pack_into('!f', buf, ind, new_val)
|
||||
result = round(struct.unpack_from('!f', buf, ind)[0], 2)
|
||||
ind += 4
|
||||
|
||||
elif data_type == 0x4c: # 'L' -> int64
|
||||
# if new_val:
|
||||
# struct.pack_into('!q', buf, ind, new_val)
|
||||
result = struct.unpack_from('!q', buf, ind)[0]
|
||||
ind += 8
|
||||
|
||||
else:
|
||||
self.inc_counter('Invalid_Data_Type')
|
||||
logging.error(f"Infos.parse: data_type: {data_type}"
|
||||
" not supported")
|
||||
return
|
||||
|
||||
if keys:
|
||||
dict = self.db
|
||||
name = ''
|
||||
|
||||
for key in keys[:-1]:
|
||||
if key not in dict:
|
||||
dict[key] = {}
|
||||
dict = dict[key]
|
||||
name += key + '.'
|
||||
|
||||
if keys[-1] not in dict:
|
||||
update = (not must_incr or result > 0)
|
||||
else:
|
||||
if must_incr:
|
||||
update = dict[keys[-1]] < result
|
||||
else:
|
||||
update = dict[keys[-1]] != result
|
||||
|
||||
if update:
|
||||
dict[keys[-1]] = result
|
||||
name += keys[-1]
|
||||
yield keys[0], update
|
||||
else:
|
||||
update = False
|
||||
name = str(f'info-id.0x{info_id:x}')
|
||||
|
||||
self.tracer.log(level, f'{name} : {result}{unit}'
|
||||
f' update: {update}')
|
||||
|
||||
i += 1
|
||||
|
||||
@@ -64,7 +64,6 @@ class Message(metaclass=IterRegistry):
|
||||
self._recv_buffer = bytearray(0)
|
||||
self._send_buffer = bytearray(0)
|
||||
self._forward_buffer = bytearray(0)
|
||||
self.db = Infos()
|
||||
self.new_data = {}
|
||||
|
||||
'''
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# test_with_pytest.py
|
||||
import pytest, json
|
||||
from app.src.infos import Infos
|
||||
from app.src.gen3.infos_g3 import InfosG3
|
||||
|
||||
@pytest.fixture
|
||||
def ContrDataSeq(): # Get Time Request message
|
||||
@@ -176,7 +176,7 @@ def InvDataSeq2_Zero(): # Data indication from the controller
|
||||
|
||||
|
||||
def test_parse_control(ContrDataSeq):
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
for key, result in i.parse (ContrDataSeq):
|
||||
pass
|
||||
|
||||
@@ -184,7 +184,7 @@ def test_parse_control(ContrDataSeq):
|
||||
{"collector": {"Collector_Fw_Version": "RSW_400_V1.00.06", "Chip_Type": "Raymon", "Chip_Model": "RSW-1-10001", "Trace_URL": "t.raymoniot.com", "Logger_URL": "logger.talent-monitoring.com"}, "controller": {"Collect_Interval": 1, "Signal_Strength": 100, "Power_On_Time": 29, "Communication_Type": 1, "Connect_Count": 1, "Data_Up_Interval": 300}})
|
||||
|
||||
def test_parse_control2(Contr2DataSeq):
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
for key, result in i.parse (Contr2DataSeq):
|
||||
pass
|
||||
|
||||
@@ -192,7 +192,7 @@ def test_parse_control2(Contr2DataSeq):
|
||||
{"collector": {"Collector_Fw_Version": "RSW_400_V1.00.20", "Chip_Type": "Raymon", "Chip_Model": "RSW-1-10001", "Trace_URL": "t.raymoniot.com", "Logger_URL": "logger.talent-monitoring.com"}, "controller": {"Collect_Interval": 1, "Signal_Strength": 16, "Power_On_Time": 334, "Communication_Type": 1, "Connect_Count": 1, "Data_Up_Interval": 300}})
|
||||
|
||||
def test_parse_inverter(InvDataSeq):
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
for key, result in i.parse (InvDataSeq):
|
||||
pass
|
||||
|
||||
@@ -200,7 +200,7 @@ def test_parse_inverter(InvDataSeq):
|
||||
{"inverter": {"Product_Name": "Microinv", "Manufacturer": "TSUN", "Version": "V5.0.11", "Serial_Number": "T170000000000001", "Equipment_Model": "TSOL-MS600"}})
|
||||
|
||||
def test_parse_cont_and_invert(ContrDataSeq, InvDataSeq):
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
for key, result in i.parse (ContrDataSeq):
|
||||
pass
|
||||
|
||||
@@ -214,7 +214,7 @@ def test_parse_cont_and_invert(ContrDataSeq, InvDataSeq):
|
||||
|
||||
|
||||
def test_build_ha_conf1(ContrDataSeq):
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
i.static_init() # initialize counter
|
||||
|
||||
tests = 0
|
||||
@@ -270,7 +270,7 @@ def test_build_ha_conf1(ContrDataSeq):
|
||||
assert tests==5
|
||||
|
||||
def test_build_ha_conf2(ContrDataSeq, InvDataSeq, InvDataSeq2):
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
for key, result in i.parse (ContrDataSeq):
|
||||
pass
|
||||
for key, result in i.parse (InvDataSeq):
|
||||
@@ -308,7 +308,7 @@ def test_build_ha_conf2(ContrDataSeq, InvDataSeq, InvDataSeq2):
|
||||
assert tests==5
|
||||
|
||||
def test_must_incr_total(InvDataSeq2, InvDataSeq2_Zero):
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
tests = 0
|
||||
for key, update in i.parse (InvDataSeq2):
|
||||
if key == 'total':
|
||||
@@ -353,7 +353,7 @@ def test_must_incr_total(InvDataSeq2, InvDataSeq2_Zero):
|
||||
assert json.dumps(i.db['env']) == json.dumps({"Inverter_Temp": 0, "Rated_Power": 0})
|
||||
|
||||
def test_must_incr_total2(InvDataSeq2, InvDataSeq2_Zero):
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
tests = 0
|
||||
for key, update in i.parse (InvDataSeq2_Zero):
|
||||
if key == 'total':
|
||||
@@ -398,7 +398,7 @@ def test_must_incr_total2(InvDataSeq2, InvDataSeq2_Zero):
|
||||
|
||||
|
||||
def test_statistic_counter():
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
val = i.dev_value("Test-String")
|
||||
assert val == "Test-String"
|
||||
|
||||
@@ -424,7 +424,7 @@ def test_statistic_counter():
|
||||
assert val == 0
|
||||
|
||||
def test_dep_rules():
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
i.static_init() # initialize counter
|
||||
|
||||
res = i.ignore_this_device({})
|
||||
@@ -456,7 +456,7 @@ def test_dep_rules():
|
||||
assert res == False
|
||||
|
||||
def test_table_definition():
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
i.static_init() # initialize counter
|
||||
|
||||
val = i.dev_value(0xffffff04) # check internal error counter
|
||||
@@ -472,7 +472,7 @@ def test_table_definition():
|
||||
assert val == 0
|
||||
|
||||
# test missing 'fmt' value
|
||||
Infos._Infos__info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test1'], 'singleton': True, 'ha':{'dev':'proxy', 'dev_cla': None, 'stat_cla': None, 'id':'intern_test1_'}}
|
||||
i.info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test1'], 'singleton': True, 'ha':{'dev':'proxy', 'dev_cla': None, 'stat_cla': None, 'id':'intern_test1_'}}
|
||||
|
||||
tests = 0
|
||||
for d_json, comp, node_id, id in i.ha_confs(ha_prfx="tsun/", node_id = 'proxy/', snr = '456', singleton=True, sug_area = 'roof'):
|
||||
@@ -485,7 +485,7 @@ def test_table_definition():
|
||||
assert val == 1
|
||||
|
||||
# test missing 'dev' value
|
||||
Infos._Infos__info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test2'], 'singleton': True, 'ha':{'dev_cla': None, 'stat_cla': None, 'id':'intern_test2_', 'fmt':'| int'}}
|
||||
i.info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test2'], 'singleton': True, 'ha':{'dev_cla': None, 'stat_cla': None, 'id':'intern_test2_', 'fmt':'| int'}}
|
||||
tests = 0
|
||||
for d_json, comp, node_id, id in i.ha_confs(ha_prfx="tsun/", node_id = 'proxy/', snr = '456', singleton=True, sug_area = 'roof'):
|
||||
if id == 'intern_test2_456':
|
||||
@@ -499,9 +499,9 @@ def test_table_definition():
|
||||
|
||||
|
||||
# test invalid 'via' value
|
||||
Infos._Infos__info_devs['test_dev'] = {'via':'xyz', 'name':'Module PV1'}
|
||||
i.info_devs['test_dev'] = {'via':'xyz', 'name':'Module PV1'}
|
||||
|
||||
Infos._Infos__info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test2'], 'singleton': True, 'ha':{'dev':'test_dev', 'dev_cla': None, 'stat_cla': None, 'id':'intern_test2_', 'fmt':'| int'}}
|
||||
i.info_defs[0xfffffffe] = {'name':['proxy', 'Internal_Test2'], 'singleton': True, 'ha':{'dev':'test_dev', 'dev_cla': None, 'stat_cla': None, 'id':'intern_test2_', 'fmt':'| int'}}
|
||||
tests = 0
|
||||
for d_json, comp, node_id, id in i.ha_confs(ha_prfx="tsun/", node_id = 'proxy/', snr = '456', singleton=True, sug_area = 'roof'):
|
||||
if id == 'intern_test2_456':
|
||||
@@ -514,7 +514,7 @@ def test_table_definition():
|
||||
|
||||
|
||||
def test_invalid_data_type(InvalidDataSeq):
|
||||
i = Infos()
|
||||
i = InfosG3()
|
||||
i.static_init() # initialize counter
|
||||
|
||||
val = i.dev_value(0xffffff03) # check invalid data type counter
|
||||
|
||||
Reference in New Issue
Block a user