decode: updates all around
Rewrite packet handling so packets can span multiple USB URBs (which sometimes happens with WMC) and also add a bunch more WMC decoding stuff.
This commit is contained in:
118
decode/packet.py
118
decode/packet.py
@@ -53,8 +53,9 @@ def get_urb_info(l):
|
||||
if idx >= 0:
|
||||
direction = defs.TO_HOST
|
||||
else:
|
||||
raise Exception("Invalid packet start line")
|
||||
return (defs.TO_UNKNOWN, -1)
|
||||
|
||||
# Yay, valid packet, grab URB number
|
||||
numstr = ""
|
||||
for c in l[idx + 9:]:
|
||||
if c.isdigit():
|
||||
@@ -68,66 +69,90 @@ def get_urb_info(l):
|
||||
return (direction, int(numstr))
|
||||
|
||||
class Packet:
|
||||
def __init__(self, lines, control_prot, transfer_prot):
|
||||
def __init__(self, line, control_prot, transfer_prot):
|
||||
self.direction = defs.TO_UNKNOWN
|
||||
self.func = URBF_UNKNOWN
|
||||
self.control_prot = control_prot
|
||||
self.transfer_prot = transfer_prot
|
||||
self.extra = []
|
||||
self.data = None
|
||||
self.urbnum = 0
|
||||
self.protocol = None
|
||||
self.has_data = False
|
||||
self.typecode = None
|
||||
self.lines = []
|
||||
self.in_data = False
|
||||
self.data_complete = False
|
||||
self.tmpdata = ""
|
||||
self.fcomplete = None
|
||||
self.funpack = None
|
||||
self.fshow = None
|
||||
|
||||
# Parse the packet
|
||||
|
||||
(self.direction, self.urbnum) = get_urb_info(lines[0])
|
||||
# Check if this is actually a packet
|
||||
self.lines.append(line)
|
||||
(self.direction, self.urbnum) = get_urb_info(line)
|
||||
|
||||
try:
|
||||
(self.func, self.has_data, self.typecode) = funcs[lines[1].strip()]
|
||||
except KeyError:
|
||||
raise KeyError("URB function %s not handled" % lines[1].strip())
|
||||
def add_line(self, line):
|
||||
line = line.strip()
|
||||
if not len(line):
|
||||
return
|
||||
self.lines.append(line)
|
||||
|
||||
if self.func == URBF_TRANSFER:
|
||||
self.protocol = transfer_prot
|
||||
elif self.func == URBF_CONTROL:
|
||||
self.protocol = control_prot
|
||||
if line[0] == '[':
|
||||
# Usually the end of a packet, but if we need data from the next
|
||||
# packet keep going
|
||||
if self.has_data and not self.data_complete:
|
||||
return False
|
||||
return True
|
||||
|
||||
# Parse transfer buffer data
|
||||
in_data = False
|
||||
data = ""
|
||||
for i in range(2, len(lines)):
|
||||
l = lines[i].strip()
|
||||
if self.has_data:
|
||||
if l.startswith("TransferBufferMDL"):
|
||||
if in_data == True:
|
||||
raise Exception("Already in data")
|
||||
in_data = True
|
||||
elif l.startswith("UrbLink"):
|
||||
in_data = False
|
||||
elif in_data and len(l) and not "no data supplied" in l:
|
||||
d = l[l.index(": ") + 2:] # get data alone
|
||||
data += d.replace(" ", "")
|
||||
if not self.typecode:
|
||||
# haven't gotten our "-- URB_FUNCTION_xxxx" line yet
|
||||
if line.find("-- URB_FUNCTION_") >= 0:
|
||||
try:
|
||||
(self.func, self.has_data, self.typecode) = funcs[line]
|
||||
except KeyError:
|
||||
raise KeyError("URB function %s not handled" % line)
|
||||
|
||||
if self.func == URBF_TRANSFER:
|
||||
self.protocol = self.transfer_prot
|
||||
elif self.func == URBF_CONTROL:
|
||||
self.protocol = self.control_prot
|
||||
|
||||
if self.protocol:
|
||||
exec "from %s import get_funcs" % self.protocol
|
||||
(self.fcomplete, self.funpack, self.fshow) = get_funcs()
|
||||
else:
|
||||
self.extra.append(l)
|
||||
return False # not done; need more lines
|
||||
|
||||
if len(data) > 0:
|
||||
self.parse_data(data)
|
||||
if line.find("TransferBufferMDL = ") >= 0 and self.has_data:
|
||||
self.in_data = True
|
||||
return False # not done; need more lines
|
||||
|
||||
def get_funcs(self):
|
||||
if self.protocol:
|
||||
exec "from %s import get_funcs" % self.protocol
|
||||
return get_funcs()
|
||||
return (None, None)
|
||||
if line.find("UrbLink = ") >= 0:
|
||||
if self.in_data:
|
||||
self.in_data = False
|
||||
|
||||
def parse_data(self, data):
|
||||
if not self.has_data:
|
||||
raise Exception("Data only valid for URBF_TRANSFER or URBF_CONTROL")
|
||||
# special case: zero-length data means complete
|
||||
if len(self.tmpdata) == 0:
|
||||
self.data_complete = True
|
||||
return True
|
||||
|
||||
(unpack, show) = self.get_funcs()
|
||||
if unpack:
|
||||
self.data = unpack(data, self.direction)
|
||||
else:
|
||||
self.data = binascii.unhexlify(data)
|
||||
if self.fcomplete:
|
||||
self.data_complete = self.fcomplete(self.tmpdata, self.direction)
|
||||
if self.data_complete:
|
||||
self.data = self.funpack(self.tmpdata, self.direction)
|
||||
return self.data_complete
|
||||
else:
|
||||
self.data = binascii.unhexlify(self.tmpdata)
|
||||
self.data_complete = True
|
||||
return False # not done; need more lines
|
||||
|
||||
if self.in_data:
|
||||
if len(line) and not "no data supplied" in line:
|
||||
d = line[line.index(": ") + 2:] # get data alone
|
||||
self.tmpdata += d.replace(" ", "")
|
||||
|
||||
return False # not done; need more lines
|
||||
|
||||
def add_ascii(self, line, items):
|
||||
if len(line) < 53:
|
||||
@@ -188,7 +213,6 @@ class Packet:
|
||||
print prefix + output
|
||||
print ""
|
||||
|
||||
(unpack, show) = self.get_funcs()
|
||||
if show:
|
||||
show(self.data, " " * 8, self.direction)
|
||||
if self.fshow:
|
||||
self.fshow(self.data, " " * 8, self.direction)
|
||||
|
||||
|
Reference in New Issue
Block a user