increase test coverage

This commit is contained in:
Stefan Allius
2024-09-07 18:04:28 +02:00
parent 865216b8d9
commit 2ab35a8257
4 changed files with 190 additions and 6 deletions

View File

@@ -1,7 +1,12 @@
import logging
from asyncio import StreamReader, StreamWriter
from async_stream import AsyncStream
from gen3.talent import Talent
if __name__ == "app.src.gen3.connection_g3":
from app.src.async_stream import AsyncStream
from app.src.gen3.talent import Talent
else: # pragma: no cover
from async_stream import AsyncStream
from gen3.talent import Talent
logger = logging.getLogger('conn')

View File

@@ -3,11 +3,17 @@ import traceback
import json
import asyncio
from asyncio import StreamReader, StreamWriter
from config import Config
from inverter import Inverter
from gen3.connection_g3 import ConnectionG3
from aiomqtt import MqttCodeError
from infos import Infos
if __name__ == "app.src.gen3.inverter_g3":
from app.src.config import Config
from app.src.inverter import Inverter
from app.src.gen3.connection_g3 import ConnectionG3
else: # pragma: no cover
from config import Config
from inverter import Inverter
from gen3.connection_g3 import ConnectionG3
from infos import Infos
logger_mqtt = logging.getLogger('mqtt')

View File

@@ -0,0 +1,84 @@
# test_with_pytest.py
import pytest
import asyncio
from mock import patch
from app.src.async_stream import AsyncStream
from app.src.gen3.connection_g3 import ConnectionG3
from app.src.gen3.talent import Talent
@pytest.fixture
def patch_async_init():
with patch.object(AsyncStream, '__init__') as conn:
yield conn
@pytest.fixture
def patch_talent_init():
with patch.object(Talent, '__init__') as conn:
yield conn
@pytest.fixture
def patch_healthy():
with patch.object(AsyncStream, 'healthy') as conn:
yield conn
@pytest.fixture
def patch_async_close():
with patch.object(AsyncStream, 'close') as conn:
yield conn
@pytest.fixture
def patch_talent_close():
with patch.object(Talent, 'close') as conn:
yield conn
class FakeReader():
def __init__(self):
self.on_recv = asyncio.Event()
async def read(self, max_len: int):
await self.on_recv.wait()
return b''
def feed_eof(self):
return
class FakeWriter():
def write(self, buf: bytes):
return
def get_extra_info(self, sel: str):
if sel == 'peername':
return 'remote.intern'
elif sel == 'sockname':
return 'sock:1234'
assert False
def is_closing(self):
return False
def close(self):
return
async def wait_closed(self):
return
def test_method_calls(patch_async_init, patch_talent_init, patch_healthy, patch_async_close, patch_talent_close):
spy1 = patch_async_init
spy2 = patch_talent_init
spy3 = patch_healthy
spy4 = patch_async_close
spy5 = patch_talent_close
reader = FakeReader()
writer = FakeWriter()
id_str = "id_string"
addr = ('proxy.local', 10000)
conn = ConnectionG3(reader, writer, addr,
remote_stream= None, server_side=True, id_str=id_str)
spy1.assert_called_once_with(conn, reader, writer, addr)
spy2.assert_called_once_with(conn, True, id_str)
conn.healthy()
spy3.assert_called_once()
conn.close()
spy4.assert_called_once()
spy5.assert_called_once()

View File

@@ -0,0 +1,89 @@
# test_with_pytest.py
import pytest
import asyncio
from mock import patch
from app.src.singleton import Singleton
from app.src.async_stream import AsyncStream
from app.src.gen3plus.connection_g3p import ConnectionG3P
from app.src.gen3plus.solarman_v5 import SolarmanV5
@pytest.fixture
def patch_async_init():
with patch.object(AsyncStream, '__init__') as conn:
yield conn
@pytest.fixture
def patch_solarman_init():
with patch.object(SolarmanV5, '__init__') as conn:
yield conn
@pytest.fixture(scope="module", autouse=True)
def module_init():
Singleton._instances.clear()
yield
@pytest.fixture
def patch_healthy():
with patch.object(AsyncStream, 'healthy') as conn:
yield conn
@pytest.fixture
def patch_async_close():
with patch.object(AsyncStream, 'close') as conn:
yield conn
@pytest.fixture
def patch_solarman_close():
with patch.object(SolarmanV5, 'close') as conn:
yield conn
class FakeReader():
def __init__(self):
self.on_recv = asyncio.Event()
async def read(self, max_len: int):
await self.on_recv.wait()
return b''
def feed_eof(self):
return
class FakeWriter():
def write(self, buf: bytes):
return
def get_extra_info(self, sel: str):
if sel == 'peername':
return 'remote.intern'
elif sel == 'sockname':
return 'sock:1234'
assert False
def is_closing(self):
return False
def close(self):
return
async def wait_closed(self):
return
def test_method_calls(patch_async_init, patch_solarman_init, patch_healthy, patch_async_close, patch_solarman_close):
spy1 = patch_async_init
spy2 = patch_solarman_init
spy3 = patch_healthy
spy4 = patch_async_close
spy5 = patch_solarman_close
reader = FakeReader()
writer = FakeWriter()
addr = ('proxy.local', 10000)
conn = ConnectionG3P(reader, writer, addr,
remote_stream= None, server_side=True, client_mode=False)
spy1.assert_called_once_with(conn, reader, writer, addr)
spy2.assert_called_once_with(conn, True, False)
conn.healthy()
spy3.assert_called_once()
conn.close()
spy4.assert_called_once()
spy5.assert_called_once()