fix unit tests
This commit is contained in:
@@ -5,6 +5,7 @@ from datetime import datetime
|
|||||||
from app.src.gen3plus.solarman_v5 import SolarmanV5
|
from app.src.gen3plus.solarman_v5 import SolarmanV5
|
||||||
from app.src.config import Config
|
from app.src.config import Config
|
||||||
from app.src.infos import Infos, Register
|
from app.src.infos import Infos, Register
|
||||||
|
from app.src.modbus import Modbus
|
||||||
|
|
||||||
|
|
||||||
pytest_plugins = ('pytest_asyncio',)
|
pytest_plugins = ('pytest_asyncio',)
|
||||||
@@ -363,7 +364,7 @@ def SyncStartFwdMsg(): # 0x4310
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def AtCommandIndMsg(): # 0x4510
|
def AtCommandIndMsg(): # 0x4510
|
||||||
msg = b'\xa5\x27\x00\x10\x45\x02\x01' +get_sn() +b'\x01\x02\x00'
|
msg = b'\xa5\x27\x00\x10\x45\x03\x02' +get_sn() +b'\x01\x02\x00'
|
||||||
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
msg += b'AT+TIME=214028,1,60,120\r'
|
msg += b'AT+TIME=214028,1,60,120\r'
|
||||||
msg += correct_checksum(msg)
|
msg += correct_checksum(msg)
|
||||||
@@ -372,7 +373,7 @@ def AtCommandIndMsg(): # 0x4510
|
|||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def AtCommandRspMsg(): # 0x1510
|
def AtCommandRspMsg(): # 0x1510
|
||||||
msg = b'\xa5\x0a\x00\x10\x15\x02\x02' +get_sn() +b'\x01\x01'
|
msg = b'\xa5\x0a\x00\x10\x15\x03\x03' +get_sn() +b'\x01\x01'
|
||||||
msg += total()
|
msg += total()
|
||||||
msg += hb()
|
msg += hb()
|
||||||
msg += correct_checksum(msg)
|
msg += correct_checksum(msg)
|
||||||
@@ -416,6 +417,15 @@ def SyncEndRspMsg(): # 0x1810
|
|||||||
msg += b'\x15'
|
msg += b'\x15'
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def MsgModbusCmd():
|
||||||
|
msg = b'\xa5\x17\x00\x10\x45\x03\x02' +get_sn() +b'\x02\xb0\x02'
|
||||||
|
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x06\x20\x08'
|
||||||
|
msg += b'\x00\x00\x03\xc8'
|
||||||
|
msg += correct_checksum(msg)
|
||||||
|
msg += b'\x15'
|
||||||
|
return msg
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def ConfigTsunAllowAll():
|
def ConfigTsunAllowAll():
|
||||||
Config.config = {'solarman':{'enabled': True}, 'inverters':{'allow_all':True}}
|
Config.config = {'solarman':{'enabled': True}, 'inverters':{'allow_all':True}}
|
||||||
@@ -876,7 +886,7 @@ def test_at_command_ind(ConfigTsunInv1, AtCommandIndMsg, AtCommandRspMsg):
|
|||||||
assert m.snr == 2070233889
|
assert m.snr == 2070233889
|
||||||
# assert m.unique_id == '2070233889'
|
# assert m.unique_id == '2070233889'
|
||||||
assert m.control == 0x4510
|
assert m.control == 0x4510
|
||||||
assert str(m.seq) == '02:02'
|
assert str(m.seq) == '03:03'
|
||||||
assert m.data_len == 39
|
assert m.data_len == 39
|
||||||
assert m._recv_buffer==b''
|
assert m._recv_buffer==b''
|
||||||
assert m._send_buffer==AtCommandRspMsg
|
assert m._send_buffer==AtCommandRspMsg
|
||||||
@@ -949,21 +959,55 @@ def test_build_logger_modell(ConfigTsunAllowAll, DeviceIndMsg):
|
|||||||
m.close()
|
m.close()
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, AtCommandIndMsg):
|
async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, MsgModbusCmd):
|
||||||
ConfigTsunAllowAll
|
ConfigTsunInv1
|
||||||
m = MemoryStream(DeviceIndMsg, (0,), True)
|
m = MemoryStream(DeviceIndMsg, (0,), True)
|
||||||
|
m.append_msg(InverterIndMsg)
|
||||||
m.read()
|
m.read()
|
||||||
assert m.control == 0x4110
|
assert m.control == 0x4110
|
||||||
assert str(m.seq) == '01:01'
|
assert str(m.seq) == '01:01'
|
||||||
assert m._recv_buffer==b''
|
assert m._recv_buffer==InverterIndMsg # unhandled next message
|
||||||
assert m._send_buffer==DeviceRspMsg
|
assert m._send_buffer==DeviceRspMsg
|
||||||
assert m._forward_buffer==DeviceIndMsg
|
assert m._forward_buffer==DeviceIndMsg
|
||||||
|
|
||||||
|
m._send_buffer = bytearray(0) # clear send buffer for next test
|
||||||
|
m._forward_buffer = bytearray(0) # clear send buffer for next test
|
||||||
|
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||||
|
assert 0 == m.send_msg_ofs
|
||||||
|
assert m._forward_buffer == b''
|
||||||
|
assert m._send_buffer == b'' # modbus command must be ignore, cause connection is still not up
|
||||||
|
|
||||||
|
m.read()
|
||||||
|
assert m.control == 0x4210
|
||||||
|
assert str(m.seq) == '02:02'
|
||||||
|
assert m._recv_buffer==b''
|
||||||
|
assert m._send_buffer==InverterRspMsg
|
||||||
|
assert m._forward_buffer==InverterIndMsg
|
||||||
|
|
||||||
|
m._send_buffer = bytearray(0) # clear send buffer for next test
|
||||||
|
m._forward_buffer = bytearray(0) # clear send buffer for next test
|
||||||
|
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0)
|
||||||
|
assert 0 == m.send_msg_ofs
|
||||||
|
assert m._forward_buffer == b''
|
||||||
|
assert m._send_buffer == MsgModbusCmd
|
||||||
|
m.close()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_AT_cmd(ConfigTsunAllowAll, InverterIndMsg, InverterRspMsg, AtCommandIndMsg):
|
||||||
|
ConfigTsunAllowAll
|
||||||
|
m = MemoryStream(InverterIndMsg, (0,), True)
|
||||||
|
m.read()
|
||||||
|
assert m.control == 0x4210
|
||||||
|
assert str(m.seq) == '02:02'
|
||||||
|
assert m._recv_buffer==b''
|
||||||
|
assert m._send_buffer==InverterRspMsg
|
||||||
|
assert m._forward_buffer==InverterIndMsg
|
||||||
|
|
||||||
m._send_buffer = bytearray(0) # clear send buffer for next test
|
m._send_buffer = bytearray(0) # clear send buffer for next test
|
||||||
m._forward_buffer = bytearray(0) # clear send buffer for next test
|
m._forward_buffer = bytearray(0) # clear send buffer for next test
|
||||||
await m.send_at_cmd('AT+TIME=214028,1,60,120')
|
await m.send_at_cmd('AT+TIME=214028,1,60,120')
|
||||||
assert m._recv_buffer==b''
|
assert m._recv_buffer==b''
|
||||||
assert m._send_buffer==AtCommandIndMsg
|
assert m._send_buffer==AtCommandIndMsg
|
||||||
assert m._forward_buffer==b''
|
assert m._forward_buffer==b''
|
||||||
assert str(m.seq) == '01:02'
|
assert str(m.seq) == '02:03'
|
||||||
m.close()
|
m.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user