increase test coverage
This commit is contained in:
@@ -6,8 +6,8 @@ import time
|
|||||||
|
|
||||||
from app.src.infos import Infos
|
from app.src.infos import Infos
|
||||||
from app.src.inverter_base import InverterBase
|
from app.src.inverter_base import InverterBase
|
||||||
from app.src.async_stream import AsyncStreamServer, AsyncStreamClient
|
from app.src.async_stream import AsyncStreamServer, AsyncStreamClient, StreamPtr
|
||||||
|
from app.src.messages import Message
|
||||||
from app.tests.test_modbus_tcp import FakeReader, FakeWriter
|
from app.tests.test_modbus_tcp import FakeReader, FakeWriter
|
||||||
from app.tests.test_inverter_base import config_conn, patch_open_connection
|
from app.tests.test_inverter_base import config_conn, patch_open_connection
|
||||||
|
|
||||||
@@ -16,6 +16,17 @@ pytest_plugins = ('pytest_asyncio',)
|
|||||||
# initialize the proxy statistics
|
# initialize the proxy statistics
|
||||||
Infos.static_init()
|
Infos.static_init()
|
||||||
|
|
||||||
|
class FakeProto(Message):
|
||||||
|
def __init__(self, server_side):
|
||||||
|
super().__init__(server_side, None, 10)
|
||||||
|
self.conn_no = 0
|
||||||
|
|
||||||
|
def fake_reader_fwd():
|
||||||
|
reader = FakeReader()
|
||||||
|
reader.test = FakeReader.RD_TEST_13_BYTES
|
||||||
|
reader.on_recv.set()
|
||||||
|
return reader
|
||||||
|
|
||||||
def test_timeout_cb():
|
def test_timeout_cb():
|
||||||
reader = FakeReader()
|
reader = FakeReader()
|
||||||
writer = FakeWriter()
|
writer = FakeWriter()
|
||||||
@@ -290,3 +301,80 @@ async def test_os_error():
|
|||||||
print(f'InverterBase refs:{gc.get_referrers(inv)}')
|
print(f'InverterBase refs:{gc.get_referrers(inv)}')
|
||||||
cnt += 1
|
cnt += 1
|
||||||
assert cnt == 0
|
assert cnt == 0
|
||||||
|
|
||||||
|
class TestType():
|
||||||
|
FWD_NO_EXCPT = 1
|
||||||
|
FWD_SW_EXCPT = 2
|
||||||
|
FWD_TIMEOUT = 3
|
||||||
|
FWD_OS_ERROR = 4
|
||||||
|
|
||||||
|
def create_remote(remote, test_type):
|
||||||
|
def update_hdr(buf):
|
||||||
|
return
|
||||||
|
def callback():
|
||||||
|
if test_type == TestType.FWD_SW_EXCPT:
|
||||||
|
remote.unknown_var += 1
|
||||||
|
elif test_type == TestType.FWD_TIMEOUT:
|
||||||
|
raise TimeoutError
|
||||||
|
elif test_type == TestType.FWD_OS_ERROR:
|
||||||
|
raise ConnectionRefusedError
|
||||||
|
|
||||||
|
remote.ifc = AsyncStreamClient(
|
||||||
|
FakeReader(), FakeWriter(), StreamPtr(None), None)
|
||||||
|
remote.ifc.prot_set_update_header_cb(update_hdr)
|
||||||
|
remote.ifc.prot_set_init_new_client_conn_cb(callback)
|
||||||
|
remote.stream = FakeProto(False)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_forward():
|
||||||
|
assert asyncio.get_running_loop()
|
||||||
|
remote = StreamPtr(None)
|
||||||
|
cnt = 0
|
||||||
|
|
||||||
|
async def _create_remote():
|
||||||
|
nonlocal cnt, remote
|
||||||
|
create_remote(remote, TestType.FWD_NO_EXCPT)
|
||||||
|
cnt += 1
|
||||||
|
|
||||||
|
cnt = 0
|
||||||
|
ifc = AsyncStreamServer(fake_reader_fwd(), FakeWriter(), None, _create_remote, remote)
|
||||||
|
ifc.fwd_add(b'test-forward_msg')
|
||||||
|
await ifc.server_loop()
|
||||||
|
assert cnt == 1
|
||||||
|
del ifc
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_forward_sw_except():
|
||||||
|
assert asyncio.get_running_loop()
|
||||||
|
remote = StreamPtr(None)
|
||||||
|
cnt = 0
|
||||||
|
|
||||||
|
async def _create_remote():
|
||||||
|
nonlocal cnt, remote
|
||||||
|
create_remote(remote, TestType.FWD_SW_EXCPT)
|
||||||
|
cnt += 1
|
||||||
|
|
||||||
|
cnt = 0
|
||||||
|
ifc = AsyncStreamServer(fake_reader_fwd(), FakeWriter(), None, _create_remote, remote)
|
||||||
|
ifc.fwd_add(b'test-forward_msg')
|
||||||
|
await ifc.server_loop()
|
||||||
|
assert cnt == 1
|
||||||
|
del ifc
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_forward_os_error():
|
||||||
|
assert asyncio.get_running_loop()
|
||||||
|
remote = StreamPtr(None)
|
||||||
|
cnt = 0
|
||||||
|
|
||||||
|
async def _create_remote():
|
||||||
|
nonlocal cnt, remote
|
||||||
|
create_remote(remote, TestType.FWD_OS_ERROR)
|
||||||
|
cnt += 1
|
||||||
|
|
||||||
|
cnt = 0
|
||||||
|
ifc = AsyncStreamServer(fake_reader_fwd(), FakeWriter(), None, _create_remote, remote)
|
||||||
|
ifc.fwd_add(b'test-forward_msg')
|
||||||
|
await ifc.server_loop()
|
||||||
|
assert cnt == 1
|
||||||
|
del ifc
|
||||||
|
|||||||
Reference in New Issue
Block a user