import pytest import struct import time import asyncio import logging from app.src.gen3plus.solarman_v5 import SolarmanV5 from app.src.config import Config from app.src.infos import Infos, Register from app.src.modbus import Modbus from app.src.messages import State pytest_plugins = ('pytest_asyncio',) # initialize the proxy statistics Infos.static_init() timestamp = int(time.time()) # 1712861197 heartbeat = 60 class Writer(): def __init__(self): self.sent_pdu = b'' def write(self, pdu: bytearray): self.sent_pdu = pdu class Mqtt(): def __init__(self): self.key = '' self.data = '' async def publish(self, key, data): self.key = key self.data = data class MemoryStream(SolarmanV5): def __init__(self, msg, chunks = (0,), server_side: bool = True): super().__init__(server_side, client_mode=False) if server_side: self.mb.timeout = 0.4 # overwrite for faster testing self.mb_start_timeout = 0.5 self.mb_timeout = 0.5 self.writer = Writer() self.mqtt = Mqtt() self.__msg = msg self.__msg_len = len(msg) self.__chunks = chunks self.__offs = 0 self.__chunk_idx = 0 self.msg_count = 0 self.addr = 'Test: SrvSide' self.db.stat['proxy']['Invalid_Msg_Format'] = 0 self.db.stat['proxy']['AT_Command'] = 0 self.db.stat['proxy']['AT_Command_Blocked'] = 0 self.test_exception_async_write = False self.entity_prfx = '' self.at_acl = {'mqtt': {'allow': ['AT+'], 'block': ['AT+WEBU']}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE', 'AT+TIME'], 'block': ['AT+WEBU']}} self.key = '' self.data = '' self.msg_recvd = [] def _timestamp(self): return timestamp def _heartbeat(self) -> int: return heartbeat def append_msg(self, msg): self.__msg += msg self.__msg_len += len(msg) self.__chunk_idx = 0 def publish_mqtt(self, key, data): self.key = key self.data = data def _read(self) -> int: copied_bytes = 0 try: if (self.__offs < self.__msg_len): chunk_len = self.__chunks[self.__chunk_idx] self.__chunk_idx += 1 if chunk_len!=0: self._recv_buffer += self.__msg[self.__offs:chunk_len] copied_bytes = chunk_len - self.__offs self.__offs = chunk_len else: self._recv_buffer += self.__msg[self.__offs:] copied_bytes = self.__msg_len - self.__offs self.__offs = self.__msg_len except Exception: pass # ignore exceptions here return copied_bytes async def async_write(self, headline=''): if self.test_exception_async_write: raise RuntimeError("Peer closed.") def createClientStream(self, msg, chunks = (0,)): c = MemoryStream(msg, chunks, False) self.remote_stream = c c. remote_stream = self return c def _SolarmanV5__flush_recv_msg(self) -> None: self.msg_recvd.append( { 'control': self.control, 'seq': str(self.seq), 'data_len': self.data_len } ) super()._SolarmanV5__flush_recv_msg() self.msg_count += 1 def get_sn() -> bytes: return b'\x21\x43\x65\x7b' def get_sn_int() -> int: return 2070233889 def get_inv_no() -> bytes: return b'T170000000000001' def get_invalid_sn(): return b'R170000000000002' def total(): ts = timestamp # convert int to little-endian bytes return struct.pack('