Skip to content

Commit a44bbe4

Browse files
authored
added timeout option to terminate capture (#172)
1 parent 597ea81 commit a44bbe4

File tree

3 files changed

+87
-10
lines changed

3 files changed

+87
-10
lines changed

util/captool/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ ipnet = "2.7.2"
1313
threadpool = "1.8.1"
1414
pnet = "0.33.0"
1515
hmac = "0.12.1"
16-
sha2 = "*"
16+
sha2 = "0.10.6"
1717
rand = "0.8.5"
1818
pcap-file = "2.0.0"
1919
clap = { version = "4.2.2", features = ["derive"] }
2020
signal-hook = "0.3.15"
2121
flate2 = "1.0.25"
2222
log = "0.4.17"
2323
simple_logger = "4.1.0"
24+
humantime = "2.1.0"
2425

2526
[dev-dependencies]
2627
tempfile = "3.5.0"

util/captool/src/main.rs

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ mod ip;
1111
mod limit;
1212
mod packet_handler;
1313
use ip::MutableIpPacket;
14-
use packet_handler::{PacketHandler, SupplementalFields};
14+
use packet_handler::{PacketError, PacketHandler, SupplementalFields};
1515

1616
use clap::Parser;
1717
use flate2::write::GzEncoder;
1818
use flate2::Compression;
19+
use humantime::parse_duration;
1920
use ipnet::IpNet;
2021
use pcap::{Activated, Capture, Device, Linktype};
2122
use pcap_file::pcapng::blocks::enhanced_packet::EnhancedPacketBlock;
@@ -34,7 +35,8 @@ use std::io::stdin;
3435
use std::io::Write;
3536
use std::sync::atomic::{AtomicBool, Ordering};
3637
use std::sync::{Arc, Mutex};
37-
use std::time::Duration;
38+
use std::thread;
39+
use std::time::{self, Duration};
3840

3941
const ASNDB_PATH: &str = "/usr/share/GeoIP/GeoLite2-ASN.mmdb";
4042
const CCDB_PATH: &str = "/usr/share/GeoIP/GeoLite2-Country.mmdb";
@@ -75,6 +77,11 @@ struct Args {
7577
#[arg(long)]
7678
lf: Option<u64>,
7779

80+
/// Limit Duration (ld) takes a human parse-able time duration. If after that long the capture
81+
/// has not terminated due to a packet limit, the timer will terminate the capture.
82+
#[arg(long,value_parser=parse_duration)]
83+
timeout: Option<std::time::Duration>,
84+
7885
/// This limits the number of packets captured in any given flow.
7986
#[arg(long)]
8087
ppf: Option<u64>,
@@ -226,9 +233,26 @@ fn main() -> Result<(), Box<dyn Error>> {
226233
let arc_writer = Arc::new(Mutex::new(writer));
227234

228235
match args.pcap_dir {
229-
Some(pcap_dir) => read_pcap_dir(pcap_dir, handler, arc_writer, flag),
230-
None => read_interfaces(args.interfaces, handler, arc_writer, flag),
236+
Some(pcap_dir) => read_pcap_dir(
237+
pcap_dir,
238+
handler,
239+
arc_writer,
240+
Arc::clone(&flag),
241+
args.timeout,
242+
),
243+
None => read_interfaces(
244+
args.interfaces,
245+
handler,
246+
arc_writer,
247+
Arc::clone(&flag),
248+
args.timeout,
249+
),
231250
};
251+
252+
if !flag.load(Ordering::Relaxed) {
253+
flag.store(true, Ordering::Relaxed);
254+
}
255+
232256
Ok(())
233257
}
234258

@@ -237,10 +261,11 @@ fn read_interfaces<W>(
237261
handler: Arc<Mutex<PacketHandler>>,
238262
arc_writer: Arc<Mutex<PcapNgWriter<W>>>,
239263
term: Arc<AtomicBool>,
264+
timeout: Option<Duration>,
240265
) where
241266
W: Write + std::marker::Send + 'static,
242267
{
243-
let pool = ThreadPool::new(interfaces.matches(',').count() + 1);
268+
let pool = ThreadPool::new(interfaces.matches(',').count() + 2);
244269

245270
for sig in TERM_SIGNALS {
246271
register(*sig, Arc::clone(&term)).unwrap();
@@ -269,6 +294,30 @@ fn read_interfaces<W>(
269294
}
270295
}
271296

297+
// if we were given a duration timeout sleep-wait until the flag is set or we hit the timeout.
298+
if let Some(duration_limit) = timeout {
299+
debug!("duration: {duration_limit:?}");
300+
let term_timeout = Arc::clone(&term);
301+
pool.execute(move || {
302+
let beginning_park = time::Instant::now();
303+
let mut timeout_remaining = duration_limit;
304+
loop {
305+
thread::park_timeout(timeout_remaining);
306+
let elapsed = beginning_park.elapsed();
307+
debug!("t: {elapsed:?}");
308+
if term_timeout.load(Ordering::Relaxed) {
309+
break;
310+
}
311+
if elapsed >= duration_limit {
312+
term_timeout.store(true, Ordering::Relaxed);
313+
break;
314+
}
315+
316+
timeout_remaining = duration_limit - elapsed;
317+
}
318+
})
319+
};
320+
272321
pool.join();
273322
}
274323

@@ -277,11 +326,12 @@ fn read_pcap_dir<W>(
277326
handler: Arc<Mutex<PacketHandler>>,
278327
arc_writer: Arc<Mutex<PcapNgWriter<W>>>,
279328
term: Arc<AtomicBool>,
329+
timeout: Option<Duration>,
280330
) where
281331
W: Write + std::marker::Send + 'static,
282332
{
283333
let mut paths = fs::read_dir(pcap_dir.clone()).unwrap();
284-
let pool = ThreadPool::new(paths.count());
334+
let pool = ThreadPool::new(paths.count() + 2);
285335
signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(&term)).unwrap();
286336

287337
// refresh the path list and launch jobs
@@ -302,6 +352,30 @@ fn read_pcap_dir<W>(
302352
}
303353
}
304354

355+
// if we were given a duration timeout sleep-wait until the flag is set or we hit the timeout.
356+
if let Some(duration_limit) = timeout {
357+
debug!("duration: {duration_limit:?}");
358+
let term_timeout = Arc::clone(&term);
359+
pool.execute(move || {
360+
let beginning_park = time::Instant::now();
361+
let mut timeout_remaining = duration_limit;
362+
loop {
363+
thread::park_timeout(timeout_remaining);
364+
let elapsed = beginning_park.elapsed();
365+
debug!("t: {elapsed:?}");
366+
if term_timeout.load(Ordering::Relaxed) {
367+
break;
368+
}
369+
if elapsed >= duration_limit {
370+
term_timeout.store(true, Ordering::Relaxed);
371+
break;
372+
}
373+
374+
timeout_remaining = duration_limit - elapsed;
375+
}
376+
})
377+
};
378+
305379
pool.join(); // all threads must complete or the process will hang
306380
}
307381

@@ -365,7 +439,10 @@ fn read_packets<T, W>(
365439
} {
366440
Ok(sf) => sf,
367441
Err(e) => {
368-
debug!("supplemental info error {e}");
442+
match e {
443+
PacketError::Skip => {}
444+
e_other => debug!("supplemental info error {e_other}"),
445+
}
369446
continue;
370447
}
371448
};

util/captool/src/packet_handler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,6 @@ impl PacketHandler {
132132
src: IpAddr,
133133
dst: IpAddr,
134134
) -> Result<SupplementalFields, PacketError> {
135-
debug!("{src} -> {dst}");
136-
137135
let direction = self.should_anonymize(src, dst);
138136
let ip_of_interest = match direction {
139137
AnonymizeTypes::None => Err(PacketError::Skip)?,
@@ -147,6 +145,7 @@ impl PacketHandler {
147145
.trunc();
148146

149147
let country = self.get_cc(ip_of_interest)?;
148+
debug!("{src} -> {dst}");
150149

151150
Ok(SupplementalFields {
152151
cc: country,

0 commit comments

Comments
 (0)