From 58c3333fcc85a1686594306c97e697998015d339 Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Thu, 2 May 2024 23:55:59 +0200 Subject: [PATCH] initial checkin --- app/src/modbus.py | 56 ++++++++++++++++++++++++++++++++++++++++ app/src/singleton.py | 9 +++++++ app/tests/test_modbus.py | 21 +++++++++++++++ 3 files changed, 86 insertions(+) create mode 100644 app/src/modbus.py create mode 100644 app/src/singleton.py create mode 100644 app/tests/test_modbus.py diff --git a/app/src/modbus.py b/app/src/modbus.py new file mode 100644 index 0000000..9745f23 --- /dev/null +++ b/app/src/modbus.py @@ -0,0 +1,56 @@ +import struct + +if __name__ == "app.src.modbus": + from app.src.singleton import Singleton +else: # pragma: no cover + from singleton import Singleton + +####### +# TSUN uses the Modbus in the RTU transmission mode. +# see: https://modbus.org/docs/Modbus_over_serial_line_V1_02.pdf +# +# A Modbus PDU consists of: 'Function-Code' + 'Data' +# A Modbus RTU message consists of: 'Addr' + 'Modbus-PDU' + 'CRC-16' +# +# The 16-bit CRC is known as CRC-16-ANSI(reverse) +# see: https://en.wikipedia.org/wiki/Computation_of_cyclic_redundancy_checks +####### + +CRC_POLY = 0xA001 # (LSBF/reverse) +CRC_INIT = 0xFFFF + + +class Modbus(metaclass=Singleton): + MB_WRITE_SINGLE_REG = 6 + MB_READ_SINGLE_REG = 3 + __crc_tab = [] + + def __init__(self): + self.__build_crc_tab(CRC_POLY) + + def build_msg(self, addr, func, reg, val): + msg = struct.pack('>BBHH', addr, func, reg, val) + msg += struct.pack(' bool: + return 0 == self.__calc_crc(msg) + + def __calc_crc(self, buffer: bytes) -> int: + crc = CRC_INIT + + for cur in buffer: + crc = (crc >> 8) ^ self.__crc_tab[(crc ^ cur) & 0xFF] + return crc + + def __build_crc_tab(self, poly) -> None: + for index in range(256): + data = index << 1 + crc = 0 + for _ in range(8, 0, -1): + data >>= 1 + if (data ^ crc) & 1: + crc = (crc >> 1) ^ poly + else: + crc >>= 1 + self.__crc_tab.append(crc) diff --git a/app/src/singleton.py b/app/src/singleton.py new file mode 100644 index 0000000..48778b9 --- /dev/null +++ b/app/src/singleton.py @@ -0,0 +1,9 @@ +class Singleton(type): + _instances = {} + + def __call__(cls, *args, **kwargs): + # logger_mqtt.debug('singleton: __call__') + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, + cls).__call__(*args, **kwargs) + return cls._instances[cls] diff --git a/app/tests/test_modbus.py b/app/tests/test_modbus.py new file mode 100644 index 0000000..0e9cf5b --- /dev/null +++ b/app/tests/test_modbus.py @@ -0,0 +1,21 @@ +# test_with_pytest.py +# import pytest, logging +from app.src.modbus import Modbus + + +def test_modbus_crc(): + mb = Modbus() + assert 0x0b02 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x04') + assert 0 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x04\x02\x0b') + assert mb.check_crc(b'\x01\x06\x20\x08\x00\x04\x02\x0b') + + assert 0xc803 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x00') + assert 0 == mb._Modbus__calc_crc(b'\x01\x06\x20\x08\x00\x00\x03\xc8') + assert mb.check_crc(b'\x01\x06\x20\x08\x00\x00\x03\xc8') + +def test_build_modbus_pdu(): + mb = Modbus() + pdu = mb.build_msg(1,6,0x2000,0x12) + assert pdu == b'\x01\x06\x20\x00\x00\x12\x02\x07' + assert mb.check_crc(pdu) +