[snippets] ubdated: nbl.py (tcp header)

git-svn-id: https://pykd.svn.codeplex.com/svn@64449 9b283d60-5439-405e-af05-b73fd8c4d996
This commit is contained in:
SND\kernelnet_cp 2011-04-25 06:10:11 +00:00
parent f3fbb0c9c8
commit 77300508e7

View File

@ -41,7 +41,7 @@ class UdpPacket:
self.sourcePort = getHostWord( dataPos ) self.sourcePort = getHostWord( dataPos )
self.destPort = getHostWord( dataPos ) self.destPort = getHostWord( dataPos )
self.length = getHostWord( dataPos ) self.length = getHostWord( dataPos )
self.checksum = getNetWord( dataPos ) self.checksum = getHostWord( dataPos )
self.parsed = True self.parsed = True
except StopIteration: except StopIteration:
@ -62,6 +62,49 @@ class UdpPacket:
return s return s
class TcpPacket:
def __init__( self, dataPos ):
self.parsed = False
try:
self.parsed = True
self.sourcePort = getHostWord( dataPos )
self.destPort = getHostWord( dataPos )
self.SeqNumber = getHostDWord( dataPos )
self.AckNumber = getHostDWord( dataPos )
self.dataOffset = ( dataPos.next() >> 4 )
self.flags = dataPos.next() & 0x3F
self.window = getHostWord( dataPos )
self.checksum = getHostWord( dataPos )
self.urgentPointer = getHostWord( dataPos )
except StopIteration:
pass
def __str__( self ):
s = "TCP header: "
fl = [ "FIN", "SYN","RST", "PSH", "ACK", "URG" ]
if self.parsed:
s += "OK\n"
s += "\tSrc port: %d\n" % self.sourcePort
s += "\tDest port: %d\n" % self.destPort
s += "\tSEQ: %x\n" % self.SeqNumber
s += "\tACK: %x\n" % self.AckNumber
s += "\tFlags: %x ( %s )\n" % ( self.flags, " ".join( [ fl[i] for i in xrange( len(fl) ) if ( self.flags & ( 1 << i ) ) != 0 ] ) )
s += "\tWindows: %x\n" % self.window
s += "\tChecksum: %x\n" % self.checksum
else:
s += "MALFORMED\n"
return s
class ArpPacket: class ArpPacket:
def __init__( self, dataPos ): def __init__( self, dataPos ):
@ -75,7 +118,7 @@ class IpAddress:
def __init__( self, dataPos ): def __init__( self, dataPos ):
self.addr = [ dataPos.next() for i in range(0,4) ] self.addr = [ dataPos.next() for i in xrange(4) ]
def __str__( self ): def __str__( self ):
@ -85,10 +128,10 @@ class IpAddress:
class Ip6Address: class Ip6Address:
def __init__( self, dataPos ): def __init__( self, dataPos ):
pass self.addr = [ getHostWord( dataPos ) for i in xrange(8) ]
def __str__( self ): def __str__( self ):
return "" return "%02x:%02x:%02x:%02x:%02x:%02x:%02x:%02x" % tuple( self.addr[0:8] )
class IpProtocol: class IpProtocol:
@ -109,6 +152,13 @@ class IpProtocol:
return { ICMP_PROTO: "ICMP", UDP_PROTO: "UDP", TCP_PROTO: "TCP" }.get( self.typeVal, hex(self.typeVal) ) return { ICMP_PROTO: "ICMP", UDP_PROTO: "UDP", TCP_PROTO: "TCP" }.get( self.typeVal, hex(self.typeVal) )
def getNextLayerPacket( self, dataPos ):
return {
ICMP_PROTO : lambda x : "",
UDP_PROTO : lambda x : UdpPacket(x),
TCP_PROTO : lambda x : TcpPacket(x)
}.get( self.typeVal, lambda x : "Unknown protocol" )(dataPos)
class IpPacket: class IpPacket:
@ -130,17 +180,14 @@ class IpPacket:
self.checlsum = getNetWord( dataPos ) self.checlsum = getNetWord( dataPos )
self.srcAddr = IpAddress( dataPos ) self.srcAddr = IpAddress( dataPos )
self.destAddr = IpAddress( dataPos ) self.destAddr = IpAddress( dataPos )
self.nextLayerPckt = self.protocol.getNextLayerPacket( dataPos )
if self.protocol.isUDP(): self.nextLayerPckt = UdpPacket( dataPos )
elif self.protocol.isTCP(): self.nextLayerPckt = ArpPacket( dataPos )
else: self.nextLayerPckt = "Unknown protocol"
self.parsed = True self.parsed = True
except StopIteration: except StopIteration:
pass pass
def __str__( self ): def __str__( self ):
s = "IPv4 header: " s = "IPv4 header: "