add Modbus support

This commit is contained in:
Stefan Allius
2024-05-02 23:59:55 +02:00
parent 1d9cbf314e
commit dba3b458ba

View File

@@ -3,12 +3,15 @@ import logging
import time
from datetime import datetime
if __name__ == "app.src.gen3.talent":
from app.src.messages import hex_dump_memory, Message
from app.src.modbus import Modbus
from app.src.config import Config
from app.src.gen3.infos_g3 import InfosG3
else: # pragma: no cover
from messages import hex_dump_memory, Message
from modbus import Modbus
from config import Config
from gen3.infos_g3 import InfosG3
@@ -41,13 +44,18 @@ class Talent(Message):
self.contact_name = b''
self.contact_mail = b''
self.db = InfosG3()
self.forward_modbus_rep = False
self.switch = {
0x00: self.msg_contact_info,
0x13: self.msg_ota_update,
0x22: self.msg_get_time,
0x71: self.msg_collector_data,
# 0x76:
0x77: self.msg_modbus,
# 0x78:
0x04: self.msg_inverter_data,
}
self.mb = Modbus()
'''
Our puplic methods
@@ -115,6 +123,18 @@ class Talent(Message):
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
return
async def send_modbus_cmd(self, func, addr, val) -> None:
self.forward_modbus_rep = False
self.__build_header(0x70, 0x77)
self._send_buffer += b'\x00\x01\xa3\x28'
modbus_msg = self.mb.build_msg(1, func, addr, val)
self._send_buffer += struct.pack('!B', len(modbus_msg))
self._send_buffer += modbus_msg
_len = self.__finish_send_msg()
hex_dump_memory(logging.INFO, 'Send Modbus Command:',
self._send_buffer[self.send_msg_ofs:], _len)
await self.flush_send_msg()
def _init_new_client_conn(self) -> bool:
contact_name = self.contact_name
contact_mail = self.contact_mail
@@ -190,17 +210,20 @@ class Talent(Message):
self.header_valid = True
return
def __build_header(self, ctrl) -> None:
def __build_header(self, ctrl, msg_id=None) -> None:
if not msg_id:
msg_id = self.msg_id
self.send_msg_ofs = len(self._send_buffer)
self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB',
0, self.id_str, ctrl, self.msg_id)
fnc = self.switch.get(self.msg_id, self.msg_unknown)
0, self.id_str, ctrl, msg_id)
fnc = self.switch.get(msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'tx') +
f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}')
def __finish_send_msg(self) -> None:
def __finish_send_msg(self) -> int:
_len = len(self._send_buffer) - self.send_msg_ofs
struct.pack_into('!l', self._send_buffer, self.send_msg_ofs, _len-4)
return _len
def __dispatch_msg(self) -> None:
fnc = self.switch.get(self.msg_id, self.msg_unknown)
@@ -348,6 +371,33 @@ class Talent(Message):
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
def parse_modbus_header(self):
msg_hdr_len = 5
result = struct.unpack_from('!lB', self._recv_buffer,
self.header_len + 4)
modbus_len = result[1]
logger.debug(f'Ref: {result[0]}')
logger.debug(f'Modbus Len: {modbus_len}')
# logger.info(f'time: {datetime.utcfromtimestamp(result[2]).strftime(
# "%Y-%m-%d %H:%M:%S")}')
return msg_hdr_len, modbus_len
def msg_modbus(self):
hdr_len, modbus_len = self.parse_modbus_header()
if self.ctrl.is_req():
self.forward_modbus_rep = True
self.inc_counter('Modbus_Command')
elif self.ctrl.is_ind():
if not self.forward_modbus_rep:
return
else:
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
def msg_unknown(self):
logger.warning(f"Unknow Msg: ID:{self.msg_id}")
self.inc_counter('Unknown_Msg')