Make test code more clean (#149)

* cleanup
This commit is contained in:
Stefan Allius
2024-08-09 00:02:01 +02:00
committed by GitHub
parent 24ece4fece
commit 155b6d4e67
14 changed files with 551 additions and 683 deletions

View File

@@ -1,6 +1,5 @@
import struct
import logging
import time
import pytz
from datetime import datetime
from tzlocal import get_localzone
@@ -169,7 +168,6 @@ class Talent(Message):
fnc = self.switch.get(self.msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'forwrd') +
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
return
def send_modbus_cb(self, modbus_pdu: bytearray, log_lvl: int, state: str):
if self.state != State.up:
@@ -242,12 +240,8 @@ class Talent(Message):
def _timestamp(self): # pragma: no cover
'''returns timestamp fo the inverter as localtime
since 1.1.1970 in msec'''
if False:
# utc as epoche
ts = time.time()
else:
# convert localtime in epoche
ts = (datetime.now() - datetime(1970, 1, 1)).total_seconds()
# convert localtime in epoche
ts = (datetime.now() - datetime(1970, 1, 1)).total_seconds()
return round(ts*1000)
def _utcfromts(self, ts: float):
@@ -297,7 +291,7 @@ class Talent(Message):
if (buf_len < 5): # enough bytes to read len and id_len?
return
result = struct.unpack_from('!lB', buf, 0)
len = result[0] # len of complete message
msg_len = result[0] # len of complete message
id_len = result[1] # len of variable id string
hdr_len = 5+id_len+2
@@ -311,10 +305,9 @@ class Talent(Message):
self.id_str = result[0]
self.ctrl = Control(result[1])
self.msg_id = result[2]
self.data_len = len-id_len-3
self.data_len = msg_len-id_len-3
self.header_len = hdr_len
self.header_valid = True
return
def __build_header(self, ctrl, msg_id=None) -> None:
if not msg_id:
@@ -360,7 +353,6 @@ class Talent(Message):
self.await_conn_resp_cnt -= 1
else:
self.forward()
return
else:
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
@@ -477,7 +469,7 @@ class Talent(Message):
if self.ctrl.is_req():
self.inc_counter('OTA_Start_Msg')
elif self.ctrl.is_ind():
pass
pass # Ok, nothing to do
else:
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
@@ -490,27 +482,24 @@ class Talent(Message):
result = struct.unpack_from('!lBB', self._recv_buffer,
self.header_len)
modbus_len = result[1]
# logger.debug(f'Ref: {result[0]}')
# logger.debug(f'Modbus MsgLen: {modbus_len} Func:{result[2]}')
return msg_hdr_len, modbus_len
def get_modbus_log_lvl(self) -> int:
if self.ctrl.is_req():
return logging.INFO
elif self.ctrl.is_ind():
if self.server_side:
return self.mb.last_log_lvl
elif self.ctrl.is_ind() and self.server_side:
return self.mb.last_log_lvl
return logging.WARNING
def msg_modbus(self):
hdr_len, modbus_len = self.parse_modbus_header()
hdr_len, _ = self.parse_modbus_header()
data = self._recv_buffer[self.header_len:
self.header_len+self.data_len]
if self.ctrl.is_req():
if self.remoteStream.mb.recv_req(data[hdr_len:],
self.remoteStream.
msg_forward):
if self.remote_stream.mb.recv_req(data[hdr_len:],
self.remote_stream.
msg_forward):
self.inc_counter('Modbus_Command')
else:
self.inc_counter('Invalid_Msg_Format')