Sonar qube 3 (#163)

fix SonarQube warnings in modbus.py
This commit is contained in:
Stefan Allius
2024-08-13 21:11:56 +02:00
committed by GitHub
parent e34afcb523
commit 7a9b23d068
2 changed files with 73 additions and 55 deletions

View File

@@ -39,7 +39,7 @@ class Modbus():
'''Modbus function code: Write Single Register'''
__crc_tab = []
map = {
mb_reg_mapping = {
0x2007: {'reg': Register.MAX_DESIGNED_POWER, 'fmt': '!H', 'ratio': 1}, # noqa: E501
0x202c: {'reg': Register.OUTPUT_COEFFICIENT, 'fmt': '!H', 'ratio': 100/1024}, # noqa: E501
@@ -139,7 +139,7 @@ class Modbus():
if self.que.qsize() == 1:
self.__send_next_from_que()
def recv_req(self, buf: bytearray,
def recv_req(self, buf: bytes,
rsp_handler: Callable[[None], None] = None) -> bool:
"""Add the received Modbus RTU request to the tx queue
@@ -164,7 +164,7 @@ class Modbus():
return True
def recv_resp(self, info_db, buf: bytearray, node_id: str) -> \
def recv_resp(self, info_db, buf: bytes, node_id: str) -> \
Generator[tuple[str, bool, int | float | str], None, None]:
"""Generator which check and parse a received MODBUS response.
@@ -183,58 +183,18 @@ class Modbus():
# logging.info(f'recv_resp: first byte modbus:{buf[0]} len:{len(buf)}')
self.node_id = node_id
if not self.req_pend:
self.err = 5
return
if not self.__check_crc(buf):
logger.error(f'[{node_id}] Modbus resp: CRC error')
self.err = 1
return
if buf[0] != self.last_addr:
logger.info(f'[{node_id}] Modbus resp: Wrong addr {buf[0]}')
self.err = 2
return
fcode = buf[1]
if fcode != self.last_fcode:
logger.info(f'[{node_id}] Modbus: Wrong fcode {fcode}'
f' != {self.last_fcode}')
self.err = 3
data_available = self.last_addr == self.INV_ADDR and \
(fcode == 3 or fcode == 4)
if self.__resp_error_check(buf, data_available):
return
if self.last_addr == self.INV_ADDR and \
(fcode == 3 or fcode == 4):
if data_available:
elmlen = buf[2] >> 1
if elmlen != self.last_len:
logger.info(f'[{node_id}] Modbus: len error {elmlen}'
f' != {self.last_len}')
self.err = 4
return
first_reg = self.last_reg # save last_reg before sending next pdu
self.__stop_timer() # stop timer and send next pdu
for i in range(0, elmlen):
addr = first_reg+i
if addr in self.map:
row = self.map[addr]
info_id = row['reg']
fmt = row['fmt']
val = struct.unpack_from(fmt, buf, 3+2*i)
result = val[0]
if 'eval' in row:
result = eval(row['eval'])
if 'ratio' in row:
result = round(result * row['ratio'], 2)
keys, level, unit, must_incr = info_db._key_obj(info_id)
if keys:
name, update = info_db.update_db(keys, must_incr,
result)
yield keys[0], update, result
if update:
info_db.tracer.log(level,
f'[{node_id}] MODBUS: {name}'
f' : {result}{unit}')
yield from self.__process_data(info_db, buf, first_reg, elmlen)
else:
self.__stop_timer()
@@ -243,6 +203,64 @@ class Modbus():
self.rsp_handler()
self.__send_next_from_que()
def __resp_error_check(self, buf: bytes, data_available: bool) -> bool:
'''Check the MODBUS response for errors, returns True if one accure'''
if not self.req_pend:
self.err = 5
return True
if not self.__check_crc(buf):
logger.error(f'[{self.node_id}] Modbus resp: CRC error')
self.err = 1
return True
if buf[0] != self.last_addr:
logger.info(f'[{self.node_id}] Modbus resp: Wrong addr {buf[0]}')
self.err = 2
return True
fcode = buf[1]
if fcode != self.last_fcode:
logger.info(f'[{self.node_id}] Modbus: Wrong fcode {fcode}'
f' != {self.last_fcode}')
self.err = 3
return True
if data_available:
elmlen = buf[2] >> 1
if elmlen != self.last_len:
logger.info(f'[{self.node_id}] Modbus: len error {elmlen}'
f' != {self.last_len}')
self.err = 4
return True
return False
def __get_value(self, buf: bytes, idx: int, row: dict):
'''get a value from the received buffer'''
val = struct.unpack_from(row['fmt'], buf, idx)
result = val[0]
if 'eval' in row:
result = eval(row['eval'])
if 'ratio' in row:
result = round(result * row['ratio'], 2)
return result
def __process_data(self, info_db, buf: bytes, first_reg, elmlen):
'''Generator over received registers, updates the db'''
for i in range(0, elmlen):
addr = first_reg+i
if addr in self.mb_reg_mapping:
row = self.mb_reg_mapping[addr]
info_id = row['reg']
keys, level, unit, must_incr = info_db._key_obj(info_id)
if keys:
result = self.__get_value(buf, 3+2*i, row)
name, update = info_db.update_db(keys, must_incr,
result)
yield keys[0], update, result
if update:
info_db.tracer.log(level,
f'[{self.node_id}] MODBUS: {name}'
f' : {result}{unit}')
'''
MODBUS response timer
'''
@@ -302,11 +320,11 @@ class Modbus():
'''
Helper function for CRC-16 handling
'''
def __check_crc(self, msg: bytearray) -> bool:
def __check_crc(self, msg: bytes) -> bool:
'''Check CRC-16 and returns True if valid'''
return 0 == self.__calc_crc(msg)
def __calc_crc(self, buffer: bytearray) -> int:
def __calc_crc(self, buffer: bytes) -> int:
'''Build CRC-16 for buffer and returns it'''
crc = CRC_INIT

View File

@@ -366,8 +366,8 @@ async def test_timeout():
def test_recv_unknown_data():
'''Receive a response with an unknwon register'''
mb = ModbusTestHelper()
assert 0x9000 not in mb.map
mb.map[0x9000] = {'reg': Register.TEST_REG1, 'fmt': '!H', 'ratio': 1}
assert 0x9000 not in mb.mb_reg_mapping
mb.mb_reg_mapping[0x9000] = {'reg': Register.TEST_REG1, 'fmt': '!H', 'ratio': 1}
mb.build_msg(1,3,0x9000,2)
@@ -379,7 +379,7 @@ def test_recv_unknown_data():
assert 0 == call
assert not mb.req_pend
del mb.map[0x9000]
del mb.mb_reg_mapping[0x9000]
def test_close():
'''Check queue handling for build_msg() calls'''