Files
tsun-gen3-proxy/app/tests/test_inverter_g3p.py
Stefan Allius 48965ffda9 S allius/issue398 (#406)
* setup logger for hypercorn and dashboard

* use logger.ini to setup dashboard logger

* workaround: restore the hypercorn logger config

- quart/hyercorn overwrites the logger config.
  as a workaround we restore the config at the
  beginning of a request

* fix the hypercorn log handler only once

* change proxy into a ASGI application

- move Quart init from server.py into app.py
- create Server class for config and loggin setup
- restore hypercorn logging configuration after
  start of Quart/Hypercorn

* move get_log_level into Server class

* define config in test_emu_init_close

* remove Web() instance from the testcase

- with importing app.py The blueprint Web() will
  automatically created and a second call in test-
  cases must avoided

* add unit tests

* move code from app.py into server.py

* test the init_logging_system() method

* add HypercornLogHndl tests

* fix deprecated pytest async warning

- Cleanup pending async tasks
- fix deprecated warning about event_loop

* add unit test for error handling in build_config()

* coverage: ignore quart template files

* check print output in test_save_and_restore

* update changelog
2025-05-10 19:32:13 +02:00

202 lines
5.7 KiB
Python

# test_with_pytest.py
import pytest
import asyncio
from mock import patch
from enum import Enum
from infos import Infos
from cnf.config import Config
from proxy import Proxy
from inverter_base import InverterBase
from singleton import Singleton
from gen3plus.inverter_g3p import InverterG3P
from test_modbus_tcp import patch_mqtt_err, patch_mqtt_except, test_port, test_hostname
pytest_plugins = ('pytest_asyncio',)
# initialize the proxy statistics
Infos.static_init()
@pytest.fixture
def config_conn():
Config.act_config = {
'mqtt':{
'host': test_hostname,
'port': test_port,
'user': '',
'passwd': ''
},
'ha':{
'auto_conf_prefix': 'homeassistant',
'discovery_prefix': 'homeassistant',
'entity_prefix': 'tsun',
'proxy_node_id': 'test_1',
'proxy_unique_id': ''
},
'solarman':{'enabled': True, 'host': 'test_cloud.local', 'port': 1234}, 'inverters':{'allow_all':True}
}
Config.log_path='app/tests/log/'
@pytest.fixture(scope="module", autouse=True)
def module_init():
Singleton._instances.clear()
yield
class FakeReader():
def __init__(self):
self.on_recv = asyncio.Event()
async def read(self, max_len: int):
await self.on_recv.wait()
return b''
def feed_eof(self):
return
class FakeWriter():
def write(self, buf: bytes):
return
def get_extra_info(self, sel: str):
if sel == 'peername':
return ('47.1.2.3', 10000)
elif sel == 'sockname':
return 'sock:1234'
assert False
def is_closing(self):
return False
def close(self):
return
async def wait_closed(self):
return
class MockType(Enum):
RD_TEST_0_BYTES = 1
RD_TEST_TIMEOUT = 2
RD_TEST_EXCEPT = 3
test = MockType.RD_TEST_0_BYTES
@pytest.fixture
def patch_open_connection():
async def new_conn(conn):
await asyncio.sleep(0)
return FakeReader(), FakeWriter()
def new_open(host: str, port: int):
if test == MockType.RD_TEST_TIMEOUT:
raise ConnectionRefusedError
elif test == MockType.RD_TEST_EXCEPT:
raise ValueError("Value cannot be negative") # Compliant
return new_conn(None)
with patch.object(asyncio, 'open_connection', new_open) as conn:
yield conn
@pytest.mark.asyncio
async def test_method_calls(my_loop, config_conn):
_ = config_conn
reader = FakeReader()
writer = FakeWriter()
InverterBase._registry.clear()
with InverterG3P(reader, writer, client_mode=False) as inverter:
assert inverter.local.stream
assert inverter.local.ifc
@pytest.mark.asyncio
async def test_remote_conn(my_loop, config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter:
await inverter.create_remote()
await asyncio.sleep(0)
assert inverter.remote.stream
@pytest.mark.asyncio
async def test_remote_except(my_loop, config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
global test
test = MockType.RD_TEST_TIMEOUT
with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter:
await inverter.create_remote()
await asyncio.sleep(0)
assert inverter.remote.stream==None
test = MockType.RD_TEST_EXCEPT
await inverter.create_remote()
await asyncio.sleep(0)
assert inverter.remote.stream==None
test = MockType.RD_TEST_0_BYTES
@pytest.mark.asyncio
async def test_mqtt_publish(my_loop, config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
Proxy.class_init()
with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter:
stream = inverter.local.stream
await inverter.async_publ_mqtt() # check call with invalid unique_id
stream._set_serial_no(snr= 123344)
stream.new_data['inverter'] = True
stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt()
assert stream.new_data['inverter'] == False
stream.new_data['env'] = True
stream.db.db['env'] = {}
await inverter.async_publ_mqtt()
assert stream.new_data['env'] == False
Infos.new_stat_data['proxy'] = True
await inverter.async_publ_mqtt()
assert Infos.new_stat_data['proxy'] == False
@pytest.mark.asyncio
async def test_mqtt_err(my_loop, config_conn, patch_open_connection, patch_mqtt_err):
_ = config_conn
_ = patch_open_connection
_ = patch_mqtt_err
assert asyncio.get_running_loop()
Proxy.class_init()
with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter:
stream = inverter.local.stream
stream._set_serial_no(snr= 123344)
stream.new_data['inverter'] = True
stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt()
assert stream.new_data['inverter'] == True
@pytest.mark.asyncio
async def test_mqtt_except(my_loop, config_conn, patch_open_connection, patch_mqtt_except):
_ = config_conn
_ = patch_open_connection
_ = patch_mqtt_except
assert asyncio.get_running_loop()
Proxy.class_init()
with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter:
stream = inverter.local.stream
stream._set_serial_no(snr= 123344)
stream.new_data['inverter'] = True
stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt()
assert stream.new_data['inverter'] == True