import sys
import logging
import subprocess
import traceback
from datetime import datetime
import socket
import collections
from pathspider.base import SynchronizedSpider
from pathspider.base import PluggableSpider
from pathspider.base import Conn
from pathspider.base import NO_FLOW
from pathspider.observer import Observer
from pathspider.observer import basic_flow
from pathspider.observer import basic_count
from pathspider.observer.tcp import tcp_setup
from pathspider.observer.tcp import tcp_handshake
from pathspider.observer.tcp import tcp_complete
from pathspider.observer.tcp import TCP_SAE
from pathspider.observer.tcp import TCP_SAEC
SpiderRecord = collections.namedtuple("SpiderRecord", ["ip", "rport", "port",
"rank", "host", "config",
"connstate", "tstart", "tstop"])
USER_AGENT = "pathspider"
## Chain functions
def ecn_setup(rec, ip):
fields = ['fwd_ez', 'fwd_eo', 'fwd_ce', 'rev_ez', 'rev_eo', 'rev_ce']
for field in fields:
rec[field] = False
return True
def ecn_code(rec, ip, rev):
EZ = 0x02
EO = 0x01
CE = 0x03
if ip.traffic_class & CE == EZ:
rec['rev_ez' if rev else 'fwd_ez'] = True
if ip.traffic_class & CE == EO:
rec['rev_eo' if rev else 'fwd_eo'] = True
if ip.traffic_class & CE == CE:
rec['rev_ce' if rev else 'fwd_ce'] = True
return True
## ECN main class
class ECN(SynchronizedSpider, PluggableSpider):
def __init__(self, worker_count, libtrace_uri, args):
super().__init__(worker_count=worker_count,
libtrace_uri=libtrace_uri,
args=args)
self.conn_timeout = args.timeout
[docs] def config_zero(self):
"""
Disables ECN negotiation via sysctl.
"""
logger = logging.getLogger('ecn')
subprocess.check_call(['/sbin/sysctl', '-w', 'net.ipv4.tcp_ecn=2'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
logger.debug("Configurator disabled ECN")
[docs] def config_one(self):
"""
Enables ECN negotiation via sysctl.
"""
logger = logging.getLogger('ecn')
subprocess.check_call(['/sbin/sysctl', '-w', 'net.ipv4.tcp_ecn=1'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
logger.debug("Configurator enabled ECN")
[docs] def connect(self, job, pcs, config):
"""
Performs a TCP connection.
"""
return self.tcp_connect(job)
[docs] def post_connect(self, job, conn, pcs, config):
"""
Close the socket gracefully.
"""
job_ip, job_port, job_host, job_rank = job
tstop = str(datetime.utcnow())
if conn.state == Conn.OK:
rec = SpiderRecord(job_ip, job_port, conn.port, job_rank, job_host,
config, True, conn.tstart, tstop)
else:
rec = SpiderRecord(job_ip, job_port, conn.port, job_rank, job_host,
config, False, conn.tstart, tstop)
try:
conn.client.shutdown(socket.SHUT_RDWR)
except: # FIXME: What are we catching?
pass
try:
conn.client.close()
except: # FIXME: What are we catching?
pass
return rec
def create_observer(self):
"""
Creates an observer with ECN-related chain functions.
"""
logger = logging.getLogger('ecn')
logger.info("Creating observer")
try:
return Observer(self.libtrace_uri,
new_flow_chain=[basic_flow, tcp_setup, ecn_setup],
ip4_chain=[basic_count, ecn_code],
ip6_chain=[basic_count, ecn_code],
tcp_chain=[tcp_handshake, tcp_complete])
except:
logger.error("Observer not cooperating, abandon ship")
traceback.print_exc()
sys.exit(-1)
[docs] def merge(self, flow, res):
"""
Merge flow records.
Includes the configuration and connection success or failure of the
socket connection with the flow record.
"""
logger = logging.getLogger('ecn')
if flow == NO_FLOW:
flow = {
"dip": res.ip,
"sp": res.port,
"dp": res.rport,
"observed": False,
}
else:
flow['observed'] = True
flow['rank'] = res.rank
flow['host'] = res.host
flow['connstate'] = res.connstate
flow['config'] = res.config
flow['tstart'] = res.tstart
flow['tstop'] = res.tstop
logger.debug("Result: " + str(flow))
self.outqueue.put(flow)
@staticmethod
def register_args(subparsers):
parser = subparsers.add_parser('ecn', help="Explicit Congestion Notification")
parser.add_argument("--timeout", default=5, type=int, help="The timeout to use for attempted connections in seconds (Default: 5)")
parser.set_defaults(spider=ECN)