From 4da6671e233ccc5e4e3b5a775400f41728a7e15a Mon Sep 17 00:00:00 2001 From: "SND\\kernelnet_cp" Date: Wed, 27 Feb 2013 16:16:17 +0000 Subject: [PATCH] [0.2.x] updated: nbl.py git-svn-id: https://pykd.svn.codeplex.com/svn@82760 9b283d60-5439-405e-af05-b73fd8c4d996 --- snippets/nbl.py | 207 ++++++++++++++++++++++++++++-------------------- 1 file changed, 123 insertions(+), 84 deletions(-) diff --git a/snippets/nbl.py b/snippets/nbl.py index c3eb67e..15b0a60 100644 --- a/snippets/nbl.py +++ b/snippets/nbl.py @@ -2,7 +2,7 @@ # # -import sys +from optparse import OptionParser from pykd import * IPv4 = 0x0008 @@ -14,6 +14,27 @@ UDP_PROTO = 0x11 TCP_PROTO = 0x06 +NET_BUFFER_LIST = None +MDL = None +NET_BUFFER = None + +def getNdisTypesInfo(): + ndis = module("ndis") + + global NET_BUFFER_LIST + global MDL + global NET_BUFFER + + try: + NET_BUFFER_LIST = ndis.type("_NET_BUFFER_LIST") + MDL = ndis.type("_MDL") + NET_BUFFER = ndis.type("_NET_BUFFER") + except SymbolException: + NET_BUFFER_LIST =typeInfo("_NET_BUFFER_LIST") + MDL = typeInfo("_MDL") + NET_BUFFER = typeInfo("_NET_BUFFER") + + def getHostWord( dataPos ): return ( dataPos.next() << 8 ) + dataPos.next() @@ -29,8 +50,6 @@ def getHostDWord( dataPos ): def getNetDWord( dataPos ): return dataPos.next() + ( dataPos.next() << 8 ) + ( dataPos.next() << 16 ) + ( dataPos.next() << 24 ) - - class UdpPacket: def __init__( self, dataPos ): @@ -131,7 +150,7 @@ class Ip6Address: self.addr = [ getHostWord( dataPos ) for i in xrange(8) ] def __str__( self ): - return "%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x" % tuple( self.addr[0:8] ) + return "%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x" % tuple( self.addr ) class IpProtocol: @@ -168,19 +187,26 @@ class IpPacket: try: - self.version = dataPos.next() - self.ihl = self.version & 0xF - self.version = self.version >> 4 + version = dataPos.next() + self.ihl = version & 0xF + self.version = version >> 4 self.tos = dataPos.next() - self.TotalLength = getHostWord( dataPos ) - self.ident = getNetWord( dataPos ) - self.fargment = getNetWord( dataPos ) + self.TotalLength = getHostWord( dataPos ) + self.ident = getHostWord( dataPos ) + frag = getHostWord( dataPos ) + self.offset = frag & 0x1FFF + self.flags = frag >> 13 self.ttl = dataPos.next() self.protocol = IpProtocol( dataPos ) self.checlsum = getNetWord( dataPos ) self.srcAddr = IpAddress( dataPos ) self.destAddr = IpAddress( dataPos ) - self.nextLayerPckt = self.protocol.getNextLayerPacket( dataPos ) + + if self.offset == 0: + self.nextLayerPckt = self.protocol.getNextLayerPacket( dataPos ) + else: + self.nextLayerPckt = "" + self.parsed = True except StopIteration: @@ -197,6 +223,17 @@ class IpPacket: s += "\tversion: %x\n" % self.version s += "\theader length: %d bytes\n" % ( self.ihl * 4 ) s += "\ttotal length: %d bytes\n" % self.TotalLength + s += "\tID: %x\n" % self.ident + s += "\tflags: %x\n" % self.flags + s += "\toffset: %x" % ( self.offset * 8) + if ( self.offset == 0 ) and ( self.flags & 0x4 == 0 ): + s += " (not fargmented)\n" + elif self.offset == 0 : + s += " (first fragment)\n" + elif not ( self.flags & 0x4 == 0 ): + s += " (fragmented)\n" + else: + s += " (last fragment)\n" s += "\tprotocol: " + str( self.protocol ) + "\n" s += "\tTTL: %d\n" % self.ttl s += "\tSrc addr: " + str(self.srcAddr) + "\n" @@ -217,16 +254,18 @@ class Ip6Packet(): try: - t = getNetDWord( dataPos ) + t = getHostDWord( dataPos ) self.version = ( t >> 28 ) & 0xF self.trafficClass = ( t >> 20 ) & 0xFF self.flowLabel = t & 0xFFF self.payloadLength = getNetWord( dataPos ) - self.nextHeader = dataPos.next() + self.nextHeader = IpProtocol( dataPos ) self.hopLimit = dataPos.next() self.srcAddr = Ip6Address( dataPos ) self.destAddr = Ip6Address( dataPos ) + self.nextLayerPckt = self.nextHeader.getNextLayerPacket( dataPos ) + self.parsed = True except StopIteration: @@ -234,7 +273,7 @@ class Ip6Packet(): def __str__( self ): - s = "IPv6 header" + s = "IPv6 header: " if self.parsed: s += "OK\n" @@ -242,10 +281,11 @@ class Ip6Packet(): s += "\ttraffic class %x\n" % self.trafficClass s += "\tflowLabel: %x\n" % self.flowLabel s += "\tpayloadLength: %x\n" % self.payloadLength - s += "\tnextHeader: %x\n" % self.nextHeader + s += "\tnextHeader: " + str( self.nextHeader ) + "\n" s += "\thopLimit: %d\n" % self.hopLimit s += "\tsrcAddr: " + str(self.srcAddr) + "\n" s += "\tdestAddr: " + str(self.destAddr) + "\n" + s += str( self.nextLayerPckt ) else: s += "MALFORMED\n" @@ -367,10 +407,18 @@ class EthernetPacket: class NetPacket: - def __init__( self, rawData ): + def __init__( self, rawData, startProtocol="eth", beginOffset=0 ): self.rawData = rawData - dataPos = iter( self.rawData ) - self.mediaParsed = EthernetPacket( dataPos ) + dataPos = iter( self.rawData[ beginOffset : ] ) + + + self.mediaParsed = { + "eth" : lambda : EthernetPacket( dataPos ), + "ip4" : lambda : IpPacket( dataPos ), + "ip6" : lambda : Ip6Packet( dataPos ), + "tcp" : lambda : TcpPacket( dataPos ), + "udp" : lambda : UdpPacket( dataPos ) + }[startProtocol]() def __str__( self ): s = "Length: %d bytes\n" % len(self.rawData) @@ -382,7 +430,7 @@ def getPacketFromNb( nb ): pcktBytes = list() - mdl = typedVar( "ndis!_MDL", nb.CurrentMdl ) + mdl = typedVar( MDL, nb.CurrentMdl ) dataLength = nb.DataLength dataOffset = nb.CurrentMdlOffset @@ -395,7 +443,7 @@ def getPacketFromNb( nb ): dataLength -= copyData - mdl = typedVar( "ndis!_MDL", mdl.Next ) + mdl = typedVar( MDL, mdl.Next ) return pcktBytes @@ -403,109 +451,100 @@ def getPacketFromNb( nb ): def getPacketsFromNbl( nblAddr ): - pcktList = list() + try: + + getNdisTypesInfo() - nbl = typedVar( "ndis!_NET_BUFFER_LIST", nblAddr ) + pcktList = list() + + nbl = typedVar( NET_BUFFER_LIST, nblAddr ) - while True: - - nb = typedVar( "ndis!_NET_BUFFER", nbl.FirstNetBuffer ) - while True: + + nb = typedVar( NET_BUFFER, nbl.FirstNetBuffer ) + + while True: - pcktList.append( getPacketFromNb( nb ) ) + pcktList.append( getPacketFromNb( nb ) ) - if nb.Next == 0: + if nb.Next == 0: + break + + nb = typedVar( NET_BUFFER, nb.Next ) + + if nbl.Next == 0: break - nb = typedVar( "ndis!_NET_BUFFER", nb.Next ) + nbl = typedVar( NET_BUFFER_LIST, nbl.Next ) - if nbl.Next == 0: - break + return pcktList + + except TypeException: - nbl = typedVar( "ndis!_NET_BUFFER_LIST", nbl.Next ) + dprintln( "the symbols ar wrong" ) - return pcktList - - -def printNblStruct( nblAddr ): - +def printNblStruct( nblAddr, showNdisStruct = False, beginProtocol="eth", beginOffset=0 ): try: + + getNdisTypesInfo() while nblAddr: - dprintln( "NET_BUFFER_LIST %#x" % nblAddr ) + if showNdisStruct: + dprintln( "NET_BUFFER_LIST %#x" % nblAddr ) - nbl = typedVar( "ndis!_NET_BUFFER_LIST", nblAddr ) + nbl = typedVar( NET_BUFFER_LIST, nblAddr ) nbAddr = nbl.FirstNetBuffer while nbAddr: + + nb = typedVar( NET_BUFFER, nbAddr ) + + if showNdisStruct: + dprint( "\tNET_BUFFER %#x" % nbAddr ) + dprintln( " data length = %d, data offset = %#x " % ( nb.DataLength, nb.DataOffset ) ) - dprint( "\tNET_BUFFER %#x" % nbAddr ) - - nb = typedVar( "ndis!_NET_BUFFER", nbAddr ) + mdlAddr = nb.CurrentMdl - dprintln( " data length = %d, data offset = %#x " % ( nb.DataLength, nb.DataOffset ) ) + while mdlAddr: + + mdl = typedVar( MDL, mdlAddr ) - mdlAddr = nb.CurrentMdl + if showNdisStruct: + dprint( "\t\tMDL %#x" % mdlAddr ) + dprintln( " byte count = %d, byte offset = %#x, mapped addr = %#x" % ( mdl.ByteCount, mdl.ByteOffset, mdl.MappedSystemVa ) ) - while mdlAddr: + mdlAddr = mdl.Next - dprint( "\t\tMDL %#x" % mdlAddr ) - - mdl = typedVar( "ndis!_MDL", mdlAddr ) + dprintln( str( NetPacket( getPacketFromNb( nb ), beginProtocol, beginOffset ) ) ) - dprintln( " byte count = %d, byte offset = %#x, mapped addr = %#x" % ( mdl.ByteCount, mdl.ByteOffset, mdl.MappedSystemVa ) ) - - mdlAddr = mdl.Next - - dprintln( str( NetPacket( getPacketFromNb( nb ) ) ) ) - - nbAddr = nb.Next + nbAddr = nb.Next nblAddr = nbl.Next - - except MemoryException: - - dprintln( "\nMemory corruption, stop analyzing" ) - except TypeException: dprintln( "the symbols ar wrong" ) - - - -def usage(): - dprintln( "!py nbl addr" ) - dprintln( "!py nbl /s addr" ) - - def main(): - if len(sys.argv) < 2: - usage() - return - if not isKernelDebugging(): dprintln( "This script is for kernel debugging only" ) return + + parser = OptionParser(usage="usage: !py nbl [options] address") + parser.add_option("-s", "--struct", action="store_true", dest="showNdisStruct", default=False, help="Show NDIS structures" ) + parser.add_option("-p", "--proto", action="store", type="string", dest="startProtocol", default="eth", help="Packet protocol. Can be eth, ip4, ip6, tcp, udp. By default - eth." ) + parser.add_option("-o", "--offset", action="store", type="int", dest="beginOffset", default=0, help="Bytes offset from packet begining" ) + + (options, args) = parser.parse_args() - if len(sys.argv)==2: - pcktList = getPacketsFromNbl( expr(sys.argv[1]) ) - parsedPcktList = [ NetPacket(p) for p in pcktList ] - dprintln( "Packet's count: %s " % len(parsedPcktList) ) - for p in parsedPcktList: print "\n", p - return - - if sys.argv[1]=="/s": - printNblStruct( expr(sys.argv[2]) ) - return - - usage() + if len(args) < 1: + parser.error("you should note address of network packet") + + printNblStruct( expr(args[0]), showNdisStruct = options.showNdisStruct, beginProtocol=options.startProtocol, beginOffset=options.beginOffset ) if __name__ == "__main__":