import collections
import logging
import base64
import heapq
import queue
import math
import multiprocessing as mp
# these three for debugging
import sys
import pdb
import traceback
from pathspider.base import SHUTDOWN_SENTINEL
def _flow4_ids(ip):
# Only import this when needed
import plt as libtrace
# FIXME keep map of fragment IDs to keys
icmp_with_payload = {3, 4, 5, 11, 12}
quotation_fid = False
if ip.proto == 1 and ip.icmp.type in icmp_with_payload:
ip = libtrace.ip(ip.icmp.data[8:]) # pylint: disable=no-member
quotation_fid = True
protos_with_ports = {6, 17, 132, 136}
if ip.proto in protos_with_ports:
# key includes ports
fid = ip.src_prefix.addr + ip.dst_prefix.addr + ip.data[9:10] + ip.payload[0:4]
rid = ip.dst_prefix.addr + ip.src_prefix.addr + ip.data[9:10] + ip.payload[2:4] + ip.payload[0:2]
else:
# no ports, just 3-tuple
fid = ip.src_prefix.addr + ip.dst_prefix.addr + ip.data[9:10]
rid = ip.dst_prefix.addr + ip.src_prefix.addr + ip.data[9:10]
if quotation_fid:
# If the fid is based on an ICMP quotation, need to be reversed
return (base64.b64encode(rid), base64.b64encode(fid))
else:
return (base64.b64encode(fid), base64.b64encode(rid))
def _flow6_ids(ip6):
# FIXME link ICMP by looking at payload
if ip6.proto == 6 or ip6.proto == 17 or ip6.proto == 132:
# key includes ports
fid = ip6.src_prefix.addr + ip6.dst_prefix.addr + ip6.data[6:7] + ip6.payload[0:4]
rid = ip6.dst_prefix.addr + ip6.src_prefix.addr + ip6.data[6:7] + ip6.payload[2:4] + ip6.payload[0:2]
else:
# no ports, just 3-tuple
fid = ip6.src_prefix.addr + ip6.dst_prefix.addr + ip6.data[6:7]
rid = ip6.dst_prefix.addr + ip6.src_prefix.addr + ip6.data[6:7]
return (base64.b64encode(fid), base64.b64encode(rid))
PacketClockTimer = collections.namedtuple("PacketClockTimer", ("time", "fn"))
[docs]class Observer:
"""
Wraps a packet source identified by a libtrace URI,
parses packets to divide them into flows, passing these
packets and flows onto a function chain to allow
data to be associated with each flow.
"""
[docs] def __init__(self, lturi,
new_flow_chain=[],
ip4_chain=[],
ip6_chain=[],
icmp4_chain=[],
icmp6_chain=[],
tcp_chain=[],
udp_chain=[],
l4_chain=[],
idle_timeout=30,
expiry_timeout=5):
"""
Create an Observer.
:param new_flow_chain: Array of functions to initialise new flows.
:type new_flow_chain: array(function)
:param ip4_chain: Array of functions to pass IPv4 headers to.
:type ip4_chain: array(function)
:param ip6_chain: Array of functions to pass IPv6 headers to.
:type ip6_chain: array(function)
:param icmp4_chain: Array of functions to pass IPv4 headers containing
ICMPv4 headers to.
:type icmp4_chain: array(function)
:param icmp6_chain: Array of functions to pass IPv6 headers containing
ICMPv6 headers to.
:type icmp6_chain: array(function)
:param tcp_chain: Array of functions to pass TCP headers to.
:param tcp_chain: Array of functions to pass TCP headers to.
:type tcp_chain: array(function)
:param udp_chain: Array of functions to pass UDP headers to.
:type udp_chain: array(function)
:param l4_chain: Array of functions to pass other layer 4 headers to.
:type l4_chain: array(function)
:see also: :ref:`Observer Documentation <observer>`
"""
# Only import this when needed
import plt as libtrace
# Control
self._irq = None
self._irq_fired = False
# Libtrace initialization
self._trace = libtrace.trace(lturi) # pylint: disable=no-member
self._trace.start()
self._pkt = libtrace.packet() # pylint: disable=no-member
# Chains of functions to evaluate
self._new_flow_chain = new_flow_chain
self._ip4_chain = ip4_chain
self._ip6_chain = ip6_chain
self._icmp4_chain = icmp4_chain
self._icmp6_chain = icmp6_chain
self._tcp_chain = tcp_chain
self._udp_chain = udp_chain
self._l4_chain = l4_chain
# Packet timer and bintables
self._ptq = 0 # current packet timer, quantized
self._idle_bins = {} # map bin number to set of fids
self._expiry_bins = {} # map bin number to set of fids
self._idle_timeout = idle_timeout
self._expiry_timeout = expiry_timeout
self._bin_quantum = 1
#self._tq = [] # packet timer queue (heap)
# Flow tables
self._active = {}
self._expiring = {}
self._ignored = set()
# Emitter queue
self._emitted = collections.deque()
# Statistics and logging
self._logger = logging.getLogger("observer")
self._ct_pkt = 0
self._ct_nonip = 0
self._ct_shortkey = 0
self._ct_ignored = 0
self._ct_flow = 0
def _interrupted(self):
if not self._irq_fired and self._irq is not None:
try:
self._irq.get_nowait()
self._irq_fired = True
except queue.Empty:
pass
return self._irq_fired
def _next_packet(self):
# Import only when needed
import plt as libtrace
# see if someone told us to stop
if self._interrupted():
return False
# see if we're done iterating
if not self._trace.read_packet(self._pkt):
return False
# count the packet
self._ct_pkt += 1
# advance the packet clock
self._tick(self._pkt.seconds)
# get a flow ID and associated flow record for the packet
(fid, rec, rev) = self._get_flow()
# don't dispatch if we don't have a record
# (this happens for non-IP packets and flows
# we know we want to ignore)
if not rec:
return True
keep_flow = True
# run IP header chains
if self._pkt.ip:
for fn in self._ip4_chain:
keep_flow = keep_flow and fn(rec, self._pkt.ip, rev=rev)
if self._pkt.icmp:
for fn in self._icmp4_chain:
q = libtrace.ip(self._pkt.ip.icmp.data[8:]) # pylint: disable=no-member
keep_flow = keep_flow and fn(rec, self._pkt.ip, q, rev=rev)
elif self._pkt.ip6:
for fn in self._ip6_chain:
keep_flow = keep_flow and fn(rec, self._pkt.ip6, rev=rev)
if self._pkt.icmp6:
for fn in self._icmp6_chain:
q = libtrace.ip(self._pkt.ip.icmp6.data[8:]) # pylint: disable=no-member
keep_flow = keep_flow and fn(rec, self._pkt.ip6, q, rev=rev)
# run transport header chains
if self._pkt.tcp:
for fn in self._tcp_chain:
keep_flow = keep_flow and fn(rec, self._pkt.tcp, rev=rev)
elif self._pkt.udp:
for fn in self._udp_chain:
keep_flow = keep_flow and fn(rec, self._pkt.udp, rev=rev)
else:
for fn in self._l4_chain:
keep_flow = keep_flow and fn(rec, self._pkt, rev=rev)
# complete the flow if any chain function asked us to
if not keep_flow:
self._flow_complete(fid)
# we processed a packet, keep going
return True
# def _set_timer(self, delay, fid):
# # add to queue
# heapq.heappush(self._tq, PacketClockTimer(self._pt + delay,
# self._finish_expiry_tfn(fid)))
def _get_flow(self):
"""
Get a flow record for the given packet.
Create a new basic flow record
"""
# get possible a flow IDs for the packet
try:
if self._pkt.ip:
(ffid, rfid) = _flow4_ids(self._pkt.ip)
ip = self._pkt.ip
elif self._pkt.ip6:
(ffid, rfid) = _flow6_ids(self._pkt.ip6)
ip = self._pkt.ip6
else:
# we don't care about non-IP packets
self._ct_nonip += 1
return (None, None, False)
except ValueError:
self._ct_shortkey += 1
return (None, None, False)
# now look for forward and reverse in ignored, active,
# and expiring tables.
if ffid in self._ignored:
return (None, None, False)
elif rfid in self._ignored:
return (None, None, False)
elif ffid in self._active:
(fid, rec, active) = (ffid, self._active[ffid], True)
#self._logger.debug("found forward flow for "+str(ffid))
elif ffid in self._expiring:
(fid, rec, active) = (ffid, self._expiring[ffid], False)
#self._logger.debug("found expiring forward flow for "+str(ffid))
elif rfid in self._active:
(fid, rec, active) = (rfid, self._active[rfid], True)
#self._logger.debug("found reverse flow for "+str(rfid))
elif rfid in self._expiring:
(fid, rec, active) = (rfid, self._expiring[rfid], False)
#self._logger.debug("found expiring reverse flow for "+str(rfid))
else:
# nowhere to be found. new flow.
rec = {'first': ip.seconds, '_idle_bin': 0}
for fn in self._new_flow_chain:
if not fn(rec, ip):
# self._logger.debug("ignoring "+str(ffid))
self._ignored.add(ffid)
self._ct_ignored += 1
return (None, None, False)
# wasn't vetoed. add to active table.
fid = ffid
self._active[ffid] = rec
active = True
# self._logger.debug("new flow for "+str(ffid))
self._ct_flow += 1
# update time and idle bin and return record
rec['last'] = ip.seconds
# update idle bin if we're not expiring
if active:
new_idle_bin = math.ceil((rec['last'] + self._idle_timeout) / self._bin_quantum) * self._bin_quantum
if new_idle_bin > rec["_idle_bin"] :
if rec['_idle_bin'] in self._idle_bins:
self._idle_bins[rec['_idle_bin']] -= set((fid,))
if new_idle_bin in self._idle_bins:
self._idle_bins[new_idle_bin] |= set((fid,))
else:
self._idle_bins[new_idle_bin] = set((fid,))
rec['_idle_bin'] = new_idle_bin
return (fid, rec, bool(fid == rfid))
def _flow_complete(self, fid):
"""
Mark a given flow ID as complete
"""
# skip all of this unless the flow is still in the active table
if fid not in self._active:
return
# remove flow ID from idle bin
rec = self._active[fid]
self._idle_bins[rec['_idle_bin']] -= set((fid,))
del(rec['_idle_bin'])
# move record to expiring table
self._expiring[fid] = rec
del self._active[fid]
# assign expiry bin
expiry_bin = math.ceil((self._ptq + self._expiry_timeout) / self._bin_quantum) * self._bin_quantum
#self._logger.debug("Completing flow "+str(fid)+" at "+str(self._ptq)+" to expire "+str(expiry_bin)+" (in "+str(expiry_bin-self._ptq)+"s)")
if expiry_bin in self._expiry_bins:
self._expiry_bins[expiry_bin] |= set((fid,))
else:
self._expiry_bins[expiry_bin] = set((fid,))
def _emit_flow(self, rec):
self._emitted.append(rec)
def _next_flow(self):
while len(self._emitted) == 0:
if not self._next_packet():
return None
return self._emitted.popleft()
def _tick(self, pt):
# quantize and skip if we're not advancing
next_ptq = math.ceil(pt / self._bin_quantum) * self._bin_quantum
if next_ptq <= self._ptq:
return
elif self._ptq == 0:
# handle zero case
self._ptq = next_ptq
return
# advance quantum
for bint in range(self._ptq + self._bin_quantum, next_ptq + self._bin_quantum, self._bin_quantum):
self._logger.debug("tick: "+str(bint))
# process idle
if bint in self._idle_bins:
if len(self._idle_bins[bint]) > 0:
for fid in self._idle_bins[bint].copy():
self._flow_complete(fid)
del(self._idle_bins[bint])
# process expiry
if bint in self._expiry_bins:
if len(self._expiry_bins[bint]) > 0:
for fid in self._expiry_bins[bint].copy():
self._emit_flow(self._expiring[fid])
del self._expiring[fid]
del(self._expiry_bins[bint])
self._ptq = next_ptq
# def _tick(self, pt):
# # Advance packet clock
# self._pt = pt
# # fire all timers whose time has come
# while len(self._tq) > 0 and pt > min(self._tq, key=lambda x: x.time).time:
# try:
# heapq.heappop(self._tq).fn()
# except:
# type, value, tb = sys.exc_info()
# traceback.print_exc()
# pdb.post_mortem(tb)
# def _finish_expiry_tfn(self, fid):
# """
# On expiry timer, emit the flow
# and delete it from the expiring queue
# """
# def tfn():
# if fid in self._expiring:
# self._emit_flow(self._expiring[fid])
# del self._expiring[fid]
# # self._logger.debug("emitted "+str(fid)+" on expiry")
# return tfn
# def purge_idle(self, timeout=30):
# # TODO test this, it's probably pretty slow.
# for fid in self._active:
# if self._pt - self._active['fid']['last'] > timeout:
# self._flow_complete(fid)
[docs] def flush(self):
for fid in self._expiring:
self._emit_flow(self._expiring[fid])
# self._logger.debug("emitted "+str(fid)+" expiring during flush")
self._expiring.clear()
for fid in self._active:
self._emit_flow(self._active[fid])
# self._logger.debug("emitted "+str(fid)+" active during flush")
self._active.clear()
self._ignored.clear()
[docs] def run_flow_enqueuer(self, flowqueue, irqueue=None):
if irqueue:
self._irq = irqueue
self._irq_fired = None
# Run main loop until last packet seen
# then flush active flows and run again
for i in range(2):
while True:
f = self._next_flow()
if f:
flowqueue.put(f)
else:
self.flush()
break
# log observer info on shutdown
self._logger.info(
("processed %u packets "+
"(%u dropped, %u short, %u non-ip) "+
"into %u flows (%u ignored)") % (
self._ct_pkt, self._trace.pkt_drops(),
self._ct_shortkey, self._ct_nonip,
self._ct_flow, self._ct_ignored))
flowqueue.put(SHUTDOWN_SENTINEL)
[docs]def basic_flow(rec, ip):
"""
New flow function that sets up basic flow information
"""
# Extract addresses and ports
(rec['sip'], rec['dip'], rec['proto']) = (str(ip.src_prefix), str(ip.dst_prefix), ip.proto)
(rec['sp'], rec['dp']) = extract_ports(ip)
# Initialize counters
rec['pkt_fwd'] = 0
rec['pkt_rev'] = 0
rec['oct_fwd'] = 0
rec['oct_rev'] = 0
# we want to keep this flow
return True
[docs]def basic_count(rec, ip, rev):
"""
Packet function that counts packets and octets per flow
"""
if rev:
rec["pkt_rev"] += 1
rec["oct_rev"] += ip.size
else:
rec["pkt_fwd"] += 1
rec["oct_fwd"] += ip.size
return True
[docs]def simple_observer(lturi):
return Observer(lturi,
new_flow_chain=[basic_flow],
ip4_chain=[basic_count],
ip6_chain=[basic_count])