Source code for pathspider.base

Basic framework for Pathspider: coordinate active measurements on large target
lists with both system-level network stack state (sysctls, iptables rules, etc)
as well as information derived from flow-level passive observation of traffic at
the sender.

.. moduleauthor:: Brian Trammell <>

Derived and generalized from ECN Spider
(c) 2014 Damiano Boppart <>

    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 2 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along
    with this program; if not, write to the Free Software Foundation, Inc.,
    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.


import sys
import time
import logging
import socket
import collections
import threading
import multiprocessing as mp
import queue
from datetime import datetime
from enum import Enum

from ipaddress import ip_address

### Utility Classes

[docs]class SemaphoreN(threading.BoundedSemaphore): """ An extension to the standard library's BoundedSemaphore that provides functions to handle n tokens at once. """ def __init__(self, value): self._value = value super().__init__(self._value) self.empty() def __str__(self): return 'SemaphoreN with a maximum value of {}.'.format(self._value)
[docs] def acquire_n(self, value=1, blocking=True, timeout=None): """ Acquire ``value`` number of tokens at once. The parameters ``blocking`` and ``timeout`` have the same semantics as :class:`BoundedSemaphore`. :returns: The same value as the last call to `BoundedSemaphore`'s :meth:`acquire` if :meth:`acquire` were called ``value`` times instead of the call to this method. """ ret = None for _ in range(value): ret = self.acquire(blocking=blocking, timeout=timeout) return ret
[docs] def release_n(self, value=1): """ Release ``value`` number of tokens at once. :returns: The same value as the last call to `BoundedSemaphore`'s :meth:`release` if :meth:`release` were called ``value`` times instead of the call to this method. """ ret = None for _ in range(value): ret = self.release() return ret
[docs] def empty(self): """ Acquire all tokens of the semaphore. """ while self.acquire(blocking=False): pass
[docs]class Conn(Enum): OK = 0 FAILED = 1 TIMEOUT = 2 SKIPPED = 3
Connection = collections.namedtuple("Connection", ["client", "port", "state", "tstart"]) QUEUE_SIZE = 1000 QUEUE_SLEEP = 0.5 SHUTDOWN_SENTINEL = "SHUTDOWN_SENTINEL" NO_RESULT = None NO_FLOW = None
[docs]class Spider: """ A spider consists of a configurator (which alternates between two system configurations), a large number of workers (for performing some network action for each configuration), an Observer which derives information from passively observed traffic, and a thread that merges results from the workers with flow records from the collector. """ def __init__(self, worker_count, libtrace_uri, args): """ The initialisation of a pathspider plugin. :param worker_count: The number of workers to use. :type worker_count: int :param libtrace_uri: The URI to pass to the Observer to describe the interface on which packets should be captured. :type libtrace_uri: str It is expected that this function will be overloaded by plugins, though the plugin should always make a call to the __init__() function of the abstract Spider class as this initialises all of the base functionality: .. code-block:: python super().__init__(worker_count=worker_count, libtrace_uri=libtrace_uri, args=args) This can be used to initialise any variables which may be required in the object. """ self.args = args self.activated = True self.running = False self.stopping = False self.terminating = False self.worker_count = worker_count self.active_worker_count = 0 self.active_worker_lock = threading.Lock() self.libtrace_uri = libtrace_uri self.jobqueue = queue.Queue(QUEUE_SIZE) self.resqueue = queue.Queue(QUEUE_SIZE) self.flowqueue = mp.Queue(QUEUE_SIZE) self.observer_shutdown_queue = mp.Queue(QUEUE_SIZE) self.restab = {} self.flowtab = {} self.flowreap = collections.deque() self.flowreap_size = min(self.worker_count * 100, 10000) self.outqueue = queue.Queue(QUEUE_SIZE) = None self.worker_threads = [] self.configurator_thread = None self.merger_thread = None self.observer_process = None # self._worker_state = [ "not_started" ] * self.worker_count self.lock = threading.Lock() self.exception = None self.conn_timeout = None
[docs] def config_zero(self): """ Changes the global state or system configuration for the baseline measurements. """ raise NotImplementedError("Cannot instantiate an abstract Spider")
[docs] def config_one(self): """ Changes the global state or system configuration for the experimental measurements. """ raise NotImplementedError("Cannot instantiate an abstract Spider")
[docs] def configurator(self): raise NotImplementedError("Cannot instantiate an abstract Spider")
[docs] def worker(self): raise NotImplementedError("Cannot instantiate an abstract Spider")
[docs] def pre_connect(self, job): """ Performs pre-connection operations. :param job: The job record. :type job: dict :returns: dict -- Result of the pre-connection operation(s). The pre_connect function can be used to perform any operations that must be performed before each connection. It will be run only once per job, with the same result passed to both the A and B connect calls. This function is not synchronised with the configurator. Plugins to PATHspider can optionally implement this function. If this function is not overloaded, it will be a noop. """ pass
[docs] def connect(self, job, pcs, config): """ Performs the connection. :param job: The job record. :type job: dict :param pcs: The result of the pre-connection operations(s). :type pcs: dict :param config: The current state of the configurator (0 or 1). :type config: int :returns: object -- Any result of the connect operation to be passed to :func:`pathspider.base.Spider.post_connect`. The connect function is used to perform the connection operation and is run for both the A and B test. This method is not implemented in the abstract :class:`pathspider.base.Spider` class and must be implemented by any plugin. Sockets created during this operation can be returned by the function for use in the post-connection phase, to minimise the time that the configurator is blocked from moving to the next configuration. """ raise NotImplementedError("Cannot instantiate an abstract Pathspider")
[docs] def post_connect(self, job, conn, pcs, config): """ Performs post-connection operations. :param job: The job record. :type job: dict :param conn: The result of the connection operation(s). :type conn: object :param pcs: The result of the pre-connection operations(s). :type pcs: dict :param config: The state of the configurator during :func:`pathspider.base.Spider.connect`. :type config: int :returns: dict -- Result of the pre-connection operation(s). The post_connect function can be used to perform any operations that must be performed after each connection. It will be run for both the A and the B configuration, and is not synchronised with the configurator. Plugins to PATHspider can optionally implement this function. If this function is not overloaded, it will be a noop. Any sockets or other file handles that were opened during :func:`pathspider.base.Spider.connect` should be closed in this function if they have not been already. """ raise NotImplementedError("Cannot instantiate an abstract Pathspider")
[docs] def create_observer(self): """ Create a flow observer. This function is called by the base Spider logic to get an instance of :class:`` configured with the function chains that are requried by the plugin. This method is not implemented in the abstract :class:`pathspider.base.Spider` class and must be implemented by any plugin. For more information on how to use the flow observer, see :ref:`Observer <observer>`. """ raise NotImplementedError("Cannot instantiate an abstract Pathspider")
[docs] def merger(self): """ Thread to merge results from the workers and the observer. """ logger = logging.getLogger('pathspider') merging_flows = True merging_results = True # merge_cycles = 0 while self.running and (merging_results or merging_flows): # if merge_cycles % 20 == 0: # for wn in range(0, self.worker_count): # logger.debug("worker %3u: %s" % (wn, self._worker_state[wn])) # merge_cycles += 1 if merging_flows and self.flowqueue.qsize() >= self.resqueue.qsize(): try: flow = self.flowqueue.get_nowait() except queue.Empty: time.sleep(QUEUE_SLEEP) else: if flow == SHUTDOWN_SENTINEL: logger.debug("stopping flow merging on sentinel") merging_flows = False continue flowkey = (flow['dip'], flow['sp']) logger.debug("got a flow (" + str(flow['sip']) + ", " + str(flow['sp']) + ")") if flowkey in self.restab: logger.debug("merging flow") self.merge(flow, self.restab[flowkey]) del self.restab[flowkey] elif flowkey in self.flowtab: logger.debug("won't merge duplicate flow") else: # Create a new flow self.flowtab[flowkey] = flow # And reap the oldest, if the reap queue is full self.flowreap.append(flowkey) if len(self.flowreap) > self.flowreap_size: try: del self.flowtab[self.flowreap.popleft()] except KeyError: pass elif merging_results: try: res = self.resqueue.get_nowait() except queue.Empty: time.sleep(QUEUE_SLEEP) logger.debug("result queue is empty") else: if res == NO_RESULT: # handle skipped results continue if res == SHUTDOWN_SENTINEL: merging_results = False logger.debug("stopping result merging on sentinel") continue reskey = (res.ip, res.port) logger.debug("got a result (" + str(res.ip) + ", " + str(res.port) + ")") if reskey in self.flowtab: logger.debug("merging result") self.merge(self.flowtab[reskey], res) del self.flowtab[reskey] elif reskey in self.restab: logger.debug("won't merge duplicate result") else: self.restab[reskey] = res self.resqueue.task_done() # Both shutdown markers received. # Call merge on all remaining entries in the results table # with null flows. # Commented out for now; see for res_item in self.restab.items(): res = res_item[1] self.merge(NO_FLOW, res)
[docs] def merge(self, flow, res): """ Merge a job record with a flow record. :param flow: The flow record. :type flow: dict :param res: The job record. :type res: dict :return: tuple -- Final record for job. In order to create a final record for reporting on a job, the final job record must be merged with the flow record. This function should be implemented by any plugin to provide the logic for this merge as the keys used in these records cannot be known by PATHspider in advance. This method is not implemented in the abstract :class:`pathspider.base.Spider` class and must be implemented by any plugin. """ raise NotImplementedError("Cannot instantiate an abstract Pathspider")
[docs] def exception_wrapper(self, target, *args, **kwargs): try: target(*args, **kwargs) except: #FIXME: What exceptions do we expect? logger = logging.getLogger('pathspider') logger.exception("exception occurred. terminating.") if self.exception is None: self.exception = sys.exc_info()[1] self.terminate()
[docs] def start(self): """ This function starts a PATHspider plugin. In order to run, the plugin must have first been activated by calling its :func:`activate` method. This function causes the following to happen: * Set the running flag * Create an :class:`` and start its process * Start the merger thread * Start the configurator thread * Start the worker threads The number of worker threads to start was given when activating the plugin. """ logger = logging.getLogger('pathspider') if self.activated == False: logger.exception("tried to run plugin without activating first") sys.exit(1)"starting pathspider") with self.lock: # set the running flag self.running = True # create an observer and start its process = self.create_observer() self.observer_process = mp.Process( args=(, self.flowqueue, self.observer_shutdown_queue), target=self.exception_wrapper, name='observer', daemon=True) self.observer_process.start() logger.debug("observer forked") # now start up ecnspider, backwards self.merger_thread = threading.Thread( args=(self.merger,), target=self.exception_wrapper, name="merger", daemon=True) self.merger_thread.start() logger.debug("merger up") self.configurator_thread = threading.Thread( args=(self.configurator,), target=self.exception_wrapper, name="configurator", daemon=True) self.configurator_thread.start() logger.debug("configurator up") # threading.Thread( # target = self.worker_status_reporter, # name = "status_reporter", # daemon = True).start() # logger.debug("status reporter up") self.worker_threads = [] with self.active_worker_lock: self.active_worker_count = self.worker_count for i in range(self.worker_count): worker_thread = threading.Thread( args=(self.worker, i), target=self.exception_wrapper, name='worker_{}'.format(i), daemon=True) self.worker_threads.append(worker_thread) worker_thread.start() logger.debug("workers up")
[docs] def shutdown(self): """ Shut down PathSpider in an orderly fashion, ensuring that all queued jobs complete, and all available results are merged. """ logger = logging.getLogger('pathspider')"beginning shutdown") with self.lock: # Set stopping flag self.stopping = True # Put a bunch of shutdown signals in the job queue for i in range(self.worker_count * 2): self.jobqueue.put(SHUTDOWN_SENTINEL) # Wait for worker threads to shut down for worker in self.worker_threads: if threading.current_thread() != worker: logger.debug("joining worker: " + repr(worker)) worker.join() logger.debug("all workers joined") # Tell observer to shut down self.observer_shutdown_queue.put(True) self.observer_process.join() logger.debug("observer shutdown") # Tell merger to shut down self.resqueue.put(SHUTDOWN_SENTINEL) self.merger_thread.join() logger.debug("merger shutdown") # Wait for merged results to be written self.outqueue.join() logger.debug("all results retrieved") # Propagate shutdown sentinel and tell threads to stop self.outqueue.put(SHUTDOWN_SENTINEL) # Tell threads we've stopped self.running = False # Join configurator # if threading.current_thread() != self.configurator_thread: # self.configurator_thread.join() self.stopping = False"shutdown complete")
[docs] def terminate(self): """ Shut down PathSpider as quickly as possible, without any regard to completeness of results. """ logger = logging.getLogger('pathspider')"terminating pathspider") # tell threads to stop self.stopping = True self.running = False # terminate observer self.observer_shutdown_queue.put(True) # drain queues try: while True: self.jobqueue.task_done() except ValueError: pass try: while True: self.resqueue.task_done() except ValueError: pass try: while True: self.flowqueue.get_nowait() except queue.Empty: pass # Join remaining threads for worker in self.worker_threads: if threading.current_thread() != worker: logger.debug("joining worker: " + repr(worker)) worker.join() logger.debug("all workers joined") if self.configurator_thread and \ (threading.current_thread() != self.configurator_thread): self.configurator_thread.join() logger.debug("configurator joined") if threading.current_thread() != self.merger_thread: self.merger_thread.join() logger.debug("merger joined") self.observer_process.join() logger.debug("observer joined") self.outqueue.put(SHUTDOWN_SENTINEL)"termination complete")
[docs] def add_job(self, job): """ Adds a job to the job queue. If PATHspider is currently stopping, the job will not be added to the queue. """ if self.stopping: return self.jobqueue.put(job)
[docs]class SynchronizedSpider(Spider): def __init__(self, worker_count, libtrace_uri, args): super().__init__(worker_count, libtrace_uri, args) # create semaphores for synchronizing configurations self.sem_config_zero = SemaphoreN(worker_count) self.sem_config_zero.empty() self.sem_config_zero_rdy = SemaphoreN(worker_count) self.sem_config_zero_rdy.empty() self.sem_config_one = SemaphoreN(worker_count) self.sem_config_one.empty() self.sem_config_one_rdy = SemaphoreN(worker_count) self.sem_config_one_rdy.empty()
[docs] def configurator(self): """ Thread which synchronizes on a set of semaphores and alternates between two system states. """ logger = logging.getLogger('pathspider') while self.running: logger.debug("setting config zero") self.config_zero() logger.debug("config zero active") self.sem_config_zero.release_n(self.worker_count) self.sem_config_one_rdy.acquire_n(self.worker_count) logger.debug("setting config one") self.config_one() logger.debug("config one active") self.sem_config_one.release_n(self.worker_count) self.sem_config_zero_rdy.acquire_n(self.worker_count) # In case the master exits the run loop before all workers have, # these tokens will allow all workers to run through again, # until the next check at the start of the loop self.sem_config_zero.release_n(self.worker_count) self.sem_config_one.release_n(self.worker_count)
[docs] def worker(self, worker_number): """ This function provides the logic for configuration-synchronized worker threads. :param worker_number: The unique number of the worker. :type worker_number: int The workers operate as continuous loops: * Fetch next job from the job queue * Perform pre-connection operations * Acquire a lock for "config_zero" * Perform the "config_zero" connection * Release "config_zero" * Acquire a lock for "config_one" * Perform the "config_one" connection * Release "config_one" * Perform post-connection operations for config_zero and pass the result to the merger * Perform post-connection operations for config_one and pass the result to the merger * Do it all again If the job fetched is the SHUTDOWN_SENTINEL, then the worker will terminate as this indicates that all the jobs have now been processed. """ logger = logging.getLogger('pathspider') worker_active = True while self.running: if worker_active: try: job = self.jobqueue.get_nowait() # Break on shutdown sentinel if job == SHUTDOWN_SENTINEL: self.jobqueue.task_done() logger.debug("shutting down worker "+str(worker_number)+" on sentinel") # self._worker_state[worker_number] = "shutdown_sentinel" worker_active = False with self.active_worker_lock: self.active_worker_count -= 1 logger.debug(str(self.active_worker_count)+" workers still active") continue logger.debug("got a job: "+repr(job)) except queue.Empty: #logger.debug("no job available, sleeping") # spin the semaphores self.sem_config_zero.acquire() # self._worker_state[worker_number] = "sleep_0" time.sleep(QUEUE_SLEEP) self.sem_config_one_rdy.release() self.sem_config_one.acquire() # self._worker_state[worker_number] = "sleep_1" time.sleep(QUEUE_SLEEP) self.sem_config_zero_rdy.release() else: # Hook for preconnection # self._worker_state[worker_number] = "preconn" pcs = self.pre_connect(job) # Wait for configuration zero # self._worker_state[worker_number] = "wait_0" self.sem_config_zero.acquire() # Connect in configuration zero # self._worker_state[worker_number] = "conn_0" conn0 = self.connect(job, pcs, 0) # Wait for configuration one # self._worker_state[worker_number] = "wait_1" self.sem_config_one_rdy.release() self.sem_config_one.acquire() # Connect in configuration one # self._worker_state[worker_number] = "conn_1" conn1 = self.connect(job, pcs, 1) # Signal okay to go to configuration zero self.sem_config_zero_rdy.release() # Pass results on for merge # self._worker_state[worker_number] = "postconn_0" self.resqueue.put(self.post_connect(job, conn0, pcs, 0)) # self._worker_state[worker_number] = "postconn_1" self.resqueue.put(self.post_connect(job, conn1, pcs, 1)) # self._worker_state[worker_number] = "done" logger.debug("job complete: "+repr(job)) self.jobqueue.task_done() else: # not worker_active, spin the semaphores self.sem_config_zero.acquire() # self._worker_state[worker_number] = "shutdown_0" time.sleep(QUEUE_SLEEP) with self.active_worker_lock: if self.active_worker_count <= 0: # self._worker_state[worker_number] = "shutdown_complete" break self.sem_config_one_rdy.release() self.sem_config_one.acquire() # self._worker_state[worker_number] = "shutdown_1" time.sleep(QUEUE_SLEEP) self.sem_config_zero_rdy.release()
[docs] def tcp_connect(self, job): """ This helper function will perform a TCP connection. It will not perform any special action in the event that this is the experimental flow, it only performs a TCP connection. This function expects that self.conn_timeout has been set to a sensible value. """ if self.conn_timeout is None: raise RuntimeError("Plugin did not set TCP connect timeout.") tstart = str(datetime.utcnow()) if ":" in job[0]: sock = socket.socket(socket.AF_INET6) else: sock = socket.socket(socket.AF_INET) try: sock.settimeout(self.conn_timeout) sock.connect((job[0], job[1])) return Connection(sock, sock.getsockname()[1], Conn.OK, tstart) except TimeoutError: return Connection(sock, sock.getsockname()[1], Conn.TIMEOUT, tstart) except OSError: return Connection(sock, sock.getsockname()[1], Conn.FAILED, tstart)
[docs]class DesynchronizedSpider(Spider): def __init__(self, worker_count, libtrace_uri, args): super().__init__(worker_count, libtrace_uri, args)
[docs] def config_zero(self): pass
[docs] def config_one(self): pass
[docs] def configurator(self): """ Since there is no need for a configurator thread in a desynchronized spider, this thread is a no-op """ logger = logging.getLogger('pathspider')"configurations are not synchronized")
[docs] def worker(self, worker_number): """ This function provides the logic for configuration-synchronized worker threads. :param worker_number: The unique number of the worker. :type worker_number: int The workers operate as continuous loops: * Fetch next job from the job queue * Perform pre-connection operations * Acquire a lock for "config_zero" * Perform the "config_zero" connection * Release "config_zero" * Acquire a lock for "config_one" * Perform the "config_one" connection * Release "config_one" * Perform post-connection operations for config_zero and pass the result to the merger * Perform post-connection operations for config_one and pass the result to the merger * Do it all again If the job fetched is the SHUTDOWN_SENTINEL, then the worker will terminate as this indicates that all the jobs have now been processed. """ logger = logging.getLogger('pathspider') worker_active = True while self.running: if worker_active: try: job = self.jobqueue.get_nowait() # Break on shutdown sentinel if job == SHUTDOWN_SENTINEL: self.jobqueue.task_done() logger.debug("shutting down worker "+str(worker_number)+" on sentinel") # self._worker_state[worker_number] = "shutdown_sentinel" worker_active = False with self.active_worker_lock: self.active_worker_count -= 1 logger.debug(str(self.active_worker_count)+" workers still active") continue logger.debug("got a job: "+repr(job)) except queue.Empty: time.sleep(QUEUE_SLEEP) else: # Hook for preconnection # self._worker_state[worker_number] = "preconn" pcs = self.pre_connect(job) # Connect in configuration zero # self._worker_state[worker_number] = "conn_0" conn0 = self.connect(job, pcs, 0) # Connect in configuration one # self._worker_state[worker_number] = "conn_1" conn1 = self.connect(job, pcs, 1) # Pass results on for merge # self._worker_state[worker_number] = "postconn_0" self.resqueue.put(self.post_connect(job, conn0, pcs, 0)) # self._worker_state[worker_number] = "postconn_1" self.resqueue.put(self.post_connect(job, conn1, pcs, 1)) # self._worker_state[worker_number] = "done" logger.debug("job complete: "+repr(job)) self.jobqueue.task_done() elif not self.stopping: time.sleep(QUEUE_SLEEP) else: # self._worker_state[worker_number] = "shutdown_complete" break
[docs]class PluggableSpider: @staticmethod
[docs] def register_args(subparsers): raise NotImplementedError("Cannot register an abstract plugin")