add unit tests for modbus
This commit is contained in:
@@ -6,6 +6,9 @@ from app.src.gen3plus.solarman_v5 import SolarmanV5
|
||||
from app.src.config import Config
|
||||
from app.src.infos import Infos, Register
|
||||
|
||||
|
||||
pytest_plugins = ('pytest_asyncio',)
|
||||
|
||||
# initialize the proxy statistics
|
||||
Infos.static_init()
|
||||
|
||||
@@ -54,6 +57,9 @@ class MemoryStream(SolarmanV5):
|
||||
pass
|
||||
return copied_bytes
|
||||
|
||||
async def async_write(self, headline=''):
|
||||
pass
|
||||
|
||||
def _SolarmanV5__flush_recv_msg(self) -> None:
|
||||
super()._SolarmanV5__flush_recv_msg()
|
||||
self.msg_count += 1
|
||||
@@ -725,7 +731,7 @@ def test_device_rsp(ConfigTsunInv1, DeviceRspMsg):
|
||||
assert m.data_len == 0x0a
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==b''
|
||||
assert m._forward_buffer==b'' # DeviceRspMsg
|
||||
assert m._forward_buffer==DeviceRspMsg
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
m.close()
|
||||
|
||||
@@ -743,7 +749,7 @@ def test_inverter_rsp(ConfigTsunInv1, InverterRspMsg):
|
||||
assert m.data_len == 0x0a
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==b''
|
||||
assert m._forward_buffer==b'' # InverterRspMsg
|
||||
assert m._forward_buffer==InverterRspMsg
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
m.close()
|
||||
|
||||
@@ -779,7 +785,7 @@ def test_heartbeat_rsp(ConfigTsunInv1, HeartbeatRspMsg):
|
||||
assert m.data_len == 0x0a
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==b''
|
||||
assert m._forward_buffer==b'' # HeartbeatRspMsg
|
||||
assert m._forward_buffer==HeartbeatRspMsg
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
m.close()
|
||||
|
||||
@@ -820,7 +826,7 @@ def test_sync_start_rsp(ConfigTsunInv1, SyncStartRspMsg):
|
||||
assert m.data_len == 0x0a
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==b''
|
||||
assert m._forward_buffer==b'' # HeartbeatRspMsg
|
||||
assert m._forward_buffer==SyncStartRspMsg
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
m.close()
|
||||
|
||||
@@ -856,7 +862,7 @@ def test_sync_end_rsp(ConfigTsunInv1, SyncEndRspMsg):
|
||||
assert m.data_len == 0x0a
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==b''
|
||||
assert m._forward_buffer==b'' # HeartbeatRspMsg
|
||||
assert m._forward_buffer==SyncEndRspMsg
|
||||
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
|
||||
m.close()
|
||||
|
||||
@@ -942,7 +948,8 @@ def test_build_logger_modell(ConfigTsunAllowAll, DeviceIndMsg):
|
||||
assert 'V1.1.00.0B' == m.db.get_db_value(Register.COLLECTOR_FW_VERSION, 0).rstrip('\00')
|
||||
m.close()
|
||||
|
||||
def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandIndMsg):
|
||||
@pytest.mark.asyncio
|
||||
async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandIndMsg):
|
||||
ConfigTsunAllowAll
|
||||
m = MemoryStream(DeviceIndMsg, (0,), True)
|
||||
m.read()
|
||||
@@ -954,7 +961,7 @@ def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandIndMsg)
|
||||
|
||||
m._send_buffer = bytearray(0) # clear send buffer for next test
|
||||
m._forward_buffer = bytearray(0) # clear send buffer for next test
|
||||
m.send_at_cmd('AT+TIME=214028,1,60,120')
|
||||
await m.send_at_cmd('AT+TIME=214028,1,60,120')
|
||||
assert m._recv_buffer==b''
|
||||
assert m._send_buffer==AtCommandIndMsg
|
||||
assert m._forward_buffer==b''
|
||||
|
||||
Reference in New Issue
Block a user