Skip to content

Commit 23d8299

Browse files
committed
Tools: add pcap2flow tool
Extracts NetFlow v5/v9 or IPFIX packets from PCAP and sents them to a collector.
1 parent cea838e commit 23d8299

File tree

1 file changed

+201
-0
lines changed

1 file changed

+201
-0
lines changed

src/tools/pcap2flow/pcap2flow.py

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
#!/usr/bin/env python3
2+
3+
"""
4+
Simple tool for replaying NetFlow v5/v9 and IPFIX packets to a collector.
5+
6+
Author(s):
7+
Lukas Hutak <[email protected]>
8+
Date: July 2019
9+
10+
Copyright(c) 2019 CESNET z.s.p.o.
11+
SPDX-License-Identifier: BSD-3-Clause
12+
"""
13+
14+
import argparse
15+
import struct
16+
from scapy.all import *
17+
18+
# Dictionary with Transport Sessions
19+
sessions = {}
20+
21+
def process_pcap():
22+
"""
23+
Open PCAP and send each NetFlow/IPFIX packet to a collector.
24+
:return: None
25+
"""
26+
# Try to open the file
27+
reader = PcapReader(args.file)
28+
29+
cnt_total = 0
30+
cnt_sent = 0
31+
32+
for pkt in reader:
33+
cnt_total += 1
34+
if args.verbose:
35+
print("Processing {}. packet".format(cnt_total))
36+
if process_packet(pkt):
37+
cnt_sent += 1
38+
39+
print("{} of {} packets have been processed and sent "
40+
"over {} Transport Session(s)!".format(cnt_sent, cnt_total, len(sessions)))
41+
42+
43+
def process_packet(pkt):
44+
"""
45+
Extract NetFlow v5/v9 or IPFIX payload and send it as a new packet to a collector.
46+
47+
:param pkt: Scapy packet to process and send
48+
:return: True if the packet has been send to the destination. False otherwise.
49+
:rtype: bool
50+
"""
51+
# Determine IP adresses
52+
l3_data = pkt.getlayer("IP")
53+
if not l3_data:
54+
print("Unable to locate L3 layer. Skipping...")
55+
ip_src = l3_data.src
56+
ip_dst = l3_data.dst
57+
58+
# Determine protocol of Transport Layer
59+
l4_data = None
60+
proto = None
61+
port_src = None
62+
port_dst = None
63+
64+
for type in ["UDP", "TCP"]:
65+
if not l3_data.haslayer(type):
66+
continue
67+
l4_data = l3_data.getlayer(type)
68+
proto = type
69+
port_src = l4_data.sport
70+
port_dst = l4_data.dport
71+
break
72+
73+
if not proto:
74+
if args.verbose:
75+
print("Failed to locate L4 layer. Skipping...")
76+
return False
77+
78+
# Check if the packet contains NetFlow v5/v9 or IPFIX payload
79+
l7_data = l4_data.payload
80+
raw_payload = l7_data.original
81+
version = struct.unpack("!H", raw_payload[:2])[0]
82+
if version not in [5, 9, 10]:
83+
print("Payload doesn't contain NetFlow/IPFIX packet. Skipping...")
84+
return False
85+
86+
# Send the packet
87+
key = (ip_src, ip_dst, proto, port_src, port_dst)
88+
send_packet(key, raw_payload)
89+
return True
90+
91+
92+
def send_packet(key, payload):
93+
"""
94+
Send packet to a collector.
95+
96+
To make sure that packets from different Transport Session (TS) are not mixed together,
97+
the function creates and maintains independent UDP/TCP session for each original TS.
98+
99+
:param key: Identification of the original Transport Session
100+
(src IP, dst IP, proto, src port, dst port)
101+
:param payload: Raw NetFlow/IPFIX message to send
102+
:return: None
103+
"""
104+
ts = sessions.get(key)
105+
if not ts:
106+
# Create a new Transport Session
107+
proto = key[2]
108+
109+
if args.verbose:
110+
print("Creating a new Transport Session for {}".format(key))
111+
if args.proto != proto:
112+
print("WARNING: Original flow packets exported over {proto_orig} "
113+
"({src_ip}:{src_port} -> {dst_ip}:{dst_port}) are now being send over {proto_now}. "
114+
"Collector could reject these flows due to different formatting rules!".format(
115+
proto_orig=proto, proto_now=args.proto, src_ip=key[0], dst_ip=key[1], src_port=key[3],
116+
dst_port=key[4]))
117+
118+
ts = create_socket()
119+
sessions[key] = ts
120+
121+
# Send the packet
122+
ts.sendall(payload)
123+
124+
125+
def create_socket():
126+
"""
127+
Create a new socket and connect it to the collector.
128+
129+
:return: Socket
130+
:rtype: socket.socket
131+
"""
132+
str2proto = {
133+
"UDP": socket.SOCK_DGRAM,
134+
"TCP": socket.SOCK_STREAM
135+
}
136+
137+
family = socket.AF_UNSPEC
138+
if args.v4_only:
139+
family = socket.AF_INET
140+
if args.v6_only:
141+
family = socket.AF_INET6
142+
143+
net_proto = str2proto[args.proto]
144+
s = None
145+
146+
for res in socket.getaddrinfo(args.addr, args.port, family, net_proto):
147+
net_af, net_type, net_proto, net_cname, net_sa = res
148+
try:
149+
s = socket.socket(net_af, net_type, net_proto)
150+
except socket.error as err:
151+
s = None
152+
continue
153+
154+
try:
155+
s.connect(net_sa)
156+
except socket.error as err:
157+
s.close()
158+
s = None
159+
continue
160+
break
161+
162+
if s is None:
163+
raise RuntimeError("Failed to open socket!")
164+
return s
165+
166+
def arg_check_port(value):
167+
"""
168+
Check if port is valid number
169+
:param str value: String to convert
170+
:return: Port
171+
"""
172+
num = int(value)
173+
if num < 0 or num >= 2**16:
174+
raise argparse.ArgumentTypeError("%s is not valid port number" % value)
175+
return num
176+
177+
if __name__ == "__main__":
178+
# Parse arguments
179+
parser = argparse.ArgumentParser(
180+
description="Simple tool for replaying NetFlow v5/v9 and IPFIX packets to a collector.",
181+
)
182+
parser.add_argument("-i", dest="file", help="PCAP with NetFlow/IPFIX packets", required=True)
183+
parser.add_argument("-d", dest="addr", help="Destination IP address (default: %(default)s)",
184+
default="127.0.0.1")
185+
parser.add_argument("-p", dest="port", help="Destination port number (default: %(default)d)",
186+
default=4739, type=arg_check_port)
187+
parser.add_argument("-t", dest="proto", help="Connection type (default: %(default)s)",
188+
default="UDP", choices=["UDP", "TCP"])
189+
parser.add_argument("-v", dest="verbose", help="Increase verbosity", default=False, action="store_true")
190+
group = parser.add_mutually_exclusive_group()
191+
group.add_argument("-4", dest="v4_only", help="Force the tool to send flows to an IPv4 address only",
192+
default=False, action="store_true")
193+
group.add_argument("-6", dest="v6_only", help="Force the tool to send flows to an IPv6 address only",
194+
default=False, action="store_true")
195+
args = parser.parse_args()
196+
197+
# Process the PCAP file
198+
try:
199+
process_pcap()
200+
except Exception as err:
201+
print("ERROR: {}".format(err))

0 commit comments

Comments
 (0)