Skip to content

Commit 1583846

Browse files
committed
feat: reduce cpu drag by making channel drop logging optional
1 parent 117786f commit 1583846

File tree

3 files changed

+30
-7
lines changed

3 files changed

+30
-7
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ core runtime.
66

77
Current fastest speed, with default controls of `max-depth` 4, `max-links-per-page` 16, `politeness-ms` 250,
88
`partition-strategy` 'wiki-prefix' (instead of 'hash'), `partition-buckets` 26, `remote-batch-size` 32, and
9-
`duration-secs` 4 (it crawls for 4 seconds, but any enqued link is still awaited, so it runs for approximately 26
10-
seconds.) is **71.95 pages/sec**.
9+
`duration-secs` 4 (it crawls for 4 seconds, but any enqued link is still awaited, so it ran for 26.61s) is **75.12
10+
pages/sec**.
1111

1212
## Metrics
1313

@@ -89,6 +89,8 @@ cargo run --example wiki --features multi_thread -- --duration-secs 60
8989
the shards, and `--partition-namespace` to keep namespaces like `Talk:` or `Help:` on stable shards.
9090
- Tune `--remote-batch-size <n>` (default 8) to control how many cross-shard links get buffered before the router sends
9191
them; higher values reduce channel wakeups at the cost of slightly delayed enqueues on the destination shard.
92+
- Enable `--remote-channel-logs` only when debugging channel shutdowns; it reintroduces the verbose “remote shard …
93+
closed” logs.
9294

9395
## Customizing Crawls
9496

src/controls.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ pub struct Cli {
110110
#[cfg(feature = "multi_thread")]
111111
#[arg(long, env = "FASTCRAWL_REMOTE_BATCH_SIZE", default_value_t = 0)]
112112
pub remote_batch_size: usize,
113+
114+
/// Emit per-shard channel-closed logs (helps debugging shutdown races).
115+
#[cfg(feature = "multi_thread")]
116+
#[arg(long, env = "FASTCRAWL_REMOTE_CHANNEL_LOGS", default_value_t = false)]
117+
pub remote_channel_logs: bool,
113118
}
114119

115120
impl Cli {
@@ -144,6 +149,7 @@ impl Cli {
144149
wiki_bucket_count: (self.partition_buckets > 0).then_some(self.partition_buckets),
145150
wiki_include_namespace: self.partition_namespace,
146151
remote_batch_size: (self.remote_batch_size > 0).then_some(self.remote_batch_size),
152+
remote_channel_logs: self.remote_channel_logs,
147153
}
148154
}
149155
}
@@ -170,4 +176,6 @@ pub struct PartitionSettings {
170176
pub wiki_include_namespace: bool,
171177
/// Optional remote batch size override.
172178
pub remote_batch_size: Option<usize>,
179+
/// Whether to log channel-closed warnings for cross-shard sends.
180+
pub remote_channel_logs: bool,
173181
}

src/runtime.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ fn run_multi_thread(cli: Cli, seeds: &[&str], filter: UrlFilter) -> Result<(), D
302302
.remote_batch_size
303303
.unwrap_or(DEFAULT_REMOTE_BATCH_SIZE)
304304
.max(1);
305+
let remote_channel_logs = partition_settings.remote_channel_logs;
305306
let seeds_owned: Vec<String> = seeds.iter().map(|s| s.to_string()).collect();
306307
let start = Instant::now();
307308

@@ -335,6 +336,7 @@ fn run_multi_thread(cli: Cli, seeds: &[&str], filter: UrlFilter) -> Result<(), D
335336
senders_clone,
336337
receiver,
337338
remote_batch_size,
339+
remote_channel_logs,
338340
)))
339341
})?);
340342
}
@@ -359,6 +361,7 @@ async fn run_shard_streaming(
359361
remotes: Arc<Vec<mpsc::Sender<LinkBatch>>>,
360362
inbox: mpsc::Receiver<LinkBatch>,
361363
remote_batch_size: usize,
364+
remote_channel_logs: bool,
362365
) -> Result<(), DynError> {
363366
let state = AppState::new_with_shared(filter, &shared)?;
364367
seed_partitioned_frontier(state.frontier.as_ref(), &seeds, &partition).await;
@@ -383,7 +386,12 @@ async fn run_shard_streaming(
383386
})
384387
};
385388

386-
let router = Arc::new(ShardRouter::new(partition, remotes, remote_batch_size));
389+
let router = Arc::new(ShardRouter::new(
390+
partition,
391+
remotes,
392+
remote_batch_size,
393+
remote_channel_logs,
394+
));
387395
let mut workers = Vec::new();
388396
for id in 0..DEFAULT_AGENT_CAPACITY {
389397
workers.push(spawn_streaming_worker_sharded(
@@ -828,6 +836,7 @@ struct ShardRouter {
828836
remotes: Arc<Vec<mpsc::Sender<LinkBatch>>>,
829837
batch_size: usize,
830838
buffers: Arc<RemoteBuffers>,
839+
log_channel_drop: bool,
831840
}
832841

833842
#[cfg(feature = "multi_thread")]
@@ -836,12 +845,14 @@ impl ShardRouter {
836845
partition: ShardPartition,
837846
remotes: Arc<Vec<mpsc::Sender<LinkBatch>>>,
838847
batch_size: usize,
848+
log_channel_drop: bool,
839849
) -> Self {
840850
Self {
841851
partition,
842852
remotes,
843853
batch_size: batch_size.max(1),
844854
buffers: Arc::new(RemoteBuffers::default()),
855+
log_channel_drop,
845856
}
846857
}
847858

@@ -876,10 +887,12 @@ impl ShardRouter {
876887
if let Some(sender) = self.remotes.get(shard) {
877888
let count = links.len();
878889
if sender.send(LinkBatch { depth, links }).await.is_err() {
879-
eprintln!(
880-
"shard {}: remote shard {shard} channel closed",
881-
self.partition.index()
882-
);
890+
if self.log_channel_drop {
891+
eprintln!(
892+
"shard {}: remote shard {shard} channel closed",
893+
self.partition.index()
894+
);
895+
}
883896
} else {
884897
metrics.record_remote_shard_links(count);
885898
}

0 commit comments

Comments
 (0)