refactoring

This commit is contained in:
Stefan Allius
2024-09-24 21:12:51 +02:00
parent 89c2c11ed9
commit d5c369e5fe
23 changed files with 1257 additions and 942 deletions

View File

@@ -48,7 +48,8 @@ class Talent(Message):
def __init__(self, server_side: bool, ifc: "AsyncIfc", id_str=b''):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=15)
ifc.read.reg_trigger(self.read)
ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout)
self.ifc = ifc
self.await_conn_resp_cnt = 0
self.id_str = id_str
@@ -107,7 +108,8 @@ class Talent(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.read.reg_trigger(None)
self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None)
super().close()
def __set_serial_no(self, serial_no: str):
@@ -143,10 +145,10 @@ class Talent(Message):
self._read()
while True:
if not self.header_valid:
self.__parse_header(self.ifc.read.peek(), len(self.ifc.read))
self.__parse_header(self.ifc.rx_peek(), self.ifc.rx_len())
if self.header_valid and \
len(self.ifc.read) >= (self.header_len + self.data_len):
self.ifc.rx_len() >= (self.header_len + self.data_len):
if self.state == State.init:
self.state = State.received # received 1st package
@@ -154,10 +156,10 @@ class Talent(Message):
if callable(log_lvl):
log_lvl = log_lvl()
self.ifc.read.logging(log_lvl, f'Received from {self.addr}:'
f' BufLen: {len(self.ifc.read)}'
f' HdrLen: {self.header_len}'
f' DtaLen: {self.data_len}')
self.ifc.rx_log(log_lvl, f'Received from {self.addr}:'
f' BufLen: {self.ifc.rx_len()}'
f' HdrLen: {self.header_len}'
f' DtaLen: {self.data_len}')
self.__set_serial_no(self.id_str.decode("utf-8"))
self.__dispatch_msg()
@@ -170,9 +172,9 @@ class Talent(Message):
tsun = Config.get('tsun')
if tsun['enabled']:
buflen = self.header_len+self.data_len
buffer = self.ifc.read.peek(buflen)
self.ifc.forward += buffer
self.ifc.forward.logging(logging.DEBUG, 'Store for forwarding:')
buffer = self.ifc.rx_peek(buflen)
self.ifc.fwd_add(buffer)
self.ifc.fwd_log(logging.DEBUG, 'Store for forwarding:')
fnc = self.switch.get(self.msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'forwrd') +
@@ -181,18 +183,18 @@ class Talent(Message):
def forward_snd(self) -> None:
'''add the build send msg to the forwarding queue'''
tsun = Config.get('tsun')
rest = self.ifc.write.get(self.send_msg_ofs)
buffer = self.ifc.write.get(len(self.ifc.write))
rest = self.ifc.tx_get(self.send_msg_ofs)
buffer = self.ifc.tx_get()
if tsun['enabled']:
_len = len(buffer)
struct.pack_into('!l', buffer, 0, _len-4)
self.ifc.forward += buffer
self.ifc.forward.logging(logging.INFO, 'Store for forwarding:')
self.ifc.fwd_add(buffer)
self.ifc.fwd_log(logging.INFO, 'Store for forwarding:')
fnc = self.switch.get(self.msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'forwrd') +
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
self.ifc.write += rest
self.ifc.tx_add(rest)
def send_modbus_cb(self, modbus_pdu: bytearray, log_lvl: int, state: str):
if self.state != State.up:
@@ -201,13 +203,13 @@ class Talent(Message):
return
self.__build_header(0x70, 0x77)
self.ifc.write += b'\x00\x01\xa3\x28' # magic ?
self.ifc.write += struct.pack('!B', len(modbus_pdu))
self.ifc.write += modbus_pdu
self.ifc.tx_add(b'\x00\x01\xa3\x28') # magic ?
self.ifc.tx_add(struct.pack('!B', len(modbus_pdu)))
self.ifc.tx_add(modbus_pdu)
self.__finish_send_msg()
self.ifc.write.logging(log_lvl, f'Send Modbus {state}:{self.addr}:')
self.ifc.write()
self.ifc.tx_log(log_lvl, f'Send Modbus {state}:{self.addr}:')
self.ifc.tx_flush()
def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
if self.state != State.up:
@@ -235,9 +237,9 @@ class Talent(Message):
self.msg_id = 0
self.await_conn_resp_cnt += 1
self.__build_header(0x91)
self.ifc.write += struct.pack(f'!{len(contact_name)+1}p'
f'{len(contact_mail)+1}p',
contact_name, contact_mail)
self.ifc.tx_add(struct.pack(f'!{len(contact_name)+1}p'
f'{len(contact_mail)+1}p',
contact_name, contact_mail))
self.__finish_send_msg()
return True
@@ -321,7 +323,7 @@ class Talent(Message):
self.inc_counter('Invalid_Msg_Format')
# erase broken recv buffer
self.ifc.read.clear()
self.ifc.rx_clear()
return
hdr_len = 5+id_len+2
@@ -342,16 +344,16 @@ class Talent(Message):
def __build_header(self, ctrl, msg_id=None) -> None:
if not msg_id:
msg_id = self.msg_id
self.send_msg_ofs = len(self.ifc.write)
self.ifc.write += struct.pack(f'!l{len(self.id_str)+1}pBB',
0, self.id_str, ctrl, msg_id)
self.send_msg_ofs = self.ifc.tx_len()
self.ifc.tx_add(struct.pack(f'!l{len(self.id_str)+1}pBB',
0, self.id_str, ctrl, msg_id))
fnc = self.switch.get(msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'tx') +
f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}')
def __finish_send_msg(self) -> None:
_len = len(self.ifc.write) - self.send_msg_ofs
struct.pack_into('!l', self.ifc.write.peek(), self.send_msg_ofs,
_len = self.ifc.tx_len() - self.send_msg_ofs
struct.pack_into('!l', self.ifc.tx_peek(), self.send_msg_ofs,
_len-4)
def __dispatch_msg(self) -> None:
@@ -366,7 +368,7 @@ class Talent(Message):
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
def __flush_recv_msg(self) -> None:
self.ifc.read.get(self.header_len+self.data_len)
self.ifc.rx_get(self.header_len+self.data_len)
self.header_valid = False
'''
@@ -376,7 +378,7 @@ class Talent(Message):
if self.ctrl.is_ind():
if self.server_side and self.__process_contact_info():
self.__build_header(0x91)
self.ifc.write += b'\x01'
self.ifc.tx_add(b'\x01')
self.__finish_send_msg()
# don't forward this contact info here, we will build one
# when the remote connection is established
@@ -390,7 +392,7 @@ class Talent(Message):
self.forward()
def __process_contact_info(self) -> bool:
buf = self.ifc.read.peek()
buf = self.ifc.rx_peek()
result = struct.unpack_from('!B', buf, self.header_len)
name_len = result[0]
if self.data_len == 1: # this is a response withone status byte
@@ -417,16 +419,16 @@ class Talent(Message):
ts = self._timestamp()
logger.debug(f'time: {ts:08x}')
self.__build_header(0x91)
self.ifc.write += struct.pack('!q', ts)
self.ifc.tx_add(struct.pack('!q', ts))
self.__finish_send_msg()
elif self.data_len >= 8:
ts = self._timestamp()
result = struct.unpack_from('!q', self.ifc.read.peek(),
result = struct.unpack_from('!q', self.ifc.rx_peek(),
self.header_len)
self.ts_offset = result[0]-ts
if self.remote_stream:
self.remote_stream.ts_offset = self.ts_offset
if self.remote.stream:
self.remote.stream.ts_offset = self.ts_offset
logger.debug(f'tsun-time: {int(result[0]):08x}'
f' proxy-time: {ts:08x}'
f' offset: {self.ts_offset}')
@@ -446,10 +448,10 @@ class Talent(Message):
self.db.set_db_def_value(Register.POLLING_INTERVAL,
self.mb_timeout)
self.__build_header(0x99)
self.ifc.write += b'\x02'
self.ifc.tx_add(b'\x02')
self.__finish_send_msg()
result = struct.unpack_from('!Bq', self.ifc.read.peek(),
result = struct.unpack_from('!Bq', self.ifc.rx_peek(),
self.header_len)
resp_code = result[0]
ts = result[1]+self.ts_offset
@@ -457,11 +459,11 @@ class Talent(Message):
f' tsun-time: {ts:08x}'
f' offset: {self.ts_offset}')
self.__build_header(0x91)
self.ifc.write += struct.pack('!Bq', resp_code, ts)
self.ifc.tx_add(struct.pack('!Bq', resp_code, ts))
self.forward_snd()
return
elif self.ctrl.is_resp():
result = struct.unpack_from('!B', self.ifc.read.peek(),
result = struct.unpack_from('!B', self.ifc.rx_peek(),
self.header_len)
resp_code = result[0]
logging.debug(f'TimeActRespCode: {resp_code}')
@@ -473,7 +475,7 @@ class Talent(Message):
self.forward()
def parse_msg_header(self):
result = struct.unpack_from('!lB', self.ifc.read.peek(),
result = struct.unpack_from('!lB', self.ifc.rx_peek(),
self.header_len)
data_id = result[0] # len of complete message
@@ -482,7 +484,7 @@ class Talent(Message):
msg_hdr_len = 5+id_len+9
result = struct.unpack_from(f'!{id_len+1}pBq', self.ifc.read.peek(),
result = struct.unpack_from(f'!{id_len+1}pBq', self.ifc.rx_peek(),
self.header_len + 4)
timestamp = result[2]
@@ -495,7 +497,7 @@ class Talent(Message):
def msg_collector_data(self):
if self.ctrl.is_ind():
self.__build_header(0x99)
self.ifc.write += b'\x01'
self.ifc.tx_add(b'\x01')
self.__finish_send_msg()
self.__process_data()
@@ -510,7 +512,7 @@ class Talent(Message):
def msg_inverter_data(self):
if self.ctrl.is_ind():
self.__build_header(0x99)
self.ifc.write += b'\x01'
self.ifc.tx_add(b'\x01')
self.__finish_send_msg()
self.__process_data()
self.state = State.up # allow MODBUS cmds
@@ -530,7 +532,7 @@ class Talent(Message):
def __process_data(self):
msg_hdr_len, ts = self.parse_msg_header()
for key, update in self.db.parse(self.ifc.read.peek(), self.header_len
for key, update in self.db.parse(self.ifc.rx_peek(), self.header_len
+ msg_hdr_len, self.node_id):
if update:
self._set_mqtt_timestamp(key, self._utcfromts(ts))
@@ -550,7 +552,7 @@ class Talent(Message):
msg_hdr_len = 5
result = struct.unpack_from('!lBB', self.ifc.read.peek(),
result = struct.unpack_from('!lBB', self.ifc.rx_peek(),
self.header_len)
modbus_len = result[1]
return msg_hdr_len, modbus_len
@@ -559,7 +561,7 @@ class Talent(Message):
msg_hdr_len = 6
result = struct.unpack_from('!lBBB', self.ifc.read.peek(),
result = struct.unpack_from('!lBBB', self.ifc.rx_peek(),
self.header_len)
modbus_len = result[2]
return msg_hdr_len, modbus_len
@@ -580,12 +582,12 @@ class Talent(Message):
self.__msg_modbus(hdr_len)
def __msg_modbus(self, hdr_len):
data = self.ifc.read.peek()[self.header_len:
self.header_len+self.data_len]
data = self.ifc.rx_peek()[self.header_len:
self.header_len+self.data_len]
if self.ctrl.is_req():
if self.remote_stream.mb.recv_req(data[hdr_len:],
self.remote_stream.
if self.remote.stream.mb.recv_req(data[hdr_len:],
self.remote.stream.
msg_forward):
self.inc_counter('Modbus_Command')
else: