From 83b2fd2c5d6f1937dc364090ac034f6df824e5b2 Mon Sep 17 00:00:00 2001 From: "SND\\kernelnet_cp" Date: Tue, 19 Apr 2011 07:48:01 +0000 Subject: [PATCH] [snippets] added: !py nbl for parsing ndis _NET_BUFFER_LIST structure git-svn-id: https://pykd.svn.codeplex.com/svn@64207 9b283d60-5439-405e-af05-b73fd8c4d996 --- snippets/nbl.py | 305 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 305 insertions(+) create mode 100644 snippets/nbl.py diff --git a/snippets/nbl.py b/snippets/nbl.py new file mode 100644 index 0000000..a914ae9 --- /dev/null +++ b/snippets/nbl.py @@ -0,0 +1,305 @@ +# +# +# + +import sys +from pykd import * + +IPv4 = 0x0008 +ARP = 0x0608 +IPv6 = 0xdd86 + + +def getHostWord( dataPos ): + return ( dataPos.next() << 8 ) + dataPos.next() + + +def getNetWord( dataPos ): + return dataPos.next() + ( dataPos.next() << 8 ) + + + +class UdpPacket: + + def __init__( self, dataPos ): + + self.parsed = False + + try: + self.sourcePort = getHostWord( dataPos ) + self.destPort = getHostWord( dataPos ) + self.length = getHostWord( dataPos ) + self.checksum = getNetWord( dataPos ) + self.parsed = True + + except StopIteration: + pass + + def __str__( self ): + s = "UDP header: " + if self.parsed: + s += "OK\n" + s += "\tSrc port: %d\n" % self.sourcePort + s += "\tDest port: %d\n" % self.destPort + s += "\tLength: %d\n" % self.length + s += "\tChecksum: %#x\n" % self.checksum + s += "\n" + else: + s += "FAILED\n" + + return s + + +class ArpPacket: + + def __init__( self, dataPos ): + pass + + def __str__( self ): + return "" + + +class IpAddress: + + def __init__( self, dataPos ): + + self.addr = [ dataPos.next() for i in range(0,4) ] + + def __str__( self ): + + return "%d.%d.%d.%d" % tuple( self.addr[0:4] ) + + +class IpProtocol: + + def __init__( self, dataPos ): + self.typeVal = dataPos.next() + + def isUDP( self ): + return self.typeVal==0x11 + + def isTCP( self ): + return self.typeVal==0x06 + + def __str__( self ): + if self.isUDP() : return "UDP" + if self.isTCP() : return "TCP" + else: return "%x" % self.typeVal + + + +class IpPacket: + + def __init__( self, dataPos ): + + self.parsed = False + + try: + + self.version = dataPos.next() + self.ihl = self.version & 0xF + self.version = self.version >> 4 + self.tos = dataPos.next() + self.TotalLength = getHostWord( dataPos ) + self.ident = getNetWord( dataPos ) + self.fargment = getNetWord( dataPos ) + self.ttl = dataPos.next() + self.protocol = IpProtocol( dataPos ) + self.checlsum = getNetWord( dataPos ) + self.srcAddr = IpAddress( dataPos ) + self.destAddr = IpAddress( dataPos ) + + if self.protocol.isUDP(): self.nextLayerPckt = UdpPacket( dataPos ) + elif self.protocol.isTCP(): self.nextLayerPckt = ArpPacket( dataPos ) + else: self.nextLayerPckt = "Unknown protocol" + + self.parsed = True + + except StopIteration: + pass + + + def __str__( self ): + + s = "IPv4 header: " + + if self.parsed: + s += "OK\n" + s += "\tversion: %x\n" % self.version + s += "\theader length: %d bytes\n" % ( self.ihl * 4 ) + s += "\ttotal length: %d bytes\n" % self.TotalLength + s += "\tprotocol: " + str( self.protocol ) + "\n" + s += "\tTTL: %d\n" % self.ttl + s += "\tSrc addr: " + str(self.srcAddr) + "\n" + s += "\tDest addr: " + str(self.destAddr) + "\n" + s += str( self.nextLayerPckt ) + + else: + s += "FAILED\n" + + return s + + +class Ip6Packet(): + + def __init__( self, dataPos ): + pass + + def __str__( self ): + return "" + + +class ARPPacket(): + + def __init__( self, dataPos ): + pass + + def __str__( self ): + return "" + + +class EthernetType: + + def __init__( self, dataPos ): + self.typeVal = getNetWord( dataPos ) + + def isIPv4( self ): + return self.typeVal == IPv4 + + def isARP( self ): + return self.typeVal == ARP + + def isIPv6( self ): + return self.typeVal == IPv6 + + def __str__( self ): + return { IPv4 : "IPv4", ARP : "ARP", IPv6 : "IPv6" }.get( self.typeVal, self.typeVal ) + + def getNextLayerPacket( self, dataPos ): + return { + IPv4 : lambda x : IpPacket(x), + ARP : lambda x : Ip6Packet(x), + IPv6 : lambda x : ARPPacket(x), + }.get( self.typeVal, lambda x : "" )( dataPos ) + + +class EthernetAddress: + + def __init__( self, dataPos ): + self.addr = [ dataPos.next() for i in range(0,6) ] + + def __str__( self ): + return "%02x-%02x-%02x-%02x-%02x-%02x" % tuple( self.addr[0:6] ) + + + +class EthernetPacket: + + def __init__( self, dataPos ): + + self.parsed = False + + try: + + self.destAddress = EthernetAddress( dataPos) + self.srcAddress = EthernetAddress( dataPos) + self.frametype = EthernetType( dataPos ) + self.nextLayerPckt = self.frametype.getNextLayerPacket( dataPos ) + self.parsed = True + + except StopIteration: + pass + + + def __str__( self): + + s = "Ethernet header: " + + if self.parsed: + + s += "OK\n" + s += "\tDest MAC: " + str(self.destAddress) + "\n" + s += "\tSrc MAC: " + str(self.srcAddress) + "\n" + s += "\tType: " + str( self.frametype) + "\n" + s += str( self.nextLayerPckt ) + + else: + s += "FAILED\n" + + return s + + +class NetPacket: + + def __init__( self, rawData ): + self.rawData = rawData + dataPos = iter( self.rawData ) + self.mediaParsed = EthernetPacket( dataPos ) + + def __str__( self ): + s = "Length: %d bytes\n" % len(self.rawData) + s += str( self.mediaParsed ) + return s + + +def getPacketsFromNbl( nblAddr ): + + pcktList = list() + + nbl = typedVar( "ndis", "_NET_BUFFER_LIST", nblAddr ) + + while nbl: + + nb = typedVar( "ndis", "_NET_BUFFER", nbl.FirstNetBuffer ) + + while nb: + + pcktBytes = list() + + mdl = typedVar( "ndis", "_MDL", nb.CurrentMdl ) + dataLength = nb.DataLength + dataOffset = nb.CurrentMdlOffset + + while dataLength > 0: + + copyData = mdl.ByteCount - dataOffset + if copyData > dataLength: copyData = dataLength + + pcktBytes.extend( loadBytes( mdl.MappedSystemVa + dataOffset, copyData ) ) + + dataLength -= copyData + + mdl = typedVar( "ndis", "_MDL", mdl.Next ) + + pcktList.append( pcktBytes ) + + nb = typedVar( "ndis", "_NET_BUFFER", nb.Next ) + + nbl = typedVar( "ndis", "_NET_BUFFER_LIST", nbl.Next ) + + return pcktList + + +def main(): + + if len(sys.argv) < 2: + return + + if not isKernelDebugging(): + dprintln( "This script is for kernel debugging only" ) + return + + + pcktList = getPacketsFromNbl( expr(sys.argv[1]) ) + + parsedPcktList = [ NetPacket(p) for p in pcktList ] + + + print "Packet's count: ", len(parsedPcktList) + + for p in parsedPcktList: print "\n", p + + + +if __name__ == "__main__": + main() \ No newline at end of file