import logging
import queue
import threading
import time
import uuid
from pathspider.base import Spider
from pathspider.base import QUEUE_SLEEP
from pathspider.base import SHUTDOWN_SENTINEL
from pathspider.helpers.tcp import connect_tcp
from pathspider.helpers.http import connect_http
from pathspider.helpers.http import connect_https
from pathspider.helpers.dns import connect_dns_tcp
from pathspider.helpers.dns import connect_dns_udp
from pathspider.base import CONN_DISCARD
[docs]class SynchronizedSpider(Spider):
# pylint: disable=W0223
configurations = []
def __init__(self, worker_count, libtrace_uri, args, server_mode=False):
super().__init__(worker_count, libtrace_uri, args, server_mode)
self.__logger = logging.getLogger('sync')
if callable(self.configurations):
self.configurations = self.configurations()
self._config_count = len(self.configurations)
self.__semaphores = []
# create semaphores for synchronizing configurations
for config in range(0, len(self.configurations)):
self.__semaphores.append([])
for i in range(0, 2):
self.__semaphores[config].append(SemaphoreN(worker_count))
self.__semaphores[config][i].empty()
[docs] def configurator(self):
"""
Thread which synchronizes on a set of semaphores and alternates
between two system states.
"""
while self.running:
for config in range(0, len(self.configurations)):
self.__logger.debug("setting config %d", config)
self.configurations[config](self)
self.__logger.debug("config %d active", config)
self.__semaphores[config][0].release_n(self.worker_count)
self.__semaphores[(config + 1) % len(self.configurations)][
1].acquire_n(self.worker_count)
# In case the master exits the run loop before all workers have,
# these tokens will allow all workers to run through again,
# until the next check at the start of the loop
for config in range(0, len(self.configurations)):
self.__semaphores[config][0].release_n(self.worker_count)
[docs] def connect(self, job, config): # pylint: disable=unused-argument
"""
Performs the requested connection.
"""
if self.args.connect == "tcp":
rec = connect_tcp(self.source, job, self.args.timeout)
elif self.args.connect == "http":
rec = connect_http(self.source, job, self.args.timeout)
elif self.args.connect == "https":
rec = connect_https(self.source, job, self.args.timeout)
elif self.args.connect == "dnstcp":
rec = connect_dns_tcp(self.source, job, self.args.timeout)
elif self.args.connect == "dnsudp":
rec = connect_dns_udp(self.source, job, self.args.timeout)
else:
raise RuntimeError("Unknown connection type requested!")
return rec
[docs] def worker(self, worker_number):
"""
This function provides the logic for
configuration-synchronized worker threads.
:param worker_number: The unique number of the worker.
:type worker_number: int
The workers operate as continuous loops:
* Fetch next job from the job queue
* For each configuration:
* Acquire a lock for the configuration (blocking)
* Perform the connection with the configuration
* Release the lock
* Pass the result to the merger
* Do it all again
If the job fetched is the SHUTDOWN_SENTINEL, then the worker will
terminate as this indicates that all the jobs have now been processed.
"""
worker_active = True
while self.running:
if worker_active:
try:
job = self.jobqueue.get_nowait()
jobId = uuid.uuid1().hex
# Break on shutdown sentinel
if job == SHUTDOWN_SENTINEL:
self.jobqueue.task_done()
self.__logger.debug(
"shutting down worker %d on sentinel",
worker_number)
worker_active = False
with self.active_worker_lock:
self.active_worker_count -= 1
self.__logger.debug("%d workers still active",
self.active_worker_count)
continue
self.__logger.debug("got a job: " + repr(job))
except queue.Empty:
#logger.debug("no job available, sleeping")
# spin the semaphores
for config in range(0, len(self.configurations)):
self.__semaphores[config][0].acquire()
time.sleep(QUEUE_SLEEP)
self.__semaphores[(config + 1) % len(
self.configurations)][1].release()
else:
conns = []
should_discard = False
for config in range(0, len(self.configurations)):
# Wait for configuration
self.__semaphores[config][0].acquire()
# Connect in configuration
conn = self._connect_wrapper(job, config)
if 'spdr_state' in conn:
if conn['spdr_state'] == CONN_DISCARD:
should_discard = True
conns.append(conn)
# Wait for next configuration
self.__semaphores[(config + 1) % len(
self.configurations)][1].release()
if not should_discard:
# Save job record for combiner
self.jobtab[jobId] = job
# Finish connections and pass on for merging
self._finalise_conns(job, jobId, conns)
self.__logger.debug("job complete: " + repr(job))
self.jobqueue.task_done()
else: # not worker_active, spin the semaphores
for config in range(0, len(self.configurations)):
self.__semaphores[config][0].acquire()
time.sleep(QUEUE_SLEEP)
if config == 0:
with self.active_worker_lock:
if self.active_worker_count <= 0:
return
self.__semaphores[(config + 1) % len(self.configurations)][
1].release()
[docs] @classmethod
def register_args(cls, subparsers):
# pylint: disable=no-member
parser = subparsers.add_parser(cls.name, help=cls.description)
parser.set_defaults(spider=cls)
parser.add_argument("--connect", type=str, choices=cls.connect_supported,
default=cls.connect_supported[0],
metavar="[{}]".format("|".join(cls.connect_supported)),
help="Type of connection to perform (Default: {})".format(
cls.connect_supported[0]))
parser.add_argument("--timeout", default=5, type=int,
help=("The time allowed for a connection operation to finish before it is terminated. On slow links this should be increased."
"(Default: 5)"))
if hasattr(cls, "extra_args"):
cls.extra_args(parser)
[docs]class SemaphoreN(threading.BoundedSemaphore):
"""
An extension to the standard library's BoundedSemaphore that provides
functions to handle n tokens at once.
"""
def __init__(self, value):
self._value = value
super().__init__(self._value)
self.empty()
def __str__(self):
return 'SemaphoreN with a maximum value of {}.'.format(self._value)
[docs] def acquire_n(self, value=1, blocking=True, timeout=None):
"""
Acquire ``value`` number of tokens at once.
The parameters ``blocking`` and ``timeout`` have the same semantics as
:class:`BoundedSemaphore`.
:returns: The same value as the last call to `BoundedSemaphore`'s
:meth:`acquire` if :meth:`acquire` were called ``value``
times instead of the call to this method.
"""
ret = None
for _ in range(value):
ret = self.acquire(blocking=blocking, timeout=timeout)
return ret
[docs] def release_n(self, value=1):
"""
Release ``value`` number of tokens at once.
"""
for _ in range(value):
self.release()
[docs] def empty(self):
"""
Acquire all tokens of the semaphore.
"""
while self.acquire(blocking=False):
pass