use AsyncIfc class with FIFO

This commit is contained in:
Stefan Allius
2024-09-22 10:40:30 +02:00
parent af81aef07c
commit b7c63b5cf8
12 changed files with 380 additions and 359 deletions

View File

@@ -7,8 +7,10 @@ from typing import Self
from itertools import count
if __name__ == "app.src.async_stream":
from app.src.async_ifc import AsyncIfc
from app.src.messages import hex_dump_memory, State
else: # pragma: no cover
from async_ifc import AsyncIfc
from messages import hex_dump_memory, State
@@ -28,10 +30,12 @@ class AsyncStream():
'''maximum default time without a received msg in sec'''
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr) -> None:
addr, ifc: "AsyncIfc") -> None:
logger.debug('AsyncStream.__init__')
self.reader = reader
self.writer = writer
ifc.write.reg_trigger(self.__write_cb)
self.ifc = ifc
self._reader = reader
self._writer = writer
self.addr = addr
self.r_addr = ''
self.l_addr = ''
@@ -39,6 +43,9 @@ class AsyncStream():
self.proc_start = None # start processing start timestamp
self.proc_max = 0
def __write_cb(self):
self._writer.write(self.ifc.write.get())
def __timeout(self) -> int:
if self.state == State.init or self.state == State.received:
to = self.MAX_START_TIME
@@ -101,8 +108,8 @@ class AsyncStream():
async def loop(self) -> Self:
"""Async loop handler for precessing all received messages"""
self.r_addr = self.writer.get_extra_info('peername')
self.l_addr = self.writer.get_extra_info('sockname')
self.r_addr = self._writer.get_extra_info('peername')
self.l_addr = self._writer.get_extra_info('sockname')
self.proc_start = time.time()
while True:
try:
@@ -151,31 +158,30 @@ class AsyncStream():
async def async_write(self, headline: str = 'Transmit to ') -> None:
"""Async write handler to transmit the send_buffer"""
if self._send_buffer:
hex_dump_memory(logging.INFO, f'{headline}{self.addr}:',
self._send_buffer, len(self._send_buffer))
self.writer.write(self._send_buffer)
await self.writer.drain()
self._send_buffer = bytearray(0) # self._send_buffer[sent:]
if len(self.ifc.write) > 0:
self.ifc.write.logging(logging.INFO, f'{headline}{self.addr}:')
self._writer.write(self.ifc.write.get())
await self._writer.drain()
async def disc(self) -> None:
"""Async disc handler for graceful disconnect"""
if self.writer.is_closing():
if self._writer.is_closing():
return
logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
self.writer.close()
await self.writer.wait_closed()
self._writer.close()
await self._writer.wait_closed()
def close(self) -> None:
"""close handler for a no waiting disconnect
hint: must be called before releasing the connection instance
"""
self.reader.feed_eof() # abort awaited read
if self.writer.is_closing():
self._reader.feed_eof() # abort awaited read
if self._writer.is_closing():
return
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
self.writer.close()
self.ifc.write.reg_trigger(None)
self._writer.close()
def healthy(self) -> bool:
elapsed = 0
@@ -194,11 +200,11 @@ class AsyncStream():
'''
async def __async_read(self) -> None:
"""Async read handler to read received data from TCP stream"""
data = await self.reader.read(4096)
data = await self._reader.read(4096)
if data:
self.proc_start = time.time()
self._recv_buffer += data
wait = self.read() # call read in parent class
self.ifc.read += data
wait = self.ifc.read() # call read in parent class
if wait > 0:
await asyncio.sleep(wait)
else:
@@ -221,8 +227,8 @@ class AsyncStream():
f'Forward to {self.remote_stream.addr}:',
self._forward_buffer,
len(self._forward_buffer))
self.remote_stream.writer.write(self._forward_buffer)
await self.remote_stream.writer.drain()
self.remote_stream._writer.write(self._forward_buffer)
await self.remote_stream._writer.drain()
self._forward_buffer = bytearray(0)
except OSError as error:

View File

@@ -2,9 +2,11 @@ import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3.connection_g3":
from app.src.async_ifc import AsyncIfc
from app.src.async_stream import AsyncStream
from app.src.gen3.talent import Talent
else: # pragma: no cover
from async_ifc import AsyncIfc
from async_stream import AsyncStream
from gen3.talent import Talent
@@ -16,8 +18,9 @@ class ConnectionG3(AsyncStream, Talent):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, remote_stream: 'ConnectionG3', server_side: bool,
id_str=b'') -> None:
AsyncStream.__init__(self, reader, writer, addr)
Talent.__init__(self, server_side, id_str)
self._ifc = AsyncIfc()
AsyncStream.__init__(self, reader, writer, addr, self._ifc)
Talent.__init__(self, server_side, self._ifc, id_str)
self.remote_stream: 'ConnectionG3' = remote_stream

View File

@@ -5,6 +5,7 @@ from datetime import datetime
from tzlocal import get_localzone
if __name__ == "app.src.gen3.talent":
from app.src.async_ifc import AsyncIfc
from app.src.messages import hex_dump_memory, Message, State
from app.src.modbus import Modbus
from app.src.my_timer import Timer
@@ -12,6 +13,7 @@ if __name__ == "app.src.gen3.talent":
from app.src.gen3.infos_g3 import InfosG3
from app.src.infos import Register
else: # pragma: no cover
from async_ifc import AsyncIfc
from messages import hex_dump_memory, Message, State
from modbus import Modbus
from my_timer import Timer
@@ -44,8 +46,10 @@ class Talent(Message):
MB_REGULAR_TIMEOUT = 60
TXT_UNKNOWN_CTRL = 'Unknown Ctrl'
def __init__(self, server_side: bool, id_str=b''):
def __init__(self, server_side: bool, ifc: "AsyncIfc", id_str=b''):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=15)
ifc.read.reg_trigger(self.read)
self.ifc = ifc
self.await_conn_resp_cnt = 0
self.id_str = id_str
self.contact_name = b''
@@ -103,6 +107,7 @@ class Talent(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.read.reg_trigger(None)
super().close()
def __set_serial_no(self, serial_no: str):
@@ -138,10 +143,10 @@ class Talent(Message):
self._read()
while True:
if not self.header_valid:
self.__parse_header(self._recv_buffer, len(self._recv_buffer))
self.__parse_header(self.ifc.read.peek(), len(self.ifc.read))
if self.header_valid and \
len(self._recv_buffer) >= (self.header_len + self.data_len):
len(self.ifc.read) >= (self.header_len + self.data_len):
if self.state == State.init:
self.state = State.received # received 1st package
@@ -149,11 +154,10 @@ class Talent(Message):
if callable(log_lvl):
log_lvl = log_lvl()
hex_dump_memory(log_lvl, f'Received from {self.addr}:'
f' BufLen: {len(self._recv_buffer)}'
f' HdrLen: {self.header_len}'
f' DtaLen: {self.data_len}',
self._recv_buffer, len(self._recv_buffer))
self.ifc.read.logging(log_lvl, f'Received from {self.addr}:'
f' BufLen: {len(self.ifc.read)}'
f' HdrLen: {self.header_len}'
f' DtaLen: {self.data_len}')
self.__set_serial_no(self.id_str.decode("utf-8"))
self.__dispatch_msg()
@@ -165,9 +169,9 @@ class Talent(Message):
'''add the actual receive msg to the forwarding queue'''
tsun = Config.get('tsun')
if tsun['enabled']:
buffer = self._recv_buffer
buflen = self.header_len+self.data_len
self._forward_buffer += buffer[:buflen]
buffer = self.ifc.read.peek(buflen)
self._forward_buffer += buffer
hex_dump_memory(logging.DEBUG, 'Store for forwarding:',
buffer, buflen)
@@ -178,21 +182,20 @@ class Talent(Message):
def forward_snd(self) -> None:
'''add the build send msg to the forwarding queue'''
tsun = Config.get('tsun')
rest = self.ifc.write.get(self.send_msg_ofs)
buffer = self.ifc.write.get(len(self.ifc.write))
if tsun['enabled']:
_len = len(self._send_buffer) - self.send_msg_ofs
struct.pack_into('!l', self._send_buffer, self.send_msg_ofs,
_len-4)
buffer = self._send_buffer[self.send_msg_ofs:]
_len = len(buffer)
struct.pack_into('!l', buffer, 0, _len-4)
buflen = _len
self._forward_buffer += buffer[:buflen]
self._forward_buffer += buffer
hex_dump_memory(logging.INFO, 'Store for forwarding:',
buffer, buflen)
fnc = self.switch.get(self.msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'forwrd') +
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
self._send_buffer = self._send_buffer[:self.send_msg_ofs]
self.ifc.write += rest
def send_modbus_cb(self, modbus_pdu: bytearray, log_lvl: int, state: str):
if self.state != State.up:
@@ -201,15 +204,13 @@ class Talent(Message):
return
self.__build_header(0x70, 0x77)
self._send_buffer += b'\x00\x01\xa3\x28' # magic ?
self._send_buffer += struct.pack('!B', len(modbus_pdu))
self._send_buffer += modbus_pdu
self.ifc.write += b'\x00\x01\xa3\x28' # magic ?
self.ifc.write += struct.pack('!B', len(modbus_pdu))
self.ifc.write += modbus_pdu
self.__finish_send_msg()
hex_dump_memory(log_lvl, f'Send Modbus {state}:{self.addr}:',
self._send_buffer, len(self._send_buffer))
self.writer.write(self._send_buffer)
self._send_buffer = bytearray(0) # self._send_buffer[sent:]
self.ifc.write.logging(log_lvl, f'Send Modbus {state}:{self.addr}:')
self.ifc.write()
def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
if self.state != State.up:
@@ -237,9 +238,9 @@ class Talent(Message):
self.msg_id = 0
self.await_conn_resp_cnt += 1
self.__build_header(0x91)
self._send_buffer += struct.pack(f'!{len(contact_name)+1}p'
f'{len(contact_mail)+1}p',
contact_name, contact_mail)
self.ifc.write += struct.pack(f'!{len(contact_name)+1}p'
f'{len(contact_mail)+1}p',
contact_name, contact_mail)
self.__finish_send_msg()
return True
@@ -323,7 +324,7 @@ class Talent(Message):
self.inc_counter('Invalid_Msg_Format')
# erase broken recv buffer
self._recv_buffer = bytearray()
self.ifc.read.clear()
return
hdr_len = 5+id_len+2
@@ -344,16 +345,17 @@ class Talent(Message):
def __build_header(self, ctrl, msg_id=None) -> None:
if not msg_id:
msg_id = self.msg_id
self.send_msg_ofs = len(self._send_buffer)
self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB',
0, self.id_str, ctrl, msg_id)
self.send_msg_ofs = len(self.ifc.write)
self.ifc.write += struct.pack(f'!l{len(self.id_str)+1}pBB',
0, self.id_str, ctrl, msg_id)
fnc = self.switch.get(msg_id, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'tx') +
f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}')
def __finish_send_msg(self) -> None:
_len = len(self._send_buffer) - self.send_msg_ofs
struct.pack_into('!l', self._send_buffer, self.send_msg_ofs, _len-4)
_len = len(self.ifc.write) - self.send_msg_ofs
struct.pack_into('!l', self.ifc.write.peek(), self.send_msg_ofs,
_len-4)
def __dispatch_msg(self) -> None:
fnc = self.switch.get(self.msg_id, self.msg_unknown)
@@ -367,7 +369,7 @@ class Talent(Message):
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
def __flush_recv_msg(self) -> None:
self._recv_buffer = self._recv_buffer[(self.header_len+self.data_len):]
self.ifc.read.get(self.header_len+self.data_len)
self.header_valid = False
'''
@@ -377,7 +379,7 @@ class Talent(Message):
if self.ctrl.is_ind():
if self.server_side and self.__process_contact_info():
self.__build_header(0x91)
self._send_buffer += b'\x01'
self.ifc.write += b'\x01'
self.__finish_send_msg()
# don't forward this contact info here, we will build one
# when the remote connection is established
@@ -391,18 +393,19 @@ class Talent(Message):
self.forward()
def __process_contact_info(self) -> bool:
result = struct.unpack_from('!B', self._recv_buffer, self.header_len)
buf = self.ifc.read.peek()
result = struct.unpack_from('!B', buf, self.header_len)
name_len = result[0]
if self.data_len == 1: # this is a response withone status byte
return False
if self.data_len >= name_len+2:
result = struct.unpack_from(f'!{name_len+1}pB', self._recv_buffer,
result = struct.unpack_from(f'!{name_len+1}pB', buf,
self.header_len)
self.contact_name = result[0]
mail_len = result[1]
logger.info(f'name: {self.contact_name}')
result = struct.unpack_from(f'!{mail_len+1}p', self._recv_buffer,
result = struct.unpack_from(f'!{mail_len+1}p', buf,
self.header_len+name_len+1)
self.contact_mail = result[0]
logger.info(f'mail: {self.contact_mail}')
@@ -417,12 +420,12 @@ class Talent(Message):
ts = self._timestamp()
logger.debug(f'time: {ts:08x}')
self.__build_header(0x91)
self._send_buffer += struct.pack('!q', ts)
self.ifc.write += struct.pack('!q', ts)
self.__finish_send_msg()
elif self.data_len >= 8:
ts = self._timestamp()
result = struct.unpack_from('!q', self._recv_buffer,
result = struct.unpack_from('!q', self.ifc.read.peek(),
self.header_len)
self.ts_offset = result[0]-ts
if self.remote_stream:
@@ -446,10 +449,10 @@ class Talent(Message):
self.db.set_db_def_value(Register.POLLING_INTERVAL,
self.mb_timeout)
self.__build_header(0x99)
self._send_buffer += b'\x02'
self.ifc.write += b'\x02'
self.__finish_send_msg()
result = struct.unpack_from('!Bq', self._recv_buffer,
result = struct.unpack_from('!Bq', self.ifc.read.peek(),
self.header_len)
resp_code = result[0]
ts = result[1]+self.ts_offset
@@ -457,11 +460,11 @@ class Talent(Message):
f' tsun-time: {ts:08x}'
f' offset: {self.ts_offset}')
self.__build_header(0x91)
self._send_buffer += struct.pack('!Bq', resp_code, ts)
self.ifc.write += struct.pack('!Bq', resp_code, ts)
self.forward_snd()
return
elif self.ctrl.is_resp():
result = struct.unpack_from('!B', self._recv_buffer,
result = struct.unpack_from('!B', self.ifc.read.peek(),
self.header_len)
resp_code = result[0]
logging.debug(f'TimeActRespCode: {resp_code}')
@@ -473,7 +476,8 @@ class Talent(Message):
self.forward()
def parse_msg_header(self):
result = struct.unpack_from('!lB', self._recv_buffer, self.header_len)
result = struct.unpack_from('!lB', self.ifc.read.peek(),
self.header_len)
data_id = result[0] # len of complete message
id_len = result[1] # len of variable id string
@@ -481,7 +485,7 @@ class Talent(Message):
msg_hdr_len = 5+id_len+9
result = struct.unpack_from(f'!{id_len+1}pBq', self._recv_buffer,
result = struct.unpack_from(f'!{id_len+1}pBq', self.ifc.read.peek(),
self.header_len + 4)
timestamp = result[2]
@@ -494,7 +498,7 @@ class Talent(Message):
def msg_collector_data(self):
if self.ctrl.is_ind():
self.__build_header(0x99)
self._send_buffer += b'\x01'
self.ifc.write += b'\x01'
self.__finish_send_msg()
self.__process_data()
@@ -509,7 +513,7 @@ class Talent(Message):
def msg_inverter_data(self):
if self.ctrl.is_ind():
self.__build_header(0x99)
self._send_buffer += b'\x01'
self.ifc.write += b'\x01'
self.__finish_send_msg()
self.__process_data()
self.state = State.up # allow MODBUS cmds
@@ -529,7 +533,7 @@ class Talent(Message):
def __process_data(self):
msg_hdr_len, ts = self.parse_msg_header()
for key, update in self.db.parse(self._recv_buffer, self.header_len
for key, update in self.db.parse(self.ifc.read.peek(), self.header_len
+ msg_hdr_len, self.node_id):
if update:
self._set_mqtt_timestamp(key, self._utcfromts(ts))
@@ -549,7 +553,7 @@ class Talent(Message):
msg_hdr_len = 5
result = struct.unpack_from('!lBB', self._recv_buffer,
result = struct.unpack_from('!lBB', self.ifc.read.peek(),
self.header_len)
modbus_len = result[1]
return msg_hdr_len, modbus_len
@@ -558,7 +562,7 @@ class Talent(Message):
msg_hdr_len = 6
result = struct.unpack_from('!lBBB', self._recv_buffer,
result = struct.unpack_from('!lBBB', self.ifc.read.peek(),
self.header_len)
modbus_len = result[2]
return msg_hdr_len, modbus_len
@@ -579,8 +583,8 @@ class Talent(Message):
self.__msg_modbus(hdr_len)
def __msg_modbus(self, hdr_len):
data = self._recv_buffer[self.header_len:
self.header_len+self.data_len]
data = self.ifc.read.peek()[self.header_len:
self.header_len+self.data_len]
if self.ctrl.is_req():
if self.remote_stream.mb.recv_req(data[hdr_len:],

View File

@@ -2,9 +2,11 @@ import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3plus.connection_g3p":
from app.src.async_ifc import AsyncIfc
from app.src.async_stream import AsyncStream
from app.src.gen3plus.solarman_v5 import SolarmanV5
else: # pragma: no cover
from async_ifc import AsyncIfc
from async_stream import AsyncStream
from gen3plus.solarman_v5 import SolarmanV5
@@ -17,8 +19,9 @@ class ConnectionG3P(AsyncStream, SolarmanV5):
addr, remote_stream: 'ConnectionG3P',
server_side: bool,
client_mode: bool) -> None:
AsyncStream.__init__(self, reader, writer, addr)
SolarmanV5.__init__(self, server_side, client_mode)
self._ifc = AsyncIfc()
AsyncStream.__init__(self, reader, writer, addr, self._ifc)
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)
self.remote_stream: 'ConnectionG3P' = remote_stream

View File

@@ -5,6 +5,7 @@ import asyncio
from datetime import datetime
if __name__ == "app.src.gen3plus.solarman_v5":
from app.src.async_ifc import AsyncIfc
from app.src.messages import hex_dump_memory, Message, State
from app.src.modbus import Modbus
from app.src.my_timer import Timer
@@ -12,6 +13,7 @@ if __name__ == "app.src.gen3plus.solarman_v5":
from app.src.gen3plus.infos_g3p import InfosG3P
from app.src.infos import Register
else: # pragma: no cover
from async_ifc import AsyncIfc
from messages import hex_dump_memory, Message, State
from config import Config
from modbus import Modbus
@@ -60,9 +62,10 @@ class SolarmanV5(Message):
HDR_FMT = '<BLLL'
'''format string for packing of the header'''
def __init__(self, server_side: bool, client_mode: bool):
def __init__(self, server_side: bool, client_mode: bool, ifc: "AsyncIfc"):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=8)
ifc.read.reg_trigger(self.read)
self.ifc = ifc
self.header_len = 11 # overwrite construcor in class Message
self.control = 0
self.seq = Sequence(server_side)
@@ -160,6 +163,7 @@ class SolarmanV5(Message):
self.log_lvl.clear()
self.state = State.closed
self.mb_timer.close()
self.ifc.read.reg_trigger(None)
super().close()
async def send_start_cmd(self, snr: int, host: str,
@@ -230,9 +234,10 @@ class SolarmanV5(Message):
self._read()
while True:
if not self.header_valid:
self.__parse_header(self._recv_buffer, len(self._recv_buffer))
self.__parse_header(self.ifc.read.peek(),
len(self.ifc.read))
if self.header_valid and len(self._recv_buffer) >= \
if self.header_valid and len(self.ifc.read) >= \
(self.header_len + self.data_len+2):
self.__process_complete_received_msg()
self.__flush_recv_msg()
@@ -243,10 +248,10 @@ class SolarmanV5(Message):
log_lvl = self.log_lvl.get(self.control, logging.WARNING)
if callable(log_lvl):
log_lvl = log_lvl()
hex_dump_memory(log_lvl, f'Received from {self.addr}:',
self._recv_buffer, self.header_len +
self.data_len+2)
if self.__trailer_is_ok(self._recv_buffer, self.header_len
self.ifc.read.logging(log_lvl, f'Received from {self.addr}:')
# self._recv_buffer, self.header_len +
# self.data_len+2)
if self.__trailer_is_ok(self.ifc.read.peek(), self.header_len
+ self.data_len + 2):
if self.state == State.init:
self.state = State.received
@@ -317,7 +322,7 @@ class SolarmanV5(Message):
self.inc_counter('Invalid_Msg_Format')
# erase broken recv buffer
self._recv_buffer = bytearray()
self.ifc.read.clear()
return
self.header_valid = True
@@ -329,11 +334,11 @@ class SolarmanV5(Message):
'Drop packet w invalid stop byte from '
f'{self.addr}:', buf, buf_len)
self.inc_counter('Invalid_Msg_Format')
if len(self._recv_buffer) > (self.data_len+13):
if len(self.ifc.read) > (self.data_len+13):
next_start = buf[self.data_len+13]
if next_start != 0xa5:
# erase broken recv buffer
self._recv_buffer = bytearray()
self.ifc.read.clear()
return False
@@ -349,9 +354,9 @@ class SolarmanV5(Message):
def __build_header(self, ctrl) -> None:
'''build header for new transmit message'''
self.send_msg_ofs = len(self._send_buffer)
self.send_msg_ofs = len(self.ifc.write)
self._send_buffer += struct.pack(
self.ifc.write += struct.pack(
'<BHHHL', 0xA5, 0, ctrl, self.seq.get_send(), self.snr)
fnc = self.switch.get(ctrl, self.msg_unknown)
logger.info(self.__flow_str(self.server_side, 'tx') +
@@ -359,11 +364,12 @@ class SolarmanV5(Message):
def __finish_send_msg(self) -> None:
'''finish the transmit message, set lenght and checksum'''
_len = len(self._send_buffer) - self.send_msg_ofs
struct.pack_into('<H', self._send_buffer, self.send_msg_ofs+1, _len-11)
check = sum(self._send_buffer[self.send_msg_ofs+1:self.send_msg_ofs +
_len]) & 0xff
self._send_buffer += struct.pack('<BB', check, 0x15) # crc & stop
_len = len(self.ifc.write) - self.send_msg_ofs
struct.pack_into('<H', self.ifc.write.peek(), self.send_msg_ofs+1,
_len-11)
check = sum(self.ifc.write.peek()[
self.send_msg_ofs+1:self.send_msg_ofs + _len]) & 0xff
self.ifc.write += struct.pack('<BB', check, 0x15) # crc & stop
def _update_header(self, _forward_buffer):
'''update header for message before forwarding,
@@ -394,15 +400,14 @@ class SolarmanV5(Message):
f' Msg: {fnc.__name__!r}')
def __flush_recv_msg(self) -> None:
self._recv_buffer = self._recv_buffer[(self.header_len +
self.data_len+2):]
self.ifc.read.get(self.header_len + self.data_len+2)
self.header_valid = False
def __send_ack_rsp(self, msgtype, ftype, ack=1):
self.__build_header(msgtype)
self._send_buffer += struct.pack('<BBLL', ftype, ack,
self._timestamp(),
self._heartbeat())
self.ifc.write += struct.pack('<BBLL', ftype, ack,
self._timestamp(),
self._heartbeat())
self.__finish_send_msg()
def send_modbus_cb(self, pdu: bytearray, log_lvl: int, state: str):
@@ -411,14 +416,12 @@ class SolarmanV5(Message):
' cause the state is not UP anymore')
return
self.__build_header(0x4510)
self._send_buffer += struct.pack('<BHLLL', self.MB_RTU_CMD,
self.sensor_list, 0, 0, 0)
self._send_buffer += pdu
self.ifc.write += struct.pack('<BHLLL', self.MB_RTU_CMD,
self.sensor_list, 0, 0, 0)
self.ifc.write += pdu
self.__finish_send_msg()
hex_dump_memory(log_lvl, f'Send Modbus {state}:{self.addr}:',
self._send_buffer, len(self._send_buffer))
self.writer.write(self._send_buffer)
self._send_buffer = bytearray(0) # self._send_buffer[sent:]
self.ifc.write.logging(log_lvl, f'Send Modbus {state}:{self.addr}:')
self.ifc.write()
def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
if self.state != State.up:
@@ -460,17 +463,17 @@ class SolarmanV5(Message):
self.forward_at_cmd_resp = False
self.__build_header(0x4510)
self._send_buffer += struct.pack(f'<BHLLL{len(at_cmd)}sc', self.AT_CMD,
0x0002, 0, 0, 0,
at_cmd.encode('utf-8'), b'\r')
self.ifc.write += struct.pack(f'<BHLLL{len(at_cmd)}sc', self.AT_CMD,
0x0002, 0, 0, 0,
at_cmd.encode('utf-8'), b'\r')
self.__finish_send_msg()
try:
await self.async_write('Send AT Command:')
except Exception:
self._send_buffer = bytearray(0)
self.ifc.write.clear()
def __forward_msg(self):
self.forward(self._recv_buffer, self.header_len+self.data_len+2)
self.forward(self.ifc.read.peek(), self.header_len+self.data_len+2)
def __build_model_name(self):
db = self.db
@@ -491,7 +494,7 @@ class SolarmanV5(Message):
def __process_data(self, ftype, ts):
inv_update = False
msg_type = self.control >> 8
for key, update in self.db.parse(self._recv_buffer, msg_type, ftype,
for key, update in self.db.parse(self.ifc.read.peek(), msg_type, ftype,
self.node_id):
if update:
if key == 'inverter':
@@ -510,7 +513,7 @@ class SolarmanV5(Message):
self.__forward_msg()
def msg_dev_ind(self):
data = self._recv_buffer[self.header_len:]
data = self.ifc.read.peek()[self.header_len:]
result = struct.unpack_from(self.HDR_FMT, data, 0)
ftype = result[0] # always 2
total = result[1]
@@ -531,7 +534,7 @@ class SolarmanV5(Message):
self.__send_ack_rsp(0x1110, ftype)
def msg_data_ind(self):
data = self._recv_buffer
data = self.ifc.read.peek()
result = struct.unpack_from('<BHLLLHL', data, self.header_len)
ftype = result[0] # 1 or 0x81
sensor = result[1]
@@ -559,7 +562,7 @@ class SolarmanV5(Message):
self.new_state_up()
def msg_sync_start(self):
data = self._recv_buffer[self.header_len:]
data = self.ifc.read.peek()[self.header_len:]
result = struct.unpack_from(self.HDR_FMT, data, 0)
ftype = result[0]
total = result[1]
@@ -572,8 +575,8 @@ class SolarmanV5(Message):
self.__send_ack_rsp(0x1310, ftype)
def msg_command_req(self):
data = self._recv_buffer[self.header_len:
self.header_len+self.data_len]
data = self.ifc.read.peek()[self.header_len:
self.header_len+self.data_len]
result = struct.unpack_from('<B', data, 0)
ftype = result[0]
if ftype == self.AT_CMD:
@@ -601,7 +604,7 @@ class SolarmanV5(Message):
self.mqtt.publish(key, data))
def get_cmd_rsp_log_lvl(self) -> int:
ftype = self._recv_buffer[self.header_len]
ftype = self.ifc.read.peek()[self.header_len]
if ftype == self.AT_CMD:
if self.forward_at_cmd_resp:
return logging.INFO
@@ -613,8 +616,8 @@ class SolarmanV5(Message):
return logging.WARNING
def msg_command_rsp(self):
data = self._recv_buffer[self.header_len:
self.header_len+self.data_len]
data = self.ifc.read.peek()[self.header_len:
self.header_len+self.data_len]
ftype = data[0]
if ftype == self.AT_CMD:
if not self.forward_at_cmd_resp:
@@ -650,7 +653,7 @@ class SolarmanV5(Message):
self.__build_model_name()
def msg_hbeat_ind(self):
data = self._recv_buffer[self.header_len:]
data = self.ifc.read.peek()[self.header_len:]
result = struct.unpack_from('<B', data, 0)
ftype = result[0]
@@ -659,7 +662,7 @@ class SolarmanV5(Message):
self.new_state_up()
def msg_sync_end(self):
data = self._recv_buffer[self.header_len:]
data = self.ifc.read.peek()[self.header_len:]
result = struct.unpack_from(self.HDR_FMT, data, 0)
ftype = result[0]
total = result[1]
@@ -672,7 +675,7 @@ class SolarmanV5(Message):
self.__send_ack_rsp(0x1810, ftype)
def msg_response(self):
data = self._recv_buffer[self.header_len:]
data = self.ifc.read.peek()[self.header_len:]
result = struct.unpack_from('<BBLL', data, 0)
ftype = result[0] # always 2
valid = result[1] == 1 # status

View File

@@ -33,13 +33,9 @@ def __asc_val(n, data, data_len):
return line
def hex_dump_memory(level, info, data, data_len):
def hex_dump(data, data_len) -> list:
n = 0
lines = []
lines.append(info)
tracer = logging.getLogger('tracer')
if not tracer.isEnabledFor(level):
return
for i in range(0, data_len, 16):
line = ' '
@@ -50,6 +46,23 @@ def hex_dump_memory(level, info, data, data_len):
line += __asc_val(n, data, data_len)
lines.append(line)
return lines
def hex_dump_str(data, data_len):
lines = hex_dump(data, data_len)
return '\n'.join(lines)
def hex_dump_memory(level, info, data, data_len):
lines = []
lines.append(info)
tracer = logging.getLogger('tracer')
if not tracer.isEnabledFor(level):
return
lines += hex_dump(data, data_len)
tracer.log(level, '\n'.join(lines))
@@ -94,8 +107,6 @@ class Message(metaclass=IterRegistry):
self.unique_id = 0
self.node_id = '' # will be overwritten in the child class's __init__
self.sug_area = ''
self._recv_buffer = bytearray(0)
self._send_buffer = bytearray(0)
self._forward_buffer = bytearray(0)
self.new_data = {}
self.state = State.init