Files
tsun-gen3-proxy/app/tests/test_inverter_g3p.py
renovate[bot] 2707582a45 Update dependency flake8 to v7.2.0 (#330)
* Update dependency flake8 to v7.2.0

* Flake8: ignore F821 errors, due of False Positives

# cleanup some unit tests

---------

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: Stefan Allius <stefan.allius@t-online.de>
2025-04-04 14:29:00 +02:00

197 lines
5.6 KiB
Python

# test_with_pytest.py
import pytest
import asyncio
from mock import patch
from enum import Enum
from infos import Infos
from cnf.config import Config
from proxy import Proxy
from inverter_base import InverterBase
from singleton import Singleton
from gen3plus.inverter_g3p import InverterG3P
from test_modbus_tcp import patch_mqtt_err, patch_mqtt_except, test_port, test_hostname
pytest_plugins = ('pytest_asyncio',)
# initialize the proxy statistics
Infos.static_init()
@pytest.fixture
def config_conn():
Config.act_config = {
'mqtt':{
'host': test_hostname,
'port': test_port,
'user': '',
'passwd': ''
},
'ha':{
'auto_conf_prefix': 'homeassistant',
'discovery_prefix': 'homeassistant',
'entity_prefix': 'tsun',
'proxy_node_id': 'test_1',
'proxy_unique_id': ''
},
'solarman':{'enabled': True, 'host': 'test_cloud.local', 'port': 1234}, 'inverters':{'allow_all':True}
}
@pytest.fixture(scope="module", autouse=True)
def module_init():
Singleton._instances.clear()
yield
class FakeReader():
def __init__(self):
self.on_recv = asyncio.Event()
async def read(self, max_len: int):
await self.on_recv.wait()
return b''
def feed_eof(self):
return
class FakeWriter():
def write(self, buf: bytes):
return
def get_extra_info(self, sel: str):
if sel == 'peername':
return ('47.1.2.3', 10000)
elif sel == 'sockname':
return 'sock:1234'
assert False
def is_closing(self):
return False
def close(self):
return
async def wait_closed(self):
return
class MockType(Enum):
RD_TEST_0_BYTES = 1
RD_TEST_TIMEOUT = 2
RD_TEST_EXCEPT = 3
test = MockType.RD_TEST_0_BYTES
@pytest.fixture
def patch_open_connection():
async def new_conn(conn):
await asyncio.sleep(0)
return FakeReader(), FakeWriter()
def new_open(host: str, port: int):
if test == MockType.RD_TEST_TIMEOUT:
raise ConnectionRefusedError
elif test == MockType.RD_TEST_EXCEPT:
raise ValueError("Value cannot be negative") # Compliant
return new_conn(None)
with patch.object(asyncio, 'open_connection', new_open) as conn:
yield conn
def test_method_calls(config_conn):
_ = config_conn
reader = FakeReader()
writer = FakeWriter()
InverterBase._registry.clear()
with InverterG3P(reader, writer, client_mode=False) as inverter:
assert inverter.local.stream
assert inverter.local.ifc
@pytest.mark.asyncio
async def test_remote_conn(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter:
await inverter.create_remote()
await asyncio.sleep(0)
assert inverter.remote.stream
@pytest.mark.asyncio
async def test_remote_except(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
global test
test = MockType.RD_TEST_TIMEOUT
with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter:
await inverter.create_remote()
await asyncio.sleep(0)
assert inverter.remote.stream==None
test = MockType.RD_TEST_EXCEPT
await inverter.create_remote()
await asyncio.sleep(0)
assert inverter.remote.stream==None
@pytest.mark.asyncio
async def test_mqtt_publish(config_conn, patch_open_connection):
_ = config_conn
_ = patch_open_connection
assert asyncio.get_running_loop()
Proxy.class_init()
with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter:
stream = inverter.local.stream
await inverter.async_publ_mqtt() # check call with invalid unique_id
stream._set_serial_no(snr= 123344)
stream.new_data['inverter'] = True
stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt()
assert stream.new_data['inverter'] == False
stream.new_data['env'] = True
stream.db.db['env'] = {}
await inverter.async_publ_mqtt()
assert stream.new_data['env'] == False
Infos.new_stat_data['proxy'] = True
await inverter.async_publ_mqtt()
assert Infos.new_stat_data['proxy'] == False
@pytest.mark.asyncio
async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err):
_ = config_conn
_ = patch_open_connection
_ = patch_mqtt_err
assert asyncio.get_running_loop()
Proxy.class_init()
with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter:
stream = inverter.local.stream
stream._set_serial_no(snr= 123344)
stream.new_data['inverter'] = True
stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt()
assert stream.new_data['inverter'] == True
@pytest.mark.asyncio
async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except):
_ = config_conn
_ = patch_open_connection
_ = patch_mqtt_except
assert asyncio.get_running_loop()
Proxy.class_init()
with InverterG3P(FakeReader(), FakeWriter(), client_mode=False) as inverter:
stream = inverter.local.stream
stream._set_serial_no(snr= 123344)
stream.new_data['inverter'] = True
stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt()
assert stream.new_data['inverter'] == True