diff --git a/pcapscanner/analyzers/__init__.py b/pcapscanner/analyzers/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pcapscanner/analyzers/conversations.py b/pcapscanner/analyzers/conversations.py deleted file mode 100644 index d7e5cae..0000000 --- a/pcapscanner/analyzers/conversations.py +++ /dev/null @@ -1,66 +0,0 @@ -from multiprocessing import Manager -import csv -import os - -CSVFN = "conversations.csv" - -manager = Manager() - - -def __add_protocol(storage, pkt): - protocol = str(pkt.protocol) - - if protocol in storage.keys(): - storage[protocol] += 1 - else: - storage[protocol] = 1 - - -def __add_port(storage, pkt): - port = str(pkt.port_dst) - - if port not in storage.keys(): - storage[port] = manager.dict() - __add_protocol(storage[port], pkt) - - -def __add_dst_addr(storage, pkt): - dst_addr = str(pkt.ip_dst) - - if dst_addr not in storage.keys(): - storage[dst_addr] = manager.dict() - __add_port(storage[dst_addr], pkt) - - -def init(): - setattr(analyze, 'storage', manager.dict()) - - -def log(outputdir): - fn = os.path.join(outputdir, CSVFN) - with open(fn, 'w') as f: - w = csv.writer(f) - - for src_addr, conversation in analyze.storage.items(): - for dst_addr, ports in conversation.items(): - for port, protocols in ports.items(): - for protocol, counter in protocols.items(): - w.writerow( - [src_addr, dst_addr, port, protocol, counter] - ) - - -def analyze(pkt): - """ Count conversations between hosts. """ - - conversations = analyze.storage - try: - src_addr = str(pkt.ip_src) - - if src_addr not in conversations.keys(): - conversations[src_addr] = manager.dict() - __add_dst_addr(conversations[src_addr], pkt) - - except AttributeError as e: - # ignore packets that aren't TCP/UDP or IPv4 - pass diff --git a/pcapscanner/analyzers/hosts.py b/pcapscanner/analyzers/hosts.py deleted file mode 100644 index e70b3e6..0000000 --- a/pcapscanner/analyzers/hosts.py +++ /dev/null @@ -1,41 +0,0 @@ -from multiprocessing import Manager -import csv -import os - -CSVFN = "hostcounter.csv" - -manager = Manager() - - -def init(): - setattr(analyze, 'storage', manager.dict()) - - -def log(outputdir): - fn = os.path.join(outputdir, CSVFN) - with open(fn, 'w') as f: - w = csv.writer(f) - w.writerows(analyze.storage.items()) - - -def analyze(pkt): - """ Count the occurences of all host either as src or dest. """ - - hosts = analyze.storage - try: - src_addr = str(pkt.ip_src) - dst_addr = str(pkt.ip_dst) - - if src_addr in hosts.keys(): - hosts[src_addr] += 1 - else: - hosts[src_addr] = 1 - - if dst_addr in hosts.keys(): - hosts[dst_addr] += 1 - else: - hosts[dst_addr] = 1 - - except AttributeError as e: - # ignore packets that aren't TCP/UDP or IPv4 - pass diff --git a/pcapscanner/main.py b/pcapscanner/main.py index 6cf9570..5954498 100755 --- a/pcapscanner/main.py +++ b/pcapscanner/main.py @@ -13,16 +13,10 @@ import time from multiprocessing import Pool -from analyzers import hosts, conversations import pcap NUM_THREADS = 4 -ANALYZERS = [ - hosts, - conversations -] - ASCII_LOGO = """ @@@@@@@ @@@@@@@ @@@@@@ @@@@@@@ @@@@@@ @@@@@@@ @@@@@@ @@@ @@@ @@ -38,9 +32,10 @@ """ + class Main: - def __init__(self, outputdir, inputdir, parser): + def __init__(self, outputdir, inputdir, elastic): # log files self.outputdir = outputdir @@ -58,12 +53,7 @@ def __init__(self, outputdir, inputdir, parser): .format(inputdir) ) self.inputdir = inputdir - - # initialize all analyzers - for a in ANALYZERS: - a.init() - - self.parser = parser + self.elastic = elastic def _log_errors(self): if not self.ignoredFiles: @@ -75,9 +65,6 @@ def _log_errors(self): print("ignored {} files".format(len(self.ignoredFiles))) - def _log_results(self): - for a in ANALYZERS: - a.log(self.outputdir) def start(self): pcapfiles = pcap.walk(self.inputdir) @@ -86,6 +73,8 @@ def start(self): format(len(pcapfiles), self.inputdir) ) + ouis = pcap.fetch_ouis() + with Pool(processes=NUM_THREADS) as pool: c = 0 # async map the process_pcap function to the list of files @@ -98,7 +87,7 @@ def start(self): # asynchronously pool.apply_async( pcap.process_pcap, - (fn, [a.analyze for a in ANALYZERS], progressbar_position, self.parser) + (fn, progressbar_position, ouis, self.elastic) ) # close pool @@ -108,7 +97,6 @@ def start(self): pool.join() self._log_errors() - self._log_results() # return number of pcap files return len(pcapfiles) @@ -129,10 +117,10 @@ def start(self): help='path to the output directory' ) parser.add_argument( - '-p', '--parser', + '-e', '--elastic', nargs='?', - default=pcap.Parser.DPKT.name, - choices=[p.name for p in pcap.Parser] + default='localhost:9200', + help='elasticsearch location scheme' ) args = parser.parse_args() @@ -141,7 +129,7 @@ def start(self): scanner = Main( outputdir=args.outputdir, inputdir=args.inputdir, - parser=args.parser + elastic=args.elastic ) # measure time startTime = time.time() diff --git a/pcapscanner/pcap.py b/pcapscanner/pcap.py index 3a232ba..8465be0 100644 --- a/pcapscanner/pcap.py +++ b/pcapscanner/pcap.py @@ -3,109 +3,43 @@ import sys import gzip import dpkt -from enum import Enum -from dpkt.compat import compat_ord -import pyshark import socket - -from pypacker import ppcap -from pypacker.layer12 import ethernet -from pypacker.layer3 import ip -from pypacker.layer4 import tcp - -import functools +import requests from tqdm import tqdm from datetime import datetime as dt -from collections import namedtuple - -""" -This is the destination format of parsed pcap packages -to decouple PCAP parser data structures from analysers code -""" -ParsedPackage = namedtuple('ParsedPackage', [ - 'protocol', - 'ip_src', - 'ip_dst', - 'port_src', - 'port_dst', - 'mac_src', - 'mac_dst', - 'pcap_file', - 'timestamp' -]) - -class Parser(Enum): - DPKT = 'dpkt' - PYPACKER = 'pypacker' - SCAPY = 'scapy' - PYSHARK = 'pyshark' - - -def sort_by_date(a, b): +from elasticsearch import Elasticsearch +from elasticsearch import helpers + + +def fetch_ouis(): """ - Custom sort function to compare them by their timestamp in filename + Fetch OUIs from internet. For performance reasons, we fetch it from + the wireshark sources instead of using standard.ieee.org. + + Returns a dictionary with the following layout: + + ouis { + 'FC:FB:FB': 'Cisco' + } + """ + ouis = dict() + r = requests.get("https://code.wireshark.org/review/gitweb?p=wireshark.git;a=blob_plain;f=manuf;hb=HEAD") - regex = '[a-zA-Z0-9\-](2017[0-9-]*)-.*pcap' - aBase = str(os.path.basename(a)) - bBase = str(os.path.basename(b)) - aDateStr = None - bDateStr = None + if not r.status_code == 200: + print("Failed to fetch OUIs from {}!".format(r.url)) + sys.exit(1) - # parse first filename - try: - aDateStr = re.search(regex, aBase).group(1) - except AttributeError: - print('Ignore a', aBase) + for line in r.iter_lines(): + text = line.decode() - # parse second filename - try: - bDateStr = re.search(regex, bBase).group(1) - except AttributeError: - print('Ignore b', bBase) - - # in case we have no valid timestamp return 0 - if aDateStr is None or bDateStr is None: - print( - "sort_by_date: Was not able to extract timestamp comparing {} to {}". - format(aBase, bBase) - ) - return 0 - - # return nagative value, zero or positive value - aDate = dt.strptime(aDateStr, "%Y%m%d-%H%M%S") - bDate = dt.strptime(bDateStr, "%Y%m%d-%H%M%S") - - # compare and sort from oldest to new - if aDate < bDate: - return -1 - - elif aDate == bDate: - try: - # in case date is equal there is a integer before the - # timestamp to sort - regex = '[a-zA-Z\-][0-9]\-([0-9]).*' - numA = int(re.search(regex, aBase).group(1)) - numB = int(re.search(regex, bBase).group(1)) - - # also VPN 1 and 2 are present - regex = '[a-zA-Z\-]([0-9])\-[0-9].*' - vpnA = int(re.search(regex, aBase).group(1)) - vpnB = int(re.search(regex, bBase).group(1)) - - except AttributeError: - numA = 0 - numB = 0 - - if numA < numB: - return -1 - elif numA == numB: - # should never be the case - return 0 - else: - return 1 - else: - return 1 + if text.startswith("#") or text == '': + continue + + l = text.split('\t') + ouis[l[0]] = l[1] + + return ouis def walk(directory): @@ -117,173 +51,85 @@ def walk(directory): if re.match(regex, os.path.basename(f)) ] - # sort them by timestamp in filename - return sorted( - pcapFilesUnordered, key=functools.cmp_to_key(sort_by_date) - ) + return pcapFilesUnordered -def parser_dpkt(pcapfile, progressbar_position): +def parser_dpkt(pcapfile, progressbar_position, ouis, elastic): """ - Parsing the RawIP encapsulated PCAPs using dpkt. Expects an unpacked file ref. + Parsing the RawIP encapsulated PCAPs using dpkt. Expects an + unpacked file ref. https://pypi.python.org/pypi/dpkt """ - out=[] + try: pcap = dpkt.pcap.Reader(pcapfile) - print("SUCCESS ", pcapfile.name) - for ts,buf in tqdm( + es = Elasticsearch([elastic]) + es.indices.create(index='packet', ignore=400, body={ + "packet": { + "properties": { + "ip_src": {"type": "ip"}, + "ip_dst": {"type": "ip"}, + "timestamp": {"type": "date"}, + "port_src": {"type": "integer"}, + "port_dst": {"type": "integer"} + } + } + }) + + bulk_data = [] + for ts, buf in tqdm( pcap, position=progressbar_position, unit=" packages", desc=os.path.basename(pcapfile.name) ): - try: - ip = dpkt.ip.IP(buf) - tcp = ip.data - # fetch the infos we need - # we use socket to convert inet IPv4 IP to human readable IP - # socket.inet_ntop(socket.AF_INET, inet) - #FIXME: get MAC adress - parsedPkg = ParsedPackage( - protocol=ip.p, - ip_src=socket.inet_ntop(socket.AF_INET, ip.src), - port_src=tcp.sport, - ip_dst=socket.inet_ntop(socket.AF_INET, ip.dst), - port_dst=tcp.dport, - mac_src='unknown', - mac_dst='unknown', - pcap_file=os.path.abspath(pcapfile.name), - timestamp=str(dt.utcfromtimestamp(ts)) - ) - out.append(parsedPkg) - except AttributeError: - # ignore packets that aren't TCP/UDP or IPv4 - pass - except ValueError: - print( - "ValueError happend as packages where parsed. We expect RawIP " - "encapsulated PCAPs, maybe now we have a Ethernet encapsulated " - "one. Abort.") - raise - except KeyboardInterrupt: - raise - except: - e = sys.exc_info() - print("FAILED ", e, str(os.path.abspath(pcapfile.name))) - finally: - pcapfile.close() - return out + eth = dpkt.ethernet.Ethernet(buf) + ip = dpkt.ip.IP(buf) + data = { + "protocol": ip.p, # TODO ip.get_proto(ip.p).__name__ would be human readible, + # but es only shows empty field + "ip_src": socket.inet_ntop(socket.AF_INET, ip.src), + "ip_dst": socket.inet_ntop(socket.AF_INET, ip.dst), + "mac_src": ':'.join(['%02x' % dpkt.compat_ord(x) for x in eth.src]), + "mac_dst": ':'.join(['%02x' % dpkt.compat_ord(x) for x in eth.dst]), + "pcap_file": os.path.abspath(pcapfile.name), + "timestamp": dt.utcfromtimestamp(ts), + } -def parser_pyshark(pcapfile, progressbar_position): - """ - Uses tshark CLI in a bash subprocess, parses stdout. Slow but works well with - pcap.gz and pcap files. - https://github.com/KimiNewt/pyshark - """ - out = [] - cap = pyshark.FileCapture(os.path.abspath(pcapfile.name), only_summaries=False) - - # read array (to resolve futures) and return only the information - # we need to decouple data structures from analysers code - for pkt in tqdm( - cap, - position=progressbar_position, - unit=" packages", - desc=os.path.basename(pcapfile.name) - ): + if data["mac_src"][:8] in ouis: + data["vendor_src"] = ouis[data["mac_src"][:8]] - try: - # fetch the infos we need - parsedPkg = ParsedPackage( - protocol=pkt.transport_layer, - ip_src=pkt.ip.src, - port_src=pkt[pkt.transport_layer].srcport, - ip_dst=pkt.ip.dst, - port_dst=pkt[pkt.transport_layer].dstport, - mac_src="IMPLEMENT", - mac_dst="IMPLEMENT", - pcap_file=os.path.abspath(pcapfile.name), - timestamp=pkt.frame_info.get_field('time') - ) - out.append(parsedPkg) - except AttributeError: - # ignore packets that aren't TCP/UDP or IPv4 - continue - return out + if data["mac_dst"][:8] in ouis: + data["vendor_dst"] = ouis[data["mac_dst"][:8]] + if ip.get_proto(ip.p) == dpkt.tcp.TCP: + tcp = ip.data + data["port_dst"] = tcp.dport + data["port_src"] = tcp.sport -def parser_pypacker(pcapfile, progressbar_position): - """ - Does not work! - Very fast, reads only .pcap (no .gz). Problem is it reads PCAPs with LinkType - Ethernet, but our dumps are RawIP. We can iterate and print the raw package - details, but parsing the packages does not work out of the box (because of RawIP). - https://github.com/mike01/pypacker - - for encapsulation RawIP or Ethernet see here: - https://osqa-ask.wireshark.org/questions/49568/why-cant-this-wireshark-produced-1-packet-pcap-file-not-be-processed-using-winpcap-or-dpkt - """ - out = [] - cap = ppcap.Reader(filename=os.path.abspath(pcapfile.name)) - - # read array (to resolve futures) and return only the information - # we need (to reduce memory needed) - for ts,buf in tqdm( - cap, - position=progressbar_position, - unit=" packages", - desc=os.path.basename(pcapfile.name) - ): + bulk_data.append(data) - try: - eth = ethernet.Ethernet(buf) - print("timestamp {}: {}", ts, eth) -# for d in eth: -# print(" datum ",d) - # FIXME: this works well for PCAPs with LinkType "Ethernet" , - # but not "RawIP" like our dumps. - if eth[tcp.TCP] is not None: - print( - "{ts}: {src}:{port_src} -> {dst}:{port_dst}". - format( - ts=ts, - src=eth[ip.IP].src_s, - port_src=eth[tcp.TCP].sport, - dst=eth[ip.IP].dst_s, - port_dst=eth[tcp.TCP].dport - ) - ) - - except AttributeError: - # ignore packets that aren't TCP/UDP or IPv4 - continue - cap.close() - return out + if len(bulk_data) == 1000: + helpers.bulk(es, index="packets", actions=bulk_data, doc_type='packet') + bulk_data = [] + if bulk_data: + helpers.bulk(es, index="packets", actions=bulk_data, doc_type='packet') -def parser_scapy(pcapfile, progressbar_position): - """ - Unfinished, never tested - https://phaethon.github.io/scapy/ - """ - out = [] - with PcapReader(pcapfile.name) as pcap_reader: - for pkt in pcap_reader: - #do something with the packet - pass - return out + except KeyboardInterrupt: + raise + finally: + pcapfile.close() -def process_pcap(pcapfilename, analysers, progressbar_position, parser): +def process_pcap(pcapfilename, progressbar_position, ouis=dict(), elastic="127.0.0.1:9200"): """ Scan the given file object for hosts data, collect statistics for each. Using pypacker as parser """ - print("processing {} with {}".format(pcapfilename, parser)) f = open(pcapfilename, 'rb') try: @@ -293,57 +139,17 @@ def process_pcap(pcapfilename, analysers, progressbar_position, parser): g = gzip.open(f, 'rb') # test if this is really GZIP, raises exception if not g.peek(1) - # if it is a gzipped files pass the unpacked file reference to the parser + # if it is a gzipped files pass the unpacked file + # reference to the parser f = g except: - #TODO: remove! just for debug - #print("THIS IS NOT A GZIP FILE: ",pcapfilename) + # TODO: remove! just for debug + # print("THIS IS NOT A GZIP FILE: ",pcapfilename) pass - if parser == Parser.PYSHARK.name: - # Pyshark CLI is slow but works (single thread ~1.200pkg/s, - # with 8 threads ~4.500pkg/s) - parsed_packets = parser_pyshark(f, progressbar_position) - - elif parser == Parser.DPKT.name: - # DPKT works for pcap and pcap.gz and is fast (single thread ~50.000pkg/s, - # with 8 threads ~240.000pkg/s) - parsed_packets = parser_dpkt(f, progressbar_position) - - elif parser == Parser.PYPACKER.name: - # TODO implement parser - parsed_packets = parser_pypacker(f, progressbar_position) - - elif parser == Parser.SCAPY.name: - # TODO implement parser - parsed_packets = parser_scapy(f, progressbar_position) - - else: - print("illegal parser") - return - - #TODO: remove! just for debug - print( - "FETCHED {amount} PACKAGES FROM PCAP {dir}.\n Example: {pkt} ". - format( - amount=len(parsed_packets), - dir=os.path.basename(pcapfilename), - pkt=parsed_packets[0] - ) - ) - - # process the stats we need - for p in tqdm(parsed_packets, - position=progressbar_position, - ascii=True, - unit=" packages", - ): - for analyser in analysers: - analyser(p) - + parser_dpkt(f, progressbar_position, ouis, elastic) except KeyboardInterrupt: - print("Bye") sys.exit() finally: if g is not None: diff --git a/requirements.txt b/requirements.txt index 0e8f915..64596ec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,5 @@ dpkt flake8 pip wheel +elasticsearch +requests