Skip to content

Commit ab05345

Browse files
committed
Make fetch subcommand faster
1 parent 5115415 commit ab05345

File tree

4 files changed

+128
-35
lines changed

4 files changed

+128
-35
lines changed

Cargo.lock

Lines changed: 51 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ aws-sdk-s3 = "1"
1111
chrono = "0.4"
1212
clap = { version = "4", features = ["derive"] }
1313
dirs = "6"
14+
futures = "0.3"
1415
indexmap = { version = "2", features = ["serde"] }
1516
indicatif = "0.18"
1617
percent-encoding = "2"

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ The subcommand names are all borrowed from similar subcommands in [Git](https://
130130

131131
This is the first command you need to run before you can do anything with `npc`. If you haven't run it before, it first clones the Nixpkgs repository, which can take a little while and will use up to a gigabyte of disk space.
132132

133-
All the other subcommands use this cache and don't try to update it themselves; for instance, if they see a commit that's not in the cache, they'll just give an error. You can always run `npc fetch` again to update the cache, which takes less time once you've already done it at least once: in my experience, usually it takes about a minute.
133+
All the other subcommands use this cache and don't try to update it themselves; for instance, if they see a commit that's not in the cache, they'll just give an error. You can always run `npc fetch` again to update the cache, which is faster once you've already done it at least once: in my experience, it takes less than a minute.
134134

135135
### `npc clean`
136136

src/main.rs

Lines changed: 75 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
collections::VecDeque,
23
env, fmt, fs,
34
io::{self, BufRead, Write},
45
os::unix::ffi::OsStrExt,
@@ -12,8 +13,9 @@ use anyhow::{Context, anyhow, bail};
1213
use aws_config::{BehaviorVersion, Region};
1314
use aws_sdk_s3 as s3;
1415
use clap::{Parser, Subcommand};
16+
use futures::future::try_join_all;
1517
use indexmap::IndexMap;
16-
use indicatif::ProgressBar;
18+
use indicatif::{MultiProgress, ProgressBar};
1719
use percent_encoding::{NON_ALPHANUMERIC, percent_encode};
1820
use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Visitor};
1921
use strum::{EnumIter, EnumString, IntoEnumIterator, IntoStaticStr};
@@ -440,18 +442,38 @@ impl Remote {
440442
Self { cache, s3 }
441443
}
442444

