import pytest import struct import time from datetime import datetime from app.src.gen3plus.solarman_v5 import SolarmanV5 from app.src.config import Config from app.src.infos import Infos, Register from app.src.modbus import Modbus pytest_plugins = ('pytest_asyncio',) # initialize the proxy statistics Infos.static_init() timestamp = int(time.time()) # 1712861197 heartbeat = 60 class Writer(): def __init__(self): self.sent_pdu = b'' def write(self, pdu: bytearray): self.sent_pdu = pdu class MemoryStream(SolarmanV5): def __init__(self, msg, chunks = (0,), server_side: bool = True): super().__init__(server_side) if server_side: self.mb.timeout = 1 # overwrite for faster testing self.writer = Writer() self.__msg = msg self.__msg_len = len(msg) self.__chunks = chunks self.__offs = 0 self.__chunk_idx = 0 self.msg_count = 0 self.addr = 'Test: SrvSide' self.db.stat['proxy']['Invalid_Msg_Format'] = 0 self.db.stat['proxy']['AT_Command'] = 0 self.test_exception_async_write = False self.entity_prfx = '' def _timestamp(self): return timestamp def _heartbeat(self) -> int: return heartbeat def append_msg(self, msg): self.__msg += msg self.__msg_len += len(msg) def _read(self) -> int: copied_bytes = 0 try: if (self.__offs < self.__msg_len): len = self.__chunks[self.__chunk_idx] self.__chunk_idx += 1 if len!=0: self._recv_buffer += self.__msg[self.__offs:len] copied_bytes = len - self.__offs self.__offs = len else: self._recv_buffer += self.__msg[self.__offs:] copied_bytes = self.__msg_len - self.__offs self.__offs = self.__msg_len except: pass return copied_bytes async def async_write(self, headline=''): if self.test_exception_async_write: raise RuntimeError("Peer closed.") def createClientStream(self, msg, chunks = (0,)): c = MemoryStream(msg, chunks, False) self.remoteStream = c c. remoteStream = self return c def _SolarmanV5__flush_recv_msg(self) -> None: super()._SolarmanV5__flush_recv_msg() self.msg_count += 1 return def get_sn() -> bytes: return b'\x21\x43\x65\x7b' def get_inv_no() -> bytes: return b'T170000000000001' def get_invalid_sn(): return b'R170000000000002' def total(): ts = timestamp # convert int to little-endian bytes return struct.pack('