import struct import logging import asyncio from typing import Generator if __name__ == "app.src.modbus": from app.src.infos import Register else: # pragma: no cover from infos import Register ####### # TSUN uses the Modbus in the RTU transmission mode. # see: https://modbus.org/docs/Modbus_over_serial_line_V1_02.pdf # # A Modbus PDU consists of: 'Function-Code' + 'Data' # A Modbus RTU message consists of: 'Addr' + 'Modbus-PDU' + 'CRC-16' # # The 16-bit CRC is known as CRC-16-ANSI(reverse) # see: https://en.wikipedia.org/wiki/Computation_of_cyclic_redundancy_checks ####### CRC_POLY = 0xA001 # (LSBF/reverse) CRC_INIT = 0xFFFF class Modbus(): INV_ADDR = 1 READ_REGS = 3 READ_INPUTS = 4 WRITE_SINGLE_REG = 6 '''Modbus function codes''' __crc_tab = [] map = { 0x2007: {'reg': Register.MAX_DESIGNED_POWER, 'fmt': '!H', 'ratio': 1}, # noqa: E501 # 0x????: {'reg': Register.INVERTER_STATUS, 'fmt': '!H'}, # noqa: E501 0x3008: {'reg': Register.VERSION, 'fmt': '!H', 'eval': "f'V{(result>>12)}.{(result>>8)&0xf}.{(result>>4)&0xf}{result&0xf}'"}, # noqa: E501 0x3009: {'reg': Register.GRID_VOLTAGE, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501 0x300a: {'reg': Register.GRID_CURRENT, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x300b: {'reg': Register.GRID_FREQUENCY, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x300c: {'reg': Register.INVERTER_TEMP, 'fmt': '!H', 'eval': 'result-40'}, # noqa: E501 # 0x300d 0x300e: {'reg': Register.RATED_POWER, 'fmt': '!H', 'ratio': 1}, # noqa: E501 0x300f: {'reg': Register.OUTPUT_POWER, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501 0x3010: {'reg': Register.PV1_VOLTAGE, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501 0x3011: {'reg': Register.PV1_CURRENT, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x3012: {'reg': Register.PV1_POWER, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501 0x3013: {'reg': Register.PV2_VOLTAGE, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501 0x3014: {'reg': Register.PV2_CURRENT, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x3015: {'reg': Register.PV2_POWER, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501 0x3016: {'reg': Register.PV3_VOLTAGE, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501 0x3017: {'reg': Register.PV3_CURRENT, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x3018: {'reg': Register.PV3_POWER, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501 0x3019: {'reg': Register.PV4_VOLTAGE, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501 0x301a: {'reg': Register.PV4_CURRENT, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x301b: {'reg': Register.PV4_POWER, 'fmt': '!H', 'ratio': 0.1}, # noqa: E501 0x301c: {'reg': Register.DAILY_GENERATION, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x301d: {'reg': Register.TOTAL_GENERATION, 'fmt': '!L', 'ratio': 0.01}, # noqa: E501 0x301f: {'reg': Register.PV1_DAILY_GENERATION, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x3020: {'reg': Register.PV1_TOTAL_GENERATION, 'fmt': '!L', 'ratio': 0.01}, # noqa: E501 0x3022: {'reg': Register.PV2_DAILY_GENERATION, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x3023: {'reg': Register.PV2_TOTAL_GENERATION, 'fmt': '!L', 'ratio': 0.01}, # noqa: E501 0x3025: {'reg': Register.PV3_DAILY_GENERATION, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x3026: {'reg': Register.PV3_TOTAL_GENERATION, 'fmt': '!L', 'ratio': 0.01}, # noqa: E501 0x3028: {'reg': Register.PV4_DAILY_GENERATION, 'fmt': '!H', 'ratio': 0.01}, # noqa: E501 0x3029: {'reg': Register.PV4_TOTAL_GENERATION, 'fmt': '!L', 'ratio': 0.01}, # noqa: E501 } def __init__(self, snd_handler, timeout: int = 1): if not len(self.__crc_tab): self.__build_crc_tab(CRC_POLY) self.que = asyncio.Queue(100) self.snd_handler = snd_handler self.timeout = timeout self.last_addr = 0 self.last_fcode = 0 self.last_len = 0 self.last_reg = 0 self.err = 0 self.loop = asyncio.get_event_loop() self.req_pend = False self.tim = None def start_timer(self): if self.req_pend: return self.req_pend = True self.tim = self.loop.call_later(self.timeout, self.timeout_cb) # logging.debug(f'Modbus start timer {self}') def stop_timer(self): self.req_pend = False # logging.debug(f'Modbus stop timer {self}') if self.tim: self.tim.cancel() self.get_next_req() def timeout_cb(self): self.req_pend = False logging.info(f'Modbus timeout {self}') self.get_next_req() def get_next_req(self) -> None: if self.req_pend: return try: item = self.que.get_nowait() req = item['req'] self.rsp_handler = item['rsp_hdl'] self.last_addr = req[0] self.last_fcode = req[1] res = struct.unpack_from('>HH', req, 2) self.last_reg = res[0] self.last_len = res[1] self.start_timer() self.snd_handler(req) except asyncio.QueueEmpty: pass def build_msg(self, addr, func, reg, val) -> None: msg = struct.pack('>BBHH', addr, func, reg, val) msg += struct.pack(' bool: # logging.info(f'recv_req: first byte modbus:{buf[0]} len:{len(buf)}') if not self.check_crc(buf): self.err = 1 logging.error('Modbus recv: CRC error') return False self.que.put_nowait({'req': buf, 'rsp_hdl': rsp_handler}) if self.que.qsize() == 1: self.get_next_req() return True def recv_resp(self, info_db, buf: bytearray, node_id: str) -> \ Generator[tuple[str, bool], None, None]: # logging.info(f'recv_resp: first byte modbus:{buf[0]} len:{len(buf)}') if not self.req_pend: self.err = 5 return if not self.check_crc(buf): logging.error('Modbus resp: CRC error') self.err = 1 return if buf[0] != self.last_addr: logging.info(f'Modbus resp: Wrong addr {buf[0]}') self.err = 2 return fcode = buf[1] if fcode != self.last_fcode: logging.info(f'Modbus: Wrong fcode {fcode} != {self.last_fcode}') self.err = 3 return if self.last_addr == self.INV_ADDR and \ (fcode == 3 or fcode == 4): elmlen = buf[2] >> 1 if elmlen != self.last_len: logging.info(f'Modbus: len error {elmlen} != {self.last_len}') self.err = 4 return first_reg = self.last_reg # save last_reg before sending next pdu self.stop_timer() # stop timer and send next pdu for i in range(0, elmlen): addr = first_reg+i if addr in self.map: row = self.map[addr] info_id = row['reg'] fmt = row['fmt'] val = struct.unpack_from(fmt, buf, 3+2*i) result = val[0] if 'eval' in row: result = eval(row['eval']) if 'ratio' in row: result = round(result * row['ratio'], 2) keys, level, unit, must_incr = info_db._key_obj(info_id) if keys: name, update = info_db.update_db(keys, must_incr, result) yield keys[0], update, result if update: info_db.tracer.log(level, f'[\'{node_id}\']MODBUS: {name}' f' : {result}{unit}') else: self.stop_timer() def check_crc(self, msg) -> bool: return 0 == self.__calc_crc(msg) def __calc_crc(self, buffer: bytes) -> int: crc = CRC_INIT for cur in buffer: crc = (crc >> 8) ^ self.__crc_tab[(crc ^ cur) & 0xFF] return crc def __build_crc_tab(self, poly) -> None: for index in range(256): data = index << 1 crc = 0 for _ in range(8, 0, -1): data >>= 1 if (data ^ crc) & 1: crc = (crc >> 1) ^ poly else: crc >>= 1 self.__crc_tab.append(crc)