From b7c63b5cf883dc181128f8928aea595855c8d00e Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Sun, 22 Sep 2024 10:40:30 +0200 Subject: [PATCH] use AsyncIfc class with FIFO --- app/src/async_stream.py | 50 +++--- app/src/gen3/connection_g3.py | 7 +- app/src/gen3/talent.py | 110 ++++++------- app/src/gen3plus/connection_g3p.py | 7 +- app/src/gen3plus/solarman_v5.py | 97 ++++++------ app/src/messages.py | 25 ++- app/tests/test_connection_g3.py | 4 +- app/tests/test_connection_g3p.py | 4 +- app/tests/test_modbus_tcp.py | 18 +-- app/tests/test_mqtt.py | 10 +- app/tests/test_solarman.py | 240 ++++++++++++++--------------- app/tests/test_talent.py | 167 ++++++++++---------- 12 files changed, 380 insertions(+), 359 deletions(-) diff --git a/app/src/async_stream.py b/app/src/async_stream.py index ae7e584..c30cd7c 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -7,8 +7,10 @@ from typing import Self from itertools import count if __name__ == "app.src.async_stream": + from app.src.async_ifc import AsyncIfc from app.src.messages import hex_dump_memory, State else: # pragma: no cover + from async_ifc import AsyncIfc from messages import hex_dump_memory, State @@ -28,10 +30,12 @@ class AsyncStream(): '''maximum default time without a received msg in sec''' def __init__(self, reader: StreamReader, writer: StreamWriter, - addr) -> None: + addr, ifc: "AsyncIfc") -> None: logger.debug('AsyncStream.__init__') - self.reader = reader - self.writer = writer + ifc.write.reg_trigger(self.__write_cb) + self.ifc = ifc + self._reader = reader + self._writer = writer self.addr = addr self.r_addr = '' self.l_addr = '' @@ -39,6 +43,9 @@ class AsyncStream(): self.proc_start = None # start processing start timestamp self.proc_max = 0 + def __write_cb(self): + self._writer.write(self.ifc.write.get()) + def __timeout(self) -> int: if self.state == State.init or self.state == State.received: to = self.MAX_START_TIME @@ -101,8 +108,8 @@ class AsyncStream(): async def loop(self) -> Self: """Async loop handler for precessing all received messages""" - self.r_addr = self.writer.get_extra_info('peername') - self.l_addr = self.writer.get_extra_info('sockname') + self.r_addr = self._writer.get_extra_info('peername') + self.l_addr = self._writer.get_extra_info('sockname') self.proc_start = time.time() while True: try: @@ -151,31 +158,30 @@ class AsyncStream(): async def async_write(self, headline: str = 'Transmit to ') -> None: """Async write handler to transmit the send_buffer""" - if self._send_buffer: - hex_dump_memory(logging.INFO, f'{headline}{self.addr}:', - self._send_buffer, len(self._send_buffer)) - self.writer.write(self._send_buffer) - await self.writer.drain() - self._send_buffer = bytearray(0) # self._send_buffer[sent:] + if len(self.ifc.write) > 0: + self.ifc.write.logging(logging.INFO, f'{headline}{self.addr}:') + self._writer.write(self.ifc.write.get()) + await self._writer.drain() async def disc(self) -> None: """Async disc handler for graceful disconnect""" - if self.writer.is_closing(): + if self._writer.is_closing(): return logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}') - self.writer.close() - await self.writer.wait_closed() + self._writer.close() + await self._writer.wait_closed() def close(self) -> None: """close handler for a no waiting disconnect hint: must be called before releasing the connection instance """ - self.reader.feed_eof() # abort awaited read - if self.writer.is_closing(): + self._reader.feed_eof() # abort awaited read + if self._writer.is_closing(): return logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}') - self.writer.close() + self.ifc.write.reg_trigger(None) + self._writer.close() def healthy(self) -> bool: elapsed = 0 @@ -194,11 +200,11 @@ class AsyncStream(): ''' async def __async_read(self) -> None: """Async read handler to read received data from TCP stream""" - data = await self.reader.read(4096) + data = await self._reader.read(4096) if data: self.proc_start = time.time() - self._recv_buffer += data - wait = self.read() # call read in parent class + self.ifc.read += data + wait = self.ifc.read() # call read in parent class if wait > 0: await asyncio.sleep(wait) else: @@ -221,8 +227,8 @@ class AsyncStream(): f'Forward to {self.remote_stream.addr}:', self._forward_buffer, len(self._forward_buffer)) - self.remote_stream.writer.write(self._forward_buffer) - await self.remote_stream.writer.drain() + self.remote_stream._writer.write(self._forward_buffer) + await self.remote_stream._writer.drain() self._forward_buffer = bytearray(0) except OSError as error: diff --git a/app/src/gen3/connection_g3.py b/app/src/gen3/connection_g3.py index b7e246b..c1f9914 100644 --- a/app/src/gen3/connection_g3.py +++ b/app/src/gen3/connection_g3.py @@ -2,9 +2,11 @@ import logging from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3.connection_g3": + from app.src.async_ifc import AsyncIfc from app.src.async_stream import AsyncStream from app.src.gen3.talent import Talent else: # pragma: no cover + from async_ifc import AsyncIfc from async_stream import AsyncStream from gen3.talent import Talent @@ -16,8 +18,9 @@ class ConnectionG3(AsyncStream, Talent): def __init__(self, reader: StreamReader, writer: StreamWriter, addr, remote_stream: 'ConnectionG3', server_side: bool, id_str=b'') -> None: - AsyncStream.__init__(self, reader, writer, addr) - Talent.__init__(self, server_side, id_str) + self._ifc = AsyncIfc() + AsyncStream.__init__(self, reader, writer, addr, self._ifc) + Talent.__init__(self, server_side, self._ifc, id_str) self.remote_stream: 'ConnectionG3' = remote_stream diff --git a/app/src/gen3/talent.py b/app/src/gen3/talent.py index bb86357..9c900fd 100644 --- a/app/src/gen3/talent.py +++ b/app/src/gen3/talent.py @@ -5,6 +5,7 @@ from datetime import datetime from tzlocal import get_localzone if __name__ == "app.src.gen3.talent": + from app.src.async_ifc import AsyncIfc from app.src.messages import hex_dump_memory, Message, State from app.src.modbus import Modbus from app.src.my_timer import Timer @@ -12,6 +13,7 @@ if __name__ == "app.src.gen3.talent": from app.src.gen3.infos_g3 import InfosG3 from app.src.infos import Register else: # pragma: no cover + from async_ifc import AsyncIfc from messages import hex_dump_memory, Message, State from modbus import Modbus from my_timer import Timer @@ -44,8 +46,10 @@ class Talent(Message): MB_REGULAR_TIMEOUT = 60 TXT_UNKNOWN_CTRL = 'Unknown Ctrl' - def __init__(self, server_side: bool, id_str=b''): + def __init__(self, server_side: bool, ifc: "AsyncIfc", id_str=b''): super().__init__(server_side, self.send_modbus_cb, mb_timeout=15) + ifc.read.reg_trigger(self.read) + self.ifc = ifc self.await_conn_resp_cnt = 0 self.id_str = id_str self.contact_name = b'' @@ -103,6 +107,7 @@ class Talent(Message): self.log_lvl.clear() self.state = State.closed self.mb_timer.close() + self.ifc.read.reg_trigger(None) super().close() def __set_serial_no(self, serial_no: str): @@ -138,10 +143,10 @@ class Talent(Message): self._read() while True: if not self.header_valid: - self.__parse_header(self._recv_buffer, len(self._recv_buffer)) + self.__parse_header(self.ifc.read.peek(), len(self.ifc.read)) if self.header_valid and \ - len(self._recv_buffer) >= (self.header_len + self.data_len): + len(self.ifc.read) >= (self.header_len + self.data_len): if self.state == State.init: self.state = State.received # received 1st package @@ -149,11 +154,10 @@ class Talent(Message): if callable(log_lvl): log_lvl = log_lvl() - hex_dump_memory(log_lvl, f'Received from {self.addr}:' - f' BufLen: {len(self._recv_buffer)}' - f' HdrLen: {self.header_len}' - f' DtaLen: {self.data_len}', - self._recv_buffer, len(self._recv_buffer)) + self.ifc.read.logging(log_lvl, f'Received from {self.addr}:' + f' BufLen: {len(self.ifc.read)}' + f' HdrLen: {self.header_len}' + f' DtaLen: {self.data_len}') self.__set_serial_no(self.id_str.decode("utf-8")) self.__dispatch_msg() @@ -165,9 +169,9 @@ class Talent(Message): '''add the actual receive msg to the forwarding queue''' tsun = Config.get('tsun') if tsun['enabled']: - buffer = self._recv_buffer buflen = self.header_len+self.data_len - self._forward_buffer += buffer[:buflen] + buffer = self.ifc.read.peek(buflen) + self._forward_buffer += buffer hex_dump_memory(logging.DEBUG, 'Store for forwarding:', buffer, buflen) @@ -178,21 +182,20 @@ class Talent(Message): def forward_snd(self) -> None: '''add the build send msg to the forwarding queue''' tsun = Config.get('tsun') + rest = self.ifc.write.get(self.send_msg_ofs) + buffer = self.ifc.write.get(len(self.ifc.write)) if tsun['enabled']: - _len = len(self._send_buffer) - self.send_msg_ofs - struct.pack_into('!l', self._send_buffer, self.send_msg_ofs, - _len-4) - - buffer = self._send_buffer[self.send_msg_ofs:] + _len = len(buffer) + struct.pack_into('!l', buffer, 0, _len-4) buflen = _len - self._forward_buffer += buffer[:buflen] + self._forward_buffer += buffer hex_dump_memory(logging.INFO, 'Store for forwarding:', buffer, buflen) fnc = self.switch.get(self.msg_id, self.msg_unknown) logger.info(self.__flow_str(self.server_side, 'forwrd') + f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}') - self._send_buffer = self._send_buffer[:self.send_msg_ofs] + self.ifc.write += rest def send_modbus_cb(self, modbus_pdu: bytearray, log_lvl: int, state: str): if self.state != State.up: @@ -201,15 +204,13 @@ class Talent(Message): return self.__build_header(0x70, 0x77) - self._send_buffer += b'\x00\x01\xa3\x28' # magic ? - self._send_buffer += struct.pack('!B', len(modbus_pdu)) - self._send_buffer += modbus_pdu + self.ifc.write += b'\x00\x01\xa3\x28' # magic ? + self.ifc.write += struct.pack('!B', len(modbus_pdu)) + self.ifc.write += modbus_pdu self.__finish_send_msg() - hex_dump_memory(log_lvl, f'Send Modbus {state}:{self.addr}:', - self._send_buffer, len(self._send_buffer)) - self.writer.write(self._send_buffer) - self._send_buffer = bytearray(0) # self._send_buffer[sent:] + self.ifc.write.logging(log_lvl, f'Send Modbus {state}:{self.addr}:') + self.ifc.write() def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None: if self.state != State.up: @@ -237,9 +238,9 @@ class Talent(Message): self.msg_id = 0 self.await_conn_resp_cnt += 1 self.__build_header(0x91) - self._send_buffer += struct.pack(f'!{len(contact_name)+1}p' - f'{len(contact_mail)+1}p', - contact_name, contact_mail) + self.ifc.write += struct.pack(f'!{len(contact_name)+1}p' + f'{len(contact_mail)+1}p', + contact_name, contact_mail) self.__finish_send_msg() return True @@ -323,7 +324,7 @@ class Talent(Message): self.inc_counter('Invalid_Msg_Format') # erase broken recv buffer - self._recv_buffer = bytearray() + self.ifc.read.clear() return hdr_len = 5+id_len+2 @@ -344,16 +345,17 @@ class Talent(Message): def __build_header(self, ctrl, msg_id=None) -> None: if not msg_id: msg_id = self.msg_id - self.send_msg_ofs = len(self._send_buffer) - self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB', - 0, self.id_str, ctrl, msg_id) + self.send_msg_ofs = len(self.ifc.write) + self.ifc.write += struct.pack(f'!l{len(self.id_str)+1}pBB', + 0, self.id_str, ctrl, msg_id) fnc = self.switch.get(msg_id, self.msg_unknown) logger.info(self.__flow_str(self.server_side, 'tx') + f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}') def __finish_send_msg(self) -> None: - _len = len(self._send_buffer) - self.send_msg_ofs - struct.pack_into('!l', self._send_buffer, self.send_msg_ofs, _len-4) + _len = len(self.ifc.write) - self.send_msg_ofs + struct.pack_into('!l', self.ifc.write.peek(), self.send_msg_ofs, + _len-4) def __dispatch_msg(self) -> None: fnc = self.switch.get(self.msg_id, self.msg_unknown) @@ -367,7 +369,7 @@ class Talent(Message): f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}') def __flush_recv_msg(self) -> None: - self._recv_buffer = self._recv_buffer[(self.header_len+self.data_len):] + self.ifc.read.get(self.header_len+self.data_len) self.header_valid = False ''' @@ -377,7 +379,7 @@ class Talent(Message): if self.ctrl.is_ind(): if self.server_side and self.__process_contact_info(): self.__build_header(0x91) - self._send_buffer += b'\x01' + self.ifc.write += b'\x01' self.__finish_send_msg() # don't forward this contact info here, we will build one # when the remote connection is established @@ -391,18 +393,19 @@ class Talent(Message): self.forward() def __process_contact_info(self) -> bool: - result = struct.unpack_from('!B', self._recv_buffer, self.header_len) + buf = self.ifc.read.peek() + result = struct.unpack_from('!B', buf, self.header_len) name_len = result[0] if self.data_len == 1: # this is a response withone status byte return False if self.data_len >= name_len+2: - result = struct.unpack_from(f'!{name_len+1}pB', self._recv_buffer, + result = struct.unpack_from(f'!{name_len+1}pB', buf, self.header_len) self.contact_name = result[0] mail_len = result[1] logger.info(f'name: {self.contact_name}') - result = struct.unpack_from(f'!{mail_len+1}p', self._recv_buffer, + result = struct.unpack_from(f'!{mail_len+1}p', buf, self.header_len+name_len+1) self.contact_mail = result[0] logger.info(f'mail: {self.contact_mail}') @@ -417,12 +420,12 @@ class Talent(Message): ts = self._timestamp() logger.debug(f'time: {ts:08x}') self.__build_header(0x91) - self._send_buffer += struct.pack('!q', ts) + self.ifc.write += struct.pack('!q', ts) self.__finish_send_msg() elif self.data_len >= 8: ts = self._timestamp() - result = struct.unpack_from('!q', self._recv_buffer, + result = struct.unpack_from('!q', self.ifc.read.peek(), self.header_len) self.ts_offset = result[0]-ts if self.remote_stream: @@ -446,10 +449,10 @@ class Talent(Message): self.db.set_db_def_value(Register.POLLING_INTERVAL, self.mb_timeout) self.__build_header(0x99) - self._send_buffer += b'\x02' + self.ifc.write += b'\x02' self.__finish_send_msg() - result = struct.unpack_from('!Bq', self._recv_buffer, + result = struct.unpack_from('!Bq', self.ifc.read.peek(), self.header_len) resp_code = result[0] ts = result[1]+self.ts_offset @@ -457,11 +460,11 @@ class Talent(Message): f' tsun-time: {ts:08x}' f' offset: {self.ts_offset}') self.__build_header(0x91) - self._send_buffer += struct.pack('!Bq', resp_code, ts) + self.ifc.write += struct.pack('!Bq', resp_code, ts) self.forward_snd() return elif self.ctrl.is_resp(): - result = struct.unpack_from('!B', self._recv_buffer, + result = struct.unpack_from('!B', self.ifc.read.peek(), self.header_len) resp_code = result[0] logging.debug(f'TimeActRespCode: {resp_code}') @@ -473,7 +476,8 @@ class Talent(Message): self.forward() def parse_msg_header(self): - result = struct.unpack_from('!lB', self._recv_buffer, self.header_len) + result = struct.unpack_from('!lB', self.ifc.read.peek(), + self.header_len) data_id = result[0] # len of complete message id_len = result[1] # len of variable id string @@ -481,7 +485,7 @@ class Talent(Message): msg_hdr_len = 5+id_len+9 - result = struct.unpack_from(f'!{id_len+1}pBq', self._recv_buffer, + result = struct.unpack_from(f'!{id_len+1}pBq', self.ifc.read.peek(), self.header_len + 4) timestamp = result[2] @@ -494,7 +498,7 @@ class Talent(Message): def msg_collector_data(self): if self.ctrl.is_ind(): self.__build_header(0x99) - self._send_buffer += b'\x01' + self.ifc.write += b'\x01' self.__finish_send_msg() self.__process_data() @@ -509,7 +513,7 @@ class Talent(Message): def msg_inverter_data(self): if self.ctrl.is_ind(): self.__build_header(0x99) - self._send_buffer += b'\x01' + self.ifc.write += b'\x01' self.__finish_send_msg() self.__process_data() self.state = State.up # allow MODBUS cmds @@ -529,7 +533,7 @@ class Talent(Message): def __process_data(self): msg_hdr_len, ts = self.parse_msg_header() - for key, update in self.db.parse(self._recv_buffer, self.header_len + for key, update in self.db.parse(self.ifc.read.peek(), self.header_len + msg_hdr_len, self.node_id): if update: self._set_mqtt_timestamp(key, self._utcfromts(ts)) @@ -549,7 +553,7 @@ class Talent(Message): msg_hdr_len = 5 - result = struct.unpack_from('!lBB', self._recv_buffer, + result = struct.unpack_from('!lBB', self.ifc.read.peek(), self.header_len) modbus_len = result[1] return msg_hdr_len, modbus_len @@ -558,7 +562,7 @@ class Talent(Message): msg_hdr_len = 6 - result = struct.unpack_from('!lBBB', self._recv_buffer, + result = struct.unpack_from('!lBBB', self.ifc.read.peek(), self.header_len) modbus_len = result[2] return msg_hdr_len, modbus_len @@ -579,8 +583,8 @@ class Talent(Message): self.__msg_modbus(hdr_len) def __msg_modbus(self, hdr_len): - data = self._recv_buffer[self.header_len: - self.header_len+self.data_len] + data = self.ifc.read.peek()[self.header_len: + self.header_len+self.data_len] if self.ctrl.is_req(): if self.remote_stream.mb.recv_req(data[hdr_len:], diff --git a/app/src/gen3plus/connection_g3p.py b/app/src/gen3plus/connection_g3p.py index 89dfc1a..735058d 100644 --- a/app/src/gen3plus/connection_g3p.py +++ b/app/src/gen3plus/connection_g3p.py @@ -2,9 +2,11 @@ import logging from asyncio import StreamReader, StreamWriter if __name__ == "app.src.gen3plus.connection_g3p": + from app.src.async_ifc import AsyncIfc from app.src.async_stream import AsyncStream from app.src.gen3plus.solarman_v5 import SolarmanV5 else: # pragma: no cover + from async_ifc import AsyncIfc from async_stream import AsyncStream from gen3plus.solarman_v5 import SolarmanV5 @@ -17,8 +19,9 @@ class ConnectionG3P(AsyncStream, SolarmanV5): addr, remote_stream: 'ConnectionG3P', server_side: bool, client_mode: bool) -> None: - AsyncStream.__init__(self, reader, writer, addr) - SolarmanV5.__init__(self, server_side, client_mode) + self._ifc = AsyncIfc() + AsyncStream.__init__(self, reader, writer, addr, self._ifc) + SolarmanV5.__init__(self, server_side, client_mode, self._ifc) self.remote_stream: 'ConnectionG3P' = remote_stream diff --git a/app/src/gen3plus/solarman_v5.py b/app/src/gen3plus/solarman_v5.py index e1b6e3e..031cd9d 100644 --- a/app/src/gen3plus/solarman_v5.py +++ b/app/src/gen3plus/solarman_v5.py @@ -5,6 +5,7 @@ import asyncio from datetime import datetime if __name__ == "app.src.gen3plus.solarman_v5": + from app.src.async_ifc import AsyncIfc from app.src.messages import hex_dump_memory, Message, State from app.src.modbus import Modbus from app.src.my_timer import Timer @@ -12,6 +13,7 @@ if __name__ == "app.src.gen3plus.solarman_v5": from app.src.gen3plus.infos_g3p import InfosG3P from app.src.infos import Register else: # pragma: no cover + from async_ifc import AsyncIfc from messages import hex_dump_memory, Message, State from config import Config from modbus import Modbus @@ -60,9 +62,10 @@ class SolarmanV5(Message): HDR_FMT = '= \ + if self.header_valid and len(self.ifc.read) >= \ (self.header_len + self.data_len+2): self.__process_complete_received_msg() self.__flush_recv_msg() @@ -243,10 +248,10 @@ class SolarmanV5(Message): log_lvl = self.log_lvl.get(self.control, logging.WARNING) if callable(log_lvl): log_lvl = log_lvl() - hex_dump_memory(log_lvl, f'Received from {self.addr}:', - self._recv_buffer, self.header_len + - self.data_len+2) - if self.__trailer_is_ok(self._recv_buffer, self.header_len + self.ifc.read.logging(log_lvl, f'Received from {self.addr}:') + # self._recv_buffer, self.header_len + + # self.data_len+2) + if self.__trailer_is_ok(self.ifc.read.peek(), self.header_len + self.data_len + 2): if self.state == State.init: self.state = State.received @@ -317,7 +322,7 @@ class SolarmanV5(Message): self.inc_counter('Invalid_Msg_Format') # erase broken recv buffer - self._recv_buffer = bytearray() + self.ifc.read.clear() return self.header_valid = True @@ -329,11 +334,11 @@ class SolarmanV5(Message): 'Drop packet w invalid stop byte from ' f'{self.addr}:', buf, buf_len) self.inc_counter('Invalid_Msg_Format') - if len(self._recv_buffer) > (self.data_len+13): + if len(self.ifc.read) > (self.data_len+13): next_start = buf[self.data_len+13] if next_start != 0xa5: # erase broken recv buffer - self._recv_buffer = bytearray() + self.ifc.read.clear() return False @@ -349,9 +354,9 @@ class SolarmanV5(Message): def __build_header(self, ctrl) -> None: '''build header for new transmit message''' - self.send_msg_ofs = len(self._send_buffer) + self.send_msg_ofs = len(self.ifc.write) - self._send_buffer += struct.pack( + self.ifc.write += struct.pack( ' None: '''finish the transmit message, set lenght and checksum''' - _len = len(self._send_buffer) - self.send_msg_ofs - struct.pack_into(' None: - self._recv_buffer = self._recv_buffer[(self.header_len + - self.data_len+2):] + self.ifc.read.get(self.header_len + self.data_len+2) self.header_valid = False def __send_ack_rsp(self, msgtype, ftype, ack=1): self.__build_header(msgtype) - self._send_buffer += struct.pack(' None: if self.state != State.up: @@ -460,17 +463,17 @@ class SolarmanV5(Message): self.forward_at_cmd_resp = False self.__build_header(0x4510) - self._send_buffer += struct.pack(f'> 8 - for key, update in self.db.parse(self._recv_buffer, msg_type, ftype, + for key, update in self.db.parse(self.ifc.read.peek(), msg_type, ftype, self.node_id): if update: if key == 'inverter': @@ -510,7 +513,7 @@ class SolarmanV5(Message): self.__forward_msg() def msg_dev_ind(self): - data = self._recv_buffer[self.header_len:] + data = self.ifc.read.peek()[self.header_len:] result = struct.unpack_from(self.HDR_FMT, data, 0) ftype = result[0] # always 2 total = result[1] @@ -531,7 +534,7 @@ class SolarmanV5(Message): self.__send_ack_rsp(0x1110, ftype) def msg_data_ind(self): - data = self._recv_buffer + data = self.ifc.read.peek() result = struct.unpack_from(' int: - ftype = self._recv_buffer[self.header_len] + ftype = self.ifc.read.peek()[self.header_len] if ftype == self.AT_CMD: if self.forward_at_cmd_resp: return logging.INFO @@ -613,8 +616,8 @@ class SolarmanV5(Message): return logging.WARNING def msg_command_rsp(self): - data = self._recv_buffer[self.header_len: - self.header_len+self.data_len] + data = self.ifc.read.peek()[self.header_len: + self.header_len+self.data_len] ftype = data[0] if ftype == self.AT_CMD: if not self.forward_at_cmd_resp: @@ -650,7 +653,7 @@ class SolarmanV5(Message): self.__build_model_name() def msg_hbeat_ind(self): - data = self._recv_buffer[self.header_len:] + data = self.ifc.read.peek()[self.header_len:] result = struct.unpack_from(' list: n = 0 lines = [] - lines.append(info) - tracer = logging.getLogger('tracer') - if not tracer.isEnabledFor(level): - return for i in range(0, data_len, 16): line = ' ' @@ -50,6 +46,23 @@ def hex_dump_memory(level, info, data, data_len): line += __asc_val(n, data, data_len) lines.append(line) + return lines + + +def hex_dump_str(data, data_len): + lines = hex_dump(data, data_len) + return '\n'.join(lines) + + +def hex_dump_memory(level, info, data, data_len): + lines = [] + lines.append(info) + tracer = logging.getLogger('tracer') + if not tracer.isEnabledFor(level): + return + + lines += hex_dump(data, data_len) + tracer.log(level, '\n'.join(lines)) @@ -94,8 +107,6 @@ class Message(metaclass=IterRegistry): self.unique_id = 0 self.node_id = '' # will be overwritten in the child class's __init__ self.sug_area = '' - self._recv_buffer = bytearray(0) - self._send_buffer = bytearray(0) self._forward_buffer = bytearray(0) self.new_data = {} self.state = State.init diff --git a/app/tests/test_connection_g3.py b/app/tests/test_connection_g3.py index 452bf18..bb2b025 100644 --- a/app/tests/test_connection_g3.py +++ b/app/tests/test_connection_g3.py @@ -72,8 +72,8 @@ def test_method_calls(patch_async_init, patch_talent_init, patch_healthy, patch_ addr = ('proxy.local', 10000) conn = ConnectionG3(reader, writer, addr, remote_stream= None, server_side=True, id_str=id_str) - spy1.assert_called_once_with(conn, reader, writer, addr) - spy2.assert_called_once_with(conn, True, id_str) + spy1.assert_called_once_with(conn, reader, writer, addr, conn._ifc) + spy2.assert_called_once_with(conn, True, conn._ifc, id_str) conn.healthy() spy3.assert_called_once() diff --git a/app/tests/test_connection_g3p.py b/app/tests/test_connection_g3p.py index 67607f1..0a56398 100644 --- a/app/tests/test_connection_g3p.py +++ b/app/tests/test_connection_g3p.py @@ -77,8 +77,8 @@ def test_method_calls(patch_async_init, patch_solarman_init, patch_healthy, patc addr = ('proxy.local', 10000) conn = ConnectionG3P(reader, writer, addr, remote_stream= None, server_side=True, client_mode=False) - spy1.assert_called_once_with(conn, reader, writer, addr) - spy2.assert_called_once_with(conn, True, False) + spy1.assert_called_once_with(conn, reader, writer, addr, conn._ifc) + spy2.assert_called_once_with(conn, True, False, conn._ifc) conn.healthy() spy3.assert_called_once() diff --git a/app/tests/test_modbus_tcp.py b/app/tests/test_modbus_tcp.py index f68e031..0fa7438 100644 --- a/app/tests/test_modbus_tcp.py +++ b/app/tests/test_modbus_tcp.py @@ -157,8 +157,8 @@ async def test_modbus_conn(patch_open): async with ModbusConn('test.local', 1234) as stream: assert stream.node_id == 'G3P' assert stream.addr == ('test.local', 1234) - assert type(stream.reader) is FakeReader - assert type(stream.writer) is FakeWriter + assert type(stream._reader) is FakeReader + assert type(stream._writer) is FakeWriter assert Infos.stat['proxy']['Inverter_Cnt'] == 1 assert Infos.stat['proxy']['Inverter_Cnt'] == 0 @@ -209,7 +209,7 @@ async def test_modbus_cnf2(config_conn, patch_no_mqtt, patch_open): test += 1 assert Infos.stat['proxy']['Inverter_Cnt'] == 1 m.shutdown_started = True - m.reader.on_recv.set() + m._reader.on_recv.set() del m assert 1 == test @@ -236,13 +236,13 @@ async def test_modbus_cnf3(config_conn, patch_no_mqtt, patch_open): test += 1 if test == 1: m.shutdown_started = False - m.reader.on_recv.set() + m._reader.on_recv.set() await asyncio.sleep(0.1) assert m.state == State.closed await asyncio.sleep(0.1) else: m.shutdown_started = True - m.reader.on_recv.set() + m._reader.on_recv.set() del m assert 2 == test @@ -269,13 +269,13 @@ async def test_mqtt_err(config_conn, patch_mqtt_err, patch_open): test += 1 if test == 1: m.shutdown_started = False - m.reader.on_recv.set() + m._reader.on_recv.set() await asyncio.sleep(0.1) assert m.state == State.closed await asyncio.sleep(0.1) else: m.shutdown_started = True - m.reader.on_recv.set() + m._reader.on_recv.set() del m await asyncio.sleep(0.01) @@ -301,13 +301,13 @@ async def test_mqtt_except(config_conn, patch_mqtt_except, patch_open): test += 1 if test == 1: m.shutdown_started = False - m.reader.on_recv.set() + m._reader.on_recv.set() await asyncio.sleep(0.1) assert m.state == State.closed await asyncio.sleep(0.1) else: m.shutdown_started = True - m.reader.on_recv.set() + m._reader.on_recv.set() del m await asyncio.sleep(0.01) diff --git a/app/tests/test_mqtt.py b/app/tests/test_mqtt.py index 3072627..eda7d5e 100644 --- a/app/tests/test_mqtt.py +++ b/app/tests/test_mqtt.py @@ -5,6 +5,7 @@ import aiomqtt import logging from mock import patch, Mock +from app.src.async_ifc import AsyncIfc from app.src.singleton import Singleton from app.src.mqtt import Mqtt from app.src.modbus import Modbus @@ -44,7 +45,8 @@ def config_no_conn(test_port): @pytest.fixture def spy_at_cmd(): - conn = SolarmanV5(server_side=True, client_mode= False) + ifc = AsyncIfc() + conn = SolarmanV5(server_side=True, client_mode= False, ifc=ifc) conn.node_id = 'inv_2/' with patch.object(conn, 'send_at_cmd', wraps=conn.send_at_cmd) as wrapped_conn: yield wrapped_conn @@ -52,7 +54,8 @@ def spy_at_cmd(): @pytest.fixture def spy_modbus_cmd(): - conn = SolarmanV5(server_side=True, client_mode= False) + ifc = AsyncIfc() + conn = SolarmanV5(server_side=True, client_mode= False, ifc=ifc) conn.node_id = 'inv_1/' with patch.object(conn, 'send_modbus_cmd', wraps=conn.send_modbus_cmd) as wrapped_conn: yield wrapped_conn @@ -60,7 +63,8 @@ def spy_modbus_cmd(): @pytest.fixture def spy_modbus_cmd_client(): - conn = SolarmanV5(server_side=False, client_mode= False) + ifc = AsyncIfc() + conn = SolarmanV5(server_side=False, client_mode= False, ifc=ifc) conn.node_id = 'inv_1/' with patch.object(conn, 'send_modbus_cmd', wraps=conn.send_modbus_cmd) as wrapped_conn: yield wrapped_conn diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index 80778a2..e5296f5 100644 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -5,6 +5,7 @@ import asyncio import logging import random from math import isclose +from app.src.async_ifc import AsyncIfc from app.src.gen3plus.solarman_v5 import SolarmanV5 from app.src.config import Config from app.src.infos import Infos, Register @@ -20,13 +21,6 @@ Infos.static_init() timestamp = int(time.time()) # 1712861197 heartbeat = 60 -class Writer(): - def __init__(self): - self.sent_pdu = b'' - - def write(self, pdu: bytearray): - self.sent_pdu = pdu - class Mqtt(): def __init__(self): @@ -40,12 +34,14 @@ class Mqtt(): class MemoryStream(SolarmanV5): def __init__(self, msg, chunks = (0,), server_side: bool = True): - super().__init__(server_side, client_mode=False) + _ifc = AsyncIfc() + super().__init__(server_side, client_mode=False, ifc=_ifc) if server_side: self.mb.timeout = 0.4 # overwrite for faster testing self.mb_first_timeout = 0.5 self.mb_timeout = 0.5 - self.writer = Writer() + self.sent_pdu = b'' + self.ifc.write.reg_trigger(self.write_cb) self.mqtt = Mqtt() self.__msg = msg self.__msg_len = len(msg) @@ -64,6 +60,9 @@ class MemoryStream(SolarmanV5): self.data = '' self.msg_recvd = [] + def write_cb(self): + self.sent_pdu = self.ifc.write.get() + def _timestamp(self): return timestamp @@ -86,11 +85,11 @@ class MemoryStream(SolarmanV5): chunk_len = self.__chunks[self.__chunk_idx] self.__chunk_idx += 1 if chunk_len!=0: - self._recv_buffer += self.__msg[self.__offs:chunk_len] + self.ifc.read += self.__msg[self.__offs:chunk_len] copied_bytes = chunk_len - self.__offs self.__offs = chunk_len else: - self._recv_buffer += self.__msg[self.__offs:] + self.ifc.read += self.__msg[self.__offs:] copied_bytes = self.__msg_len - self.__offs self.__offs = self.__msg_len except Exception: @@ -690,8 +689,8 @@ def test_read_message(device_ind_msg): assert m.control == 0x4110 assert str(m.seq) == '01:00' assert m.data_len == 0xd4 - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -711,8 +710,8 @@ def test_invalid_start_byte(invalid_start_byte, device_ind_msg): assert m.control == 0x4110 assert str(m.seq) == '01:00' assert m.data_len == 0xd4 - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1 m.close() @@ -731,8 +730,8 @@ def test_invalid_stop_byte(invalid_stop_byte): assert m.control == 0x4110 assert str(m.seq) == '01:00' assert m.data_len == 0xd4 - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1 m.close() @@ -756,8 +755,8 @@ def test_invalid_stop_byte2(invalid_stop_byte, device_ind_msg): assert m.msg_recvd[1]['data_len']==0xd4 assert m.unique_id == None - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1 m.close() @@ -778,8 +777,8 @@ def test_invalid_stop_start_byte(invalid_stop_byte, invalid_start_byte): assert m.control == 0x4110 assert str(m.seq) == '01:00' assert m.data_len == 0xd4 - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1 m.close() @@ -802,8 +801,8 @@ def test_invalid_checksum(invalid_checksum, device_ind_msg): assert m.msg_recvd[1]['control']==0x4110 assert m.msg_recvd[1]['seq']=='01:00' assert m.msg_recvd[1]['data_len']==0xd4 - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1 m.close() @@ -824,7 +823,7 @@ def test_read_message_twice(config_no_tsun_inv1, device_ind_msg, device_rsp_msg) assert m.msg_recvd[1]['control']==0x4110 assert m.msg_recvd[1]['seq']=='01:01' assert m.msg_recvd[1]['data_len']==0xd4 - assert m._send_buffer==device_rsp_msg+device_rsp_msg + assert m.ifc.write.get()==device_rsp_msg+device_rsp_msg assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -897,11 +896,10 @@ def test_read_two_messages(config_tsun_allow_all, device_ind_msg, device_rsp_msg assert '02b0' == m.db.get_db_value(Register.SENSOR_LIST, None) assert 0x02b0 == m.sensor_list assert m._forward_buffer==device_ind_msg+inverter_ind_msg - assert m._send_buffer==device_rsp_msg+inverter_rsp_msg + assert m.ifc.write.get()==device_rsp_msg+inverter_rsp_msg - m._send_buffer = bytearray(0) # clear send buffer for next test m._init_new_client_conn() - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' m.close() def test_read_two_messages2(config_tsun_allow_all, inverter_ind_msg, inverter_ind_msg_81, inverter_rsp_msg, inverter_rsp_msg_81): @@ -923,11 +921,10 @@ def test_read_two_messages2(config_tsun_allow_all, inverter_ind_msg, inverter_in assert m.msg_recvd[1]['data_len']==0x199 assert m.time_ofs == 0x33e447a0 assert m._forward_buffer==inverter_ind_msg+inverter_ind_msg_81 - assert m._send_buffer==inverter_rsp_msg+inverter_rsp_msg_81 + assert m.ifc.write.get()==inverter_rsp_msg+inverter_rsp_msg_81 - m._send_buffer = bytearray(0) # clear send buffer for next test m._init_new_client_conn() - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' m.close() def test_read_two_messages3(config_tsun_allow_all, device_ind_msg2, device_rsp_msg2, inverter_ind_msg, inverter_rsp_msg): @@ -953,11 +950,10 @@ def test_read_two_messages3(config_tsun_allow_all, device_ind_msg2, device_rsp_m assert '02b0' == m.db.get_db_value(Register.SENSOR_LIST, None) assert 0x02b0 == m.sensor_list assert m._forward_buffer==inverter_ind_msg+device_ind_msg2 - assert m._send_buffer==inverter_rsp_msg+device_rsp_msg2 + assert m.ifc.write.get()==inverter_rsp_msg+device_rsp_msg2 - m._send_buffer = bytearray(0) # clear send buffer for next test m._init_new_client_conn() - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' m.close() def test_unkown_frame_code(config_tsun_inv1, inverter_ind_msg_81, inverter_rsp_msg_81): @@ -972,8 +968,8 @@ def test_unkown_frame_code(config_tsun_inv1, inverter_ind_msg_81, inverter_rsp_m assert m.control == 0x4210 assert str(m.seq) == '03:03' assert m.data_len == 0x199 - assert m._recv_buffer==b'' - assert m._send_buffer==inverter_rsp_msg_81 + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==inverter_rsp_msg_81 assert m._forward_buffer==inverter_ind_msg_81 assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -990,8 +986,8 @@ def test_unkown_message(config_tsun_inv1, unknown_msg): assert m.control == 0x5110 assert str(m.seq) == '84:10' assert m.data_len == 0x0a - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==unknown_msg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -1008,8 +1004,8 @@ def test_device_rsp(config_tsun_inv1, device_rsp_msg): assert m.control == 0x1110 assert str(m.seq) == '01:01' assert m.data_len == 0x0a - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -1026,8 +1022,8 @@ def test_inverter_rsp(config_tsun_inv1, inverter_rsp_msg): assert m.control == 0x1210 assert str(m.seq) == '02:02' assert m.data_len == 0x0a - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -1043,8 +1039,8 @@ def test_heartbeat_ind(config_tsun_inv1, heartbeat_ind_msg, heartbeat_rsp_msg): assert m.control == 0x4710 assert str(m.seq) == '84:11' # value after sending response assert m.data_len == 0x01 - assert m._recv_buffer==b'' - assert m._send_buffer==heartbeat_rsp_msg + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==heartbeat_rsp_msg assert m._forward_buffer==heartbeat_ind_msg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -1061,8 +1057,8 @@ def test_heartbeat_ind2(config_tsun_inv1, heartbeat_ind_msg, heartbeat_rsp_msg): assert m.control == 0x4710 assert str(m.seq) == '84:11' # value after sending response assert m.data_len == 0x01 - assert m._recv_buffer==b'' - assert m._send_buffer==heartbeat_rsp_msg + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==heartbeat_rsp_msg assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -1079,8 +1075,8 @@ def test_heartbeat_rsp(config_tsun_inv1, heartbeat_rsp_msg): assert m.control == 0x1710 assert str(m.seq) == '11:84' # value after sending response assert m.data_len == 0x0a - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -1096,8 +1092,8 @@ def test_sync_start_ind(config_tsun_inv1, sync_start_ind_msg, sync_start_rsp_msg assert m.control == 0x4310 assert str(m.seq) == '0d:0d' # value after sending response assert m.data_len == 47 - assert m._recv_buffer==b'' - assert m._send_buffer==sync_start_rsp_msg + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==sync_start_rsp_msg assert m._forward_buffer==sync_start_ind_msg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 @@ -1120,8 +1116,8 @@ def test_sync_start_rsp(config_tsun_inv1, sync_start_rsp_msg): assert m.control == 0x1310 assert str(m.seq) == '0d:0d' # value after sending response assert m.data_len == 0x0a - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -1137,8 +1133,8 @@ def test_sync_end_ind(config_tsun_inv1, sync_end_ind_msg, sync_end_rsp_msg): assert m.control == 0x4810 assert str(m.seq) == '07:07' # value after sending response assert m.data_len == 60 - assert m._recv_buffer==b'' - assert m._send_buffer==sync_end_rsp_msg + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==sync_end_rsp_msg assert m._forward_buffer==sync_end_ind_msg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -1155,8 +1151,8 @@ def test_sync_end_rsp(config_tsun_inv1, sync_end_rsp_msg): assert m.control == 0x1810 assert str(m.seq) == '07:07' # value after sending response assert m.data_len == 0x0a - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 m.close() @@ -1175,9 +1171,9 @@ def test_build_modell_600(config_tsun_allow_all, inverter_ind_msg): assert '02b0' == m.db.get_db_value(Register.SENSOR_LIST, None) assert 0 == m.sensor_list # must not been set by an inverter data ind - m._send_buffer = bytearray(0) # clear send buffer for next test + m.ifc.write.clear() # clear send buffer for next test m._init_new_client_conn() - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' m.close() def test_build_modell_1600(config_tsun_allow_all, inverter_ind_msg1600): @@ -1241,9 +1237,9 @@ def test_build_logger_modell(config_tsun_allow_all, device_ind_msg): def test_msg_iterator(): Message._registry.clear() - m1 = SolarmanV5(server_side=True, client_mode=False) - m2 = SolarmanV5(server_side=True, client_mode=False) - m3 = SolarmanV5(server_side=True, client_mode=False) + m1 = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfc()) + m2 = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfc()) + m3 = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfc()) m3.close() del m3 test1 = 0 @@ -1261,7 +1257,7 @@ def test_msg_iterator(): assert test2 == 1 def test_proxy_counter(): - m = SolarmanV5(server_side=True, client_mode=False) + m = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfc()) assert m.new_data == {} m.db.stat['proxy']['Unknown_Msg'] = 0 Infos.new_stat_data['proxy'] = False @@ -1285,16 +1281,16 @@ async def test_msg_build_modbus_req(config_tsun_inv1, device_ind_msg, device_rsp m.read() assert m.control == 0x4110 assert str(m.seq) == '01:01' - assert m._send_buffer==device_rsp_msg + assert m.ifc.write.get()==device_rsp_msg assert m._forward_buffer==device_ind_msg - m._send_buffer = bytearray(0) # clear send buffer for next test + m.ifc.write.clear() # clear send buffer for next test m._forward_buffer = bytearray(0) # clear send buffer for next test await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0, logging.DEBUG) assert 0 == m.send_msg_ofs assert m._forward_buffer == b'' - assert m.writer.sent_pdu == b'' # modbus command must be ignore, cause connection is still not up - assert m._send_buffer == b'' # modbus command must be ignore, cause connection is still not up + assert m.sent_pdu == b'' # modbus command must be ignore, cause connection is still not up + assert m.ifc.write.get() == b'' # modbus command must be ignore, cause connection is still not up m.append_msg(inverter_ind_msg) m.read() @@ -1304,24 +1300,22 @@ async def test_msg_build_modbus_req(config_tsun_inv1, device_ind_msg, device_rsp assert m.msg_recvd[0]['seq']=='01:01' assert m.msg_recvd[1]['control']==0x4210 assert m.msg_recvd[1]['seq']=='02:02' - assert m._recv_buffer==b'' - assert m._send_buffer==inverter_rsp_msg + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==inverter_rsp_msg assert m._forward_buffer==inverter_ind_msg - m._send_buffer = bytearray(0) # clear send buffer for next test m._forward_buffer = bytearray(0) # clear send buffer for next test await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0, logging.DEBUG) assert 0 == m.send_msg_ofs assert m._forward_buffer == b'' - assert m.writer.sent_pdu == msg_modbus_cmd - assert m._send_buffer == b'' + assert m.sent_pdu == msg_modbus_cmd + assert m.ifc.write.get()== b'' - m._send_buffer = bytearray(0) # clear send buffer for next test m.test_exception_async_write = True await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0, logging.DEBUG) assert 0 == m.send_msg_ofs assert m._forward_buffer == b'' - assert m._send_buffer == b'' + assert m.ifc.write.get() == b'' m.close() @pytest.mark.asyncio @@ -1331,13 +1325,12 @@ async def test_at_cmd(config_tsun_allow_all, device_ind_msg, device_rsp_msg, inv m.read() # read device ind assert m.control == 0x4110 assert str(m.seq) == '01:01' - assert m._send_buffer==device_rsp_msg + assert m.ifc.write.get()==device_rsp_msg assert m._forward_buffer==device_ind_msg - m._send_buffer = bytearray(0) # clear send buffer for next test m._forward_buffer = bytearray(0) # clear send buffer for next test await m.send_at_cmd('AT+TIME=214028,1,60,120') - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert str(m.seq) == '01:01' assert m.mqtt.key == '' @@ -1347,33 +1340,31 @@ async def test_at_cmd(config_tsun_allow_all, device_ind_msg, device_rsp_msg, inv m.read() # read inverter ind assert m.control == 0x4210 assert str(m.seq) == '02:02' - assert m._send_buffer==inverter_rsp_msg + assert m.ifc.write.get()==inverter_rsp_msg assert m._forward_buffer==inverter_ind_msg - m._send_buffer = bytearray(0) # clear send buffer for next test m._forward_buffer = bytearray(0) # clear send buffer for next test await m.send_at_cmd('AT+TIME=214028,1,60,120') - assert m._send_buffer==at_command_ind_msg + assert m.ifc.write.get()==at_command_ind_msg assert m._forward_buffer==b'' assert str(m.seq) == '02:03' assert m.mqtt.key == '' assert m.mqtt.data == "" - m._send_buffer = bytearray(0) # clear send buffer for next test m.append_msg(at_command_rsp_msg) m.read() # read at resp assert m.control == 0x1510 assert str(m.seq) == '03:03' - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.key == 'at_resp' assert m.data == "+ok" m.test_exception_async_write = True await m.send_at_cmd('AT+TIME=214028,1,60,120') - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert str(m.seq) == '03:04' assert m.forward_at_cmd_resp == False @@ -1388,13 +1379,12 @@ async def test_at_cmd_blocked(config_tsun_allow_all, device_ind_msg, device_rsp_ m.read() assert m.control == 0x4110 assert str(m.seq) == '01:01' - assert m._send_buffer==device_rsp_msg + assert m.ifc.write.get()==device_rsp_msg assert m._forward_buffer==device_ind_msg - m._send_buffer = bytearray(0) # clear send buffer for next test m._forward_buffer = bytearray(0) # clear send buffer for next test await m.send_at_cmd('AT+WEBU') - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert str(m.seq) == '01:01' assert m.mqtt.key == '' @@ -1404,15 +1394,14 @@ async def test_at_cmd_blocked(config_tsun_allow_all, device_ind_msg, device_rsp_ m.read() assert m.control == 0x4210 assert str(m.seq) == '02:02' - assert m._recv_buffer==b'' - assert m._send_buffer==inverter_rsp_msg + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==inverter_rsp_msg assert m._forward_buffer==inverter_ind_msg - m._send_buffer = bytearray(0) # clear send buffer for next test m._forward_buffer = bytearray(0) # clear send buffer for next test await m.send_at_cmd('AT+WEBU') - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert str(m.seq) == '02:02' assert m.forward_at_cmd_resp == False @@ -1435,8 +1424,8 @@ def test_at_cmd_ind(config_tsun_inv1, at_command_ind_msg): assert m.control == 0x4510 assert str(m.seq) == '03:02' assert m.data_len == 39 - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==at_command_ind_msg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 assert m.db.stat['proxy']['AT_Command'] == 1 @@ -1459,8 +1448,8 @@ def test_at_cmd_ind_block(config_tsun_inv1, at_command_ind_msg_block): assert m.control == 0x4510 assert str(m.seq) == '03:02' assert m.data_len == 23 - assert m._recv_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==b'' assert m._forward_buffer==b'' assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 assert m.db.stat['proxy']['AT_Command'] == 0 @@ -1482,7 +1471,7 @@ def test_msg_at_command_rsp1(config_tsun_inv1, at_command_rsp_msg): assert m.header_len==11 assert m.data_len==17 assert m._forward_buffer==at_command_rsp_msg - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 m.close() @@ -1501,7 +1490,7 @@ def test_msg_at_command_rsp2(config_tsun_inv1, at_command_rsp_msg): assert m.header_len==11 assert m.data_len==17 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 m.close() @@ -1526,8 +1515,8 @@ def test_msg_modbus_req(config_tsun_inv1, msg_modbus_cmd, msg_modbus_cmd_fwd): assert c.header_len==11 assert c.data_len==23 assert c._forward_buffer==b'' - assert c._send_buffer==b'' - assert m.writer.sent_pdu == msg_modbus_cmd_fwd + assert c.ifc.write.get()==b'' + assert m.sent_pdu == msg_modbus_cmd_fwd assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['AT_Command'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 1 @@ -1553,8 +1542,8 @@ def test_msg_modbus_req2(config_tsun_inv1, msg_modbus_cmd_crc_err): assert c.header_len==11 assert c.data_len==23 assert c._forward_buffer==b'' - assert c._send_buffer==b'' - assert m.writer.sent_pdu==b'' + assert c.ifc.write.get()==b'' + assert m.sent_pdu==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['AT_Command'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 @@ -1576,7 +1565,7 @@ def test_msg_unknown_cmd_req(config_tsun_inv1, msg_unknown_cmd): assert m.header_len==11 assert m.data_len==23 assert m._forward_buffer==msg_unknown_cmd - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['AT_Command'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 @@ -1597,7 +1586,7 @@ def test_msg_modbus_rsp1(config_tsun_inv1, msg_modbus_rsp): assert m.header_len==11 assert m.data_len==59 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 m.close() @@ -1621,7 +1610,7 @@ def test_msg_modbus_rsp2(config_tsun_inv1, msg_modbus_rsp): assert m.mb.err == 0 assert m.msg_count == 1 assert m._forward_buffer==msg_modbus_rsp - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.get_db_value(Register.VERSION) == 'V4.0.10' assert m.new_data['inverter'] == True m.new_data['inverter'] = False @@ -1634,7 +1623,7 @@ def test_msg_modbus_rsp2(config_tsun_inv1, msg_modbus_rsp): assert m.mb.err == 0 assert m.msg_count == 2 assert m._forward_buffer==msg_modbus_rsp - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.get_db_value(Register.VERSION) == 'V4.0.10' assert m.new_data['inverter'] == False @@ -1659,7 +1648,7 @@ def test_msg_modbus_rsp3(config_tsun_inv1, msg_modbus_rsp): assert m.mb.err == 0 assert m.msg_count == 1 assert m._forward_buffer==msg_modbus_rsp - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.get_db_value(Register.VERSION) == 'V4.0.10' assert m.new_data['inverter'] == True m.new_data['inverter'] = False @@ -1671,7 +1660,7 @@ def test_msg_modbus_rsp3(config_tsun_inv1, msg_modbus_rsp): assert m.mb.err == 5 assert m.msg_count == 2 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.get_db_value(Register.VERSION) == 'V4.0.10' assert m.new_data['inverter'] == False @@ -1690,7 +1679,7 @@ def test_msg_unknown_rsp(config_tsun_inv1, msg_unknown_cmd_rsp): assert m.header_len==11 assert m.data_len==59 assert m._forward_buffer==msg_unknown_cmd_rsp - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 m.close() @@ -1704,7 +1693,7 @@ def test_msg_modbus_invalid(config_tsun_inv1, msg_modbus_invalid): assert not m.header_valid # must be invalid, since msg was handled and buffer flushed assert m.msg_count == 1 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 m.close() @@ -1727,7 +1716,7 @@ def test_msg_modbus_fragment(config_tsun_inv1, msg_modbus_rsp): assert not m.header_valid # must be invalid, since msg was handled and buffer flushed assert m.msg_count == 1 assert m._forward_buffer==msg_modbus_rsp - assert m._send_buffer == b'' + assert m.ifc.write.get()== b'' assert m.mb.err == 0 assert m.modbus_elms == 20-1 # register 0x300d is unknown, so one value can't be mapped assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 @@ -1750,28 +1739,27 @@ async def test_modbus_polling(config_tsun_inv1, heartbeat_ind_msg, heartbeat_rsp assert m.control == 0x4710 assert str(m.seq) == '84:11' # value after sending response assert m.data_len == 0x01 - assert m._recv_buffer==b'' - assert m._send_buffer==heartbeat_rsp_msg + assert m.ifc.read.get()==b'' + assert m.ifc.write.get()==heartbeat_rsp_msg assert m._forward_buffer==heartbeat_ind_msg assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 - m._send_buffer = bytearray(0) # clear send buffer for next test assert m.state == State.up assert isclose(m.mb_timeout, 0.5) assert next(m.mb_timer.exp_count) == 0 await asyncio.sleep(0.5) - assert m.writer.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x12\x84!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x03\x30\x00\x000J\xde\x86\x15') - assert m._send_buffer==b'' + assert m.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x12\x84!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x03\x30\x00\x000J\xde\x86\x15') + assert m.ifc.write.get()==b'' await asyncio.sleep(0.5) - assert m.writer.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x13\x84!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x03\x30\x00\x000J\xde\x87\x15') - assert m._send_buffer==b'' + assert m.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x13\x84!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x03\x30\x00\x000J\xde\x87\x15') + assert m.ifc.write.get()==b'' m.state = State.closed - m.writer.sent_pdu = bytearray() + m.sent_pdu = bytearray() await asyncio.sleep(0.5) - assert m.writer.sent_pdu==bytearray(b'') - assert m._send_buffer==b'' + assert m.sent_pdu==bytearray(b'') + assert m.ifc.write.get()==b'' assert next(m.mb_timer.exp_count) == 4 m.close() @@ -1785,7 +1773,7 @@ async def test_start_client_mode(config_tsun_inv1, str_test_ip): assert m.mb_timer.tim == None assert asyncio.get_running_loop() == m.mb_timer.loop await m.send_start_cmd(get_sn_int(), str_test_ip, m.mb_first_timeout) - assert m.writer.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x01\x00!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x030\x00\x000J\xde\xf1\x15') + assert m.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x01\x00!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x030\x00\x000J\xde\xf1\x15') assert m.db.get_db_value(Register.IP_ADDRESS) == str_test_ip assert isclose(m.db.get_db_value(Register.POLLING_INTERVAL), 0.5) assert m.db.get_db_value(Register.HEARTBEAT_INTERVAL) == 120 @@ -1793,16 +1781,16 @@ async def test_start_client_mode(config_tsun_inv1, str_test_ip): assert m.state == State.up assert m.no_forwarding == True - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert isclose(m.mb_timeout, 0.5) assert next(m.mb_timer.exp_count) == 0 await asyncio.sleep(0.5) - assert m.writer.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x02\x00!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x030\x00\x000J\xde\xf2\x15') - assert m._send_buffer==b'' + assert m.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x02\x00!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x030\x00\x000J\xde\xf2\x15') + assert m.ifc.write.get()==b'' await asyncio.sleep(0.5) - assert m.writer.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x03\x00!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x030\x00\x000J\xde\xf3\x15') - assert m._send_buffer==b'' + assert m.sent_pdu==bytearray(b'\xa5\x17\x00\x10E\x03\x00!Ce{\x02\xb0\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x030\x00\x000J\xde\xf3\x15') + assert m.ifc.write.get()==b'' assert next(m.mb_timer.exp_count) == 3 m.close() diff --git a/app/tests/test_talent.py b/app/tests/test_talent.py index ce4cef4..fb2f2ca 100644 --- a/app/tests/test_talent.py +++ b/app/tests/test_talent.py @@ -1,6 +1,7 @@ # test_with_pytest.py import pytest, logging, asyncio from math import isclose +from app.src.async_ifc import AsyncIfc from app.src.gen3.talent import Talent, Control from app.src.config import Config from app.src.infos import Infos, Register @@ -16,21 +17,15 @@ Infos.static_init() tracer = logging.getLogger('tracer') -class Writer(): - def __init__(self): - self.sent_pdu = b'' - - def write(self, pdu: bytearray): - self.sent_pdu = pdu - class MemoryStream(Talent): def __init__(self, msg, chunks = (0,), server_side: bool = True): - super().__init__(server_side) + super().__init__(server_side, AsyncIfc()) if server_side: self.mb.timeout = 0.4 # overwrite for faster testing self.mb_first_timeout = 0.5 self.mb_timeout = 0.5 - self.writer = Writer() + self.sent_pdu = b'' + self.ifc.write.reg_trigger(self.write_cb) self.__msg = msg self.__msg_len = len(msg) self.__chunks = chunks @@ -43,6 +38,10 @@ class MemoryStream(Talent): self.msg_recvd = [] self.remote_stream = None + def write_cb(self): + self.sent_pdu = self.ifc.write.get() + + def append_msg(self, msg): self.__msg += msg self.__msg_len += len(msg) @@ -54,11 +53,11 @@ class MemoryStream(Talent): chunk_len = self.__chunks[self.__chunk_idx] self.__chunk_idx += 1 if chunk_len!=0: - self._recv_buffer += self.__msg[self.__offs:chunk_len] + self.ifc.read += self.__msg[self.__offs:chunk_len] copied_bytes = chunk_len - self.__offs self.__offs = chunk_len else: - self._recv_buffer += self.__msg[self.__offs:] + self.ifc.read += self.__msg[self.__offs:] copied_bytes = self.__msg_len - self.__offs self.__offs = self.__msg_len except Exception: @@ -853,14 +852,14 @@ def test_read_two_messages(config_tsun_allow_all, msg2_contact_info,msg_contact_ assert m.msg_recvd[1]['header_len']==23 assert m.msg_recvd[1]['data_len']==25 assert m._forward_buffer==b'' - assert m._send_buffer==msg_contact_rsp + msg_contact_rsp2 + assert m.ifc.write.get()==msg_contact_rsp + msg_contact_rsp2 assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 - m._send_buffer = bytearray(0) # clear send buffer for next test + m.ifc.write.clear() # clear send buffer for next test m.contact_name = b'solarhub' m.contact_mail = b'solarhub@123456' m._init_new_client_conn() - assert m._send_buffer==b'\x00\x00\x00,\x10R170000000000002\x91\x00\x08solarhub\x0fsolarhub@123456' + assert m.ifc.write.get()==b'\x00\x00\x00,\x10R170000000000002\x91\x00\x08solarhub\x0fsolarhub@123456' m.close() def test_conttact_req(config_tsun_allow_all, msg_contact_info, msg_contact_rsp): @@ -878,7 +877,7 @@ def test_conttact_req(config_tsun_allow_all, msg_contact_info, msg_contact_rsp): assert m.header_len==23 assert m.data_len==25 assert m._forward_buffer==b'' - assert m._send_buffer==msg_contact_rsp + assert m.ifc.write.get()==msg_contact_rsp m.close() def test_contact_broken_req(config_tsun_allow_all, msg_contact_info_broken, msg_contact_rsp): @@ -896,7 +895,7 @@ def test_contact_broken_req(config_tsun_allow_all, msg_contact_info_broken, msg_ assert m.header_len==23 assert m.data_len==23 assert m._forward_buffer==b'' - assert m._send_buffer==msg_contact_rsp + assert m.ifc.write.get()==msg_contact_rsp m.close() def test_msg_contact_resp(config_tsun_inv1, msg_contact_rsp): @@ -915,7 +914,7 @@ def test_msg_contact_resp(config_tsun_inv1, msg_contact_rsp): assert m.header_len==23 assert m.data_len==1 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -935,7 +934,7 @@ def test_msg_contact_resp_2(config_tsun_inv1, msg_contact_rsp): assert m.header_len==23 assert m.data_len==1 assert m._forward_buffer==msg_contact_rsp - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -955,7 +954,7 @@ def test_msg_contact_resp_3(config_tsun_inv1, msg_contact_rsp): assert m.header_len==23 assert m.data_len==1 assert m._forward_buffer==msg_contact_rsp - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -973,7 +972,7 @@ def test_msg_contact_invalid(config_tsun_inv1, msg_contact_invalid): assert m.header_len==23 assert m.data_len==1 assert m._forward_buffer==msg_contact_invalid - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 m.close() @@ -994,7 +993,7 @@ def test_msg_get_time(config_tsun_inv1, msg_get_time): assert m.data_len==0 assert m.state==State.pend assert m._forward_buffer==msg_get_time - assert m._send_buffer==b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x89\xc6,_\x00' + assert m.ifc.write.get()==b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x89\xc6,_\x00' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1015,7 +1014,7 @@ def test_msg_get_time_autark(config_no_tsun_inv1, msg_get_time): assert m.data_len==0 assert m.state==State.received assert m._forward_buffer==b'' - assert m._send_buffer==bytearray(b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x89\xc6,_\x00') + assert m.ifc.write.get()==bytearray(b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x89\xc6,_\x00') assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1039,7 +1038,7 @@ def test_msg_time_resp(config_tsun_inv1, msg_time_rsp): assert s.ts_offset==3600000 assert m.data_len==8 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.remote_stream = None s.close() @@ -1060,7 +1059,7 @@ def test_msg_time_resp_autark(config_no_tsun_inv1, msg_time_rsp): assert m.ts_offset==3600000 assert m.data_len==8 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1079,7 +1078,7 @@ def test_msg_time_inv_resp(config_tsun_inv1, msg_time_rsp_inv): assert m.ts_offset==0 assert m.data_len==4 assert m._forward_buffer==msg_time_rsp_inv - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1098,7 +1097,7 @@ def test_msg_time_invalid(config_tsun_inv1, msg_time_invalid): assert m.ts_offset==0 assert m.data_len==0 assert m._forward_buffer==msg_time_invalid - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 m.close() @@ -1117,7 +1116,7 @@ def test_msg_time_invalid_autark(config_no_tsun_inv1, msg_time_invalid): assert m.header_len==23 assert m.data_len==0 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 m.close() @@ -1141,7 +1140,7 @@ def test_msg_act_time(config_no_modbus_poll, msg_act_time, msg_act_time_ack): assert m.data_len==9 assert m.state == State.up assert m._forward_buffer==msg_act_time - assert m._send_buffer==msg_act_time_ack + assert m.ifc.write.get()==msg_act_time_ack assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert 125 == m.db.get_db_value(Register.POLLING_INTERVAL, 0) m.close() @@ -1165,7 +1164,7 @@ def test_msg_act_time2(config_tsun_inv1, msg_act_time, msg_act_time_ack): assert m.header_len==23 assert m.data_len==9 assert m._forward_buffer==msg_act_time - assert m._send_buffer==msg_act_time_ack + assert m.ifc.write.get()==msg_act_time_ack assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert 123 == m.db.get_db_value(Register.POLLING_INTERVAL, 0) m.close() @@ -1186,7 +1185,7 @@ def test_msg_act_time_ofs(config_tsun_inv1, msg_act_time, msg_act_time_ofs, msg_ assert m.header_len==23 assert m.data_len==9 assert m._forward_buffer==msg_act_time_ofs - assert m._send_buffer==msg_act_time_ack + assert m.ifc.write.get()==msg_act_time_ack assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1206,7 +1205,7 @@ def test_msg_act_time_ofs2(config_tsun_inv1, msg_act_time, msg_act_time_ofs, msg assert m.header_len==23 assert m.data_len==9 assert m._forward_buffer==msg_act_time - assert m._send_buffer==msg_act_time_ack + assert m.ifc.write.get()==msg_act_time_ack assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1226,7 +1225,7 @@ def test_msg_act_time_autark(config_no_tsun_inv1, msg_act_time, msg_act_time_ack assert m.header_len==23 assert m.data_len==9 assert m._forward_buffer==b'' - assert m._send_buffer==msg_act_time_ack + assert m.ifc.write.get()==msg_act_time_ack assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1244,7 +1243,7 @@ def test_msg_act_time_ack(config_tsun_inv1, msg_act_time_ack): assert m.header_len==23 assert m.data_len==1 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1262,7 +1261,7 @@ def test_msg_act_time_cmd(config_tsun_inv1, msg_act_time_cmd): assert m.header_len==23 assert m.data_len==1 assert m._forward_buffer==msg_act_time_cmd - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 m.close() @@ -1280,7 +1279,7 @@ def test_msg_act_time_inv(config_tsun_inv1, msg_act_time_inv): assert m.header_len==23 assert m.data_len==8 assert m._forward_buffer==msg_act_time_inv - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1303,7 +1302,7 @@ def test_msg_cntrl_ind(config_tsun_inv1, msg_controller_ind, msg_controller_ind_ m.ts_offset = -4096 m._update_header(m._forward_buffer) assert m._forward_buffer==msg_controller_ind_ts_offs - assert m._send_buffer==msg_controller_ack + assert m.ifc.write.get()==msg_controller_ack assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1321,7 +1320,7 @@ def test_msg_cntrl_ack(config_tsun_inv1, msg_controller_ack): assert m.header_len==23 assert m.data_len==1 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1344,7 +1343,7 @@ def test_msg_cntrl_invalid(config_tsun_inv1, msg_controller_invalid): m.ts_offset = -4096 m._update_header(m._forward_buffer) assert m._forward_buffer==msg_controller_invalid - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 m.close() @@ -1368,7 +1367,7 @@ def test_msg_inv_ind(config_tsun_inv1, msg_inverter_ind, msg_inverter_ind_ts_off m.ts_offset = +256 m._update_header(m._forward_buffer) assert m._forward_buffer==msg_inverter_ind_ts_offs - assert m._send_buffer==msg_inverter_ack + assert m.ifc.write.get()==msg_inverter_ack assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1392,7 +1391,7 @@ def test_msg_inv_ind1(config_tsun_inv1, msg_inverter_ind2, msg_inverter_ind_ts_o m.ts_offset = 0 m._update_header(m._forward_buffer) assert m._forward_buffer==msg_inverter_ind2 - assert m._send_buffer==msg_inverter_ack + assert m.ifc.write.get()==msg_inverter_ack assert m.db.get_db_value(Register.TS_GRID) == 1691243349 m.close() @@ -1416,7 +1415,7 @@ def test_msg_inv_ind2(config_tsun_inv1, msg_inverter_ind_new, msg_inverter_ind_t m.ts_offset = 0 m._update_header(m._forward_buffer) assert m._forward_buffer==msg_inverter_ind_new - assert m._send_buffer==msg_inverter_ack + assert m.ifc.write.get()==msg_inverter_ack assert m.db.get_db_value(Register.INVERTER_STATUS) == None assert m.db.get_db_value(Register.TS_GRID) == None m.db.db['grid'] = {'Output_Power': 100} @@ -1444,7 +1443,7 @@ def test_msg_inv_ind3(config_tsun_inv1, msg_inverter_ind_0w, msg_inverter_ack): m.ts_offset = 0 m._update_header(m._forward_buffer) assert m._forward_buffer==msg_inverter_ind_0w - assert m._send_buffer==msg_inverter_ack + assert m.ifc.write.get()==msg_inverter_ack assert m.db.get_db_value(Register.INVERTER_STATUS) == 1 assert isclose(m.db.db['grid']['Output_Power'], 0.5) m.close() @@ -1467,7 +1466,7 @@ def test_msg_inv_ack(config_tsun_inv1, msg_inverter_ack): assert m.header_len==23 assert m.data_len==1 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -1490,7 +1489,7 @@ def test_msg_inv_invalid(config_tsun_inv1, msg_inverter_invalid): m.ts_offset = 256 m._update_header(m._forward_buffer) assert m._forward_buffer==msg_inverter_invalid - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 m.close() @@ -1514,7 +1513,7 @@ def test_msg_ota_req(config_tsun_inv1, msg_ota_req): m.ts_offset = 4096 m._update_header(m._forward_buffer) assert m._forward_buffer==msg_ota_req - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['OTA_Start_Msg'] == 1 m.close() @@ -1541,7 +1540,7 @@ def test_msg_ota_ack(config_tsun_inv1, msg_ota_ack): m.ts_offset = 256 m._update_header(m._forward_buffer) assert m._forward_buffer==msg_ota_ack - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['OTA_Start_Msg'] == 0 m.close() @@ -1566,7 +1565,7 @@ def test_msg_ota_invalid(config_tsun_inv1, msg_ota_invalid): m.ts_offset = 4096 assert m._forward_buffer==msg_ota_invalid m._update_header(m._forward_buffer) - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 assert m.db.stat['proxy']['OTA_Start_Msg'] == 0 m.close() @@ -1585,7 +1584,7 @@ def test_msg_unknown(config_tsun_inv1, msg_unknown): assert m.header_len==23 assert m.data_len==4 assert m._forward_buffer==msg_unknown - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert 1 == m.db.stat['proxy']['Unknown_Msg'] m.close() @@ -1605,9 +1604,9 @@ def test_ctrl_byte(): def test_msg_iterator(): - m1 = Talent(server_side=True) - m2 = Talent(server_side=True) - m3 = Talent(server_side=True) + m1 = Talent(server_side=True, ifc=AsyncIfc()) + m2 = Talent(server_side=True, ifc=AsyncIfc()) + m3 = Talent(server_side=True, ifc=AsyncIfc()) m3.close() del m3 test1 = 0 @@ -1710,11 +1709,11 @@ def test_msg_modbus_req(config_tsun_inv1, msg_modbus_cmd): assert c.header_len==23 assert c.data_len==13 assert c._forward_buffer==b'' - assert c._send_buffer==b'' + assert c.ifc.write.get()==b'' assert m.id_str == b"R170000000000001" assert m._forward_buffer==b'' - assert m._send_buffer==b'' - assert m.writer.sent_pdu == msg_modbus_cmd + assert m.ifc.write.get()==b'' + assert m.sent_pdu == msg_modbus_cmd assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 1 assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 @@ -1740,11 +1739,11 @@ def test_msg_modbus_req2(config_tsun_inv1, msg_modbus_cmd): assert c.header_len==23 assert c.data_len==13 assert c._forward_buffer==b'' - assert c._send_buffer==b'' + assert c.ifc.write.get()==b'' assert m.id_str == b"R170000000000001" assert m._forward_buffer==b'' - assert m._send_buffer==b'' - assert m.writer.sent_pdu == b'' + assert m.ifc.write.get()==b'' + assert m.sent_pdu == b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 1 assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 @@ -1769,10 +1768,10 @@ def test_msg_modbus_req3(config_tsun_inv1, msg_modbus_cmd_crc_err): assert c.header_len==23 assert c.data_len==13 assert c._forward_buffer==b'' - assert c._send_buffer==b'' + assert c.ifc.write.get()==b'' assert m._forward_buffer==b'' - assert m._send_buffer==b'' - assert m.writer.sent_pdu ==b'' + assert m.ifc.write.get()==b'' + assert m.sent_pdu ==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 assert m.db.stat['proxy']['Invalid_Msg_Format'] == 1 @@ -1794,7 +1793,7 @@ def test_msg_modbus_rsp1(config_tsun_inv1, msg_modbus_rsp): assert m.header_len==23 assert m.data_len==13 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 m.close() @@ -1816,7 +1815,7 @@ def test_msg_modbus_cloud_rsp(config_tsun_inv1, msg_modbus_rsp): assert m.header_len==23 assert m.data_len==13 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Msg'] == 1 assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 @@ -1844,7 +1843,7 @@ def test_msg_modbus_rsp2(config_tsun_inv1, msg_modbus_rsp20): assert m.mb.err == 5 assert m.msg_count == 2 assert m._forward_buffer==msg_modbus_rsp20 - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.db == {'collector': {'Serial_Number': 'R170000000000001'}, 'inverter': {'Version': 'V5.1.09', 'Rated_Power': 300}, 'grid': {'Timestamp': m._utc(), 'Voltage': 225.9, 'Current': 0.41, 'Frequency': 49.99, 'Output_Power': 94.8}, 'env': {'Inverter_Temp': 22}, 'input': {'Timestamp': m._utc(), 'pv1': {'Voltage': 0.8, 'Current': 0.0, 'Power': 0.0}, 'pv2': {'Voltage': 34.5, 'Current': 2.89, 'Power': 99.8}, 'pv3': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}, 'pv4': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}}} assert m.db.get_db_value(Register.VERSION) == 'V5.1.09' assert m.db.get_db_value(Register.TS_GRID) == m._utc() @@ -1874,7 +1873,7 @@ def test_msg_modbus_rsp3(config_tsun_inv1, msg_modbus_rsp21): assert m.mb.err == 5 assert m.msg_count == 2 assert m._forward_buffer==msg_modbus_rsp21 - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.db == {'collector': {'Serial_Number': 'R170000000000001'}, 'inverter': {'Version': 'V5.1.0E', 'Rated_Power': 300}, 'grid': {'Timestamp': m._utc(), 'Voltage': 225.9, 'Current': 0.41, 'Frequency': 49.99, 'Output_Power': 94.8}, 'env': {'Inverter_Temp': 22}, 'input': {'Timestamp': m._utc(), 'pv1': {'Voltage': 0.8, 'Current': 0.0, 'Power': 0.0}, 'pv2': {'Voltage': 34.5, 'Current': 2.89, 'Power': 99.8}, 'pv3': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}, 'pv4': {'Voltage': 0.0, 'Current': 0.0, 'Power': 0.0}}} assert m.db.get_db_value(Register.VERSION) == 'V5.1.0E' assert m.db.get_db_value(Register.TS_GRID) == m._utc() @@ -1904,7 +1903,7 @@ def test_msg_modbus_rsp4(config_tsun_inv1, msg_modbus_rsp21): assert m.msg_count == 1 assert m._forward_buffer==msg_modbus_rsp21 assert m.modbus_elms == 19 - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.db == db_values assert m.db.get_db_value(Register.VERSION) == 'V5.1.0E' assert m.db.get_db_value(Register.TS_GRID) == m._utc() @@ -1928,7 +1927,7 @@ def test_msg_modbus_rsp_new(config_tsun_inv1, msg_modbus_rsp20_new): assert m.header_len==23 assert m.data_len==107 assert m._forward_buffer==b'' - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Modbus_Command'] == 0 m.close() @@ -1948,7 +1947,7 @@ def test_msg_modbus_invalid(config_tsun_inv1, msg_modbus_inv): assert m.header_len==23 assert m.data_len==13 assert m._forward_buffer==msg_modbus_inv - assert m._send_buffer==b'' + assert m.ifc.write.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 assert m.db.stat['proxy']['Modbus_Command'] == 0 m.close() @@ -1977,7 +1976,7 @@ def test_msg_modbus_fragment(config_tsun_inv1, msg_modbus_rsp20): assert m.header_len == 23 assert m.data_len == 50 assert m._forward_buffer==msg_modbus_rsp20 - assert m._send_buffer == b'' + assert m.ifc.write.get() == b'' assert m.mb.err == 0 assert m.modbus_elms == 20-1 # register 0x300d is unknown, so one value can't be mapped assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 @@ -1992,23 +1991,23 @@ async def test_msg_build_modbus_req(config_tsun_inv1, msg_modbus_cmd): await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0, logging.DEBUG) assert 0 == m.send_msg_ofs assert m._forward_buffer == b'' - assert m._send_buffer == b'' - assert m.writer.sent_pdu == b'' + assert m.ifc.write.get() == b'' + assert m.sent_pdu == b'' m.state = State.up await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0, logging.DEBUG) assert 0 == m.send_msg_ofs assert m._forward_buffer == b'' - assert m._send_buffer == b'' - assert m.writer.sent_pdu == msg_modbus_cmd + assert m.ifc.write.get() == b'' + assert m.sent_pdu == msg_modbus_cmd - m.writer.sent_pdu = bytearray(0) # clear send buffer for next test + m.sent_pdu = bytearray(0) # clear send buffer for next test m.test_exception_async_write = True await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0, logging.DEBUG) assert 0 == m.send_msg_ofs assert m._forward_buffer == b'' - assert m._send_buffer == b'' - assert m.writer.sent_pdu == b'' + assert m.ifc.write.get() == b'' + assert m.sent_pdu == b'' m.close() def test_modbus_no_polling(config_no_modbus_poll, msg_get_time): @@ -2027,7 +2026,7 @@ def test_modbus_no_polling(config_no_modbus_poll, msg_get_time): assert m.ts_offset==0 assert m.data_len==0 assert m._forward_buffer==msg_get_time - assert m._send_buffer==b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x89\xc6,_\x00' + assert m.ifc.write.get()==b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x89\xc6,_\x00' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -2051,24 +2050,24 @@ async def test_modbus_polling(config_tsun_inv1, msg_inverter_ind): assert m.ts_offset==0 assert m.data_len==120 assert m._forward_buffer==msg_inverter_ind - assert m._send_buffer==b'\x00\x00\x00\x14\x10R170000000000001\x99\x04\x01' + assert m.ifc.write.get()==b'\x00\x00\x00\x14\x10R170000000000001\x99\x04\x01' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 - m._send_buffer = bytearray(0) # clear send buffer for next test + m.ifc.write.clear() # clear send buffer for next test assert isclose(m.mb_timeout, 0.5) assert next(m.mb_timer.exp_count) == 0 await asyncio.sleep(0.5) - assert m.writer.sent_pdu==b'\x00\x00\x00 \x10R170000000000001pw\x00\x01\xa3(\x08\x01\x030\x00\x000J\xde' - assert m._send_buffer==b'' + assert m.sent_pdu==b'\x00\x00\x00 \x10R170000000000001pw\x00\x01\xa3(\x08\x01\x030\x00\x000J\xde' + assert m.ifc.write.get()==b'' await asyncio.sleep(0.5) - assert m.writer.sent_pdu==b'\x00\x00\x00 \x10R170000000000001pw\x00\x01\xa3(\x08\x01\x030\x00\x000J\xde' - assert m._send_buffer==b'' + assert m.sent_pdu==b'\x00\x00\x00 \x10R170000000000001pw\x00\x01\xa3(\x08\x01\x030\x00\x000J\xde' + assert m.ifc.write.get()==b'' await asyncio.sleep(0.5) - assert m.writer.sent_pdu==b'\x00\x00\x00 \x10R170000000000001pw\x00\x01\xa3(\x08\x01\x03\x20\x00\x00`N"' - assert m._send_buffer==b'' + assert m.sent_pdu==b'\x00\x00\x00 \x10R170000000000001pw\x00\x01\xa3(\x08\x01\x03\x20\x00\x00`N"' + assert m.ifc.write.get()==b'' assert next(m.mb_timer.exp_count) == 4 m.close()