MQTT timestamps and protocol improvements (#140)

* add TS_INPUT, TS_GRID and TS_TOTAL

* prepare MQTT timestamps

- add _set_mqtt_timestamp method
- fix hexdump printing

* push dev and debug images to docker.io

* add unix epoche timestamp for MQTT pakets

* set timezone for unit tests

* set name für setting timezone step

* trigger new action

* GEN3 and GEN3PLUS: handle multiple message

- read: iterate over the receive buffer
- forward: append messages to the forward buffer
- _update_header: iterate over the forward buffer

* GEN3: optimize timeout handling

- longer timeout in state init and reveived
- got to state pending only from state up

* update changelog

* cleanup
This commit is contained in:
Stefan Allius
2024-08-04 19:43:09 +02:00
committed by GitHub
parent b8e44b7379
commit 47f7580184
10 changed files with 475 additions and 297 deletions

View File

@@ -35,7 +35,7 @@ class AsyncStream():
self.proc_max = 0
def __timeout(self) -> int:
if self.state == State.init:
if self.state == State.init or self.state == State.received:
to = self.MAX_START_TIME
else:
if self.server_side and self.modbus_polling:

View File

@@ -1,7 +1,9 @@
import struct
import logging
import time
import pytz
from datetime import datetime
from tzlocal import get_localzone
if __name__ == "app.src.gen3.talent":
from app.src.messages import hex_dump_memory, Message, State
@@ -127,37 +129,43 @@ class Talent(Message):
self.unique_id = serial_no
def read(self) -> float:
'''process all received messages in the _recv_buffer'''
self._read()
while True:
if not self.header_valid:
self.__parse_header(self._recv_buffer, len(self._recv_buffer))
if not self.header_valid:
self.__parse_header(self._recv_buffer, len(self._recv_buffer))
if self.header_valid and \
len(self._recv_buffer) >= (self.header_len + self.data_len):
if self.state == State.init:
self.state = State.received # received 1st package
if self.header_valid and len(self._recv_buffer) >= (self.header_len +
self.data_len):
if self.state == State.init:
self.state = State.received # received 1st package
log_lvl = self.log_lvl.get(self.msg_id, logging.WARNING)
if callable(log_lvl):
log_lvl = log_lvl()
log_lvl = self.log_lvl.get(self.msg_id, logging.WARNING)
if callable(log_lvl):
log_lvl = log_lvl()
hex_dump_memory(log_lvl, f'Received from {self.addr}:'
f' BufLen: {len(self._recv_buffer)}'
f' HdrLen: {self.header_len}'
f' DtaLen: {self.data_len}',
self._recv_buffer, len(self._recv_buffer))
hex_dump_memory(log_lvl, f'Received from {self.addr}:',
self._recv_buffer, self.header_len+self.data_len)
self.__set_serial_no(self.id_str.decode("utf-8"))
self.__dispatch_msg()
self.__flush_recv_msg()
else:
return 0 # don not wait before sending a response
self.__set_serial_no(self.id_str.decode("utf-8"))
self.__dispatch_msg()
self.__flush_recv_msg()
return 0.5 # wait 500ms before sending a response
def forward(self, buffer, buflen) -> None:
def forward(self) -> None:
'''add the actual receive msg to the forwarding queue'''
tsun = Config.get('tsun')
if tsun['enabled']:
self._forward_buffer = buffer[:buflen]
buffer = self._recv_buffer
buflen = self.header_len+self.data_len
self._forward_buffer += buffer[:buflen]
hex_dump_memory(logging.DEBUG, 'Store for forwarding:',
buffer, buflen)
self.__parse_header(self._forward_buffer,
len(self._forward_buffer))
fnc = self.switch.get(self.msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'forwrd') +
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
@@ -232,6 +240,8 @@ class Talent(Message):
return switch.get(type, '???')
def _timestamp(self): # pragma: no cover
'''returns timestamp fo the inverter as localtime
since 1.1.1970 in msec'''
if False:
# utc as epoche
ts = time.time()
@@ -240,23 +250,37 @@ class Talent(Message):
ts = (datetime.now() - datetime(1970, 1, 1)).total_seconds()
return round(ts*1000)
def _utcfromts(self, ts: float):
'''converts inverter timestamp into unix time (epoche)'''
dt = datetime.fromtimestamp(ts/1000, pytz.UTC). \
replace(tzinfo=get_localzone())
return dt.timestamp()
def _utc(self): # pragma: no cover
'''returns unix time (epoche)'''
return datetime.now().timestamp()
def _update_header(self, _forward_buffer):
'''update header for message before forwarding,
add time offset to timestamp'''
_len = len(_forward_buffer)
result = struct.unpack_from('!lB', _forward_buffer, 0)
id_len = result[1] # len of variable id string
if _len < 2*id_len + 21:
return
ofs = 0
while ofs < _len:
result = struct.unpack_from('!lB', _forward_buffer, 0)
msg_len = 4 + result[0]
id_len = result[1] # len of variable id string
if _len < 2*id_len + 21:
return
result = struct.unpack_from('!B', _forward_buffer, id_len+6)
msg_code = result[0]
if msg_code == 0x71 or msg_code == 0x04:
result = struct.unpack_from('!q', _forward_buffer, 13+2*id_len)
ts = result[0] + self.ts_offset
logger.debug(f'offset: {self.ts_offset:08x}'
f' proxy-time: {ts:08x}')
struct.pack_into('!q', _forward_buffer, 13+2*id_len, ts)
result = struct.unpack_from('!B', _forward_buffer, id_len+6)
msg_code = result[0]
if msg_code == 0x71 or msg_code == 0x04:
result = struct.unpack_from('!q', _forward_buffer, 13+2*id_len)
ts = result[0] + self.ts_offset
logger.debug(f'offset: {self.ts_offset:08x}'
f' proxy-time: {ts:08x}')
struct.pack_into('!q', _forward_buffer, 13+2*id_len, ts)
ofs += msg_len
# check if there is a complete header in the buffer, parse it
# and set
@@ -335,12 +359,12 @@ class Talent(Message):
elif self.await_conn_resp_cnt > 0:
self.await_conn_resp_cnt -= 1
else:
self.forward(self._recv_buffer, self.header_len+self.data_len)
self.forward()
return
else:
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
self.forward()
def __process_contact_info(self) -> bool:
result = struct.unpack_from('!B', self._recv_buffer, self.header_len)
@@ -362,7 +386,8 @@ class Talent(Message):
def msg_get_time(self):
if self.ctrl.is_ind():
if self.data_len == 0:
self.state = State.pend # block MODBUS cmds
if self.state == State.up:
self.state = State.pend # block MODBUS cmds
if (self.modbus_polling):
self.mb_timer.start(self.mb_start_timeout)
self.db.set_db_def_value(Register.POLLING_INTERVAL,
@@ -387,7 +412,7 @@ class Talent(Message):
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
self.forward()
def parse_msg_header(self):
result = struct.unpack_from('!lB', self._recv_buffer, self.header_len)
@@ -401,11 +426,12 @@ class Talent(Message):
result = struct.unpack_from(f'!{id_len+1}pBq', self._recv_buffer,
self.header_len + 4)
timestamp = result[2]
logger.debug(f'ID: {result[0]} B: {result[1]}')
logger.debug(f'time: {result[2]:08x}')
logger.debug(f'time: {timestamp:08x}')
# logger.info(f'time: {datetime.utcfromtimestamp(result[2]).strftime(
# "%Y-%m-%d %H:%M:%S")}')
return msg_hdr_len
return msg_hdr_len, timestamp
def msg_collector_data(self):
if self.ctrl.is_ind():
@@ -420,7 +446,7 @@ class Talent(Message):
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
self.forward()
def msg_inverter_data(self):
if self.ctrl.is_ind():
@@ -436,14 +462,15 @@ class Talent(Message):
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
self.forward()
def __process_data(self):
msg_hdr_len = self.parse_msg_header()
msg_hdr_len, ts = self.parse_msg_header()
for key, update in self.db.parse(self._recv_buffer, self.header_len
+ msg_hdr_len, self.node_id):
if update:
self._set_mqtt_timestamp(key, self._utcfromts(ts))
self.new_data[key] = True
def msg_ota_update(self):
@@ -454,7 +481,7 @@ class Talent(Message):
else:
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
self.forward()
def parse_modbus_header(self):
@@ -499,17 +526,18 @@ class Talent(Message):
hdr_len:],
self.node_id):
if update:
self._set_mqtt_timestamp(key, self._utc())
self.new_data[key] = True
self.modbus_elms += 1 # count for unit tests
else:
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
self.forward()
def msg_forward(self):
self.forward(self._recv_buffer, self.header_len+self.data_len)
self.forward()
def msg_unknown(self):
logger.warning(f"Unknow Msg: ID:{self.msg_id}")
self.inc_counter('Unknown_Msg')
self.forward(self._recv_buffer, self.header_len+self.data_len)
self.forward()

