add dcu_power topic and unit tests
This commit is contained in:
@@ -812,6 +812,15 @@ def dcu_data_rsp_msg(): # 0x1210
|
||||
msg += b'\x15'
|
||||
return msg
|
||||
|
||||
@pytest.fixture
|
||||
def dcu_command_ind_msg(): # 0x4510
|
||||
msg = b'\xa5\x17\x00\x10\x45\x94\x02' +get_dcu_sn() +b'\x05\x26\x30'
|
||||
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
msg += b'\x01\x01\x06\x01\x00\x01\x03\xe8'
|
||||
msg += correct_checksum(msg)
|
||||
msg += b'\x15'
|
||||
return msg
|
||||
|
||||
@pytest.fixture
|
||||
def config_tsun_allow_all():
|
||||
Config.act_config = {
|
||||
@@ -2402,3 +2411,80 @@ async def test_proxy_at_blocked(my_loop, config_tsun_inv1, patch_open_connection
|
||||
|
||||
assert Proxy.mqtt.key == 'tsun/inv1/at_resp'
|
||||
assert Proxy.mqtt.data == "+ok"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dcu_cmd(my_loop, config_tsun_allow_all, dcu_dev_ind_msg, dcu_dev_rsp_msg, dcu_data_ind_msg, dcu_data_rsp_msg, dcu_command_ind_msg, at_command_rsp_msg):
|
||||
_ = config_tsun_allow_all
|
||||
m = MemoryStream(dcu_dev_ind_msg, (0,), True)
|
||||
m.read() # read device ind
|
||||
assert m.control == 0x4110
|
||||
assert str(m.seq) == '01:92'
|
||||
assert m.ifc.tx_fifo.get()==dcu_dev_rsp_msg
|
||||
assert m.ifc.fwd_fifo.get()==dcu_dev_ind_msg
|
||||
|
||||
m.send_dcu_cmd(b'\x01\x01\x06\x01\x00\x01\x03\xe8')
|
||||
assert m.ifc.tx_fifo.get()==b''
|
||||
assert m.ifc.fwd_fifo.get()==b''
|
||||
assert m.sent_pdu == b''
|
||||
assert str(m.seq) == '01:92'
|
||||
assert Proxy.mqtt.key == ''
|
||||
assert Proxy.mqtt.data == ""
|
||||
|
||||
m.append_msg(dcu_data_ind_msg)
|
||||
m.read() # read inverter ind
|
||||
assert m.control == 0x4210
|
||||
assert str(m.seq) == '02:93'
|
||||
assert m.ifc.tx_fifo.get()==dcu_data_rsp_msg
|
||||
assert m.ifc.fwd_fifo.get()==dcu_data_ind_msg
|
||||
|
||||
m.send_dcu_cmd(b'\x01\x01\x06\x01\x00\x01\x03\xe8')
|
||||
assert m.ifc.fwd_fifo.get() == b''
|
||||
assert m.ifc.tx_fifo.get()== b''
|
||||
assert m.sent_pdu == dcu_command_ind_msg
|
||||
m.sent_pdu = bytearray()
|
||||
|
||||
assert str(m.seq) == '02:94'
|
||||
assert Proxy.mqtt.key == ''
|
||||
assert Proxy.mqtt.data == ""
|
||||
|
||||
# m.append_msg(at_command_rsp_msg)
|
||||
# m.read() # read at resp
|
||||
# assert m.control == 0x1510
|
||||
# assert str(m.seq) == '03:03'
|
||||
# assert m.ifc.rx_get()==b''
|
||||
# assert m.ifc.tx_fifo.get()==b''
|
||||
# assert m.ifc.fwd_fifo.get()==b''
|
||||
# assert Proxy.mqtt.key == 'tsun/at_resp'
|
||||
# assert Proxy.mqtt.data == "+ok"
|
||||
Proxy.mqtt.clear() # clear last test result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dcu_cmd_not_supported(my_loop, config_tsun_allow_all, device_ind_msg, device_rsp_msg, inverter_ind_msg, inverter_rsp_msg):
|
||||
_ = config_tsun_allow_all
|
||||
m = MemoryStream(device_ind_msg, (0,), True)
|
||||
m.read() # read device ind
|
||||
assert m.control == 0x4110
|
||||
assert str(m.seq) == '01:01'
|
||||
assert m.ifc.tx_fifo.get()==device_rsp_msg
|
||||
assert m.ifc.fwd_fifo.get()==device_ind_msg
|
||||
|
||||
m.send_dcu_cmd(b'\x01\x01\x06\x01\x00\x01\x03\xe8')
|
||||
assert m.ifc.tx_fifo.get()==b''
|
||||
assert m.ifc.fwd_fifo.get()==b''
|
||||
assert m.sent_pdu == b''
|
||||
assert str(m.seq) == '01:01'
|
||||
assert Proxy.mqtt.key == ''
|
||||
assert Proxy.mqtt.data == ""
|
||||
|
||||
m.append_msg(inverter_ind_msg)
|
||||
m.read() # read inverter ind
|
||||
assert m.control == 0x4210
|
||||
assert str(m.seq) == '02:02'
|
||||
assert m.ifc.tx_fifo.get()==inverter_rsp_msg
|
||||
assert m.ifc.fwd_fifo.get()==inverter_ind_msg
|
||||
|
||||
m.send_dcu_cmd(b'\x01\x01\x06\x01\x00\x01\x03\xe8')
|
||||
assert m.ifc.fwd_fifo.get() == b''
|
||||
assert m.ifc.tx_fifo.get()== b''
|
||||
assert m.sent_pdu == b''
|
||||
Proxy.mqtt.clear() # clear last test result
|
||||
|
||||
Reference in New Issue
Block a user