Source code for pathspider.base

"""
Basic framework for Pathspider: coordinate active measurements on large target
lists with both system-level network stack state (sysctls, iptables rules, etc)
as well as information derived from flow-level passive observation of traffic at
the sender.

.. moduleauthor:: Brian Trammell <brian@trammell.ch>

Derived and generalized from ECN Spider
(c) 2014 Damiano Boppart <hat.guy.repo@gmail.com>

    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 2 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along
    with this program; if not, write to the Free Software Foundation, Inc.,
    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""

import sys
import time
import logging
import socket
import collections
import threading
import multiprocessing as mp
import queue

from ipaddress import ip_address
from zope.interface import Interface

###
### Utility Classes
###

class SemaphoreN(threading.BoundedSemaphore):
    """
    An extension to the standard library's BoundedSemaphore that provides
    functions to handle n tokens at once.
    """
    def __init__(self, value):
        self._value = value
        super().__init__(self._value)
        self.empty()

    def __str__(self):
        return 'SemaphoreN with a maximum value of {}.'.format(self._value)

    def acquire_n(self, value=1, blocking=True, timeout=None):
        """
        Acquire ``value`` number of tokens at once.

        The parameters ``blocking`` and ``timeout`` have the same semantics as
        :class:`BoundedSemaphore`.

        :returns: The same value as the last call to `BoundedSemaphore`'s
        :meth:`acquire` if :meth:`acquire` were called ``value`` times instead
        of the call to this method.
        """
        ret = None
        for _ in range(value):
            ret = self.acquire(blocking=blocking, timeout=timeout)
        return ret

    def release_n(self, value=1):
        """
        Release ``value`` number of tokens at once.

        :returns: The same value as the last call to `BoundedSemaphore`'s
        :meth:`release` if :meth:`release` were called ``value`` times instead
        of the call to this method.
        """
        ret = None
        for _ in range(value):
            ret = self.release()
        return ret

    def empty(self):
        """
        Acquire all tokens of the semaphore.
        """
        while self.acquire(blocking=False):
            pass

QUEUE_SIZE = 1000
QUEUE_SLEEP = 0.5

SHUTDOWN_SENTINEL = None
NO_FLOW = None

[docs]class Spider: """ A spider consists of a configurator (which alternates between two system configurations), a large number of workers (for performing some network action for each configuration), an Observer which derives information from passively observed traffic, and a thread that merges results from the workers with flow records from the collector. """ def __init__(self): """ Bare minimum initalisation for a pathspider plugin. .. warning:: This function should not be overloaded by any plugin. Its purpose here is only to set the "activated" flag to false, to prevent the plugin functions being used before it has been activated. """ self.activated = False
[docs] def activate(self, worker_count, libtrace_uri): """ The activate function performs initialisation of a pathspider plugin. :param worker_count: The number of workers to use. :type worker_count: int :param libtrace_uri: The URI to pass to the Observer to describe the interface on which packets should be captured. :type libtrace_uri: str :see also: :func:`pathspider.base.ISpider.activate() <ISpider.activate>` It is expected that this function will be overloaded by plugins, though the plugin should always make a call to the activate() function of the abstract Spider class as this initialises all of the base functionality: .. code-block:: python super().activate(worker_count=worker_count, libtrace_uri=libtrace_uri, check_interrupt=check_interrupt) This can be used to initialise any variables which may be required in the object. Do not initialise any variables in the __init__ method, or perform any other operations there as all plugins must be instantiated in order to be loaded and this will cause unnecessary delays in the starting of pathspider. """ self.activated = True self.running = False self.stopping = False self.terminating = False self.worker_count = worker_count self.active_worker_count = 0 self.active_worker_lock = threading.Lock() self.libtrace_uri = libtrace_uri # self.check_interrupt = check_interrupt self.sem_config_zero = SemaphoreN(worker_count) self.sem_config_zero.empty() self.sem_config_zero_rdy = SemaphoreN(worker_count) self.sem_config_zero_rdy.empty() self.sem_config_one = SemaphoreN(worker_count) self.sem_config_one.empty() self.sem_config_one_rdy = SemaphoreN(worker_count) self.sem_config_one_rdy.empty() self.jobqueue = queue.Queue(QUEUE_SIZE) self.resqueue = queue.Queue(QUEUE_SIZE) self.flowqueue = mp.Queue(QUEUE_SIZE) self.observer_shutdown_queue = mp.Queue(QUEUE_SIZE) self.restab = {} self.flowtab = {} self.outqueue = queue.Queue(QUEUE_SIZE) self.observer = None self.worker_threads = [] self.configurator_thread = None # self.interrupter_thread = None self.merger_thread = None self.observer_process = None # self._worker_state = [ "not_started" ] * self.worker_count self.lock = threading.Lock() self.exception = None
[docs] def configurator(self): """ Thread which synchronizes on a set of semaphores and alternates between two system states. """ logger = logging.getLogger('pathspider') while self.running: logger.debug("setting config zero") self.config_zero() logger.debug("config zero active") self.sem_config_zero.release_n(self.worker_count) self.sem_config_one_rdy.acquire_n(self.worker_count) logger.debug("setting config one") self.config_one() logger.debug("config one active") self.sem_config_one.release_n(self.worker_count) self.sem_config_zero_rdy.acquire_n(self.worker_count) # In case the master exits the run loop before all workers have, # these tokens will allow all workers to run through again, # until the next check at the start of the loop self.sem_config_zero.release_n(self.worker_count) self.sem_config_one.release_n(self.worker_count)
[docs] def config_zero(self): """ Changes the global state or system configuration for the baseline measurements. """ raise NotImplementedError("Cannot instantiate an abstract Pathspider")
[docs] def config_one(self): """ Changes the global state or system configuration for the experimental measurements. """ raise NotImplementedError("Cannot instantiate an abstract Pathspider")
# def interrupter(self): # if self.check_interrupt is None: # return # logger = logging.getLogger('pathspider') # while self.running: # if self.check_interrupt(): # logger.warning("spider interrupted") # logger.warning("trying to abort %d jobs", self.jobqueue.qsize()) # while not self.jobqueue.empty(): # self.jobqueue.get() # self.jobqueue.task_done() # self.stop() # break # time.sleep(5)
[docs] def worker(self, worker_number): """ This function provides the logic for the worker threads. :param worker_number: The unique number of the worker. :type worker_number: int The workers operate as continuous loops: * Fetch next job from the job queue * Perform pre-connection operations * Acquire a lock for "config_zero" * Perform the "config_zero" connection * Release "config_zero" * Acquire a lock for "config_one" * Perform the "config_one" connection * Release "config_one" * Perform post-connection operations for config_zero and pass the result to the merger * Perform post-connection operations for config_one and pass the result to the merger * Do it all again If the job fetched is the SHUTDOWN_SENTINEL, then the worker will terminate as this indicates that all the jobs have now been processed. """ logger = logging.getLogger('pathspider') worker_active = True while self.running: if worker_active: try: job = self.jobqueue.get_nowait() # Break on shutdown sentinel if job == SHUTDOWN_SENTINEL: self.jobqueue.task_done() logger.debug("shutting down worker "+str(worker_number)+" on sentinel") #self._worker_state[worker_number] = "shutdown_sentinel" worker_active = False with self.active_worker_lock: self.active_worker_count -= 1 logger.debug(str(self.active_worker_count)+" workers still active") continue logger.debug("got a job: "+repr(job)) except queue.Empty: #logger.debug("no job available, sleeping") # spin the semaphores self.sem_config_zero.acquire() #self._worker_state[worker_number] = "sleep_0" time.sleep(QUEUE_SLEEP) self.sem_config_one_rdy.release() self.sem_config_one.acquire() #self._worker_state[worker_number] = "sleep_1" time.sleep(QUEUE_SLEEP) self.sem_config_zero_rdy.release() else: # Hook for preconnection #self._worker_state[worker_number] = "preconn" pcs = self.pre_connect(job) # Wait for configuration zero #self._worker_state[worker_number] = "wait_0" self.sem_config_zero.acquire() # Connect in configuration zero #self._worker_state[worker_number] = "conn_0" conn0 = self.connect(job, pcs, 0) # Wait for configuration one #self._worker_state[worker_number] = "wait_1" self.sem_config_one_rdy.release() self.sem_config_one.acquire() # Connect in configuration one #self._worker_state[worker_number] = "conn_1" conn1 = self.connect(job, pcs, 1) # Signal okay to go to configuration zero self.sem_config_zero_rdy.release() # Pass results on for merge #self._worker_state[worker_number] = "postconn_0" self.resqueue.put(self.post_connect(job, conn0, pcs, 0)) #self._worker_state[worker_number] = "postconn_1" self.resqueue.put(self.post_connect(job, conn1, pcs, 1)) #self._worker_state[worker_number] = "done" logger.debug("job complete: "+repr(job)) self.jobqueue.task_done() else: # not worker_active, spin the semaphores self.sem_config_zero.acquire() #self._worker_state[worker_number] = "shutdown_0" time.sleep(QUEUE_SLEEP) with self.active_worker_lock: if self.active_worker_count <= 0: #self._worker_state[worker_number] = "shutdown_complete" break self.sem_config_one_rdy.release() self.sem_config_one.acquire() #self._worker_state[worker_number] = "shutdown_1" time.sleep(QUEUE_SLEEP) self.sem_config_zero_rdy.release()
[docs] def pre_connect(self, job): """ Performs pre-connection operations. :param job: The job record. :type job: dict :returns: dict -- Result of the pre-connection operation(s). The pre_connect function can be used to perform any operations that must be performed before each connection. It will be run only once per job, with the same result passed to both the A and B connect calls. This function is not synchronised with the configurator. Plugins to PATHspider can optionally implement this function. If this function is not overloaded, it will be a noop. """ pass
[docs] def connect(self, job, pcs, config): """ Performs the connection. :param job: The job record. :type job: dict :param pcs: The result of the pre-connection operations(s). :type pcs: dict :param config: The current state of the configurator (0 or 1). :type config: int :returns: object -- Any result of the connect operation to be passed to :func:`pathspider.base.Spider.post_connect`. The connect function is used to perform the connection operation and is run for both the A and B test. This method is not implemented in the abstract :class:`pathspider.base.Spider` class and must be implemented by any plugin. Sockets created during this operation can be returned by the function for use in the post-connection phase, to minimise the time that the configurator is blocked from moving to the next configuration. """ raise NotImplementedError("Cannot instantiate an abstract Pathspider")
[docs] def post_connect(self, job, conn, pcs, config): """ Performs post-connection operations. :param job: The job record. :type job: dict :param conn: The result of the connection operation(s). :type conn: object :param pcs: The result of the pre-connection operations(s). :type pcs: dict :param config: The state of the configurator during :func:`pathspider.base.Spider.connect`. :type config: int :returns: dict -- Result of the pre-connection operation(s). The post_connect function can be used to perform any operations that must be performed after each connection. It will be run for both the A and the B configuration, and is not synchronised with the configurator. Plugins to PATHspider can optionally implement this function. If this function is not overloaded, it will be a noop. Any sockets or other file handles that were opened during :func:`pathspider.base.Spider.connect` should be closed in this function if they have not been already. """ raise NotImplementedError("Cannot instantiate an abstract Pathspider")
[docs] def create_observer(self): """ Create a flow observer. This function is called by the base Spider logic to get an instance of :class:`pathspider.observer.Observer` configured with the function chains that are requried by the plugin. This method is not implemented in the abstract :class:`pathspider.base.Spider` class and must be implemented by any plugin. For more information on how to use the flow observer, see :ref:`Observer <observer>`. """ raise NotImplementedError("Cannot instantiate an abstract Pathspider")
[docs] def merger(self): """ Thread to merge results from the workers and the observer. """ logger = logging.getLogger('pathspider') merging_flows = True merging_results = True while self.running and merging_results: if self.flowqueue.qsize() >= self.resqueue.qsize(): try: flow = self.flowqueue.get_nowait() except queue.Empty: time.sleep(QUEUE_SLEEP) else: if flow == SHUTDOWN_SENTINEL: logger.debug("stopping flow merging on sentinel") merging_flows = False continue flowkey = (flow['dip'], flow['sp']) logger.debug("got a flow (" + str(flow['sip']) + ", " + str(flow['sp']) + ")") if flowkey in self.restab: logger.debug("merging flow") self.merge(flow, self.restab[flowkey]) del self.restab[flowkey] elif flowkey in self.flowtab: logger.debug("won't merge duplicate flow") else: # FIXME: How to keep flowtab from # exploding with unrelated flows? # We need a timer queue for flow expiry. # See Issue #30 self.flowtab[flowkey] = flow else: try: res = self.resqueue.get_nowait() except queue.Empty: time.sleep(QUEUE_SLEEP) logger.debug("result queue is empty") else: if res == SHUTDOWN_SENTINEL: merging_results = False logger.debug("stopping result merging on sentinel") continue reskey = (res.ip, res.port) logger.debug("got a result (" + str(res.ip) + ", " + str(res.port) + ")") if reskey in self.flowtab: logger.debug("merging result") self.merge(self.flowtab[reskey], res) del self.flowtab[reskey] elif reskey in self.restab: logger.debug("won't merge duplicate result") else: self.restab[reskey] = res self.resqueue.task_done() # Both shutdown markers received. # Call merge on all remaining entries in the results table # with null flows. # Commented out for now; see https://github.com/mami-project/pathspider/issues/29 for res_item in self.restab.items(): res = res_item[1] self.merge(NO_FLOW, res)
[docs] def merge(self, flow, res): """ Merge a job record with a flow record. :param flow: The flow record. :type flow: dict :param res: The job record. :type res: dict :return: tuple -- Final record for job. In order to create a final record for reporting on a job, the final job record must be merged with the flow record. This function should be implemented by any plugin to provide the logic for this merge as the keys used in these records cannot be known by PATHspider in advance. This method is not implemented in the abstract :class:`pathspider.base.Spider` class and must be implemented by any plugin. """ raise NotImplementedError("Cannot instantiate an abstract Pathspider")
def exception_wrapper(self, target, *args, **kwargs): try: target(*args, **kwargs) except: #FIXME: What exceptions do we expect? logger = logging.getLogger('pathspider') logger.exception("exception occurred. terminating.") if self.exception is None: self.exception = sys.exc_info()[1] self.terminate()
[docs] def start(self): """ This function starts a PATHspider plugin. In order to run, the plugin must have first been activated by calling its :func:`activate` method. This function causes the following to happen: * Set the running flag * Create an :class:`pathspider.observer.Observer` and start its process * Start the merger thread * Start the configurator thread * Start the worker threads The number of worker threads to start was given when activating the plugin. """ logger = logging.getLogger('pathspider') if self.activated == False: logger.exception("tried to run plugin without activating first") sys.exit(1) logger.info("starting pathspider") with self.lock: # set the running flag self.running = True # create an observer and start its process self.observer = self.create_observer() self.observer_process = mp.Process( args=(self.observer.run_flow_enqueuer, self.flowqueue, self.observer_shutdown_queue), target=self.exception_wrapper, name='observer', daemon=True) self.observer_process.start() logger.debug("observer forked") # now start up ecnspider, backwards self.merger_thread = threading.Thread( args=(self.merger,), target=self.exception_wrapper, name="merger", daemon=True) self.merger_thread.start() logger.debug("merger up") self.configurator_thread = threading.Thread( args=(self.configurator,), target=self.exception_wrapper, name="configurator", daemon=True) self.configurator_thread.start() logger.debug("configurator up") # threading.Thread( # target = self.worker_status_reporter, # name = "status_reporter", # daemon = True).start() # logger.debug("status reporter up") self.worker_threads = [] with self.active_worker_lock: self.active_worker_count = self.worker_count for i in range(self.worker_count): worker_thread = threading.Thread( args=(self.worker, i), target=self.exception_wrapper, name='worker_{}'.format(i), daemon=True) self.worker_threads.append(worker_thread) worker_thread.start() logger.debug("workers up")
# if self.check_interrupt is not None: # self.interrupter_thread = threading.Thread( # args=(self.interrupter,), # target=self.exception_wrapper, # name="interrupter", # daemon=True) # self.interrupter_thread.start() # logger.debug("interrupter up")
[docs] def shutdown(self): """ Shut down PathSpider in an orderly fashion, ensuring that all queued jobs complete, and all available results are merged. """ logger = logging.getLogger('pathspider') logger.info("shutting down pathspider") with self.lock: # Set stopping flag self.stopping = True # Place two shutdown sentinels per worker # in the job queue FIXME HACK for i in range(self.worker_count): self.jobqueue.put(SHUTDOWN_SENTINEL) # Wait for worker threads to shut down for worker in self.worker_threads: if threading.current_thread() != worker: logger.debug("joining worker: " + repr(worker)) worker.join() logger.debug("all workers joined") # Tell observer to shut down self.observer_shutdown_queue.put(True) self.observer_process.join() logger.debug("observer shutdown") # Tell merger to shut down self.resqueue.put(SHUTDOWN_SENTINEL) self.merger_thread.join() logger.debug("merger shutdown") # Wait for merged results to be written self.outqueue.join() logger.debug("all results retrieved") # Propagate shutdown sentinel and tell threads to stop self.outqueue.put(SHUTDOWN_SENTINEL) # Tell threads we've stopped self.running = False # Join configurator # if threading.current_thread() != self.configurator_thread: # self.configurator_thread.join() self.stopping = False logger.info("shutdown complete")
[docs] def terminate(self): """ Shut down PathSpider as quickly as possible, without any regard to completeness of results. """ logger = logging.getLogger('pathspider') logger.info("terminating pathspider") # tell threads to stop self.stopping = True self.running = False # terminate observer self.observer_shutdown_queue.put(True) # drain queues try: while True: self.jobqueue.task_done() except ValueError: pass try: while True: self.resqueue.task_done() except ValueError: pass try: while True: self.flowqueue.get_nowait() except queue.Empty: pass # Join remaining threads for worker in self.worker_threads: if threading.current_thread() != worker: logger.debug("joining worker: " + repr(worker)) worker.join() logger.debug("all workers joined") if threading.current_thread() != self.configurator_thread: self.configurator_thread.join() logger.debug("configurator joined") if threading.current_thread() != self.merger_thread: self.merger_thread.join() logger.debug("merger joined") self.observer_process.join() logger.debug("observer joined") self.outqueue.put(SHUTDOWN_SENTINEL) logger.info("termination complete")
[docs] def add_job(self, job): """ Adds a job to the job queue. If PATHspider is currently stopping, the job will not be added to the queue. """ if self.stopping: return self.jobqueue.put(job)
# def local_address(ipv=4, target="path-ams.corvid.ch", port=53): # if ipv == 4: # addrfamily = socket.AF_INET # elif ipv == 6: # addrfamily = socket.AF_INET6 # else: # assert False # try: # sock = socket.socket(addrfamily, socket.SOCK_DGRAM) # sock.connect((target, port)) # return ip_address(sock.getsockname()[0]) # except: # #FIXME: What exceptions do we expect? # return None
[docs]class ISpider(Interface): # pylint: disable=E0239 """ The ISpider interface defines the expected interface for PATHspider plugins. """ def activate(self, worker_count, libtrace_uri): pass def configurator(self): pass def config_zero(self): pass def config_one(self): pass def worker(self, worker_number): pass def pre_connect(self, job): pass def connect(self, job, pcs, config): pass def post_connect(self, job, conn, pcs, config): pass def create_observer(self): pass def merger(self): pass def merge(self, flow, res): pass def exception_wrapper(self, target, *args, **kwargs): pass def start(self): pass def shutdown(self): pass def terminate(self): pass def add_job(self, job): pass