Skip to content

Commit e5df306

Browse files
committed
new: implemented adaptive timeout, now certain timeout sensitive plugins such as dns and port.scanner can adjust the workers timeout when getting positive results
1 parent 37b7f2b commit e5df306

File tree

8 files changed

+105
-37
lines changed

8 files changed

+105
-37
lines changed

src/options.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ pub(crate) struct Options {
6767
#[clap(long, value_enum, default_value_t = session::loot::OutputFormat::Text)]
6868
pub output_format: session::loot::OutputFormat,
6969
/// Connection timeout in milliseconds.
70-
#[clap(long, default_value_t = 10000)]
70+
#[clap(long, default_value_t = 1000)]
7171
pub timeout: u64,
7272
/// Number of attempts if a request fails.
73-
#[clap(long, default_value_t = 3)]
73+
#[clap(long, default_value_t = 1)]
7474
pub retries: usize,
7575
/// Delay in milliseconds to wait before a retry.
7676
#[clap(long, default_value_t = 1000)]

src/plugins/dns/mod.rs

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,51 @@ super::manager::register_plugin! {
2525

2626
#[derive(Clone)]
2727
pub(crate) struct DNS {
28-
resolver: Option<TokioAsyncResolver>,
28+
ips: Vec<IpAddr>,
2929
opts: options::Options,
30+
core_options: Options,
3031
hits: Arc<DashMap<IpAddr, usize>>,
3132
domains: Arc<DashSet<String>>,
3233
}
3334

3435
impl DNS {
3536
pub fn new() -> Self {
3637
DNS {
37-
resolver: None,
38+
ips: vec![],
3839
opts: options::Options::default(),
40+
core_options: Options::default(),
3941
hits: Arc::new(DashMap::new()),
4042
domains: Arc::new(DashSet::new()),
4143
}
4244
}
4345

46+
async fn get_resolver(&self, timeout: Duration) -> Result<TokioAsyncResolver, Error> {
47+
if !self.ips.is_empty() {
48+
let nameserver_group =
49+
NameServerConfigGroup::from_ips_clear(&self.ips, self.opts.dns_port, true);
50+
51+
let mut options = ResolverOpts::default();
52+
53+
options.num_concurrent_reqs = self.core_options.concurrency;
54+
options.attempts = self.opts.dns_attempts;
55+
options.timeout = timeout;
56+
options.shuffle_dns_servers = true;
57+
58+
Ok(AsyncResolver::tokio(
59+
ResolverConfig::from_parts(None, vec![], nameserver_group),
60+
options,
61+
))
62+
} else {
63+
let (config, mut options) =
64+
trust_dns_resolver::system_conf::read_system_conf().map_err(|e| e.to_string())?;
65+
66+
options.attempts = self.opts.dns_attempts;
67+
options.timeout = timeout;
68+
69+
Ok(AsyncResolver::tokio(config, options))
70+
}
71+
}
72+
4473
async fn filter(&self, addresses: Vec<IpAddr>) -> Vec<IpAddr> {
4574
// Some domains are configured to resolve any subdomain, whatever it is, to the same IP. We do
4675
// this filtering in order too many positives for an address and work around this behaviour.
@@ -170,8 +199,9 @@ impl Plugin for DNS {
170199
}
171200

172201
async fn setup(&mut self, opts: &Options) -> Result<(), Error> {
202+
self.core_options = opts.clone();
173203
self.opts = opts.dns.clone();
174-
self.resolver = Some(if let Some(resolvers) = opts.dns.dns_resolvers.as_ref() {
204+
self.ips = if let Some(resolvers) = opts.dns.dns_resolvers.as_ref() {
175205
let ips: Vec<IpAddr> = resolvers
176206
.split(',')
177207
.map(|s| s.trim())
@@ -181,25 +211,12 @@ impl Plugin for DNS {
181211

182212
log::info!("using resolvers: {:?}", &ips);
183213

184-
let nameserver_group =
185-
NameServerConfigGroup::from_ips_clear(&ips, opts.dns.dns_port, true);
186-
187-
let mut options = ResolverOpts::default();
188-
189-
options.num_concurrent_reqs = opts.concurrency;
190-
options.attempts = opts.dns.dns_attempts;
191-
options.timeout = Duration::from_millis(opts.timeout);
192-
options.shuffle_dns_servers = true;
193-
194-
AsyncResolver::tokio(
195-
ResolverConfig::from_parts(None, vec![], nameserver_group),
196-
options,
197-
)
214+
ips
198215
} else {
199216
log::info!("using system resolver");
200217

201-
AsyncResolver::tokio_from_system_conf().map_err(|e| e.to_string())?
202-
});
218+
vec![]
219+
};
203220

204221
Ok(())
205222
}
@@ -224,10 +241,11 @@ impl Plugin for DNS {
224241
}
225242

226243
// each worker will use its own resolver object instance
227-
let resolver = self.resolver.as_ref().unwrap().clone();
228-
244+
let resolver = self.get_resolver(timeout).await?;
245+
let started_at = std::time::Instant::now();
229246
// attempt resolving this subdomain to a one or more IP addresses
230247
if let Ok(response) = resolver.lookup_ip(&subdomain).await {
248+
let elapsed = started_at.elapsed();
231249
// collect valid IPs
232250
let addresses: Vec<IpAddr> = response.iter().filter(|ip| !ip.is_loopback()).collect();
233251
// Some domains are configured to resolve any subdomain, whatever it is, to the same IP. We do
@@ -259,6 +277,7 @@ impl Plugin for DNS {
259277
};
260278

261279
loot_data.push(("addresses".to_owned(), addr_data));
280+
loot_data.push(("time_ms".to_owned(), elapsed.as_millis().to_string()));
262281

263282
let mut loot = vec![Loot::new("dns", &subdomain, loot_data)];
264283

src/plugins/manager.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::BTreeMap;
22
use std::sync::{LazyLock, Mutex};
3-
use std::time;
3+
use std::time::{self, Duration};
44

55
use ansi_term::Style;
66
use dashmap::DashSet;
@@ -134,7 +134,6 @@ pub(crate) async fn run(
134134
async fn worker(plugin: &dyn Plugin, unreachables: Arc<DashSet<Arc<str>>>, session: Arc<Session>) {
135135
log::debug!("worker started");
136136

137-
let timeout = time::Duration::from_millis(session.options.timeout);
138137
let retry_time: time::Duration = time::Duration::from_millis(session.options.retry_time);
139138

140139
while let Ok(creds) = session.recv_credentials().await {
@@ -161,6 +160,7 @@ async fn worker(plugin: &dyn Plugin, unreachables: Arc<DashSet<Arc<str>>>, sessi
161160

162161
// skip attempt if we had enough failures from this specific target
163162
if !unreachables.contains(creds.target.as_str()) {
163+
let timeout = session.runtime.get_timeout();
164164
match plugin.attempt(&creds, timeout).await {
165165
Err(err) => {
166166
errors += 1;
@@ -196,6 +196,21 @@ async fn worker(plugin: &dyn Plugin, unreachables: Arc<DashSet<Arc<str>>>, sessi
196196
// do we have new loot?
197197
if let Some(loots) = loot {
198198
for loot in loots {
199+
// some plugins might return the elapsed time in the loot
200+
// if we have it, we can adjust the timeout to avoid waiting
201+
// for too long
202+
if let Some(mut elapsed) = loot.get_elapsed_time() {
203+
if elapsed.as_millis() == 0 {
204+
// make it at least 1ms
205+
elapsed = Duration::from_millis(1);
206+
}
207+
// increase the elapsed time just to be safe
208+
elapsed *= 10;
209+
if elapsed < timeout {
210+
session.runtime.set_timeout(elapsed.as_millis() as u64);
211+
}
212+
}
213+
199214
session.add_loot(loot).await.unwrap();
200215
}
201216
}

src/plugins/port_scanner/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ impl PortScanner {
5050
let mut data = vec![
5151
("transport".to_owned(), "tcp".to_owned()),
5252
("port".to_owned(), creds.username.to_owned()),
53-
("time".to_owned(), format!("{:?}", start.elapsed())),
53+
(
54+
"time_ms".to_owned(),
55+
format!("{}", start.elapsed().as_millis()),
56+
),
5457
];
5558

5659
if !self.opts.port_scanner_no_banners {
@@ -143,7 +146,10 @@ impl PortScanner {
143146
let mut data = vec![
144147
("transport".to_owned(), "udp".to_owned()),
145148
("port".to_owned(), creds.username.to_owned()),
146-
("time".to_owned(), format!("{:?}", start.elapsed())),
149+
(
150+
"time_ms".to_owned(),
151+
format!("{}", start.elapsed().as_millis()),
152+
),
147153
];
148154

149155
for (name, value) in grab_udp_banner(&buf[0..size]).await {

src/plugins/port_scanner/options.rs

Lines changed: 2 additions & 2 deletions
Large diffs are not rendered by default.

src/session/loot.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::fs::OpenOptions;
22
use std::io::prelude::*;
3+
use std::time::Duration;
34
use std::{fmt, path::Path};
45

56
use ansi_term::Colour;
@@ -59,6 +60,15 @@ impl Loot {
5960
&self.data
6061
}
6162

63+
pub fn get_elapsed_time(&self) -> Option<Duration> {
64+
if let Some(time) = self.data.get("time_ms")
65+
&& let Ok(time) = time.parse::<u64>()
66+
{
67+
return Some(Duration::from_millis(time));
68+
}
69+
None
70+
}
71+
6272
pub fn is_partial(&self) -> bool {
6373
self.partial
6474
}

src/session/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub(crate) struct Session {
109109
pub results: Mutex<Vec<Loot>>,
110110

111111
#[serde(skip_serializing, skip_deserializing)]
112-
runtime: Runtime,
112+
pub runtime: Runtime,
113113
}
114114

115115
impl Session {
@@ -129,7 +129,7 @@ impl Session {
129129
parse_target(target, 0)?;
130130
}
131131

132-
let runtime = Runtime::new(options.concurrency);
132+
let runtime = Runtime::new(options.concurrency, options.timeout);
133133
let total = AtomicUsize::new(0);
134134
let done = AtomicUsize::new(0);
135135
let errors = AtomicUsize::new(0);
@@ -153,7 +153,7 @@ impl Session {
153153
let file = fs::File::open(path).map_err(|e| e.to_string())?;
154154
let mut session: Session = serde_json::from_reader(file).map_err(|e| e.to_string())?;
155155

156-
session.runtime = Runtime::new(session.options.concurrency);
156+
session.runtime = Runtime::new(session.options.concurrency, session.options.timeout);
157157

158158
Ok(Arc::new(session))
159159
} else {
@@ -341,8 +341,6 @@ impl Session {
341341
pub async fn report_runtime_statistics(&self) {
342342
let report_interval = time::Duration::from_millis(self.options.report_time);
343343
while !self.is_stop() {
344-
tokio::time::sleep(report_interval).await;
345-
346344
let total = self.get_total();
347345
let done = self.get_done();
348346
let perc = (done as f32 / total as f32) * 100.0;
@@ -371,6 +369,8 @@ impl Session {
371369
} else {
372370
log::info!("{}", stats.to_text());
373371
}
372+
373+
tokio::time::sleep(report_interval).await;
374374
}
375375
}
376376
}

src/session/runtime.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1+
use std::{
2+
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
3+
time::Duration,
4+
};
25

36
use super::Error;
47
use crate::Credentials;
@@ -9,21 +12,23 @@ pub(crate) struct Runtime {
912
creds_tx: async_channel::Sender<Credentials>,
1013
creds_rx: async_channel::Receiver<Credentials>,
1114
speed: AtomicUsize,
15+
timeout_ms: AtomicU64,
1216
}
1317

1418
impl Default for Runtime {
1519
fn default() -> Self {
16-
Self::new(1)
20+
Self::new(1, 10000)
1721
}
1822
}
1923

2024
impl Runtime {
21-
pub(crate) fn new(concurrency: usize) -> Self {
25+
pub(crate) fn new(concurrency: usize, timeout_ms: u64) -> Self {
2226
// use a buffer of 100x the concurrency to avoid blocking on sending credentials
2327
let (creds_tx, creds_rx) = async_channel::bounded(concurrency * 100);
2428
Self {
2529
stop: AtomicBool::new(false),
2630
speed: AtomicUsize::new(0),
31+
timeout_ms: AtomicU64::new(timeout_ms),
2732
creds_tx,
2833
creds_rx,
2934
}
@@ -54,4 +59,17 @@ impl Runtime {
5459
pub async fn recv_credentials(&self) -> Result<Credentials, Error> {
5560
self.creds_rx.recv().await.map_err(|e| e.to_string())
5661
}
62+
63+
pub fn get_timeout(&self) -> Duration {
64+
Duration::from_millis(self.timeout_ms.load(Ordering::Relaxed))
65+
}
66+
67+
pub fn set_timeout(&self, timeout_ms: u64) {
68+
log::info!(
69+
"adjusting timeout from {:?} to {}ms",
70+
self.get_timeout(),
71+
timeout_ms
72+
);
73+
self.timeout_ms.store(timeout_ms, Ordering::Relaxed);
74+
}
5775
}

0 commit comments

Comments
 (0)