View File

@@ -219,39 +219,41 @@ class SolarmanV5(Message):
self.unique_id = serial_no
def read(self) -> float:
'''process all received messages in the _recv_buffer'''
self._read()
while True:
if not self.header_valid:
self.__parse_header(self._recv_buffer, len(self._recv_buffer))
if not self.header_valid:
self.__parse_header(self._recv_buffer, len(self._recv_buffer))
if self.header_valid and len(self._recv_buffer) >= \
(self.header_len + self.data_len+2):
log_lvl = self.log_lvl.get(self.control, logging.WARNING)
if callable(log_lvl):
log_lvl = log_lvl()
hex_dump_memory(log_lvl, f'Received from {self.addr}:',
self._recv_buffer, self.header_len +
self.data_len+2)
if self.__trailer_is_ok(self._recv_buffer, self.header_len
+ self.data_len + 2):
if self.state == State.init:
self.state = State.received
if self.header_valid and len(self._recv_buffer) >= (self.header_len +
self.data_len+2):
log_lvl = self.log_lvl.get(self.control, logging.WARNING)
if callable(log_lvl):
log_lvl = log_lvl()
hex_dump_memory(log_lvl, f'Received from {self.addr}:',
self._recv_buffer, self.header_len+self.data_len+2)
if self.__trailer_is_ok(self._recv_buffer, self.header_len
+ self.data_len + 2):
if self.state == State.init:
self.state = State.received
self.__set_serial_no(self.snr)
self.__dispatch_msg()
self.__flush_recv_msg()
return 0 # wait 0s before sending a response
self.__set_serial_no(self.snr)
self.__dispatch_msg()
self.__flush_recv_msg()
else:
return 0 # wait 0s before sending a response
def forward(self, buffer, buflen) -> None:
'''add the actual receive msg to the forwarding queue'''
if self.no_forwarding:
return
tsun = Config.get('solarman')
if tsun['enabled']:
self._forward_buffer = buffer[:buflen]
self._forward_buffer += buffer[:buflen]
hex_dump_memory(logging.DEBUG, 'Store for forwarding:',
buffer, buflen)
self.__parse_header(self._forward_buffer,
len(self._forward_buffer))
fnc = self.switch.get(self.control, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'forwrd') +
f' Ctl: {int(self.control):#04x}'
@@ -259,12 +261,6 @@ class SolarmanV5(Message):
return
def _init_new_client_conn(self) -> bool:
# self.__build_header(0x91)
# self._send_buffer += struct.pack(f'!{len(contact_name)+1}p'
# f'{len(contact_mail)+1}p',
# contact_name, contact_mail)
# self.__finish_send_msg()
return False
'''
@@ -366,13 +362,17 @@ class SolarmanV5(Message):
'''update header for message before forwarding,
set sequence and checksum'''
_len = len(_forward_buffer)
struct.pack_into('<H', _forward_buffer, 1,
_len-13)
struct.pack_into('<H', _forward_buffer, 5,
self.seq.get_send())
ofs = 0
while ofs < _len:
result = struct.unpack_from('<BH', _forward_buffer, ofs)
data_len = result[1] # len of variable id string
check = sum(_forward_buffer[1:_len-2]) & 0xff
struct.pack_into('<B', _forward_buffer, _len-2, check)
struct.pack_into('<H', _forward_buffer, ofs+5,
self.seq.get_send())
check = sum(_forward_buffer[ofs+1:ofs+data_len+11]) & 0xff
struct.pack_into('<B', _forward_buffer, ofs+data_len+11, check)
ofs += (13 + data_len)
def __dispatch_msg(self) -> None:
fnc = self.switch.get(self.control, self.msg_unknown)
@@ -481,7 +481,7 @@ class SolarmanV5(Message):
logger.info(f'Model: {Model}')
self.db.set_db_def_value(Register.EQUIPMENT_MODEL, Model)
def __process_data(self, ftype):
def __process_data(self, ftype, ts):
inv_update = False
msg_type = self.control >> 8
for key, update in self.db.parse(self._recv_buffer, msg_type, ftype,
@@ -489,6 +489,7 @@ class SolarmanV5(Message):
if update:
if key == 'inverter':
inv_update = True
self._set_mqtt_timestamp(key, ts)
self.new_data[key] = True
if inv_update:
@@ -505,16 +506,18 @@ class SolarmanV5(Message):
data = self._recv_buffer[self.header_len:]
result = struct.unpack_from('<BLLL', data, 0)
ftype = result[0] # always 2
# total = result[1]
total = result[1]
tim = result[2]
res = result[3] # always zero
logger.info(f'frame type:{ftype:02x}'
f' timer:{tim:08x}s null:{res}')
# if self.time_ofs:
# dt = datetime.fromtimestamp(total + self.time_ofs)
# logger.info(f'ts: {dt.strftime("%Y-%m-%d %H:%M:%S")}')
self.__process_data(ftype)
if self.time_ofs:
# dt = datetime.fromtimestamp(total + self.time_ofs)
# logger.info(f'ts: {dt.strftime("%Y-%m-%d %H:%M:%S")}')
ts = total + self.time_ofs
else:
ts = None
self.__process_data(ftype, ts)
self.__forward_msg()
self.__send_ack_rsp(0x1110, ftype)
@@ -522,7 +525,7 @@ class SolarmanV5(Message):
data = self._recv_buffer
result = struct.unpack_from('<BHLLLHL', data, self.header_len)
ftype = result[0] # 1 or 0x81
# total = result[2]
total = result[2]
tim = result[3]
if 1 == ftype:
self.time_ofs = result[4]
@@ -530,11 +533,14 @@ class SolarmanV5(Message):
cnt = result[6]
logger.info(f'ftype:{ftype:02x} timer:{tim:08x}s'
f' ??: {unkn:04x} cnt:{cnt}')
# if self.time_ofs:
# dt = datetime.fromtimestamp(total + self.time_ofs)
# logger.info(f'ts: {dt.strftime("%Y-%m-%d %H:%M:%S")}')
if self.time_ofs:
# dt = datetime.fromtimestamp(total + self.time_ofs)
# logger.info(f'ts: {dt.strftime("%Y-%m-%d %H:%M:%S")}')
ts = total + self.time_ofs
else:
ts = None
self.__process_data(ftype)
self.__process_data(ftype, ts)
self.__forward_msg()
self.__send_ack_rsp(0x1210, ftype)
self.new_state_up()
@@ -620,6 +626,7 @@ class SolarmanV5(Message):
if update:
if key == 'inverter':
inv_update = True
self._set_mqtt_timestamp(key, self._timestamp())
self.new_data[key] = True
if inv_update:

View File

@@ -112,6 +112,9 @@ class Register(Enum):
EVENT_414 = 513
EVENT_415 = 514
EVENT_416 = 515
TS_INPUT = 600
TS_GRID = 601
TS_TOTAL = 602
VALUE_1 = 9000
TEST_REG1 = 10000
TEST_REG2 = 10001
@@ -265,6 +268,7 @@ class Infos:
Register.EVENT_416: {'name': ['events', '416_'], 'level': logging.DEBUG, 'unit': ''}, # noqa: E501
# grid measures:
Register.TS_GRID: {'name': ['grid', 'Timestamp'], 'level': logging.INFO, 'unit': ''}, # noqa: E501
Register.GRID_VOLTAGE: {'name': ['grid', 'Voltage'], 'level': logging.DEBUG, 'unit': 'V', 'ha': {'dev': 'inverter', 'dev_cla': 'voltage', 'stat_cla': 'measurement', 'id': 'out_volt_', 'fmt': '| float', 'name': 'Grid Voltage', 'ent_cat': 'diagnostic'}}, # noqa: E501
Register.GRID_CURRENT: {'name': ['grid', 'Current'], 'level': logging.DEBUG, 'unit': 'A', 'ha': {'dev': 'inverter', 'dev_cla': 'current', 'stat_cla': 'measurement', 'id': 'out_cur_', 'fmt': '| float', 'name': 'Grid Current', 'ent_cat': 'diagnostic'}}, # noqa: E501
Register.GRID_FREQUENCY: {'name': ['grid', 'Frequency'], 'level': logging.DEBUG, 'unit': 'Hz', 'ha': {'dev': 'inverter', 'dev_cla': 'frequency', 'stat_cla': 'measurement', 'id': 'out_freq_', 'fmt': '| float', 'name': 'Grid Frequency', 'ent_cat': 'diagnostic'}}, # noqa: E501
@@ -273,6 +277,7 @@ class Infos:
Register.INVERTER_STATUS: {'name': ['env', 'Inverter_Status'], 'level': logging.INFO, 'unit': '', 'ha': {'dev': 'inverter', 'comp': 'sensor', 'dev_cla': None, 'stat_cla': None, 'id': 'inv_status_', 'name': 'Inverter Status', 'val_tpl': __status_type_val_tpl, 'icon': 'mdi:power'}}, # noqa: E501
# input measures:
Register.TS_INPUT: {'name': ['input', 'Timestamp'], 'level': logging.INFO, 'unit': ''}, # noqa: E501
Register.PV1_VOLTAGE: {'name': ['input', 'pv1', 'Voltage'], 'level': logging.DEBUG, 'unit': 'V', 'ha': {'dev': 'input_pv1', 'dev_cla': 'voltage', 'stat_cla': 'measurement', 'id': 'volt_pv1_', 'val_tpl': "{{ (value_json['pv1']['Voltage'] | float)}}", 'icon': 'mdi:gauge', 'ent_cat': 'diagnostic'}}, # noqa: E501
Register.PV1_CURRENT: {'name': ['input', 'pv1', 'Current'], 'level': logging.DEBUG, 'unit': 'A', 'ha': {'dev': 'input_pv1', 'dev_cla': 'current', 'stat_cla': 'measurement', 'id': 'cur_pv1_', 'val_tpl': "{{ (value_json['pv1']['Current'] | float)}}", 'icon': 'mdi:gauge', 'ent_cat': 'diagnostic'}}, # noqa: E501
Register.PV1_POWER: {'name': ['input', 'pv1', 'Power'], 'level': logging.DEBUG, 'unit': 'W', 'ha': {'dev': 'input_pv1', 'dev_cla': 'power', 'stat_cla': 'measurement', 'id': 'power_pv1_', 'val_tpl': "{{ (value_json['pv1']['Power'] | float)}}"}}, # noqa: E501
@@ -304,6 +309,7 @@ class Infos:
Register.PV6_DAILY_GENERATION: {'name': ['input', 'pv6', 'Daily_Generation'], 'level': logging.DEBUG, 'unit': 'kWh', 'ha': {'dev': 'input_pv6', 'dev_cla': 'energy', 'stat_cla': 'total_increasing', 'id': 'daily_gen_pv6_', 'name': 'Daily Generation', 'val_tpl': "{{ (value_json['pv6']['Daily_Generation'] | float)}}", 'icon': 'mdi:solar-power-variant', 'must_incr': True}}, # noqa: E501
Register.PV6_TOTAL_GENERATION: {'name': ['input', 'pv6', 'Total_Generation'], 'level': logging.DEBUG, 'unit': 'kWh', 'ha': {'dev': 'input_pv6', 'dev_cla': 'energy', 'stat_cla': 'total', 'id': 'total_gen_pv6_', 'name': 'Total Generation', 'val_tpl': "{{ (value_json['pv6']['Total_Generation'] | float)}}", 'icon': 'mdi:solar-power', 'must_incr': True}}, # noqa: E501
# total:
Register.TS_TOTAL: {'name': ['total', 'Timestamp'], 'level': logging.INFO, 'unit': ''}, # noqa: E501
Register.DAILY_GENERATION: {'name': ['total', 'Daily_Generation'], 'level': logging.INFO, 'unit': 'kWh', 'ha': {'dev': 'inverter', 'dev_cla': 'energy', 'stat_cla': 'total_increasing', 'id': 'daily_gen_', 'fmt': '| float', 'name': 'Daily Generation', 'icon': 'mdi:solar-power-variant', 'must_incr': True}}, # noqa: E501
Register.TOTAL_GENERATION: {'name': ['total', 'Total_Generation'], 'level': logging.INFO, 'unit': 'kWh', 'ha': {'dev': 'inverter', 'dev_cla': 'energy', 'stat_cla': 'total', 'id': 'total_gen_', 'fmt': '| float', 'name': 'Total Generation', 'icon': 'mdi:solar-power', 'must_incr': True}}, # noqa: E501

View File

@@ -5,16 +5,16 @@ from enum import Enum
if __name__ == "app.src.messages":
from app.src.infos import Infos
from app.src.infos import Infos, Register
from app.src.modbus import Modbus
else: # pragma: no cover
from infos import Infos
from infos import Infos, Register
from modbus import Modbus
logger = logging.getLogger('msg')
def hex_dump_memory(level, info, data, num):
def hex_dump_memory(level, info, data, data_len):
n = 0
lines = []
lines.append(info)
@@ -22,20 +22,20 @@ def hex_dump_memory(level, info, data, num):
if not tracer.isEnabledFor(level):
return
for i in range(0, num, 16):
for i in range(0, data_len, 16):
line = ' '
line += '%04x | ' % (i)
n += 16
for j in range(n-16, n):
if j >= len(data):
if j >= data_len:
break
line += '%02x ' % abs(data[j])
line += ' ' * (3 * 16 + 9 - len(line)) + ' | '
for j in range(n-16, n):
if j >= len(data):
if j >= data_len:
break
c = data[j] if not (data[j] < 0x20 or data[j] > 0x7e) else '.'
line += '%c' % c
@@ -105,6 +105,22 @@ class Message(metaclass=IterRegistry):
'''callback for updating the header of the forward buffer'''
return # pragma: no cover
def _set_mqtt_timestamp(self, key, ts: float | None):
if type(ts) is not None and \
key not in self.new_data or \
not self.new_data[key]:
if key == 'grid':
info_id = Register.TS_GRID
elif key == 'input':
info_id = Register.TS_INPUT
elif key == 'total':
info_id = Register.TS_TOTAL
else:
return
# tstr = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(ts))
# logger.info(f'update: key: {key} ts:{tstr}'
self.db.set_db_def_value(info_id, round(ts))
'''
Our puplic methods
'''