improve test coverage und reduce test delays

This commit is contained in:
Stefan Allius
2024-09-03 17:23:09 +02:00
parent 215dcd98e6
commit a76c0ac440
4 changed files with 24 additions and 32 deletions

View File

@@ -25,10 +25,10 @@ class ConnectionG3(AsyncStream, Talent):
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
async def async_create_remote(self) -> None:
pass # virtual interface
pass # virtual interface # pragma: no cover
async def async_publ_mqtt(self) -> None:
pass # virtual interface
pass # virtual interface # pragma: no cover
def healthy(self) -> bool:
logger.debug('ConnectionG3 healthy()')

View File

@@ -31,10 +31,10 @@ class ConnectionG3P(AsyncStream, SolarmanV5):
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
async def async_create_remote(self) -> None:
pass # virtual interface
pass # virtual interface # pragma: no cover
async def async_publ_mqtt(self) -> None:
pass # virtual interface
pass # virtual interface # pragma: no cover
def healthy(self) -> bool:
logger.debug('ConnectionG3P healthy()')

View File

@@ -110,7 +110,7 @@ class FakeWriter():
@pytest.fixture
def patch_open():
async def new_conn(conn):
await asyncio.sleep(0.01)
await asyncio.sleep(0)
return FakeReader(), FakeWriter()
def new_open(host: str, port: int):
@@ -161,12 +161,12 @@ async def test_modbus_cnf1(config_conn, patch_open):
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
loop = asyncio.get_event_loop()
ModbusTcp(loop)
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
for m in Message:
if (m.node_id == 'inv_2'):
assert False
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
@pytest.mark.asyncio
@@ -181,7 +181,7 @@ async def test_modbus_cnf2(config_conn, patch_no_mqtt, patch_open):
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
ModbusTcp(asyncio.get_event_loop())
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
test = 0
for m in Message:
if (m.node_id == 'inv_2'):
@@ -192,7 +192,7 @@ async def test_modbus_cnf2(config_conn, patch_no_mqtt, patch_open):
del m
assert 1 == test
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
# check that the connection is released
for m in Message:
@@ -211,32 +211,24 @@ async def test_modbus_cnf3(config_conn, patch_no_mqtt, patch_open):
test = TestType.RD_TEST_0_BYTES
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
ModbusTcp(asyncio.get_event_loop(), tim_restart= 0.1)
await asyncio.sleep(0.1)
ModbusTcp(asyncio.get_event_loop(), tim_restart= 0)
await asyncio.sleep(0.01)
test = 0
for m in Message:
if (m.node_id == 'inv_2'):
assert Infos.stat['proxy']['Inverter_Cnt'] == 1
m.shutdown_started = False
m.reader.on_recv.set()
test += 1
await asyncio.sleep(0.1)
assert m.state == State.closed
assert 1 == test
await asyncio.sleep(0.1)
assert Infos.stat['proxy']['Inverter_Cnt'] == 1
# check that the connection is released
for m in Message:
if (m.node_id == 'inv_2'):
test += 1
m.shutdown_started = True
m.reader.on_recv.set()
del m
if test == 1:
m.shutdown_started = False
m.reader.on_recv.set()
await asyncio.sleep(0.1)
assert m.state == State.closed
await asyncio.sleep(0.1)
else:
m.shutdown_started = True
m.reader.on_recv.set()
del m
assert 3 == test
await asyncio.sleep(0.1)
assert 2 == test
await asyncio.sleep(0.01)
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
for m in Message:
if (m.node_id == 'inv_2'):
assert False

View File

@@ -97,7 +97,7 @@ async def test_mqtt_no_config(config_no_conn):
try:
m = Mqtt(cb)
assert m.task
await asyncio.sleep(1)
await asyncio.sleep(0)
assert not on_connect.is_set()
try:
await m.publish('homeassistant/status', 'online')