Source code for pathspider.forge
import logging
from scapy.all import send
from pathspider.desync import DesynchronizedSpider
from pathspider.chains.basic import BasicChain
[docs]class ForgeSpider(DesynchronizedSpider):
chains = [BasicChain]
packets = 0
def __init__(self, worker_count, libtrace_uri, args, server_mode=False):
super().__init__(worker_count, libtrace_uri, args, server_mode)
self.__logger = logging.getLogger('forge')
self._config_count = self.packets
self.connections = [self.connect] * self.packets # pylint: disable=no-member
[docs] def connect(self, job, seq):
pkt = self.forge(job, seq)
send(pkt, verbose=0)
return {'sp': pkt.getlayer(1).sport}
[docs] def forge(self, job, config):
raise NotImplementedError("Cannot register an abstract plugin")
[docs] @classmethod
def register_args(cls, subparsers):
# pylint: disable=no-member
parser = subparsers.add_parser(cls.name, help=cls.description)
parser.set_defaults(spider=cls)
if hasattr(cls, "connect_supported"):
parser.add_argument(
"--connect",
type=str,
choices=cls.connect_supported,
default=cls.connect_supported[0],
metavar="[{}]".format("|".join(cls.connect_supported)),
help="Type of connection to perform (Default: {})".format(
cls.connect_supported[0]))
if hasattr(cls, "extra_args"):
cls.extra_args(parser)