From 7a9b23d06835387a8fe670903852c0a7d971ef42 Mon Sep 17 00:00:00 2001 From: Stefan Allius <122395479+s-allius@users.noreply.github.com> Date: Tue, 13 Aug 2024 21:11:56 +0200 Subject: [PATCH] Sonar qube 3 (#163) fix SonarQube warnings in modbus.py --- app/src/modbus.py | 122 ++++++++++++++++++++++----------------- app/tests/test_modbus.py | 6 +- 2 files changed, 73 insertions(+), 55 deletions(-) diff --git a/app/src/modbus.py b/app/src/modbus.py index f7dbc27..9a0c918 100644 --- a/app/src/modbus.py +++ b/app/src/modbus.py @@ -39,7 +39,7 @@ class Modbus(): '''Modbus function code: Write Single Register''' __crc_tab = [] - map = { + mb_reg_mapping = { 0x2007: {'reg': Register.MAX_DESIGNED_POWER, 'fmt': '!H', 'ratio': 1}, # noqa: E501 0x202c: {'reg': Register.OUTPUT_COEFFICIENT, 'fmt': '!H', 'ratio': 100/1024}, # noqa: E501 @@ -139,7 +139,7 @@ class Modbus(): if self.que.qsize() == 1: self.__send_next_from_que() - def recv_req(self, buf: bytearray, + def recv_req(self, buf: bytes, rsp_handler: Callable[[None], None] = None) -> bool: """Add the received Modbus RTU request to the tx queue @@ -164,7 +164,7 @@ class Modbus(): return True - def recv_resp(self, info_db, buf: bytearray, node_id: str) -> \ + def recv_resp(self, info_db, buf: bytes, node_id: str) -> \ Generator[tuple[str, bool, int | float | str], None, None]: """Generator which check and parse a received MODBUS response. @@ -183,58 +183,18 @@ class Modbus(): # logging.info(f'recv_resp: first byte modbus:{buf[0]} len:{len(buf)}') self.node_id = node_id - if not self.req_pend: - self.err = 5 - return - if not self.__check_crc(buf): - logger.error(f'[{node_id}] Modbus resp: CRC error') - self.err = 1 - return - if buf[0] != self.last_addr: - logger.info(f'[{node_id}] Modbus resp: Wrong addr {buf[0]}') - self.err = 2 - return fcode = buf[1] - if fcode != self.last_fcode: - logger.info(f'[{node_id}] Modbus: Wrong fcode {fcode}' - f' != {self.last_fcode}') - self.err = 3 + data_available = self.last_addr == self.INV_ADDR and \ + (fcode == 3 or fcode == 4) + + if self.__resp_error_check(buf, data_available): return - if self.last_addr == self.INV_ADDR and \ - (fcode == 3 or fcode == 4): + + if data_available: elmlen = buf[2] >> 1 - if elmlen != self.last_len: - logger.info(f'[{node_id}] Modbus: len error {elmlen}' - f' != {self.last_len}') - self.err = 4 - return first_reg = self.last_reg # save last_reg before sending next pdu self.__stop_timer() # stop timer and send next pdu - - for i in range(0, elmlen): - addr = first_reg+i - if addr in self.map: - row = self.map[addr] - info_id = row['reg'] - fmt = row['fmt'] - val = struct.unpack_from(fmt, buf, 3+2*i) - result = val[0] - - if 'eval' in row: - result = eval(row['eval']) - if 'ratio' in row: - result = round(result * row['ratio'], 2) - - keys, level, unit, must_incr = info_db._key_obj(info_id) - - if keys: - name, update = info_db.update_db(keys, must_incr, - result) - yield keys[0], update, result - if update: - info_db.tracer.log(level, - f'[{node_id}] MODBUS: {name}' - f' : {result}{unit}') + yield from self.__process_data(info_db, buf, first_reg, elmlen) else: self.__stop_timer() @@ -243,6 +203,64 @@ class Modbus(): self.rsp_handler() self.__send_next_from_que() + def __resp_error_check(self, buf: bytes, data_available: bool) -> bool: + '''Check the MODBUS response for errors, returns True if one accure''' + if not self.req_pend: + self.err = 5 + return True + if not self.__check_crc(buf): + logger.error(f'[{self.node_id}] Modbus resp: CRC error') + self.err = 1 + return True + if buf[0] != self.last_addr: + logger.info(f'[{self.node_id}] Modbus resp: Wrong addr {buf[0]}') + self.err = 2 + return True + fcode = buf[1] + if fcode != self.last_fcode: + logger.info(f'[{self.node_id}] Modbus: Wrong fcode {fcode}' + f' != {self.last_fcode}') + self.err = 3 + return True + if data_available: + elmlen = buf[2] >> 1 + if elmlen != self.last_len: + logger.info(f'[{self.node_id}] Modbus: len error {elmlen}' + f' != {self.last_len}') + self.err = 4 + return True + + return False + + def __get_value(self, buf: bytes, idx: int, row: dict): + '''get a value from the received buffer''' + val = struct.unpack_from(row['fmt'], buf, idx) + result = val[0] + + if 'eval' in row: + result = eval(row['eval']) + if 'ratio' in row: + result = round(result * row['ratio'], 2) + return result + + def __process_data(self, info_db, buf: bytes, first_reg, elmlen): + '''Generator over received registers, updates the db''' + for i in range(0, elmlen): + addr = first_reg+i + if addr in self.mb_reg_mapping: + row = self.mb_reg_mapping[addr] + info_id = row['reg'] + keys, level, unit, must_incr = info_db._key_obj(info_id) + if keys: + result = self.__get_value(buf, 3+2*i, row) + name, update = info_db.update_db(keys, must_incr, + result) + yield keys[0], update, result + if update: + info_db.tracer.log(level, + f'[{self.node_id}] MODBUS: {name}' + f' : {result}{unit}') + ''' MODBUS response timer ''' @@ -302,11 +320,11 @@ class Modbus(): ''' Helper function for CRC-16 handling ''' - def __check_crc(self, msg: bytearray) -> bool: + def __check_crc(self, msg: bytes) -> bool: '''Check CRC-16 and returns True if valid''' return 0 == self.__calc_crc(msg) - def __calc_crc(self, buffer: bytearray) -> int: + def __calc_crc(self, buffer: bytes) -> int: '''Build CRC-16 for buffer and returns it''' crc = CRC_INIT diff --git a/app/tests/test_modbus.py b/app/tests/test_modbus.py index 970a161..d0e321e 100644 --- a/app/tests/test_modbus.py +++ b/app/tests/test_modbus.py @@ -366,8 +366,8 @@ async def test_timeout(): def test_recv_unknown_data(): '''Receive a response with an unknwon register''' mb = ModbusTestHelper() - assert 0x9000 not in mb.map - mb.map[0x9000] = {'reg': Register.TEST_REG1, 'fmt': '!H', 'ratio': 1} + assert 0x9000 not in mb.mb_reg_mapping + mb.mb_reg_mapping[0x9000] = {'reg': Register.TEST_REG1, 'fmt': '!H', 'ratio': 1} mb.build_msg(1,3,0x9000,2) @@ -379,7 +379,7 @@ def test_recv_unknown_data(): assert 0 == call assert not mb.req_pend - del mb.map[0x9000] + del mb.mb_reg_mapping[0x9000] def test_close(): '''Check queue handling for build_msg() calls'''