import struct import logging import time import asyncio from itertools import chain from datetime import datetime from proxy import Proxy from async_ifc import AsyncIfc from messages import hex_dump_memory, Message, State from cnf.config import Config from modbus import Modbus from gen3plus.infos_g3p import InfosG3P from infos import Register, Fmt logger = logging.getLogger('msg') class Sequence(): def __init__(self, server_side: bool): self.rcv_idx = 0 self.snd_idx = 0 self.server_side = server_side def set_recv(self, val: int): if self.server_side: self.rcv_idx = val >> 8 self.snd_idx = val & 0xff else: self.rcv_idx = val & 0xff self.snd_idx = val >> 8 def get_send(self): self.snd_idx += 1 self.snd_idx &= 0xff if self.server_side: return (self.rcv_idx << 8) | self.snd_idx else: return (self.snd_idx << 8) | self.rcv_idx def __str__(self): return f'{self.rcv_idx:02x}:{self.snd_idx:02x}' class SolarmanBase(Message): def __init__(self, addr, ifc: "AsyncIfc", server_side: bool, _send_modbus_cb, mb_timeout: int): super().__init__('G3P', ifc, server_side, _send_modbus_cb, mb_timeout) ifc.rx_set_cb(self.read) ifc.prot_set_timeout_cb(self._timeout) ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn) ifc.prot_set_update_header_cb(self.__update_header) self.addr = addr self.conn_no = ifc.get_conn_no() self.header_len = 11 # overwrite construcor in class Message self.control = 0 self.seq = Sequence(server_side) self.snr = 0 self.time_ofs = 0 def read(self) -> float: '''process all received messages in the _recv_buffer''' self._read() while True: if not self.header_valid: self.__parse_header(self.ifc.rx_peek(), self.ifc.rx_len()) if self.header_valid and self.ifc.rx_len() >= \ (self.header_len + self.data_len+2): self.__process_complete_received_msg() self.__flush_recv_msg() else: return 0 # wait 0s before sending a response ''' Our public methods ''' def _flow_str(self, server_side: bool, type: str): # noqa: F821 switch = { 'rx': ' <', 'tx': ' >', 'forwrd': '<< ', 'drop': ' xx', 'rxS': '> ', 'txS': '< ', 'forwrdS': ' >>', 'dropS': 'xx ', } if server_side: type += 'S' return switch.get(type, '???') def get_fnc_handler(self, ctrl): fnc = self.switch.get(ctrl, self.msg_unknown) if callable(fnc): return fnc, repr(fnc.__name__) else: return self.msg_unknown, repr(fnc) def _build_header(self, ctrl) -> None: '''build header for new transmit message''' self.send_msg_ofs = self.ifc.tx_len() self.ifc.tx_add(struct.pack( ' None: '''finish the transmit message, set lenght and checksum''' _len = self.ifc.tx_len() - self.send_msg_ofs struct.pack_into(' None: if (buf_len < self.header_len): # enough bytes for complete header? return result = struct.unpack_from(' bool: crc = buf[self.data_len+11] stop = buf[self.data_len+12] if stop != 0x15: hex_dump_memory(logging.ERROR, 'Drop packet w invalid stop byte from ' f'{self.addr}:', buf, buf_len) self.inc_counter('Invalid_Msg_Format') if self.ifc.rx_len() > (self.data_len+13): next_start = buf[self.data_len+13] if next_start != 0xa5: # erase broken recv buffer self.ifc.rx_clear() return False check = sum(buf[1:buf_len-2]) & 0xff if check != crc: self.inc_counter('Invalid_Msg_Format') logger.debug(f'CRC {int(crc):#02x} {int(check):#08x}' f' Stop:{int(stop):#02x}') # start & stop byte are valid, discard only this message return False return True def __flush_recv_msg(self) -> None: self.ifc.rx_get(self.header_len + self.data_len+2) self.header_valid = False def __dispatch_msg(self) -> None: _fnc, _str = self.get_fnc_handler(self.control) if self.unique_id: logger.info(self._flow_str(self.server_side, 'rx') + f' Ctl: {int(self.control):#04x}' + f' Msg: {_str}') _fnc() else: logger.info(self._flow_str(self.server_side, 'drop') + f' Ctl: {int(self.control):#04x}' + f' Msg: {_str}') ''' Message handler methods ''' def msg_response(self): data = self.ifc.rx_peek()[self.header_len:] result = struct.unpack_from(' None: logging.debug('Solarman.close()') # we have references to methods of this class in self.switch # so we have to erase self.switch, otherwise this instance can't be # deallocated by the garbage collector ==> we get a memory leak self.switch.clear() self.log_lvl.clear() super().close() async def send_start_cmd(self, snr: int, host: str, forward: bool, start_timeout=MB_CLIENT_DATA_UP): self.no_forwarding = True self.establish_inv_emu = forward self.snr = snr self._set_serial_no(snr) self.mb_timeout = start_timeout self.db.set_db_def_value(Register.IP_ADDRESS, host) self.db.set_db_def_value(Register.POLLING_INTERVAL, self.mb_timeout) self.db.set_db_def_value(Register.DATA_UP_INTERVAL, 300) self.db.set_db_def_value(Register.COLLECT_INTERVAL, 1) self.db.set_db_def_value(Register.HEARTBEAT_INTERVAL, 120) self.db.set_db_def_value(Register.SENSOR_LIST, Fmt.hex4((self.sensor_list, ))) self.new_data['controller'] = True self.state = State.up if self.mb_scan: self._send_modbus_cmd(self.mb_inv_no, Modbus.READ_REGS, self.mb_start_reg, self.mb_bytes, logging.INFO) else: self._send_modbus_cmd(Modbus.INV_ADDR, Modbus.READ_REGS, self.mb_regs[0]['addr'], self.mb_regs[0]['len'], logging.DEBUG) self.mb_timer.start(self.mb_timeout) def new_state_up(self): if self.state is not State.up: self.state = State.up if (self.modbus_polling): self.mb_timer.start(self.mb_first_timeout) self.db.set_db_def_value(Register.POLLING_INTERVAL, self.mb_timeout) def establish_emu(self): _len = 223 build_msg = self.db.build(_len, 0x41, 2) struct.pack_into( ' {inv}') if (type(inv) is dict and 'monitor_sn' in inv and inv['monitor_sn'] == snr): self._set_config_parms(inv, key) self.db.set_pv_module_details(inv) logger.debug(f'SerialNo {serial_no} allowed! area:{self.sug_area}') # noqa: E501 self.db.set_db_def_value(Register.COLLECTOR_SNR, snr) self.db.set_db_def_value(Register.SERIAL_NUMBER, key) break else: self.node_id = '' self.sug_area = '' if 'allow_all' not in inverters or not inverters['allow_all']: self.inc_counter('Unknown_SNR') self.unique_id = None logging.error(f"Ignore message from unknow inverter with Monitoring-SN: {serial_no})!\n" # noqa: E501 " !!Check the 'monitor_sn' setting in your configuration!!") # noqa: E501 return logging.warning(f"Monitoring-SN: {serial_no} not configured but accepted!" # noqa: E501 " !!Check the 'monitor_sn' setting in your configuration!!") # noqa: E501 self.unique_id = serial_no def forward(self, buffer, buflen) -> None: '''add the actual receive msg to the forwarding queue''' if self.no_forwarding: return tsun = Config.get('solarman') if tsun['enabled']: self.ifc.fwd_add(buffer[:buflen]) self.ifc.fwd_log(logging.DEBUG, 'Store for forwarding:') _, _str = self.get_fnc_handler(self.control) logger.info(self._flow_str(self.server_side, 'forwrd') + f' Ctl: {int(self.control):#04x}' f' Msg: {_str}') def _init_new_client_conn(self) -> bool: return False def _heartbeat(self) -> int: return 60 # pragma: no cover def __send_ack_rsp(self, msgtype, ftype, ack=1): self._build_header(msgtype) self.ifc.tx_add(struct.pack(' 1: # logging.info("Regular Modbus Status request") self._send_modbus_cmd(Modbus.INV_ADDR, Modbus.READ_REGS, self.mb_regs[1]['addr'], self.mb_regs[1]['len'], logging.INFO) def at_cmd_forbidden(self, cmd: str, connection: str) -> bool: return not cmd.startswith(tuple(self.at_acl[connection]['allow'])) or \ cmd.startswith(tuple(self.at_acl[connection]['block'])) async def send_at_cmd(self, at_cmd: str) -> None: if self.state != State.up: logger.warning(f'[{self.node_id}] ignore AT+ cmd,' ' as the state is not UP') return at_cmd = at_cmd.strip() if self.at_cmd_forbidden(cmd=at_cmd, connection='mqtt'): data_json = f'\'{at_cmd}\' is forbidden' node_id = self.node_id key = 'at_resp' logger.info(f'{key}: {data_json}') await Proxy.mqtt.publish(f'{Proxy.entity_prfx}{node_id}{key}', data_json) # noqa: E501 return self.forward_at_cmd_resp = False self._build_header(0x4510) self.ifc.tx_add(struct.pack(f'> 8 for key, update in self.db.parse(self.ifc.rx_peek(), msg_type, ftype, sensor, self.node_id): if update: if key == 'inverter': inv_update = True self._set_mqtt_timestamp(key, ts) self.new_data[key] = True if inv_update: self.__build_model_name() ''' Message handler methods ''' def msg_unknown(self): logger.warning(f"Unknow Msg: ID:{int(self.control):#04x}") self.inc_counter('Unknown_Msg') self.__forward_msg() def msg_dev_ind(self): data = self.ifc.rx_peek()[self.header_len:] result = struct.unpack_from(self.HDR_FMT, data, 0) ftype = result[0] # always 2 total = result[1] tim = result[2] res = result[3] # always zero logger.info(f'frame type:{ftype:02x}' f' timer:{tim:08x}s null:{res}') if self.time_ofs: # dt = datetime.fromtimestamp(total + self.time_ofs) # logger.info(f'ts: {dt.strftime("%Y-%m-%d %H:%M:%S")}') ts = total + self.time_ofs else: ts = None self.__process_data(ftype, ts) self.sensor_list = int(self.db.get_db_value(Register.SENSOR_LIST, 0), 16) self.__forward_msg() self.__send_ack_rsp(0x1110, ftype) def msg_data_ind(self): data = self.ifc.rx_peek() result = struct.unpack_from(' int: ftype = self.ifc.rx_peek()[self.header_len] if ftype == self.AT_CMD: if self.forward_at_cmd_resp: return logging.INFO return logging.DEBUG elif ftype == self.MB_RTU_CMD \ and self.server_side: return self.mb.last_log_lvl return logging.WARNING def msg_command_rsp(self): data = self.ifc.rx_peek()[self.header_len: self.header_len+self.data_len] ftype = data[0] if ftype == self.AT_CMD or \ ftype == self.AT_CMD_RSP: if not self.forward_at_cmd_resp: data_json = data[14:].decode("utf-8") node_id = self.node_id key = 'at_resp' logger.info(f'{key}: {data_json}') self.publish_mqtt(f'{Proxy.entity_prfx}{node_id}{key}', data_json) # noqa: E501 return elif ftype == self.MB_RTU_CMD: self.__modbus_command_rsp(data) return self.__forward_msg() def __parse_modbus_rsp(self, data, modbus_msg_len): inv_update = False self.modbus_elms = 0 if (self.mb_scan): self._dump_modbus_scan(data, 14, modbus_msg_len) ts = self._timestamp() for key, update, _ in self.mb.recv_resp(self.db, data[14:]): self.modbus_elms += 1 if update: if key == 'inverter': inv_update = True self._set_mqtt_timestamp(key, ts) self.new_data[key] = True for key, update in self.db.calc(self.sensor_list, self.node_id): if update: self._set_mqtt_timestamp(key, ts) self.new_data[key] = True return inv_update def __modbus_command_rsp(self, data): '''precess MODBUS RTU response''' valid = data[1] modbus_msg_len = self.data_len - 14 # logger.debug(f'modbus_len:{modbus_msg_len} accepted:{valid}') if valid == 1 and modbus_msg_len > 4: # logger.info(f'first byte modbus:{data[14]}') inv_update = self.__parse_modbus_rsp(data, modbus_msg_len) if inv_update: self.__build_model_name() if self.establish_inv_emu and not self.ifc.remote.stream: self.establish_emu() def msg_hbeat_ind(self): data = self.ifc.rx_peek()[self.header_len:] result = struct.unpack_from('