[0.2.x] updated: nbl.py

git-svn-id: https://pykd.svn.codeplex.com/svn@82760 9b283d60-5439-405e-af05-b73fd8c4d996
This commit is contained in:
SND\kernelnet_cp 2013-02-27 16:16:17 +00:00 committed by Mikhail I. Izmestev
parent f03c94608e
commit 4da6671e23

View File

@ -2,7 +2,7 @@
# #
# #
import sys from optparse import OptionParser
from pykd import * from pykd import *
IPv4 = 0x0008 IPv4 = 0x0008
@ -14,6 +14,27 @@ UDP_PROTO = 0x11
TCP_PROTO = 0x06 TCP_PROTO = 0x06
NET_BUFFER_LIST = None
MDL = None
NET_BUFFER = None
def getNdisTypesInfo():
ndis = module("ndis")
global NET_BUFFER_LIST
global MDL
global NET_BUFFER
try:
NET_BUFFER_LIST = ndis.type("_NET_BUFFER_LIST")
MDL = ndis.type("_MDL")
NET_BUFFER = ndis.type("_NET_BUFFER")
except SymbolException:
NET_BUFFER_LIST =typeInfo("_NET_BUFFER_LIST")
MDL = typeInfo("_MDL")
NET_BUFFER = typeInfo("_NET_BUFFER")
def getHostWord( dataPos ): def getHostWord( dataPos ):
return ( dataPos.next() << 8 ) + dataPos.next() return ( dataPos.next() << 8 ) + dataPos.next()
@ -29,8 +50,6 @@ def getHostDWord( dataPos ):
def getNetDWord( dataPos ): def getNetDWord( dataPos ):
return dataPos.next() + ( dataPos.next() << 8 ) + ( dataPos.next() << 16 ) + ( dataPos.next() << 24 ) return dataPos.next() + ( dataPos.next() << 8 ) + ( dataPos.next() << 16 ) + ( dataPos.next() << 24 )
class UdpPacket: class UdpPacket:
def __init__( self, dataPos ): def __init__( self, dataPos ):
@ -131,7 +150,7 @@ class Ip6Address:
self.addr = [ getHostWord( dataPos ) for i in xrange(8) ] self.addr = [ getHostWord( dataPos ) for i in xrange(8) ]
def __str__( self ): def __str__( self ):
return "%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x" % tuple( self.addr[0:8] ) return "%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x" % tuple( self.addr )
class IpProtocol: class IpProtocol:
@ -168,19 +187,26 @@ class IpPacket:
try: try:
self.version = dataPos.next() version = dataPos.next()
self.ihl = self.version & 0xF self.ihl = version & 0xF
self.version = self.version >> 4 self.version = version >> 4
self.tos = dataPos.next() self.tos = dataPos.next()
self.TotalLength = getHostWord( dataPos ) self.TotalLength = getHostWord( dataPos )
self.ident = getNetWord( dataPos ) self.ident = getHostWord( dataPos )
self.fargment = getNetWord( dataPos ) frag = getHostWord( dataPos )
self.offset = frag & 0x1FFF
self.flags = frag >> 13
self.ttl = dataPos.next() self.ttl = dataPos.next()
self.protocol = IpProtocol( dataPos ) self.protocol = IpProtocol( dataPos )
self.checlsum = getNetWord( dataPos ) self.checlsum = getNetWord( dataPos )
self.srcAddr = IpAddress( dataPos ) self.srcAddr = IpAddress( dataPos )
self.destAddr = IpAddress( dataPos ) self.destAddr = IpAddress( dataPos )
self.nextLayerPckt = self.protocol.getNextLayerPacket( dataPos )
if self.offset == 0:
self.nextLayerPckt = self.protocol.getNextLayerPacket( dataPos )
else:
self.nextLayerPckt = ""
self.parsed = True self.parsed = True
except StopIteration: except StopIteration:
@ -197,6 +223,17 @@ class IpPacket:
s += "\tversion: %x\n" % self.version s += "\tversion: %x\n" % self.version
s += "\theader length: %d bytes\n" % ( self.ihl * 4 ) s += "\theader length: %d bytes\n" % ( self.ihl * 4 )
s += "\ttotal length: %d bytes\n" % self.TotalLength s += "\ttotal length: %d bytes\n" % self.TotalLength
s += "\tID: %x\n" % self.ident
s += "\tflags: %x\n" % self.flags
s += "\toffset: %x" % ( self.offset * 8)
if ( self.offset == 0 ) and ( self.flags & 0x4 == 0 ):
s += " (not fargmented)\n"
elif self.offset == 0 :
s += " (first fragment)\n"
elif not ( self.flags & 0x4 == 0 ):
s += " (fragmented)\n"
else:
s += " (last fragment)\n"
s += "\tprotocol: " + str( self.protocol ) + "\n" s += "\tprotocol: " + str( self.protocol ) + "\n"
s += "\tTTL: %d\n" % self.ttl s += "\tTTL: %d\n" % self.ttl
s += "\tSrc addr: " + str(self.srcAddr) + "\n" s += "\tSrc addr: " + str(self.srcAddr) + "\n"
@ -217,16 +254,18 @@ class Ip6Packet():
try: try:
t = getNetDWord( dataPos ) t = getHostDWord( dataPos )
self.version = ( t >> 28 ) & 0xF self.version = ( t >> 28 ) & 0xF
self.trafficClass = ( t >> 20 ) & 0xFF self.trafficClass = ( t >> 20 ) & 0xFF
self.flowLabel = t & 0xFFF self.flowLabel = t & 0xFFF
self.payloadLength = getNetWord( dataPos ) self.payloadLength = getNetWord( dataPos )
self.nextHeader = dataPos.next() self.nextHeader = IpProtocol( dataPos )
self.hopLimit = dataPos.next() self.hopLimit = dataPos.next()
self.srcAddr = Ip6Address( dataPos ) self.srcAddr = Ip6Address( dataPos )
self.destAddr = Ip6Address( dataPos ) self.destAddr = Ip6Address( dataPos )
self.nextLayerPckt = self.nextHeader.getNextLayerPacket( dataPos )
self.parsed = True self.parsed = True
except StopIteration: except StopIteration:
@ -234,7 +273,7 @@ class Ip6Packet():
def __str__( self ): def __str__( self ):
s = "IPv6 header" s = "IPv6 header: "
if self.parsed: if self.parsed:
s += "OK\n" s += "OK\n"
@ -242,10 +281,11 @@ class Ip6Packet():
s += "\ttraffic class %x\n" % self.trafficClass s += "\ttraffic class %x\n" % self.trafficClass
s += "\tflowLabel: %x\n" % self.flowLabel s += "\tflowLabel: %x\n" % self.flowLabel
s += "\tpayloadLength: %x\n" % self.payloadLength s += "\tpayloadLength: %x\n" % self.payloadLength
s += "\tnextHeader: %x\n" % self.nextHeader s += "\tnextHeader: " + str( self.nextHeader ) + "\n"
s += "\thopLimit: %d\n" % self.hopLimit s += "\thopLimit: %d\n" % self.hopLimit
s += "\tsrcAddr: " + str(self.srcAddr) + "\n" s += "\tsrcAddr: " + str(self.srcAddr) + "\n"
s += "\tdestAddr: " + str(self.destAddr) + "\n" s += "\tdestAddr: " + str(self.destAddr) + "\n"
s += str( self.nextLayerPckt )
else: else:
s += "MALFORMED\n" s += "MALFORMED\n"
@ -367,10 +407,18 @@ class EthernetPacket:
class NetPacket: class NetPacket:
def __init__( self, rawData ): def __init__( self, rawData, startProtocol="eth", beginOffset=0 ):
self.rawData = rawData self.rawData = rawData
dataPos = iter( self.rawData ) dataPos = iter( self.rawData[ beginOffset : ] )
self.mediaParsed = EthernetPacket( dataPos )
self.mediaParsed = {
"eth" : lambda : EthernetPacket( dataPos ),
"ip4" : lambda : IpPacket( dataPos ),
"ip6" : lambda : Ip6Packet( dataPos ),
"tcp" : lambda : TcpPacket( dataPos ),
"udp" : lambda : UdpPacket( dataPos )
}[startProtocol]()
def __str__( self ): def __str__( self ):
s = "Length: %d bytes\n" % len(self.rawData) s = "Length: %d bytes\n" % len(self.rawData)
@ -382,7 +430,7 @@ def getPacketFromNb( nb ):
pcktBytes = list() pcktBytes = list()
mdl = typedVar( "ndis!_MDL", nb.CurrentMdl ) mdl = typedVar( MDL, nb.CurrentMdl )
dataLength = nb.DataLength dataLength = nb.DataLength
dataOffset = nb.CurrentMdlOffset dataOffset = nb.CurrentMdlOffset
@ -395,7 +443,7 @@ def getPacketFromNb( nb ):
dataLength -= copyData dataLength -= copyData
mdl = typedVar( "ndis!_MDL", mdl.Next ) mdl = typedVar( MDL, mdl.Next )
return pcktBytes return pcktBytes
@ -403,109 +451,100 @@ def getPacketFromNb( nb ):
def getPacketsFromNbl( nblAddr ): def getPacketsFromNbl( nblAddr ):
pcktList = list() try:
getNdisTypesInfo()
nbl = typedVar( "ndis!_NET_BUFFER_LIST", nblAddr ) pcktList = list()
nbl = typedVar( NET_BUFFER_LIST, nblAddr )
while True:
nb = typedVar( "ndis!_NET_BUFFER", nbl.FirstNetBuffer )
while True: while True:
nb = typedVar( NET_BUFFER, nbl.FirstNetBuffer )
while True:
pcktList.append( getPacketFromNb( nb ) ) pcktList.append( getPacketFromNb( nb ) )
if nb.Next == 0: if nb.Next == 0:
break
nb = typedVar( NET_BUFFER, nb.Next )
if nbl.Next == 0:
break break
nb = typedVar( "ndis!_NET_BUFFER", nb.Next ) nbl = typedVar( NET_BUFFER_LIST, nbl.Next )
if nbl.Next == 0: return pcktList
break
except TypeException:
nbl = typedVar( "ndis!_NET_BUFFER_LIST", nbl.Next ) dprintln( "the symbols ar wrong" )
return pcktList def printNblStruct( nblAddr, showNdisStruct = False, beginProtocol="eth", beginOffset=0 ):
def printNblStruct( nblAddr ):
try: try:
getNdisTypesInfo()
while nblAddr: while nblAddr:
dprintln( "NET_BUFFER_LIST %#x" % nblAddr ) if showNdisStruct:
dprintln( "NET_BUFFER_LIST %#x" % nblAddr )
nbl = typedVar( "ndis!_NET_BUFFER_LIST", nblAddr ) nbl = typedVar( NET_BUFFER_LIST, nblAddr )
nbAddr = nbl.FirstNetBuffer nbAddr = nbl.FirstNetBuffer
while nbAddr: while nbAddr:
nb = typedVar( NET_BUFFER, nbAddr )
if showNdisStruct:
dprint( "\tNET_BUFFER %#x" % nbAddr )
dprintln( " data length = %d, data offset = %#x " % ( nb.DataLength, nb.DataOffset ) )
dprint( "\tNET_BUFFER %#x" % nbAddr ) mdlAddr = nb.CurrentMdl
nb = typedVar( "ndis!_NET_BUFFER", nbAddr )
dprintln( " data length = %d, data offset = %#x " % ( nb.DataLength, nb.DataOffset ) ) while mdlAddr:
mdl = typedVar( MDL, mdlAddr )
mdlAddr = nb.CurrentMdl if showNdisStruct:
dprint( "\t\tMDL %#x" % mdlAddr )
dprintln( " byte count = %d, byte offset = %#x, mapped addr = %#x" % ( mdl.ByteCount, mdl.ByteOffset, mdl.MappedSystemVa ) )
while mdlAddr: mdlAddr = mdl.Next
dprint( "\t\tMDL %#x" % mdlAddr ) dprintln( str( NetPacket( getPacketFromNb( nb ), beginProtocol, beginOffset ) ) )
mdl = typedVar( "ndis!_MDL", mdlAddr )
dprintln( " byte count = %d, byte offset = %#x, mapped addr = %#x" % ( mdl.ByteCount, mdl.ByteOffset, mdl.MappedSystemVa ) ) nbAddr = nb.Next
mdlAddr = mdl.Next
dprintln( str( NetPacket( getPacketFromNb( nb ) ) ) )
nbAddr = nb.Next
nblAddr = nbl.Next nblAddr = nbl.Next
except MemoryException:
dprintln( "\nMemory corruption, stop analyzing" )
except TypeException: except TypeException:
dprintln( "the symbols ar wrong" ) dprintln( "the symbols ar wrong" )
def usage():
dprintln( "!py nbl addr" )
dprintln( "!py nbl /s addr" )
def main(): def main():
if len(sys.argv) < 2:
usage()
return
if not isKernelDebugging(): if not isKernelDebugging():
dprintln( "This script is for kernel debugging only" ) dprintln( "This script is for kernel debugging only" )
return return
parser = OptionParser(usage="usage: !py nbl [options] address")
parser.add_option("-s", "--struct", action="store_true", dest="showNdisStruct", default=False, help="Show NDIS structures" )
parser.add_option("-p", "--proto", action="store", type="string", dest="startProtocol", default="eth", help="Packet protocol. Can be eth, ip4, ip6, tcp, udp. By default - eth." )
parser.add_option("-o", "--offset", action="store", type="int", dest="beginOffset", default=0, help="Bytes offset from packet begining" )
(options, args) = parser.parse_args()
if len(sys.argv)==2: if len(args) < 1:
pcktList = getPacketsFromNbl( expr(sys.argv[1]) ) parser.error("you should note address of network packet")
parsedPcktList = [ NetPacket(p) for p in pcktList ]
dprintln( "Packet's count: %s " % len(parsedPcktList) ) printNblStruct( expr(args[0]), showNdisStruct = options.showNdisStruct, beginProtocol=options.startProtocol, beginOffset=options.beginOffset )
for p in parsedPcktList: print "\n", p
return
if sys.argv[1]=="/s":
printNblStruct( expr(sys.argv[2]) )
return
usage()
if __name__ == "__main__": if __name__ == "__main__":