import collections
import logging
import base64
import queue
import math
from pathspider.base import SHUTDOWN_SENTINEL
def _flow4_ids(ip):
# FIXME keep map of fragment IDs to keys (#144)
icmp_with_payload = {3, 4, 5, 11, 12}
quotation_fid = False
if ip.proto == 1 and ip.icmp.type in icmp_with_payload:
if ip.icmp.payload is not None and len(ip.icmp.payload.data) >= 20:
ip = ip.icmp.payload
quotation_fid = True
protos_with_ports = {6, 17, 132, 136}
if ip.proto in protos_with_ports:
# key includes ports
fid = ip.src_prefix.addr + ip.dst_prefix.addr + ip.data[
9:10] + ip.payload[0:4]
rid = ip.dst_prefix.addr + ip.src_prefix.addr + ip.data[
9:10] + ip.payload[2:4] + ip.payload[0:2]
else:
# no ports, just 3-tuple
fid = ip.src_prefix.addr + ip.dst_prefix.addr + ip.data[9:10]
rid = ip.dst_prefix.addr + ip.src_prefix.addr + ip.data[9:10]
if quotation_fid:
# If the fid is based on an ICMP quotation, need to be reversed
return (base64.b64encode(rid), base64.b64encode(fid))
else:
return (base64.b64encode(fid), base64.b64encode(rid))
def _flow6_ids(ip6):
icmp_with_payload = {1, 2, 3, 4}
quotation_fid = False
if ip6.proto == 58 and ip6.icmp6.type in icmp_with_payload:
if ip6.icmp6.payload is not None and len(ip6.icmp6.payload.data) >= 40:
ip6 = ip6.icmp6.payload
quotation_fid = True
protos_with_ports = {6, 17, 132, 136}
if ip6.proto in protos_with_ports:
# key includes ports
fid = ip6.src_prefix.addr + ip6.dst_prefix.addr + ip6.data[
6:7] + ip6.payload[0:4]
rid = ip6.dst_prefix.addr + ip6.src_prefix.addr + ip6.data[
6:7] + ip6.payload[2:4] + ip6.payload[0:2]
else:
# no ports, just 3-tuple
fid = ip6.src_prefix.addr + ip6.dst_prefix.addr + ip6.data[6:7]
rid = ip6.dst_prefix.addr + ip6.src_prefix.addr + ip6.data[6:7]
if quotation_fid:
# If the fid is based on an ICMP quotation, need to be reversed
return (base64.b64encode(rid), base64.b64encode(fid))
else:
return (base64.b64encode(fid), base64.b64encode(rid))
PacketClockTimer = collections.namedtuple("PacketClockTimer", ("time", "fn"))
[docs]class Observer:
"""
Wraps a packet source identified by a libtrace URI,
parses packets to divide them into flows, passing these
packets and flows onto a function chain to allow
data to be associated with each flow.
"""
[docs] def __init__(self, lturi, chains=None, idle_timeout=30, expiry_timeout=5,
aggregate=False):
"""
Create an Observer.
:param chains: Array of Observer chain classes
:see also: :ref:`Observer Documentation <observer>`
"""
# Only import this when needed
import plt as libtrace
# Control
self._irq = None
self._irq_fired = False
self._aggregate = aggregate
# Libtrace initialization
self._trace = libtrace.trace(lturi) # pylint: disable=no-member
self._trace.start()
self._pkt = libtrace.packet() # pylint: disable=no-member
# Chains of functions to evaluate
chains = chains if chains is not None else []
self._chains = [chain() for chain in chains]
# Packet timer and bintables
self._ptq = 0 # current packet timer, quantized
self._idle_bins = {} # map bin number to set of fids
self._expiry_bins = {} # map bin number to set of fids
self._idle_timeout = idle_timeout
self._expiry_timeout = expiry_timeout
self._bin_quantum = 1
#self._tq = [] # packet timer queue (heap)
# Flow tables
self._active = {}
self._expiring = {}
self._ignored = set()
# Emitter queue
self._emitted = collections.deque()
# Statistics and logging
self._logger = logging.getLogger("observer")
self._ct_pkt = 0
self._ct_nonip = 0
self._ct_shortkey = 0
self._ct_ignored = 0
self._ct_flow = 0
def _interrupted(self):
if not self._irq_fired and self._irq is not None:
try:
self._irq.get_nowait()
self._irq_fired = True
except queue.Empty:
pass
return self._irq_fired
def _get_chains(self, name):
return [
c.__getattribute__(name) for c in self._chains if hasattr(c, name)
]
def _next_packet(self):
# Import only when needed
import plt as libtrace
# see if someone told us to stop
if self._interrupted():
return False
# see if we're done iterating
if not self._trace.read_packet(self._pkt):
return False
# count the packet
self._ct_pkt += 1
# advance the packet clock
self._tick(self._pkt.seconds)
# get a flow ID and associated flow record for the packet
(fid, rec, rev) = self._get_flow()
# don't dispatch if we don't have a record
# (this happens for non-IP packets and flows
# we know we want to ignore)
if not rec:
return True
keep_flow = True
# run IP header chains
if self._pkt.ip:
for fn in self._get_chains("ip4"):
keep_flow = keep_flow and fn(rec, self._pkt.ip, rev=rev)
if self._pkt.icmp:
for fn in self._get_chains("icmp4"):
q = self._pkt.icmp.payload # pylint: disable=no-member
keep_flow = keep_flow and fn(rec, self._pkt.ip, q, rev=rev)
elif self._pkt.ip6:
for fn in self._get_chains("ip6"):
keep_flow = keep_flow and fn(rec, self._pkt.ip6, rev=rev)
if self._pkt.icmp6:
for fn in self._get_chains("icmp6"):
q = self._pkt.icmp6.payload # pylint: disable=no-member
keep_flow = keep_flow and fn(
rec, self._pkt.ip6, q, rev=rev)
# run transport header chains
if self._pkt.tcp:
for fn in self._get_chains("tcp"):
keep_flow = keep_flow and fn(rec, self._pkt.tcp, rev=rev)
elif self._pkt.udp:
for fn in self._get_chains("udp"):
keep_flow = keep_flow and fn(rec, self._pkt.udp, rev=rev)
# complete the flow if any chain function asked us to
if not keep_flow:
self._flow_complete(fid)
# we processed a packet, keep going
return True
# def _set_timer(self, delay, fid):
# # add to queue
# heapq.heappush(self._tq, PacketClockTimer(self._pt + delay,
# self._finish_expiry_tfn(fid)))
def _get_flow(self):
"""
Get a flow record for the given packet.
Create a new basic flow record
"""
# get possible a flow IDs for the packet
try:
if self._pkt.ip:
(ffid, rfid) = (4, 5) if self._aggregate else _flow4_ids(
self._pkt.ip)
ip = self._pkt.ip
elif self._pkt.ip6:
(ffid, rfid) = (4, 5) if self._aggregate else _flow6_ids(
self._pkt.ip6)
ip = self._pkt.ip6
else:
# we don't care about non-IP packets
self._ct_nonip += 1
return (None, None, False)
except ValueError:
self._ct_shortkey += 1
return (None, None, False)
# now look for forward and reverse in ignored, active,
# and expiring tables.
if ffid in self._ignored:
return (None, None, False)
elif rfid in self._ignored:
return (None, None, False)
elif ffid in self._active:
(fid, rec, active) = (ffid, self._active[ffid], True)
#self._logger.debug("found forward flow for "+str(ffid))
elif ffid in self._expiring:
(fid, rec, active) = (ffid, self._expiring[ffid], False)
#self._logger.debug("found expiring forward flow for "+str(ffid))
elif rfid in self._active:
(fid, rec, active) = (rfid, self._active[rfid], True)
#self._logger.debug("found reverse flow for "+str(rfid))
elif rfid in self._expiring:
(fid, rec, active) = (rfid, self._expiring[rfid], False)
#self._logger.debug("found expiring reverse flow for "+str(rfid))
else:
# nowhere to be found. new flow.
rec = {'pkt_first': ip.seconds, '_idle_bin': 0}
for fn in self._get_chains("new_flow"):
if not fn(rec, ip):
# self._logger.debug("ignoring "+str(ffid))
self._ignored.add(ffid)
self._ct_ignored += 1
return (None, None, False)
# wasn't vetoed. add to active table.
fid = ffid
self._active[ffid] = rec
active = True
# self._logger.debug("new flow for "+str(ffid))
self._ct_flow += 1
# update time and idle bin and return record
rec['pkt_last'] = ip.seconds
# update idle bin if we're not expiring
if active:
new_idle_bin = math.ceil((rec['pkt_last'] + self._idle_timeout) /
self._bin_quantum) * self._bin_quantum
if new_idle_bin > rec["_idle_bin"]:
if rec['_idle_bin'] in self._idle_bins:
self._idle_bins[rec['_idle_bin']] -= set((fid, ))
if new_idle_bin in self._idle_bins:
self._idle_bins[new_idle_bin] |= set((fid, ))
else:
self._idle_bins[new_idle_bin] = set((fid, ))
rec['_idle_bin'] = new_idle_bin
return (fid, rec, bool(fid == rfid))
def _flow_complete(self, fid):
"""
Mark a given flow ID as complete
"""
# skip all of this unless the flow is still in the active table
if fid not in self._active:
return
# remove flow ID from idle bin
rec = self._active[fid]
self._idle_bins[rec['_idle_bin']] -= set((fid, ))
del rec['_idle_bin']
# move record to expiring table
self._expiring[fid] = rec
del self._active[fid]
# assign expiry bin
expiry_bin = math.ceil((self._ptq + self._expiry_timeout) /
self._bin_quantum) * self._bin_quantum
#self._logger.debug("Completing flow "+str(fid)+" at "+str(self._ptq)+" to expire "+str(expiry_bin)+" (in "+str(expiry_bin-self._ptq)+"s)")
if expiry_bin in self._expiry_bins:
self._expiry_bins[expiry_bin] |= set((fid, ))
else:
self._expiry_bins[expiry_bin] = set((fid, ))
def _emit_flow(self, rec):
self._emitted.append(rec)
def _next_flow(self):
while len(self._emitted) == 0:
if not self._next_packet():
return None
return self._emitted.popleft()
def _tick(self, pt):
# quantize and skip if we're not advancing
next_ptq = math.ceil(pt / self._bin_quantum) * self._bin_quantum
if next_ptq <= self._ptq:
return
elif self._ptq == 0:
# handle zero case
self._ptq = next_ptq
return
# advance quantum
for bint in range(self._ptq + self._bin_quantum,
next_ptq + self._bin_quantum, self._bin_quantum):
self._logger.debug("tick: " + str(bint))
# process idle
if bint in self._idle_bins:
if len(self._idle_bins[bint]) > 0:
for fid in self._idle_bins[bint].copy():
self._flow_complete(fid)
del self._idle_bins[bint]
# process expiry
if bint in self._expiry_bins:
if len(self._expiry_bins[bint]) > 0:
for fid in self._expiry_bins[bint].copy():
self._emit_flow(self._expiring[fid])
del self._expiring[fid]
del self._expiry_bins[bint]
self._ptq = next_ptq
# def _tick(self, pt):
# # Advance packet clock
# self._pt = pt
# # fire all timers whose time has come
# while len(self._tq) > 0 and pt > min(self._tq, key=lambda x: x.time).time:
# try:
# heapq.heappop(self._tq).fn()
# except:
# type, value, tb = sys.exc_info()
# traceback.print_exc()
# pdb.post_mortem(tb)
# def _finish_expiry_tfn(self, fid):
# """
# On expiry timer, emit the flow
# and delete it from the expiring queue
# """
# def tfn():
# if fid in self._expiring:
# self._emit_flow(self._expiring[fid])
# del self._expiring[fid]
# # self._logger.debug("emitted "+str(fid)+" on expiry")
# return tfn
# def purge_idle(self, timeout=30):
# # TODO test this, it's probably pretty slow.
# for fid in self._active:
# if self._pt - self._active['fid']['last'] > timeout:
# self._flow_complete(fid)
[docs] def flush(self):
for fid in self._expiring:
self._emit_flow(self._expiring[fid])
# self._logger.debug("emitted "+str(fid)+" expiring during flush")
self._expiring.clear()
for fid in self._active:
self._emit_flow(self._active[fid])
# self._logger.debug("emitted "+str(fid)+" active during flush")
self._active.clear()
self._ignored.clear()
[docs] def run_flow_enqueuer(self, flowqueue, irqueue=None):
if irqueue:
self._irq = irqueue
self._irq_fired = None
# Run main loop until last packet seen
# then flush active flows and run again
for _ in range(2):
while True:
f = self._next_flow()
if f:
flowqueue.put(f)
else:
self.flush()
break
# log observer info on shutdown
self._logger.info(("processed %u packets "
"(%u dropped, %u short, %u non-ip) "
"into %u flows (%u ignored)"), self._ct_pkt,
self._trace.pkt_drops(), self._ct_shortkey,
self._ct_nonip, self._ct_flow, self._ct_ignored)
flowqueue.put(SHUTDOWN_SENTINEL)
[docs]class DummyObserver: # pylint: disable=R0903
"""
The dummy observer provides a class compatible with the API of the Observer
class without actually performing any operations. This is primarily used
for PATHspider's test suite.
"""
[docs] def run_flow_enqueuer(self, flowqueue, irqueue=None): # pylint: disable=R0201
"""
When running the flow enqueuer, no network operation is performed and
the thread will block until given a shutdown signal. When the shutdown
signal is received it will cascade the signal onto the flowqueue
in the same way that a real Observer instance would.
"""
irqueue.get()
flowqueue.put(SHUTDOWN_SENTINEL)