Skip to content

Commit 6f43d9a

Browse files
committed
feat: stop crawl early when frontier stays idle
* End runs once the frontier, active tasks, and agent backlogs remain empty for 15s, logging an [idle] notice before shutdown. * Replace fixed sleep-based stop with the new idle-aware wait loop while preserving duration cap. * Keep final metrics reporting unchanged but ensure shutdown proceeds promptly after idleness.
1 parent 24a0418 commit 6f43d9a

File tree

1 file changed

+51
-8
lines changed

1 file changed

+51
-8
lines changed

src/runtime.rs

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use tokio::sync::mpsc;
3131
#[cfg(feature = "multi_thread")]
3232
use tokio::sync::Mutex;
3333
use tokio::task::{spawn_local, yield_now, LocalSet};
34-
use tokio::time::{sleep, timeout};
34+
use tokio::time::{interval, sleep, sleep_until, timeout, Instant as TokioInstant};
3535
use url::Url;
3636

3737
const USER_AGENT: &str = "fastcrawl-example/0.1 (+https://github.com/aaronlifton/fastcrawl)";
@@ -375,7 +375,9 @@ fn run_multi_thread(cli: Cli, seeds: &[&str], filter: UrlFilter) -> Result<(), D
375375
handle.join().expect("shard thread panicked")?;
376376
}
377377

378-
shared.metrics.report(start.elapsed());
378+
shared
379+
.metrics
380+
.report(start.elapsed(), shared.run_duration);
379381
Ok(())
380382
}
381383

@@ -444,7 +446,7 @@ async fn finish_run(
444446
report_metrics: bool,
445447
heartbeat: Option<tokio::task::JoinHandle<()>>,
446448
) {
447-
sleep(state.run_duration).await;
449+
wait_for_stop_signal(&state).await;
448450
state.stop_requested.store(true, Ordering::Release);
449451
wait_for_drain(
450452
state.registry.as_ref(),
@@ -463,7 +465,39 @@ async fn finish_run(
463465
let _ = hb.await;
464466
}
465467
if report_metrics {
466-
state.metrics.report(start.elapsed());
468+
state
469+
.metrics
470+
.report(start.elapsed(), state.run_duration);
471+
}
472+
}
473+
474+
async fn wait_for_stop_signal(state: &AppState) {
475+
let deadline = TokioInstant::now() + state.run_duration;
476+
let mut idle_ticks = 0usize;
477+
let mut ticker = interval(Duration::from_secs(5));
478+
479+
loop {
480+
tokio::select! {
481+
_ = sleep_until(deadline) => break,
482+
_ = ticker.tick() => {
483+
let pending = state.frontier.pending();
484+
let active = state.active_tasks.load(Ordering::Acquire);
485+
let backlog: usize = state.registry.iter().map(|a| a.backlog()).sum();
486+
if pending == 0 && active == 0 && backlog == 0 {
487+
idle_ticks += 1;
488+
if idle_ticks >= 3 {
489+
eprintln!(
490+
"[idle] frontier empty for {}s, ending early (run window {}s)",
491+
idle_ticks * 5,
492+
state.run_duration.as_secs()
493+
);
494+
break;
495+
}
496+
} else {
497+
idle_ticks = 0;
498+
}
499+
}
500+
}
467501
}
468502
}
469503

@@ -1560,12 +1594,21 @@ impl Metrics {
15601594
self.remote_batches.fetch_add(1, Ordering::Relaxed);
15611595
}
15621596

1563-
fn report(&self, elapsed: Duration) {
1564-
let secs = elapsed.as_secs_f32().max(f32::EPSILON);
1597+
fn report(&self, elapsed: Duration, crawl_window: Duration) {
1598+
let wall_secs = elapsed.as_secs_f32().max(f32::EPSILON);
1599+
let crawl_secs = crawl_window.as_secs_f32().max(f32::EPSILON);
15651600
let fetched = self.pages_fetched.load(Ordering::Relaxed);
1566-
println!("--- crawl metrics ({secs:.2}s) ---");
1601+
println!(
1602+
"--- crawl metrics ({wall_secs:.2}s elapsed, {crawl_secs:.2}s crawl window) ---"
1603+
);
15671604
println!("pages fetched: {}", fetched);
1568-
println!("urls fetched/sec: {:.2}", fetched as f32 / secs);
1605+
println!("urls fetched/sec: {:.2}", fetched as f32 / crawl_secs);
1606+
if wall_secs > crawl_secs + f32::EPSILON {
1607+
println!(
1608+
"urls fetched/sec (incl. drain): {:.2}",
1609+
fetched as f32 / wall_secs
1610+
);
1611+
}
15691612
println!(
15701613
"urls discovered: {}",
15711614
self.urls_discovered.load(Ordering::Relaxed)

0 commit comments

Comments
 (0)