Skip to content

Commit e7ac602

Browse files
committed
Fix OSS RE client: FindMissingCache staleness, TTL, and silent upload skip
Three related bugs in the OSS RE gRPC client cause `remote_upload_error` when deferred-materialized outputs from one action are used as inputs to another action. **Bug 1: Hardcoded TTL of 0 in OSS shim** `convert_action_result()` hardcodes `ttl: 0` for all output files and `action_result_ttl: 0` for execute responses. The standard REv2 protocol has no TTL field — TTL is a buck2 extension. With ttl=0, the deferred materializer treats every blob as immediately expired, so `get_digests_ttl()` always queries FindMissingBlobs instead of trusting that recently-produced outputs still exist. Fix: use `cas_ttl_secs` from runtime options (configured via `cas_ttl_secs` buckconfig key, default 3 hours) instead of hardcoding 0. **Bug 2: FindMissingCache caches "Missing" state** `get_digests_ttl()` caches `DigestRemoteState::Missing` in the FindMissingCache LRU (500K entries, 12-hour TTL). "Missing" is a transient state — the blob may be uploaded or produced by an RE action at any time. A stale "Missing" entry causes `get_digests_ttl()` to return ttl=0 without sending a FindMissingBlobs RPC, even though the blob now exists in CAS. Since deferred materialization never downloaded the blob, buck2 cannot upload it and fails with `remote_upload_error`. Fix: stop caching `DigestRemoteState::Missing`. Only cache `ExistsOnRemote`. Additionally, mark top-level output file and directory digests as `ExistsOnRemote` when processing action results from Execute, GetActionResult, and WriteActionResult responses. **Bug 3: Silent skip of missing CAS artifacts during upload** When the uploader encounters a `RequiresCasDownload` file that FindMissingBlobs reports as missing, it logs a `soft_error!("cas_missing")` and silently skips the file (`continue`). The downstream action then fails with incomplete inputs. Fix: instead of silently skipping, materialize the file locally (which downloads from CAS, bypassing FindMissingCache) and add it to the upload list. If CAS truly doesn't have the blob, `ensure_materialized` fails with a clear error instead of a mysterious downstream action failure.
1 parent 442a1d6 commit e7ac602

File tree

2 files changed

+63
-16
lines changed

2 files changed

+63
-16
lines changed

app/buck2_execute/src/re/uploader.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,9 @@ impl Uploader {
312312
}
313313
Err(
314314
ref err @ ArtifactNotMaterializedReason::RequiresCasDownload {
315+
ref path,
315316
ref entry,
316317
ref info,
317-
..
318318
},
319319
) => {
320320
if let DirectoryEntry::Leaf(ActionDirectoryMember::File(file)) =
@@ -361,6 +361,19 @@ impl Uploader {
361361
quiet: true
362362
)?;
363363

364+
// Materialize the file locally from CAS and add it to
365+
// the upload list. Silently skipping this file (the old
366+
// behavior) leaves the downstream action with incomplete
367+
// inputs, causing confusing failures. Materializing
368+
// bypasses FindMissingCache and talks directly to CAS;
369+
// if the blob truly is gone, we fail here with a clear
370+
// error rather than later with a mysterious one.
371+
paths_to_materialize.push(path.clone());
372+
upload_files.push(NamedDigest {
373+
name: fs.resolve(path).as_maybe_relativized_str()?.to_owned(),
374+
digest,
375+
..Default::default()
376+
});
364377
continue;
365378
}
366379
}

remote_execution/oss/re_grpc/src/client.rs

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ pub struct REClient {
591591
capabilities: RECapabilities,
592592
instance_name: InstanceName,
593593
// buck2 calls find_missing for same blobs
594-
find_missing_cache: Mutex<FindMissingCache>,
594+
find_missing_cache: Arc<Mutex<FindMissingCache>>,
595595
bystream_compressor: Option<Compressor>,
596596
}
597597

@@ -669,11 +669,11 @@ impl REClient {
669669
grpc_clients,
670670
capabilities,
671671
instance_name,
672-
find_missing_cache: Mutex::new(FindMissingCache {
672+
find_missing_cache: Arc::new(Mutex::new(FindMissingCache {
673673
cache: LruCache::new(NonZeroUsize::new(500_000).unwrap()),
674674
ttl: Duration::from_hours(12), // 12 hours TODO: Tune this parameter
675675
last_check: Instant::now(),
676-
}),
676+
})),
677677
bystream_compressor,
678678
}
679679
}
@@ -697,9 +697,12 @@ impl REClient {
697697
))
698698
.await?;
699699

700+
let action_result = convert_action_result(res.into_inner(), self.runtime_opts.cas_ttl_secs)?;
701+
mark_action_result_outputs_as_existing(&self.find_missing_cache, &action_result);
702+
700703
Ok(ActionResultResponse {
701-
action_result: convert_action_result(res.into_inner())?,
702-
ttl: 0,
704+
action_result,
705+
ttl: self.runtime_opts.cas_ttl_secs,
703706
})
704707
}
705708

@@ -724,9 +727,12 @@ impl REClient {
724727
))
725728
.await?;
726729

