diff --git a/app/src/modbus_tcp.py b/app/src/modbus_tcp.py index f6e8245..724b25f 100644 --- a/app/src/modbus_tcp.py +++ b/app/src/modbus_tcp.py @@ -77,7 +77,7 @@ class ModbusTcp(): logging.debug(f'Inv-conn:{error}') except OSError as error: - if error.errno == 113: + if error.errno == 113: # pragma: no cover logging.debug(f'os-error:{error}') else: logging.info(f'os-error: {error}') diff --git a/app/tests/test_modbus_tcp.py b/app/tests/test_modbus_tcp.py index a39c2cd..da065fd 100644 --- a/app/tests/test_modbus_tcp.py +++ b/app/tests/test_modbus_tcp.py @@ -146,6 +146,22 @@ def patch_open_timeout(): with patch.object(asyncio, 'open_connection', new_open) as conn: yield conn +@pytest.fixture +def patch_open_value_error(): + def new_open(host: str, port: int): + raise ValueError + + with patch.object(asyncio, 'open_connection', new_open) as conn: + yield conn + +@pytest.fixture +def patch_open_conn_abort(): + def new_open(host: str, port: int): + raise ConnectionAbortedError + + with patch.object(asyncio, 'open_connection', new_open) as conn: + yield conn + @pytest.fixture def patch_no_mqtt(): with patch.object(Mqtt, 'publish') as conn: @@ -190,7 +206,7 @@ async def test_modbus_no_cnf(): assert Infos.stat['proxy']['Inverter_Cnt'] == 0 @pytest.mark.asyncio -async def test_modbus_cnf1(config_conn, patch_open_timeout): +async def test_modbus_timeout(config_conn, patch_open_timeout): _ = config_conn _ = patch_open_timeout assert asyncio.get_running_loop() @@ -207,6 +223,42 @@ async def test_modbus_cnf1(config_conn, patch_open_timeout): await asyncio.sleep(0.01) assert Infos.stat['proxy']['Inverter_Cnt'] == 0 +@pytest.mark.asyncio +async def test_modbus_value_err(config_conn, patch_open_value_error): + _ = config_conn + _ = patch_open_value_error + assert asyncio.get_running_loop() + Proxy.class_init() + + assert Infos.stat['proxy']['Inverter_Cnt'] == 0 + loop = asyncio.get_event_loop() + ModbusTcp(loop) + await asyncio.sleep(0.01) + for m in Message: + if (m.node_id == 'inv_2'): + assert False + + await asyncio.sleep(0.01) + assert Infos.stat['proxy']['Inverter_Cnt'] == 0 + +@pytest.mark.asyncio +async def test_modbus_conn_abort(config_conn, patch_open_conn_abort): + _ = config_conn + _ = patch_open_conn_abort + assert asyncio.get_running_loop() + Proxy.class_init() + + assert Infos.stat['proxy']['Inverter_Cnt'] == 0 + loop = asyncio.get_event_loop() + ModbusTcp(loop) + await asyncio.sleep(0.01) + for m in Message: + if (m.node_id == 'inv_2'): + assert False + + await asyncio.sleep(0.01) + assert Infos.stat['proxy']['Inverter_Cnt'] == 0 + @pytest.mark.asyncio async def test_modbus_cnf2(config_conn, patch_no_mqtt, patch_open): _ = config_conn