S allius/issue186 (#187)
* Parse more values in Server Mode Fixes #186 * read OUTPUT_COEFFICIENT and MAC_ADDR in SrvMode * fix unit test * increase test coverage
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
# test_with_pytest.py
|
||||
import pytest
|
||||
import asyncio
|
||||
from aiomqtt import MqttCodeError
|
||||
|
||||
from mock import patch
|
||||
from enum import Enum
|
||||
@@ -132,6 +133,22 @@ def patch_no_mqtt():
|
||||
with patch.object(Mqtt, 'publish') as conn:
|
||||
yield conn
|
||||
|
||||
@pytest.fixture
|
||||
def patch_mqtt_err():
|
||||
def new_publish(self, key, data):
|
||||
raise MqttCodeError(None)
|
||||
|
||||
with patch.object(Mqtt, 'publish', new_publish) as conn:
|
||||
yield conn
|
||||
|
||||
@pytest.fixture
|
||||
def patch_mqtt_except():
|
||||
def new_publish(self, key, data):
|
||||
raise ValueError("Test")
|
||||
|
||||
with patch.object(Mqtt, 'publish', new_publish) as conn:
|
||||
yield conn
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_modbus_conn(patch_open):
|
||||
_ = patch_open
|
||||
@@ -231,3 +248,67 @@ async def test_modbus_cnf3(config_conn, patch_no_mqtt, patch_open):
|
||||
assert 2 == test
|
||||
await asyncio.sleep(0.01)
|
||||
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mqtt_err(config_conn, patch_mqtt_err, patch_open):
|
||||
_ = config_conn
|
||||
_ = patch_open
|
||||
_ = patch_mqtt_err
|
||||
global test
|
||||
assert asyncio.get_running_loop()
|
||||
Inverter.class_init()
|
||||
test = TestType.RD_TEST_0_BYTES
|
||||
|
||||
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
|
||||
ModbusTcp(asyncio.get_event_loop(), tim_restart= 0)
|
||||
await asyncio.sleep(0.01)
|
||||
test = 0
|
||||
for m in Message:
|
||||
if (m.node_id == 'inv_2'):
|
||||
assert Infos.stat['proxy']['Inverter_Cnt'] == 1
|
||||
test += 1
|
||||
if test == 1:
|
||||
m.shutdown_started = False
|
||||
m.reader.on_recv.set()
|
||||
await asyncio.sleep(0.1)
|
||||
assert m.state == State.closed
|
||||
await asyncio.sleep(0.1)
|
||||
else:
|
||||
m.shutdown_started = True
|
||||
m.reader.on_recv.set()
|
||||
del m
|
||||
|
||||
await asyncio.sleep(0.01)
|
||||
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mqtt_except(config_conn, patch_mqtt_except, patch_open):
|
||||
_ = config_conn
|
||||
_ = patch_open
|
||||
_ = patch_mqtt_except
|
||||
global test
|
||||
assert asyncio.get_running_loop()
|
||||
Inverter.class_init()
|
||||
test = TestType.RD_TEST_0_BYTES
|
||||
|
||||
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
|
||||
ModbusTcp(asyncio.get_event_loop(), tim_restart= 0)
|
||||
await asyncio.sleep(0.01)
|
||||
test = 0
|
||||
for m in Message:
|
||||
if (m.node_id == 'inv_2'):
|
||||
assert Infos.stat['proxy']['Inverter_Cnt'] == 1
|
||||
test += 1
|
||||
if test == 1:
|
||||
m.shutdown_started = False
|
||||
m.reader.on_recv.set()
|
||||
await asyncio.sleep(0.1)
|
||||
assert m.state == State.closed
|
||||
await asyncio.sleep(0.1)
|
||||
else:
|
||||
m.shutdown_started = True
|
||||
m.reader.on_recv.set()
|
||||
del m
|
||||
|
||||
await asyncio.sleep(0.01)
|
||||
assert Infos.stat['proxy']['Inverter_Cnt'] == 0
|
||||
|
||||
Reference in New Issue
Block a user