730+
let action_result = convert_action_result(res.into_inner(), self.runtime_opts.cas_ttl_secs)?;
731+
mark_action_result_outputs_as_existing(&self.find_missing_cache, &action_result);
732+
727733
Ok(WriteActionResultResponse {
728-
actual_action_result: convert_action_result(res.into_inner())?,
729-
ttl_seconds: 0,
734+
actual_action_result: action_result,
735+
ttl_seconds: self.runtime_opts.cas_ttl_secs,
730736
})
731737
}
732738

@@ -756,6 +762,9 @@ impl REClient {
756762
..Default::default()
757763
};
758764

765+
let cas_ttl_secs = self.runtime_opts.cas_ttl_secs;
766+
let find_missing_cache = self.find_missing_cache.clone();
767+
759768
let stream = client
760769
.execute(with_re_metadata(
761770
request,
@@ -765,7 +774,9 @@ impl REClient {
765774
.await?
766775
.into_inner();
767776

768-
let stream = futures::stream::try_unfold(stream, move |mut stream| async {
777+
let stream = futures::stream::try_unfold(stream, move |mut stream| {
778+
let find_missing_cache = find_missing_cache.clone();
779+
async move {
769780
let msg = match stream.try_next().await.context("RE channel error")? {
770781
Some(msg) => msg,
771782
None => return Ok(None),
@@ -794,12 +805,13 @@ impl REClient {
794805
.result
795806
.with_context(|| "The action result is not defined.")?;
796807

797-
let action_result = convert_action_result(action_result)?;
808+
let action_result = convert_action_result(action_result, cas_ttl_secs)?;
809+
mark_action_result_outputs_as_existing(&find_missing_cache, &action_result);
798810

799811
let execute_response = ExecuteResponse {
800812
action_result,
801813
action_result_digest: TDigest::default(),
802-
action_result_ttl: 0,
814+
action_result_ttl: cas_ttl_secs,
803815
status: TStatus {
804816
code: TCode::OK,
805817
message: execute_response_grpc.message,
@@ -837,6 +849,7 @@ impl REClient {
837849
};
838850

839851
anyhow::Ok(Some((status, stream)))
852+
}
840853
});
841854

842855
// We fill in the action digest a little later here. We do it this way so we don't have to
@@ -1019,10 +1032,13 @@ impl REClient {
10191032
}
10201033

10211034
for digest in &resp.missing_blob_digests.map(|d| tdigest_from(d.clone())) {
1022-
// If it's present in the MissingBlobsResponse, it's expired on the remote and
1023-
// needs to be refetched.
1035+
// Missing is a transient state: the blob may be uploaded or
1036+
// produced by an RE action at any time. Only record it in
1037+
// remote_results for the current call; do NOT cache it, as a
1038+
// stale "Missing" entry would prevent future get_digests_ttl
1039+
// calls from sending a FindMissingBlobs RPC, causing
1040+
// remote_upload_error for blobs that actually exist in CAS.
10241041
remote_results.insert(digest.clone(), DigestRemoteState::Missing);
1025-
find_missing_cache.put(digest.clone(), DigestRemoteState::Missing);
10261042
}
10271043
digests_to_check.clear();
10281044
}
@@ -1080,7 +1096,7 @@ impl REClient {
10801096
}
10811097
}
10821098

1083-
fn convert_action_result(action_result: ActionResult) -> anyhow::Result<TActionResult2> {
1099+
fn convert_action_result(action_result: ActionResult, cas_ttl_secs: i64) -> anyhow::Result<TActionResult2> {
10841100
let execution_metadata = action_result
10851101
.execution_metadata
10861102
.with_context(|| "The execution metadata are not defined.")?;
@@ -1097,7 +1113,7 @@ fn convert_action_result(action_result: ActionResult) -> anyhow::Result<TActionR
10971113
name: output_file.path,
10981114
existed: false,
10991115
executable: output_file.is_executable,
1100-
ttl: 0,
1116+
ttl: cas_ttl_secs,
11011117
_dot_dot_default: (),
11021118
})
11031119
})?;
@@ -1176,6 +1192,24 @@ fn convert_action_result(action_result: ActionResult) -> anyhow::Result<TActionR
11761192
Ok(action_result)
11771193
}
11781194

1195+
/// Mark all output digests from an action result as existing in the
1196+
/// FindMissingCache. This prevents stale "Missing" entries from causing
1197+
/// downstream actions to fail with remote_upload_error when they reference
1198+
/// outputs produced by earlier RE actions.
1199+
fn mark_action_result_outputs_as_existing(
1200+
cache: &Mutex<FindMissingCache>,
1201+
action_result: &TActionResult2,
1202+
) {
1203+
let mut cache = cache.lock().unwrap();
1204+
for file in &action_result.output_files {
1205+
cache.put(file.digest.digest.clone(), DigestRemoteState::ExistsOnRemote);
1206+
}
1207+
for dir in &action_result.output_directories {
1208+
cache.put(dir.tree_digest.clone(), DigestRemoteState::ExistsOnRemote);
1209+
cache.put(dir.root_directory_digest.clone(), DigestRemoteState::ExistsOnRemote);
1210+
}
1211+
}
1212+
11791213
fn convert_t_action_result2(t_action_result: TActionResult2) -> anyhow::Result<ActionResult> {
11801214
let t_execution_metadata = t_action_result.execution_metadata;
11811215
let virtual_execution_duration = prost_types::Duration::try_from(

0 commit comments

Comments
 (0)