443-
async fn git_revision(&self, prefix: &str) -> anyhow::Result<Sha> {
444-
let dot = prefix.rfind(".").ok_or_else(|| anyhow!("no dot"))?;
445-
let short = &prefix[dot + 1..prefix.len() - 1];
446-
let output = self.cache.git().args(["rev-parse", short]).output()?;
447-
let string = if output.status.success() {
448-
pop_newline(String::from_utf8(output.stdout)?)?
449-
} else {
450-
let key = format!("{prefix}git-revision");
451-
let output = self.s3.get_object().bucket(BUCKET).key(key).send().await?;
452-
String::from_utf8(output.body.collect().await?.to_vec())?
453-
};
454-
Ok(string.parse()?)
445+
async fn git_revisions(
446+
&self,
447+
mut prefixes: VecDeque<String>,
448+
mut callback: impl FnMut(Sha, String),
449+
) -> anyhow::Result<()> {
450+
while !prefixes.is_empty() {
451+
let mut cmd = self.cache.git();
452+
cmd.arg("rev-parse");
453+
// We cap the number of commits we try to ask Git for at once, because otherwise the
454+
// retries could potentially cause this function to become quadratic instead of linear.
455+
for prefix in prefixes.iter().take(100) {
456+
let dot = prefix.rfind(".").ok_or_else(|| anyhow!("no dot"))?;
457+
let short = &prefix[dot + 1..prefix.len() - 1];
458+
cmd.arg(short);
459+
}
460+
let output = cmd.output()?;
461+
for line in String::from_utf8(output.stdout)?.lines() {
462+
let Some(prefix) = prefixes.pop_front() else {
463+
bail!("unexpected extra Git output");
464+
};
465+
let sha = match line.parse() {
466+
Ok(sha) => sha,
467+
Err(_) => {
468+
let key = format!("{prefix}git-revision");
469+
let output = self.s3.get_object().bucket(BUCKET).key(key).send().await?;
470+
String::from_utf8(output.body.collect().await?.to_vec())?.parse()?
471+
}
472+
};
473+
callback(sha, prefix);
474+
}
475+
}
476+
Ok(())
455477
}
456478

457479
async fn list_revisions(
@@ -472,11 +494,13 @@ impl Remote {
472494
.set_continuation_token(continuation_token)
473495
.send()
474496
.await?;
475-
for item in output.common_prefixes.unwrap_or_default() {
476-
let prefix = item.prefix.ok_or_else(|| anyhow!("missing prefix"))?;
477-
let sha = self.git_revision(&prefix).await?;
478-
callback(sha, prefix);
479-
}
497+
let prefixes = output
498+
.common_prefixes
499+
.unwrap_or_default()
500+
.into_iter()
501+
.map(|item| item.prefix.ok_or_else(|| anyhow!("missing prefix")))
502+
.collect::<anyhow::Result<VecDeque<String>>>()?;
503+
self.git_revisions(prefixes, &mut callback).await?;
480504
match output.next_continuation_token {
481505
Some(token) => continuation_token = Some(token),
482506
None => break,
@@ -488,10 +512,10 @@ impl Remote {
488512

489513
async fn channel_json(
490514
&self,
515+
spinner: ProgressBar,
491516
channel: Branch,
492517
releases: impl IntoIterator<Item = &str>,
493518
) -> anyhow::Result<String> {
494-
let spinner = ProgressBar::new_spinner();
495519
spinner.set_message(<&str>::from(channel));
496520
spinner.enable_steady_tick(Duration::from_millis(100));
497521
let mut pairs = IndexMap::new();
@@ -504,6 +528,26 @@ impl Remote {
504528
spinner.finish();
505529
Ok(json)
506530
}
531+
532+
async fn fetch_channels<I: IntoIterator<Item = &'static str>>(
533+
&self,
534+
releases: impl Fn(Branch) -> I,
535+
callback: impl Copy + Fn(Branch, String) -> anyhow::Result<()>,
536+
) -> anyhow::Result<()> {
537+
let multi = MultiProgress::new();
538+
try_join_all(Branch::iter().filter_map(|branch| {
539+
let mut iterator = releases(branch).into_iter().peekable();
540+
iterator.peek()?; // Ignore this branch if there's nothing to fetch for it.
541+
let spinner = multi.add(ProgressBar::new_spinner());
542+
Some(async move {
543+
let json = self.channel_json(spinner, branch, iterator).await?;
544+
callback(branch, json)?;
545+
Ok::<_, anyhow::Error>(())
546+
})
547+
}))
548+
.await?;
549+
Ok(())
550+
}
507551
}
508552

509553
#[derive(Debug, Deserialize)]
@@ -922,12 +966,12 @@ async fn main() -> anyhow::Result<()> {
922966
};
923967

924968
let remote = Remote::new(cache).await;
925-
for branch in Branch::iter() {
926-
if let Some(release) = branch.current_release() {
927-
let json = remote.channel_json(branch, [release]).await?;
928-
fs::write(remote.cache.path(branch.key()), json)?;
929-
}
930-
}
969+
remote
970+
.fetch_channels(
971+
|branch| branch.current_release(),
972+
|branch, json| Ok(fs::write(remote.cache.path(branch.key()), json)?),
973+
)
974+
.await?;
931975

932976
// We fetch from Git after fetching from S3 so that, once we're done, all the commit
933977
// hashes we got from S3 should also be in our local Git clone.
@@ -1105,15 +1149,12 @@ async fn main() -> anyhow::Result<()> {
11051149
#[cfg(feature = "history")]
11061150
(Ok(cache), Commands::History { dir }) => {
11071151
let remote = Remote::new(cache).await;
1108-
for branch in Branch::iter() {
1109-
let releases = branch.past_releases();
1110-
if !releases.is_empty() {
1111-
let json = remote
1112-
.channel_json(branch, releases.iter().copied())
1113-
.await?;
1114-
fs::write(dir.join(branch.key().name()), json)?;
1115-
}
1116-
}
1152+
remote
1153+
.fetch_channels(
1154+
|branch| branch.past_releases().iter().copied(),
1155+
|branch, json| Ok(fs::write(dir.join(branch.key().name()), json)?),
1156+
)
1157+
.await?;
11171158
Ok(())
11181159
}
11191160
}

0 commit comments

Comments
 (0)