This commit is contained in:
Stefan Allius
2024-05-20 00:48:23 +02:00
parent 177706c3e6
commit 6ef6f4cd34
2 changed files with 133 additions and 79 deletions

View File

@@ -1,7 +1,7 @@
import struct
import logging
import asyncio
from typing import Generator
from typing import Generator, Callable
if __name__ == "app.src.modbus":
from app.src.infos import Register
@@ -25,10 +25,13 @@ CRC_INIT = 0xFFFF
class Modbus():
INV_ADDR = 1
'''MODBUS slave address of the TSUN inverter'''
READ_REGS = 3
'''MODBUS function code: Read Holding Register'''
READ_INPUTS = 4
'''MODBUS function code: Read Input Register'''
WRITE_SINGLE_REG = 6
'''Modbus function codes'''
'''Modbus function code: Write Single Register'''
__crc_tab = []
map = {
@@ -66,21 +69,26 @@ class Modbus():
0x3029: {'reg': Register.PV4_TOTAL_GENERATION, 'fmt': '!L', 'ratio': 0.01}, # noqa: E501
}
def __init__(self, snd_handler, timeout: int = 1):
def __init__(self, snd_handler: Callable[[bool], None], timeout: int = 1):
if not len(self.__crc_tab):
self.__build_crc_tab(CRC_POLY)
self.que = asyncio.Queue(100)
self.snd_handler = snd_handler
'''Send handler to transmit a MODBUS request'''
self.rsp_handler = None
'''Response handler to forward the response'''
self.timeout = timeout
'''MODBUS response timeout in seconds'''
self.max_retries = 3
'''Max retransmit for MODBUS requests'''
self.retry_cnt = 0
self.last_req = b''
self.counter = {}
'''Dictenary with statistic counter'''
self.counter['timeouts'] = 0
self.counter['retries'] = {}
for i in range(0, self.max_retries):
self.counter['retries'][i] = 0
for i in range(0, self.max_retries+1):
self.counter['retries'][f'{i}'] = 0
self.last_addr = 0
self.last_fcode = 0
self.last_len = 0
@@ -94,80 +102,67 @@ class Modbus():
if type(self.counter) is not None:
logging.info(f'Modbus __del__:\n {self.counter}')
def start_timer(self):
if self.req_pend:
return
self.req_pend = True
self.tim = self.loop.call_later(self.timeout, self.timeout_cb)
# logging.debug(f'Modbus start timer {self}')
def build_msg(self, addr: int, func: int, reg: int, val: int) -> None:
"""Build MODBUS RTU message frame and add it to the tx queue
def stop_timer(self):
self.req_pend = False
# logging.debug(f'Modbus stop timer {self}')
if self.tim:
self.tim.cancel()
def timeout_cb(self):
self.req_pend = False
if self.retry_cnt < self.max_retries:
logging.debug(f'Modbus retrans {self}')
self.retry_cnt += 1
self.start_timer()
self.snd_handler(self.last_req, retrans=True)
else:
logging.info(f'Modbus timeout {self}')
self.counter['timeouts'] += 1
self.get_next_req()
def get_next_req(self) -> None:
if self.req_pend:
return
try:
item = self.que.get_nowait()
req = item['req']
self.last_req = req
self.rsp_handler = item['rsp_hdl']
self.last_addr = req[0]
self.last_fcode = req[1]
res = struct.unpack_from('>HH', req, 2)
self.last_reg = res[0]
self.last_len = res[1]
self.retry_cnt = 0
self.start_timer()
self.snd_handler(self.last_req, retrans=False)
except asyncio.QueueEmpty:
pass
def build_msg(self, addr, func, reg, val) -> None:
Keyword arguments:
addr: RTU slave address
func: MODBUS function code
reg: 16-bit register number
val: 16 bit value
"""
msg = struct.pack('>BBHH', addr, func, reg, val)
msg += struct.pack('<H', self.__calc_crc(msg))
self.que.put_nowait({'req': msg,
'rsp_hdl': None})
if self.que.qsize() == 1:
self.get_next_req()
self.__send_next_from_que()
def recv_req(self, buf: bytearray, rsp_handler=None) -> bool:
def recv_req(self, buf: bytearray,
rsp_handler: Callable[[None], None] = None) -> bool:
"""Add the received Modbus request to the tx queue
Keyword arguments:
buf: Modbus RTU pdu incl ADDR byte and trailing CRC
rsp_handler: Callback, if the received pdu is valid
Returns:
True: PDU was added to the queue
False: PDU was ignored, due to an error
"""
# logging.info(f'recv_req: first byte modbus:{buf[0]} len:{len(buf)}')
if not self.check_crc(buf):
if not self.__check_crc(buf):
self.err = 1
logging.error('Modbus recv: CRC error')
return False
self.que.put_nowait({'req': buf,
'rsp_hdl': rsp_handler})
if self.que.qsize() == 1:
self.get_next_req()
self.__send_next_from_que()
return True
def recv_resp(self, info_db, buf: bytearray, node_id: str) -> \
Generator[tuple[str, bool, any], None, None]:
Generator[tuple[str, bool, int | float | str], None, None]:
"""Generator which check and parse a received MODBUS response.
Keyword arguments:
info_db: database for info lockups
buf: received Modbus RTU response frame
node_id: string for logging which identifies the slave
Returns on error and set Self.err to:
1: CRC error
2: Wrong server address
3: Unexpected function code
4: Unexpected data length
5: No MODBUS request pending
"""
# logging.info(f'recv_resp: first byte modbus:{buf[0]} len:{len(buf)}')
if not self.req_pend:
self.err = 5
return
if not self.check_crc(buf):
if not self.__check_crc(buf):
logging.error('Modbus resp: CRC error')
self.err = 1
return
@@ -188,7 +183,7 @@ class Modbus():
self.err = 4
return
first_reg = self.last_reg # save last_reg before sending next pdu
self.stop_timer() # stop timer and send next pdu
self.__stop_timer() # stop timer and send next pdu
for i in range(0, elmlen):
addr = first_reg+i
@@ -215,23 +210,81 @@ class Modbus():
f'[\'{node_id}\']MODBUS: {name}'
f' : {result}{unit}')
else:
self.stop_timer()
self.counter['retries'][self.retry_cnt] += 1
self.__stop_timer()
self.counter['retries'][f'{self.retry_cnt}'] += 1
if self.rsp_handler:
self.rsp_handler()
self.get_next_req()
self.__send_next_from_que()
def check_crc(self, msg) -> bool:
'''
MODBUS response timer
'''
def __start_timer(self) -> None:
'''Start response timer and set `req_pend` to True'''
self.req_pend = True
self.tim = self.loop.call_later(self.timeout, self.__timeout_cb)
# logging.debug(f'Modbus start timer {self}')
def __stop_timer(self) -> None:
'''Stop response timer and set `req_pend` to False'''
self.req_pend = False
# logging.debug(f'Modbus stop timer {self}')
if self.tim:
self.tim.cancel()
def __timeout_cb(self) -> None:
'''Rsponse timeout handler retransmit pdu or send next pdu'''
self.req_pend = False
if self.retry_cnt < self.max_retries:
logging.debug(f'Modbus retrans {self}')
self.retry_cnt += 1
self.__start_timer()
self.snd_handler(self.last_req, retrans=True)
else:
logging.info(f'Modbus timeout {self}')
self.counter['timeouts'] += 1
self.__send_next_from_que()
def __send_next_from_que(self) -> None:
'''Get next MODBUS pdu from queue and transmit it'''
if self.req_pend:
return
try:
item = self.que.get_nowait()
req = item['req']
self.last_req = req
self.rsp_handler = item['rsp_hdl']
self.last_addr = req[0]
self.last_fcode = req[1]
res = struct.unpack_from('>HH', req, 2)
self.last_reg = res[0]
self.last_len = res[1]
self.retry_cnt = 0
self.__start_timer()
self.snd_handler(self.last_req, retrans=False)
except asyncio.QueueEmpty:
pass
'''
Helper function for CRC-16 handling
'''
def __check_crc(self, msg: bytearray) -> bool:
'''Check CRC-16 and returns True if valid'''
return 0 == self.__calc_crc(msg)
def __calc_crc(self, buffer: bytes) -> int:
def __calc_crc(self, buffer: bytearray) -> int:
'''Build CRC-16 for buffer and returns it'''
crc = CRC_INIT
for cur in buffer:
crc = (crc >> 8) ^ self.__crc_tab[(crc ^ cur) & 0xFF]
return crc
def __build_crc_tab(self, poly) -> None:
def __build_crc_tab(self, poly: int) -> None:
'''Build CRC-16 helper table, must be called exactly one time'''
for index in range(256):
data = index << 1
crc = 0