add more test cases
This commit is contained in:
@@ -75,6 +75,7 @@ def test_read_message(MsgContactInfo):
|
||||
assert m.msg_id==0
|
||||
assert m.header_len==23
|
||||
assert m.data_len==25
|
||||
m.close()
|
||||
|
||||
|
||||
def test_read_message_long_id(MsgContactInfo_LongId):
|
||||
@@ -93,6 +94,7 @@ def test_read_message_long_id(MsgContactInfo_LongId):
|
||||
m.read() # try to read rest of message, but there is no chunk available
|
||||
assert m.header_valid # must be valid, since header is complete but not the msg
|
||||
assert m.msg_count == 0
|
||||
m.close()
|
||||
|
||||
|
||||
def test_read_message_in_chunks(MsgContactInfo):
|
||||
@@ -111,6 +113,7 @@ def test_read_message_in_chunks(MsgContactInfo):
|
||||
m.read() # read rest of message
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
assert m.msg_count == 1
|
||||
m.close()
|
||||
|
||||
def test_read_message_in_chunks2(MsgContactInfo):
|
||||
m = MemoryStream(MsgContactInfo, (4,10,0))
|
||||
@@ -132,6 +135,7 @@ def test_read_message_in_chunks2(MsgContactInfo):
|
||||
pass
|
||||
assert m.msg_count == 1
|
||||
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
|
||||
m.close()
|
||||
|
||||
def test_read_two_messages(Msg2ContactInfo):
|
||||
m = MemoryStream(Msg2ContactInfo, (0,))
|
||||
@@ -151,6 +155,7 @@ def test_read_two_messages(Msg2ContactInfo):
|
||||
assert m.msg_id==0
|
||||
assert m.header_len==23
|
||||
assert m.data_len==25
|
||||
m.close()
|
||||
|
||||
def test_ctrl_byte():
|
||||
c = Control(0x91)
|
||||
@@ -161,3 +166,36 @@ def test_ctrl_byte():
|
||||
assert c.is_resp()
|
||||
|
||||
|
||||
def test_msg_interator():
|
||||
m1 = Message()
|
||||
m2 = Message()
|
||||
m3 = Message()
|
||||
m3.close()
|
||||
del m3
|
||||
test1 = 0
|
||||
test2 = 0
|
||||
for key in Message:
|
||||
if key == m1:
|
||||
test1+=1
|
||||
elif key == m2:
|
||||
test2+=1
|
||||
else:
|
||||
assert False
|
||||
assert test1 == 1
|
||||
assert test2 == 1
|
||||
|
||||
def test_proxy_counter():
|
||||
m = Message()
|
||||
assert m.new_data == {}
|
||||
assert 'proxy' in m.db.stat
|
||||
assert 0 == m.db.stat['proxy']['Unknown_Msg']
|
||||
|
||||
m.inc_counter('Unknown_Msg')
|
||||
assert m.new_data == {'proxy': True}
|
||||
assert 1 == m.db.stat['proxy']['Unknown_Msg']
|
||||
|
||||
m.new_data['proxy'] = False
|
||||
m.dec_counter('Unknown_Msg')
|
||||
assert m.new_data == {'proxy': True}
|
||||
assert 0 == m.db.stat['proxy']['Unknown_Msg']
|
||||
m.close()
|
||||
|
||||
Reference in New Issue
Block a user