initial checkin

This commit is contained in:
Stefan Allius
2024-05-02 23:55:59 +02:00
parent 530687039d
commit 58c3333fcc
3 changed files with 86 additions and 0 deletions

56
app/src/modbus.py Normal file
View File

@@ -0,0 +1,56 @@
import struct
if __name__ == "app.src.modbus":
from app.src.singleton import Singleton
else: # pragma: no cover
from singleton import Singleton
#######
# TSUN uses the Modbus in the RTU transmission mode.
# see: https://modbus.org/docs/Modbus_over_serial_line_V1_02.pdf
#
# A Modbus PDU consists of: 'Function-Code' + 'Data'
# A Modbus RTU message consists of: 'Addr' + 'Modbus-PDU' + 'CRC-16'
#
# The 16-bit CRC is known as CRC-16-ANSI(reverse)
# see: https://en.wikipedia.org/wiki/Computation_of_cyclic_redundancy_checks
#######
CRC_POLY = 0xA001 # (LSBF/reverse)
CRC_INIT = 0xFFFF
class Modbus(metaclass=Singleton):
MB_WRITE_SINGLE_REG = 6
MB_READ_SINGLE_REG = 3
__crc_tab = []
def __init__(self):
self.__build_crc_tab(CRC_POLY)
def build_msg(self, addr, func, reg, val):
msg = struct.pack('>BBHH', addr, func, reg, val)
msg += struct.pack('<H', self.__calc_crc(msg))
return msg
def check_crc(self, msg) -> bool:
return 0 == self.__calc_crc(msg)
def __calc_crc(self, buffer: bytes) -> int:
crc = CRC_INIT
for cur in buffer:
crc = (crc >> 8) ^ self.__crc_tab[(crc ^ cur) & 0xFF]
return crc
def __build_crc_tab(self, poly) -> None:
for index in range(256):
data = index << 1
crc = 0
for _ in range(8, 0, -1):
data >>= 1
if (data ^ crc) & 1:
crc = (crc >> 1) ^ poly
else:
crc >>= 1
self.__crc_tab.append(crc)

9
app/src/singleton.py Normal file
View File

@@ -0,0 +1,9 @@
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
# logger_mqtt.debug('singleton: __call__')
if cls not in cls._instances:
cls._instances[cls] = super(Singleton,
cls).__call__(*args, **kwargs)
return cls._instances[cls]

21
app/tests/test_modbus.py Normal file
View File

@@ -0,0 +1,21 @@
# test_with_pytest.py
# import pytest, logging
from app.src.modbus import Modbus
def test_modbus_crc():
mb = Modbus()
assert 0x0b02 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x04')
assert 0 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x04\x02\x0b')
assert mb.check_crc(b'\x01\x06\x20\x08\x00\x04\x02\x0b')
assert 0xc803 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x00')
assert 0 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x00\x03\xc8')
assert mb.check_crc(b'\x01\x06\x20\x08\x00\x00\x03\xc8')
def test_build_modbus_pdu():
mb = Modbus()
pdu = mb.build_msg(1,6,0x2000,0x12)
assert pdu == b'\x01\x06\x20\x00\x00\x12\x02\x07'
assert mb.check_crc(pdu)