import collections
import logging
import base64
import heapq
import queue
import multiprocessing as mp
# these three for debugging
import sys
import pdb
import traceback
SHUTDOWN_SENTINEL = None
def _flow4_ids(ip):
# FIXME keep map of fragment IDs to keys
# FIXME link ICMP by looking at payload
if ip.proto == 6 or ip.proto == 17 or ip.proto == 132:
# key includes ports
fid = ip.src_prefix.addr + ip.dst_prefix.addr + ip.data[9:10] + ip.payload[0:4]
rid = ip.dst_prefix.addr + ip.src_prefix.addr + ip.data[9:10] + ip.payload[2:4] + ip.payload[0:2]
else:
# no ports, just 3-tuple
fid = ip.src_prefix.addr + ip.dst_prefix.addr + ip.data[9:10]
rid = ip.dst_prefix.addr + ip.src_prefix.addr + ip.data[9:10]
return (base64.b64encode(fid), base64.b64encode(rid))
def _flow6_ids(ip6):
# FIXME link ICMP by looking at payload
if ip6.proto == 6 or ip6.proto == 17 or ip6.proto == 132:
# key includes ports
fid = ip6.src_prefix.addr + ip6.dst_prefix.addr + ip6.data[6:7] + ip6.payload[0:4]
rid = ip6.dst_prefix.addr + ip6.src_prefix.addr + ip6.data[6:7] + ip6.payload[2:4] + ip6.payload[0:2]
else:
# no ports, just 3-tuple
fid = ip6.src_prefix.addr + ip6.dst_prefix.addr + ip6.data[6:7]
rid = ip6.dst_prefix.addr + ip6.src_prefix.addr + ip6.data[6:7]
return (base64.b64encode(fid), base64.b64encode(rid))
PacketClockTimer = collections.namedtuple("PacketClockTimer", ("time", "fn"))
[docs]class Observer:
"""
Wraps a packet source identified by a libtrace URI,
parses packets to divide them into flows, passing these
packets and flows onto a function chain to allow
data to be associated with each flow.
"""
[docs] def __init__(self, lturi,
new_flow_chain=[],
ip4_chain=[],
ip6_chain=[],
tcp_chain=[],
udp_chain=[],
l4_chain=[]):
"""
Create an Observer.
:param new_flow_chain: Array of functions to initialise new flows.
:type new_flow_chain: array(function)
:param ip4_chain: Array of functions to pass IPv4 headers to.
:type ip4_chain: array(function)
:param ip6_chain: Array of functions to pass IPv6 headers to.
:type ip6_chain: array(function)
:param tcp_chain: Array of functions to pass TCP headers to.
:type tcp_chain: array(function)
:param udp_chain: Array of functions to pass UDP headers to.
:type udp_chain: array(function)
:param l4_chain: Array of functions to pass other layer 4 headers to.
:type l4_chain: array(function)
:see also: :ref:`Observer Documentation <observer>`
"""
# Only import this when needed
import plt as libtrace
# Control
self._irq = None
self._irq_fired = False
# Libtrace initialization
self._trace = libtrace.trace(lturi)
self._trace.start()
self._pkt = libtrace.packet()
# Chains of functions to evaluate
self._new_flow_chain = new_flow_chain
self._ip4_chain = ip4_chain
self._ip6_chain = ip6_chain
self._tcp_chain = tcp_chain
self._udp_chain = udp_chain
self._l4_chain = l4_chain
# Packet timer and timer queue
self._pt = 0 # current packet timer
self._tq = [] # packet timer queue (heap)
# Flow tables
self._active = {}
self._expiring = {}
self._ignored = set()
# Emitter queue
self._emitted = collections.deque()
# Statistics
self._ct_pkt = 0
self._ct_nonip = 0
self._ct_shortkey = 0
self._ct_ignored = 0
self._ct_flow = 0
def _interrupted(self):
try:
if not self._irq_fired and self._irq is not None:
self._irq.get_nowait()
self._irq_fired = True
except queue.Empty:
pass
return self._irq_fired
def _next_packet(self):
# see if we're done iterating
if not self._trace.read_packet(self._pkt):
return False
# see if someone told us to stop
if self._interrupted():
return False
# count the packet
self._ct_pkt += 1
# advance the packet clock
self._tick(self._pkt.seconds)
# get a flow ID and associated flow record for the packet
(fid, rec, rev) = self._get_flow()
# don't dispatch if we don't have a record
# (this happens for non-IP packets and flows
# we know we want to ignore)
if not rec:
return True
keep_flow = True
# run IP header chains
if self._pkt.ip:
for fn in self._ip4_chain:
keep_flow = keep_flow and fn(rec, self._pkt.ip, rev=rev)
elif self._pkt.ip6:
for fn in self._ip6_chain:
keep_flow = keep_flow and fn(rec, self._pkt.ip6, rev=rev)
# run transport header chains
if self._pkt.tcp:
for fn in self._tcp_chain:
keep_flow = keep_flow and fn(rec, self._pkt.tcp, rev=rev)
elif self._pkt.udp:
for fn in self._udp_chain:
keep_flow = keep_flow and fn(rec, self._pkt.udp, rev=rev)
else:
for fn in self._l4_chain:
keep_flow = keep_flow and fn(rec, self._pkt, rev=rev)
# complete the flow if any chain function asked us to
if not keep_flow:
self._flow_complete(fid)
# we processed a packet, keep going
return True
def _set_timer(self, delay, fid):
# add to queue
heapq.heappush(self._tq, PacketClockTimer(self._pt + delay,
self._finish_expiry_tfn(fid)))
def _get_flow(self):
"""
Get a flow record for the given packet.
Create a new basic flow record
"""
logger = logging.getLogger("observer")
# get possible a flow IDs for the packet
try:
if self._pkt.ip:
(ffid, rfid) = _flow4_ids(self._pkt.ip)
ip = self._pkt.ip
elif self._pkt.ip6:
(ffid, rfid) = _flow6_ids(self._pkt.ip6)
ip = self._pkt.ip6
else:
# we don't care about non-IP packets
self._ct_nonip += 1
return (None, None, False)
except ValueError:
self._ct_shortkey += 1
return (None, None, False)
# now look for forward and reverse in ignored, active,
# and expiring tables.
if ffid in self._ignored:
return (None, None, False)
elif rfid in self._ignored:
return (None, None, False)
elif ffid in self._active:
(fid, rec) = (ffid, self._active[ffid])
#logger.debug("found forward flow for "+str(ffid))
elif ffid in self._expiring:
(fid, rec) = (ffid, self._expiring[ffid])
#logger.debug("found expiring forward flow for "+str(ffid))
elif rfid in self._active:
(fid, rec) = (rfid, self._active[rfid])
#logger.debug("found reverse flow for "+str(rfid))
elif rfid in self._expiring:
(fid, rec) = (rfid, self._expiring[rfid])
#logger.debug("found expiring reverse flow for "+str(rfid))
else:
# nowhere to be found. new flow.
rec = {'first': ip.seconds}
for fn in self._new_flow_chain:
if not fn(rec, ip):
#logger.debug("ignoring "+str(ffid))
self._ignored.add(ffid)
self._ct_ignored += 1
return (None, None, False)
# wasn't vetoed. add to active table.
fid = ffid
self._active[ffid] = rec
#logger.debug("new flow for "+str(ffid))
self._ct_flow += 1
# update time and return record
rec['last'] = ip.seconds
return (fid, rec, bool(fid == rfid))
def _flow_complete(self, fid, delay=5):
"""
Mark a given flow ID as complete
"""
logger = logging.getLogger("observer")
# move flow to expiring table
# logging.debug("Moving flow " + str(fid) + " to expiring queue")
try:
self._expiring[fid] = self._active[fid]
except KeyError:
#logger.debug("Tried to expire an already expired flow")
pass
else:
del self._active[fid]
# set up a timer to fire to emit the flow after timeout
self._set_timer(delay, fid)
def _emit_flow(self, rec):
self._emitted.append(rec)
def _next_flow(self):
while len(self._emitted) == 0:
if not self._next_packet():
return None
return self._emitted.popleft()
def _tick(self, pt):
# Advance packet clock
self._pt = pt
# fire all timers whose time has come
while len(self._tq) > 0 and pt > min(self._tq, key=lambda x: x.time).time:
try:
heapq.heappop(self._tq).fn()
except:
type, value, tb = sys.exc_info()
traceback.print_exc()
pdb.post_mortem(tb)
def _finish_expiry_tfn(self, fid):
"""
On expiry timer, emit the flow
and delete it from the expiring queue
"""
def tfn():
if fid in self._expiring:
self._emit_flow(self._expiring[fid])
del self._expiring[fid]
return tfn
[docs] def purge_idle(self, timeout=30):
# TODO test this, it's probably pretty slow.
for fid in self._active:
if self._pt - self._active['fid']['last'] > timeout:
self._flow_complete(fid)
[docs] def flush(self):
for fid in self._expiring:
self._emit_flow(self._expiring[fid])
self._expiring.clear()
for fid in self._active:
self._emit_flow(self._active[fid])
self._active.clear()
self._ignored.clear()
[docs] def run_flow_enqueuer(self, flowqueue, irqueue=None):
if irqueue:
self._irq = irqueue
self._irq_fired = None
# Run main loop until last packet seen
# then flush active flows and run again
for i in range(2):
while True:
f = self._next_flow()
if f:
flowqueue.put(f)
else:
self.flush()
break
# log observer info on shutdown
logging.getLogger("observer").info(
("processed %u packets "+
"(%u dropped, %u short, %u non-ip) "+
"into %u flows (%u ignored)") % (
self._ct_pkt, self._trace.pkt_drops(),
self._ct_shortkey, self._ct_nonip,
self._ct_flow, self._ct_ignored))
flowqueue.put(SHUTDOWN_SENTINEL)
def extract_ports(ip):
if ip.udp:
return (ip.udp.src_port, ip.udp.dst_port)
elif ip.tcp:
return (ip.tcp.src_port, ip.tcp.dst_port)
else:
return (None, None)
def basic_flow(rec, ip):
"""
New flow function that sets up basic flow information
"""
# Extract addresses and ports
(rec['sip'], rec['dip'], rec['proto']) = (str(ip.src_prefix), str(ip.dst_prefix), ip.proto)
(rec['sp'], rec['dp']) = extract_ports(ip)
# Initialize counters
rec['pkt_fwd'] = 0
rec['pkt_rev'] = 0
rec['oct_fwd'] = 0
rec['oct_rev'] = 0
# we want to keep this flow
return True
def basic_count(rec, ip, rev):
"""
Packet function that counts packets and octets per flow
"""
if rev:
rec["pkt_rev"] += 1
rec["oct_rev"] += ip.size
else:
rec["pkt_fwd"] += 1
rec["oct_fwd"] += ip.size
return True
def simple_observer(lturi):
return Observer(lturi,
new_flow_chain=[basic_flow],
ip4_chain=[basic_count],
ip6_chain=[basic_count])