Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions magic-nix-cache/src/gha.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl GhaCache {

let worker_result = tokio::task::spawn(async move {
worker(
&api2,
api2,
store,
channel_rx,
metrics,
Expand Down Expand Up @@ -115,14 +115,15 @@ impl GhaCache {
}

async fn worker(
api: &Api,
api: Arc<Api>,
store: Arc<NixStore>,
mut channel_rx: UnboundedReceiver<Request>,
metrics: Arc<telemetry::TelemetryReport>,
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
) -> Result<()> {
let mut done = HashSet::new();

let mut handles = tokio::task::JoinSet::new();
while let Some(req) = channel_rx.recv().await {
match req {
Request::Shutdown => {
Expand All @@ -138,31 +139,39 @@ async fn worker(
continue;
}

if let Err(err) = upload_path(
api,
store.clone(),
&path,
metrics.clone(),
narinfo_negative_cache.clone(),
)
.await
{
tracing::error!(
"Upload of path '{}' failed: {}",
store.get_full_path(&path).display(),
err
);
}
let path = path.clone();
let store = Arc::clone(&store);
let api = Arc::clone(&api);
let metrics = Arc::clone(&metrics);
let narinfo_negative_cache = Arc::clone(&narinfo_negative_cache);

handles.spawn(async move {
if let Err(err) =
upload_path(&api, &store, &path, metrics, narinfo_negative_cache).await
{
tracing::error!(
"Upload of path '{}' failed: {}",
store.get_full_path(&path).display(),
err
);
}
});
}
}
}

while let Some(ret) = handles.join_next().await {
if let Err(e) = ret {
tracing::error!("Upload task failed: {}", e);
}
}

Ok(())
}

async fn upload_path(
api: &Api,
store: Arc<NixStore>,
store: &Arc<NixStore>,
path: &StorePath,
metrics: Arc<telemetry::TelemetryReport>,
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
Expand Down Expand Up @@ -197,7 +206,7 @@ async fn upload_path(

let narinfo_allocation = api.allocate_file_with_random_suffix(&narinfo_path).await?;

let narinfo = path_info_to_nar_info(store.clone(), &path_info, format!("nar/{}", nar_path))
let narinfo = path_info_to_nar_info(Arc::clone(store), &path_info, format!("nar/{}", nar_path))
.to_string()
.expect("failed to convert path into to nar info");

Expand Down