import logging import struct from async_ifc import AsyncIfc from gen3plus.solarman_v5 import SolarmanBase from my_timer import Timer from infos import Register logger = logging.getLogger('msg') class SolarmanEmu(SolarmanBase): def __init__(self, inverter, addr, ifc: "AsyncIfc", server_side: bool, client_mode: bool): super().__init__(addr, ifc, server_side=False, _send_modbus_cb=None, mb_timeout=8) _ = inverter logging.debug('SolarmanEmu.init()') self.db = ifc.remote.stream.db self.snr = ifc.remote.stream.snr self.hb_timeout = 60 '''actual heatbeat timeout from the last response message''' self.data_up_inv = self.db.get_db_value(Register.DATA_UP_INTERVAL) '''time interval for getting new MQTT data messages''' self.hb_timer = Timer(self.send_heartbeat_cb, self.node_id) self.data_timer = Timer(self.send_data_cb, self.node_id) self.last_sync = self._emu_timestamp() '''timestamp when we send the last sync message (4110)''' self.pkt_cnt = 0 '''last sent packet number''' self.switch = { 0x4210: 'msg_data_ind', # real time data 0x1210: self.msg_response, # at least every 5 minutes 0x4710: 'msg_hbeat_ind', # heatbeat 0x1710: self.msg_response, # every 2 minutes 0x4110: 'msg_dev_ind', # device data, sync start 0x1110: self.msg_response, # every 3 hours } self.log_lvl = { 0x4110: logging.INFO, # device data, sync start 0x1110: logging.INFO, # every 3 hours 0x4210: logging.INFO, # real time data 0x1210: logging.INFO, # at least every 5 minutes 0x4710: logging.DEBUG, # heatbeat 0x1710: logging.DEBUG, # every 2 minutes } ''' Our puplic methods ''' def close(self) -> None: logging.info('SolarmanEmu.close()') # we have references to methods of this class in self.switch # so we have to erase self.switch, otherwise this instance can't be # deallocated by the garbage collector ==> we get a memory leak self.switch.clear() self.log_lvl.clear() self.hb_timer.close() self.data_timer.close() self.db = None super().close() def _set_serial_no(self, snr: int): logging.debug(f'SolarmanEmu._set_serial_no, snr: {snr}') self.unique_id = str(snr) def _init_new_client_conn(self) -> bool: logging.debug('SolarmanEmu.init_new()') self.data_timer.start(self.data_up_inv) return False def next_pkt_cnt(self): '''get the next packet number''' self.pkt_cnt = (self.pkt_cnt + 1) & 0xffffffff return self.pkt_cnt def seconds_since_last_sync(self): '''get seconds since last 0x4110 message was sent''' return self._emu_timestamp() - self.last_sync def send_heartbeat_cb(self, exp_cnt): '''send a heartbeat to the TSUN cloud''' self._build_header(0x4710) self.ifc.tx_add(struct.pack('