SonarCloud setup (#168)

* Code Cleanup (#158)

* print coverage report

* create sonar-project property file

* install all py dependencies in one step

* code cleanup

* reduce cognitive complexity

* do not build on *.yml changes

* optimise versionstring handling (#159)

- Reading the version string from the image updates
  it even if the image is re-pulled without re-deployment

* fix linter warning

* exclude *.pyi filese

* ignore some rules for tests

* cleanup (#160)

* Sonar qube 3 (#163)

fix SonarQube warnings in modbus.py

* Sonar qube 3 (#164)


* fix SonarQube warnings

* Sonar qube 3 (#165)

* cleanup

* Add support for TSUN Titan inverter
Fixes #161


* fix SonarQube warnings

* fix error

* rename field "config"

* SonarQube reads flake8 output

* don't stop on flake8 errors

* flake8 scan only app/src for SonarQube

* update flake8 run

* ignore flake8 C901

* cleanup

* fix linter warnings

* ignore changed *.yml files

* read sensor list solarman data packets

* catch 'No route to' error and log only in debug mode

* fix unit tests

* add sensor_list configuration

* adapt unit tests

* fix SonarQube warnings

* Sonar qube 3 (#166)

* add unittests for mqtt.py

* add mock

* move test requirements into a file

* fix unit tests

* fix formating

* initial version

* fix SonarQube warning
This commit is contained in:
Stefan Allius
2024-08-23 21:24:01 +02:00
committed by GitHub
parent db1169f61f
commit 58b42f7d7c
27 changed files with 899 additions and 500 deletions

View File

@@ -39,7 +39,7 @@ class Modbus():
'''Modbus function code: Write Single Register'''
__crc_tab = []
map = {
mb_reg_mapping = {
0x2007: {'reg': Register.MAX_DESIGNED_POWER, 'fmt': '!H', 'ratio': 1}, # noqa: E501
0x202c: {'reg': Register.OUTPUT_COEFFICIENT, 'fmt': '!H', 'ratio': 100/1024}, # noqa: E501
@@ -139,7 +139,7 @@ class Modbus():
if self.que.qsize() == 1:
self.__send_next_from_que()
def recv_req(self, buf: bytearray,
def recv_req(self, buf: bytes,
rsp_handler: Callable[[None], None] = None) -> bool:
"""Add the received Modbus RTU request to the tx queue
@@ -164,7 +164,7 @@ class Modbus():
return True
def recv_resp(self, info_db, buf: bytearray, node_id: str) -> \
def recv_resp(self, info_db, buf: bytes, node_id: str) -> \
Generator[tuple[str, bool, int | float | str], None, None]:
"""Generator which check and parse a received MODBUS response.
@@ -183,58 +183,18 @@ class Modbus():
# logging.info(f'recv_resp: first byte modbus:{buf[0]} len:{len(buf)}')
self.node_id = node_id
if not self.req_pend:
self.err = 5
return
if not self.__check_crc(buf):
logger.error(f'[{node_id}] Modbus resp: CRC error')
self.err = 1
return
if buf[0] != self.last_addr:
logger.info(f'[{node_id}] Modbus resp: Wrong addr {buf[0]}')
self.err = 2
return
fcode = buf[1]
if fcode != self.last_fcode:
logger.info(f'[{node_id}] Modbus: Wrong fcode {fcode}'
f' != {self.last_fcode}')
self.err = 3
data_available = self.last_addr == self.INV_ADDR and \
(fcode == 3 or fcode == 4)
if self.__resp_error_check(buf, data_available):
return
if self.last_addr == self.INV_ADDR and \
(fcode == 3 or fcode == 4):
if data_available:
elmlen = buf[2] >> 1
if elmlen != self.last_len:
logger.info(f'[{node_id}] Modbus: len error {elmlen}'
f' != {self.last_len}')
self.err = 4
return
first_reg = self.last_reg # save last_reg before sending next pdu
self.__stop_timer() # stop timer and send next pdu
for i in range(0, elmlen):
addr = first_reg+i
if addr in self.map:
row = self.map[addr]
info_id = row['reg']
fmt = row['fmt']
val = struct.unpack_from(fmt, buf, 3+2*i)
result = val[0]
if 'eval' in row:
result = eval(row['eval'])
if 'ratio' in row:
result = round(result * row['ratio'], 2)
keys, level, unit, must_incr = info_db._key_obj(info_id)
if keys:
name, update = info_db.update_db(keys, must_incr,
result)
yield keys[0], update, result
if update:
info_db.tracer.log(level,
f'[{node_id}] MODBUS: {name}'
f' : {result}{unit}')
yield from self.__process_data(info_db, buf, first_reg, elmlen)
else:
self.__stop_timer()
@@ -243,6 +203,64 @@ class Modbus():
self.rsp_handler()
self.__send_next_from_que()
def __resp_error_check(self, buf: bytes, data_available: bool) -> bool:
'''Check the MODBUS response for errors, returns True if one accure'''
if not self.req_pend:
self.err = 5
return True
if not self.__check_crc(buf):
logger.error(f'[{self.node_id}] Modbus resp: CRC error')
self.err = 1
return True
if buf[0] != self.last_addr:
logger.info(f'[{self.node_id}] Modbus resp: Wrong addr {buf[0]}')
self.err = 2
return True
fcode = buf[1]
if fcode != self.last_fcode:
logger.info(f'[{self.node_id}] Modbus: Wrong fcode {fcode}'
f' != {self.last_fcode}')
self.err = 3
return True
if data_available:
elmlen = buf[2] >> 1
if elmlen != self.last_len:
logger.info(f'[{self.node_id}] Modbus: len error {elmlen}'
f' != {self.last_len}')
self.err = 4
return True
return False
def __get_value(self, buf: bytes, idx: int, row: dict):
'''get a value from the received buffer'''
val = struct.unpack_from(row['fmt'], buf, idx)
result = val[0]
if 'eval' in row:
result = eval(row['eval'])
if 'ratio' in row:
result = round(result * row['ratio'], 2)
return result
def __process_data(self, info_db, buf: bytes, first_reg, elmlen):
'''Generator over received registers, updates the db'''
for i in range(0, elmlen):
addr = first_reg+i
if addr in self.mb_reg_mapping:
row = self.mb_reg_mapping[addr]
info_id = row['reg']
keys, level, unit, must_incr = info_db._key_obj(info_id)
if keys:
result = self.__get_value(buf, 3+2*i, row)
name, update = info_db.update_db(keys, must_incr,
result)
yield keys[0], update, result
if update:
info_db.tracer.log(level,
f'[{self.node_id}] MODBUS: {name}'
f' : {result}{unit}')
'''
MODBUS response timer
'''
@@ -302,11 +320,11 @@ class Modbus():
'''
Helper function for CRC-16 handling
'''
def __check_crc(self, msg: bytearray) -> bool:
def __check_crc(self, msg: bytes) -> bool:
'''Check CRC-16 and returns True if valid'''
return 0 == self.__calc_crc(msg)
def __calc_crc(self, buffer: bytearray) -> int:
def __calc_crc(self, buffer: bytes) -> int:
'''Build CRC-16 for buffer and returns it'''
crc = CRC_INIT