# test_with_pytest.py import pytest import asyncio import sys,gc import weakref from mock import patch from enum import Enum from app.src.infos import Infos from app.src.config import Config from app.src.inverter import Inverter from app.src.singleton import Singleton from app.src.protocol_ifc import ProtocolIfc from app.src.inverter_base import InverterBase from app.src.messages import Message from app.src.async_stream import AsyncStream from app.tests.test_modbus_tcp import patch_mqtt_err, patch_mqtt_except, test_port, test_hostname pytest_plugins = ('pytest_asyncio',) # initialize the proxy statistics Infos.static_init() @pytest.fixture def config_conn(): Config.act_config = { 'mqtt':{ 'host': test_hostname, 'port': test_port, 'user': '', 'passwd': '' }, 'ha':{ 'auto_conf_prefix': 'homeassistant', 'discovery_prefix': 'homeassistant', 'entity_prefix': 'tsun', 'proxy_node_id': 'test_1', 'proxy_unique_id': '' }, 'tsun':{'enabled': True, 'host': 'test_cloud.local', 'port': 1234}, 'inverters':{'allow_all':True} } @pytest.fixture(scope="module", autouse=True) def module_init(): Singleton._instances.clear() yield class FakeProtocol(ProtocolIfc): def __init__(self, addr, ifc, server_side: bool, client_mode: bool = False, id_str=b''): # self._registry.append(weakref.ref(self)) pass # empty mockup def close(self): pass # empty mockup class FakeReader(): def __init__(self): self.on_recv = asyncio.Event() async def read(self, max_len: int): await self.on_recv.wait() return b'' def feed_eof(self): return class FakeWriter(): def write(self, buf: bytes): return def get_extra_info(self, sel: str): if sel == 'peername': return 'remote.intern' elif sel == 'sockname': return 'sock:1234' assert False def is_closing(self): return False def close(self): return async def wait_closed(self): return class TestType(Enum): RD_TEST_0_BYTES = 1 RD_TEST_TIMEOUT = 2 RD_TEST_EXCEPT = 3 test = TestType.RD_TEST_0_BYTES @pytest.fixture def patch_open_connection(): async def new_conn(conn): await asyncio.sleep(0) return FakeReader(), FakeWriter() def new_open(host: str, port: int): global test if test == TestType.RD_TEST_TIMEOUT: raise ConnectionRefusedError elif test == TestType.RD_TEST_EXCEPT: raise ValueError("Value cannot be negative") # Compliant return new_conn(None) with patch.object(asyncio, 'open_connection', new_open) as conn: yield conn @pytest.fixture def get_test_inverter(): reader = FakeReader() writer = FakeWriter() with InverterBase(reader, writer, 'tsun', FakeProtocol) as inverter: yield inverter @pytest.fixture def patch_healthy(): with patch.object(AsyncStream, 'healthy') as conn: yield conn def test_method_calls(get_test_inverter, patch_healthy): spy = patch_healthy with get_test_inverter as inverter: assert inverter.local.stream assert inverter.local.ifc for inv in Inverter: inv.healthy() del inv spy.assert_called_once() del inverter cnt = 0 for inv in Inverter: print(f'inv:{gc.get_referrers()}') cnt += 1 assert cnt == 0 @pytest.mark.asyncio async def test_remote_conn(config_conn, patch_open_connection): _ = config_conn _ = patch_open_connection assert asyncio.get_running_loop() with InverterBase(FakeReader(), FakeWriter()) as inverter: await inverter.async_create_remote() await asyncio.sleep(0) assert inverter.remote.stream del inverter cnt = 0 for inv in Inverter: print(f'Inverter refs:{gc.get_referrers(inv)}') cnt += 1 assert cnt == 0 @pytest.mark.asyncio async def test_remote_except(config_conn, patch_open_connection): _ = config_conn _ = patch_open_connection assert asyncio.get_running_loop() global test test = TestType.RD_TEST_TIMEOUT with InverterBase(FakeReader(), FakeWriter()) as inverter: await inverter.async_create_remote() await asyncio.sleep(0) assert inverter.remote.stream==None test = TestType.RD_TEST_EXCEPT await inverter.async_create_remote() await asyncio.sleep(0) assert inverter.remote.stream==None del inverter cnt = 0 for inv in Inverter: print(f'Inverter refs:{gc.get_referrers(inv)}') cnt += 1 assert cnt == 0 @pytest.mark.asyncio async def test_mqtt_publish(config_conn, patch_open_connection): _ = config_conn _ = patch_open_connection assert asyncio.get_running_loop() Inverter.class_init() with InverterBase(FakeReader(), FakeWriter()) as inverter: stream = inverter.local.stream await inverter.async_publ_mqtt() # check call with invalid unique_id stream._Talent__set_serial_no(serial_no= "123344") stream.new_data['inverter'] = True stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() assert stream.new_data['inverter'] == False stream.new_data['env'] = True stream.db.db['env'] = {} await inverter.async_publ_mqtt() assert stream.new_data['env'] == False Infos.new_stat_data['proxy'] = True await inverter.async_publ_mqtt() assert Infos.new_stat_data['proxy'] == False @pytest.mark.asyncio async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err): _ = config_conn _ = patch_open_connection _ = patch_mqtt_err assert asyncio.get_running_loop() Inverter.class_init() with InverterBase(FakeReader(), FakeWriter()) as inverter: stream = inverter.local.stream stream._Talent__set_serial_no(serial_no= "123344") stream.new_data['inverter'] = True stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() assert stream.new_data['inverter'] == True @pytest.mark.asyncio async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except): _ = config_conn _ = patch_open_connection _ = patch_mqtt_except assert asyncio.get_running_loop() Inverter.class_init() with InverterBase(FakeReader(), FakeWriter()) as inverter: stream = inverter.local.stream stream._Talent__set_serial_no(serial_no= "123344") stream.new_data['inverter'] = True stream.db.db['inverter'] = {} await inverter.async_publ_mqtt() assert stream.new_data['inverter'] == True