From b34e44c885abd657432983ceecb61007de78b12a Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Fri, 27 Feb 2026 00:44:01 +0530 Subject: [PATCH 1/3] Fix Fast slow store Not Found error by returning failed precondition --- nativelink-worker/src/local_worker.rs | 33 +++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index 0b9ff40e2..ccf53a3a4 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -369,11 +369,34 @@ impl<'a, T: WorkerApiClientTrait + 'static, U: RunningActionsManager> LocalWorke .err_tip(|| "Error while calling execution_response")?; }, Err(e) => { - grpc_client.execution_response(ExecuteResult{ - instance_name, - operation_id, - result: Some(execute_result::Result::InternalError(e.into())), - }).await.err_tip(|| "Error calling execution_response with error")?; + let is_cas_blob_missing = e.code == Code::NotFound + && e.message_string().contains("not found in either fast or slow store"); + if is_cas_blob_missing { + warn!( + ?e, + "Missing CAS inputs during prepare_action, returning FAILED_PRECONDITION" + ); + let action_result = ActionResult { + error: Some(make_err!( + Code::FailedPrecondition, + "{}", + e.message_string() + )), + ..ActionResult::default() + }; + let action_stage = ActionStage::Completed(action_result); + grpc_client.execution_response(ExecuteResult{ + instance_name, + operation_id, + result: Some(execute_result::Result::ExecuteResponse(action_stage.into())), + }).await.err_tip(|| "Error calling execution_response with missing inputs")?; + } else { + grpc_client.execution_response(ExecuteResult{ + instance_name, + operation_id, + result: Some(execute_result::Result::InternalError(e.into())), + }).await.err_tip(|| "Error calling execution_response with error")?; + } }, } Ok(()) From 3ff25a797810d3d7658575be648d17328071c29e Mon Sep 17 00:00:00 2001 From: Marcus Date: Fri, 27 Feb 2026 18:20:16 -0500 Subject: [PATCH 2/3] Add tests for CAS NotFound to FailedPrecondition conversion Tests the conditional that converts NotFound errors containing "not found in either fast or slow store" to FailedPrecondition, and verifies other NotFound errors still return InternalError. Co-Authored-By: Claude Opus 4.6 --- nativelink-worker/tests/local_worker_test.rs | 209 +++++++++++++++++++ 1 file changed, 209 insertions(+) diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index efc3a61fa..d6398a04d 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -752,6 +752,215 @@ async fn kill_action_request_kills_action() -> Result<(), Error> { Ok(()) } +#[nativelink_test] +async fn cas_not_found_returns_failed_precondition_test() -> Result<(), Error> { + let mut test_context = setup_local_worker(HashMap::new()).await; + let streaming_response = test_context.maybe_streaming_response.take().unwrap(); + + { + let props = test_context + .client + .expect_connect_worker(Ok(streaming_response)) + .await; + assert_eq!(props, ConnectWorkerRequest::default()); + } + + let expected_worker_id = "foobar".to_string(); + + let tx_stream = test_context.maybe_tx_stream.take().unwrap(); + { + tx_stream + .send(Frame::data( + encode_stream_proto(&UpdateForWorker { + update: Some(Update::ConnectionResult(ConnectionResult { + worker_id: expected_worker_id.clone(), + })), + }) + .unwrap(), + )) + .await + .map_err(|e| make_input_err!("Could not send : {:?}", e))?; + } + + let action_digest = DigestInfo::new([3u8; 32], 10); + let action_info = ActionInfo { + command_digest: DigestInfo::new([1u8; 32], 10), + input_root_digest: DigestInfo::new([2u8; 32], 10), + timeout: Duration::from_secs(1), + platform_properties: HashMap::new(), + priority: 0, + load_timestamp: SystemTime::UNIX_EPOCH, + insert_timestamp: SystemTime::UNIX_EPOCH, + unique_qualifier: ActionUniqueQualifier::Uncacheable(ActionUniqueKey { + instance_name: INSTANCE_NAME.to_string(), + digest_function: DigestHasherFunc::Sha256, + digest: action_digest, + }), + }; + + { + tx_stream + .send(Frame::data( + encode_stream_proto(&UpdateForWorker { + update: Some(Update::StartAction(StartExecute { + execute_request: Some((&action_info).into()), + operation_id: String::new(), + queued_timestamp: None, + platform: Some(Platform::default()), + worker_id: expected_worker_id.clone(), + })), + }) + .unwrap(), + )) + .await + .map_err(|e| make_input_err!("Could not send : {:?}", e))?; + } + + let running_action = Arc::new(MockRunningAction::new()); + + // Send and wait for response from create_and_add_action. + test_context + .actions_manager + .expect_create_and_add_action(Ok(running_action.clone())) + .await; + + // Simulate prepare_action failing with a CAS NotFound error containing the + // specific "not found in either fast or slow store" message. This is the exact + // condition that the code checks to decide whether to return FailedPrecondition. + running_action + .expect_prepare_action(Err(make_err!( + Code::NotFound, + "Hash 0123456789abcdef not found in either fast or slow store" + ))) + .await?; + + // Cleanup is still called even when prepare_action fails. + running_action.cleanup(Ok(())).await?; + + // The worker should respond with FailedPrecondition wrapped in an ExecuteResponse, + // NOT an InternalError. This allows Bazel to re-upload the missing artifacts. + let execution_response = test_context.client.expect_execution_response(Ok(())).await; + + let expected_action_result = ActionResult { + error: Some(make_err!( + Code::FailedPrecondition, + "Hash 0123456789abcdef not found in either fast or slow store" + )), + ..ActionResult::default() + }; + assert_eq!( + execution_response, + ExecuteResult { + instance_name: INSTANCE_NAME.to_string(), + operation_id: String::new(), + result: Some(execute_result::Result::ExecuteResponse( + ActionStage::Completed(expected_action_result).into() + )), + } + ); + + Ok(()) +} + +#[nativelink_test] +async fn non_cas_not_found_returns_internal_error_test() -> Result<(), Error> { + let mut test_context = setup_local_worker(HashMap::new()).await; + let streaming_response = test_context.maybe_streaming_response.take().unwrap(); + + { + let props = test_context + .client + .expect_connect_worker(Ok(streaming_response)) + .await; + assert_eq!(props, ConnectWorkerRequest::default()); + } + + let expected_worker_id = "foobar".to_string(); + + let tx_stream = test_context.maybe_tx_stream.take().unwrap(); + { + tx_stream + .send(Frame::data( + encode_stream_proto(&UpdateForWorker { + update: Some(Update::ConnectionResult(ConnectionResult { + worker_id: expected_worker_id.clone(), + })), + }) + .unwrap(), + )) + .await + .map_err(|e| make_input_err!("Could not send : {:?}", e))?; + } + + let action_digest = DigestInfo::new([3u8; 32], 10); + let action_info = ActionInfo { + command_digest: DigestInfo::new([1u8; 32], 10), + input_root_digest: DigestInfo::new([2u8; 32], 10), + timeout: Duration::from_secs(1), + platform_properties: HashMap::new(), + priority: 0, + load_timestamp: SystemTime::UNIX_EPOCH, + insert_timestamp: SystemTime::UNIX_EPOCH, + unique_qualifier: ActionUniqueQualifier::Uncacheable(ActionUniqueKey { + instance_name: INSTANCE_NAME.to_string(), + digest_function: DigestHasherFunc::Sha256, + digest: action_digest, + }), + }; + + { + tx_stream + .send(Frame::data( + encode_stream_proto(&UpdateForWorker { + update: Some(Update::StartAction(StartExecute { + execute_request: Some((&action_info).into()), + operation_id: String::new(), + queued_timestamp: None, + platform: Some(Platform::default()), + worker_id: expected_worker_id.clone(), + })), + }) + .unwrap(), + )) + .await + .map_err(|e| make_input_err!("Could not send : {:?}", e))?; + } + + let running_action = Arc::new(MockRunningAction::new()); + + test_context + .actions_manager + .expect_create_and_add_action(Ok(running_action.clone())) + .await; + + // Simulate prepare_action failing with a NotFound error that does NOT contain + // the CAS-specific message. This should result in an InternalError, not + // FailedPrecondition. + let other_not_found_error = make_err!(Code::NotFound, "Some other resource was not found"); + running_action + .expect_prepare_action(Err(other_not_found_error.clone())) + .await?; + + // Cleanup is still called even when prepare_action fails. + running_action.cleanup(Ok(())).await?; + + // The worker should respond with InternalError since this is not a CAS blob miss. + let execution_response = test_context.client.expect_execution_response(Ok(())).await; + + assert_eq!( + execution_response, + ExecuteResult { + instance_name: INSTANCE_NAME.to_string(), + operation_id: String::new(), + result: Some(execute_result::Result::InternalError( + other_not_found_error.into() + )), + } + ); + + Ok(()) +} + #[cfg(target_family = "unix")] #[nativelink_test] async fn preconditions_met_extra_envs() -> Result<(), Error> { From 4a661d3d0da0df927b5296a8d121fe24bd8cd1a5 Mon Sep 17 00:00:00 2001 From: Marcus Date: Fri, 27 Feb 2026 19:46:05 -0500 Subject: [PATCH 3/3] Disable per-RPC timeout by default to prevent large upload retry loops The GrpcStore rpc_timeout_s defaulted to 120 seconds, which is too short for multi-GB uploads. This caused DeadlineExceeded errors that triggered retries, restarting the upload and compounding the problem. Dead connections are already detected by HTTP/2 keepalive (30s ping, 20s timeout) and TCP keepalive (30s) on each endpoint, so the per-RPC total timeout is unnecessary for that purpose. Setting rpc_timeout_s=0 now correctly disables the timeout instead of silently falling through to the 120s default. Fixes #2185 --- nativelink-config/src/stores.rs | 12 +++++++++--- nativelink-store/src/grpc_store.rs | 6 +----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 1c9ea27bd..31a0ae443 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -1126,10 +1126,16 @@ pub struct GrpcSpec { pub connections_per_endpoint: usize, /// Maximum time (seconds) allowed for a single RPC request (e.g. a - /// ByteStream.Write call) before it is cancelled. This prevents - /// individual RPCs from hanging forever on dead connections. + /// ByteStream.Write call) before it is cancelled. /// - /// Default: 120 (seconds) + /// A value of 0 (the default) disables the per-RPC timeout. Dead + /// connections are still detected by the HTTP/2 and TCP keepalive + /// mechanisms configured on each endpoint. + /// + /// For large uploads (multi-GB), either leave this at 0 or set it + /// large enough to accommodate the full transfer time. + /// + /// Default: 0 (disabled) #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] pub rpc_timeout_s: u64, } diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index 3fc3625d3..2966cd1e3 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -90,11 +90,7 @@ impl GrpcStore { endpoints.push(endpoint); } - let rpc_timeout = if spec.rpc_timeout_s > 0 { - Duration::from_secs(spec.rpc_timeout_s) - } else { - Duration::from_secs(120) - }; + let rpc_timeout = Duration::from_secs(spec.rpc_timeout_s); Ok(Arc::new(Self { instance_name: spec.instance_name.clone(),