Source code for pathspider.forge


import logging

from scapy.all import send

from pathspider.desync import DesynchronizedSpider

from pathspider.chains.basic import BasicChain

[docs]class ForgeSpider(DesynchronizedSpider): chains = [BasicChain] packets = 0 def __init__(self, worker_count, libtrace_uri, args): super().__init__(worker_count, libtrace_uri, args) self.__logger = logging.getLogger('forge') self._config_count = self.packets self.connections = [self.connect] * self.packets # pylint: disable=no-member
[docs] def pre_connect(self, job): self.setup(job)
[docs] def setup(self, job): pass
[docs] def connect(self, job, seq): pkt = self.forge(job, seq) send(pkt, verbose=0) return {'sp': pkt.getlayer(1).sport}
[docs] def forge(self, job, config): raise NotImplementedError("Cannot register an abstract plugin")
[docs] @classmethod def register_args(cls, subparsers): # pylint: disable=no-member parser = subparsers.add_parser(cls.name, help=cls.description) parser.set_defaults(spider=cls) if hasattr(cls, "connect_supported"): parser.add_argument( "--connect", type=str, choices=cls.connect_supported, default=cls.connect_supported[0], metavar="[{}]".format("|".join(cls.connect_supported)), help="Type of connection to perform (Default: {})".format( cls.connect_supported[0])) if hasattr(cls, "extra_args"): cls.extra_args(parser)