@@ -99,14 +99,22 @@ class MemoryStream(Talent):
|
||||
|
||||
@pytest.fixture
|
||||
def msg_contact_info(): # Contact Info message
|
||||
Config.act_config = {'tsun':{'enabled': True}}
|
||||
return b'\x00\x00\x00\x2c\x10R170000000000001\x91\x00\x08solarhub\x0fsolarhub\x40123456'
|
||||
|
||||
@pytest.fixture
|
||||
def msg_contact_info_empty(): # Contact Info message with empty string
|
||||
return b'\x00\x00\x00\x15\x10R170000000000001\x91\x00\x00\x00'
|
||||
|
||||
@pytest.fixture
|
||||
def msg_contact_info_long_id(): # Contact Info message with longer ID
|
||||
Config.act_config = {'tsun':{'enabled': True}}
|
||||
return b'\x00\x00\x00\x2d\x11R1700000000000011\x91\x00\x08solarhub\x0fsolarhub\x40123456'
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def msg_contact_info_broken(): # Contact Info message with invalid string coding
|
||||
return b'\x00\x00\x00\x2a\x10R170000000000001\x91\x00solarhubsolarhub\x40123456'
|
||||
|
||||
@pytest.fixture
|
||||
def msg2_contact_info(): # two Contact Info messages
|
||||
return b'\x00\x00\x00\x2c\x10R170000000000001\x91\x00\x08solarhub\x0fsolarhub\x40123456\x00\x00\x00\x2c\x10R170000000000002\x91\x00\x08solarhub\x0fsolarhub\x40123456'
|
||||
@@ -728,6 +736,7 @@ def multiple_recv_buf(): # There are three message in the buffer, but the second
|
||||
return msg
|
||||
|
||||
def test_read_message(msg_contact_info):
|
||||
Config.act_config = {'tsun':{'enabled': True}}
|
||||
m = MemoryStream(msg_contact_info, (0,))
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
@@ -782,6 +791,7 @@ def test_read_message_long_id(msg_contact_info_long_id):
|
||||
|
||||
|
||||
def test_read_message_in_chunks(msg_contact_info):
|
||||
Config.act_config = {'tsun':{'enabled': True}}
|
||||
m = MemoryStream(msg_contact_info, (4,23,0))
|
||||
m.read() # read 4 bytes, header incomplere
|
||||
assert not m.header_valid # must be invalid, since header not complete
|
||||
@@ -801,6 +811,7 @@ def test_read_message_in_chunks(msg_contact_info):
|
||||
m.close()
|
||||
|
||||
def test_read_message_in_chunks2(msg_contact_info):
|
||||
Config.act_config = {'tsun':{'enabled': True}}
|
||||
m = MemoryStream(msg_contact_info, (4,10,0))
|
||||
m.read() # read 4 bytes, header incomplere
|
||||
assert not m.header_valid
|
||||
@@ -852,6 +863,42 @@ def test_read_two_messages(config_tsun_allow_all, msg2_contact_info,msg_contact_
|
||||
assert m._send_buffer==b'\x00\x00\x00,\x10R170000000000002\x91\x00\x08solarhub\x0fsolarhub@123456'
|
||||
m.close()
|
||||
|
||||
def test_conttact_req(config_tsun_allow_all, msg_contact_info, msg_contact_rsp):
|
||||
_ = config_tsun_allow_all
|
||||
m = MemoryStream(msg_contact_info, (0,))
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 1
|
||||
assert m.id_str == b"R170000000000001"
|
||||
assert m.contact_name == b'solarhub'
|
||||
assert m.contact_mail == b'solarhub@123456'
|
||||
assert m.unique_id == 'R170000000000001'
|
||||
assert int(m.ctrl)==145
|
||||
assert m.msg_id==0
|
||||
assert m.header_len==23
|
||||
assert m.data_len==25
|
||||
assert m._forward_buffer==b''
|
||||
assert m._send_buffer==msg_contact_rsp
|
||||
m.close()
|
||||
|
||||
def test_contact_broken_req(config_tsun_allow_all, msg_contact_info_broken, msg_contact_rsp):
|
||||
_ = config_tsun_allow_all
|
||||
m = MemoryStream(msg_contact_info_broken, (0,))
|
||||
m.read() # read complete msg, and dispatch msg
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 1
|
||||
assert m.id_str == b"R170000000000001"
|
||||
assert m.contact_name == b''
|
||||
assert m.contact_mail == b''
|
||||
assert m.unique_id == 'R170000000000001'
|
||||
assert int(m.ctrl)==145
|
||||
assert m.msg_id==0
|
||||
assert m.header_len==23
|
||||
assert m.data_len==23
|
||||
assert m._forward_buffer==b''
|
||||
assert m._send_buffer==msg_contact_rsp
|
||||
m.close()
|
||||
|
||||
def test_msg_contact_resp(config_tsun_inv1, msg_contact_rsp):
|
||||
_ = config_tsun_inv1
|
||||
m = MemoryStream(msg_contact_rsp, (0,), False)
|
||||
|
||||
Reference in New Issue
Block a user