Skip to content

Commit 3ff25a7

Browse files
Add tests for CAS NotFound to FailedPrecondition conversion
Tests the conditional that converts NotFound errors containing "not found in either fast or slow store" to FailedPrecondition, and verifies other NotFound errors still return InternalError. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b34e44c commit 3ff25a7

File tree

1 file changed

+209
-0
lines changed

1 file changed

+209
-0
lines changed

nativelink-worker/tests/local_worker_test.rs

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,215 @@ async fn kill_action_request_kills_action() -> Result<(), Error> {
752752
Ok(())
753753
}
754754

755+
#[nativelink_test]
756+
async fn cas_not_found_returns_failed_precondition_test() -> Result<(), Error> {
757+
let mut test_context = setup_local_worker(HashMap::new()).await;
758+
let streaming_response = test_context.maybe_streaming_response.take().unwrap();
759+
760+
{
761+
let props = test_context
762+
.client
763+
.expect_connect_worker(Ok(streaming_response))
764+
.await;
765+
assert_eq!(props, ConnectWorkerRequest::default());
766+
}
767+
768+
let expected_worker_id = "foobar".to_string();
769+
770+
let tx_stream = test_context.maybe_tx_stream.take().unwrap();
771+
{
772+
tx_stream
773+
.send(Frame::data(
774+
encode_stream_proto(&UpdateForWorker {
775+
update: Some(Update::ConnectionResult(ConnectionResult {
776+
worker_id: expected_worker_id.clone(),
777+
})),
778+
})
779+
.unwrap(),
780+
))
781+
.await
782+
.map_err(|e| make_input_err!("Could not send : {:?}", e))?;
783+
}
784+
785+
let action_digest = DigestInfo::new([3u8; 32], 10);
786+
let action_info = ActionInfo {
787+
command_digest: DigestInfo::new([1u8; 32], 10),
788+
input_root_digest: DigestInfo::new([2u8; 32], 10),
789+
timeout: Duration::from_secs(1),
790+
platform_properties: HashMap::new(),
791+
priority: 0,
792+
load_timestamp: SystemTime::UNIX_EPOCH,
793+
insert_timestamp: SystemTime::UNIX_EPOCH,
794+
unique_qualifier: ActionUniqueQualifier::Uncacheable(ActionUniqueKey {
795+
instance_name: INSTANCE_NAME.to_string(),
796+
digest_function: DigestHasherFunc::Sha256,
797+
digest: action_digest,
798+
}),
799+
};
800+
801+
{
802+
tx_stream
803+
.send(Frame::data(
804+
encode_stream_proto(&UpdateForWorker {
805+
update: Some(Update::StartAction(StartExecute {
806+
execute_request: Some((&action_info).into()),
807+
operation_id: String::new(),
808+
queued_timestamp: None,
809+
platform: Some(Platform::default()),
810+
worker_id: expected_worker_id.clone(),
811+
})),
812+
})
813+
.unwrap(),
814+
))
815+
.await
816+
.map_err(|e| make_input_err!("Could not send : {:?}", e))?;
817+
}
818+
819+
let running_action = Arc::new(MockRunningAction::new());
820+
821+
// Send and wait for response from create_and_add_action.
822+
test_context
823+
.actions_manager
824+
.expect_create_and_add_action(Ok(running_action.clone()))
825+
.await;
826+
827+
// Simulate prepare_action failing with a CAS NotFound error containing the
828+
// specific "not found in either fast or slow store" message. This is the exact
829+
// condition that the code checks to decide whether to return FailedPrecondition.
830+
running_action
831+
.expect_prepare_action(Err(make_err!(
832+
Code::NotFound,
833+
"Hash 0123456789abcdef not found in either fast or slow store"
834+
)))
835+
.await?;
836+
837+
// Cleanup is still called even when prepare_action fails.
838+
running_action.cleanup(Ok(())).await?;
839+
840+
// The worker should respond with FailedPrecondition wrapped in an ExecuteResponse,
841+
// NOT an InternalError. This allows Bazel to re-upload the missing artifacts.
842+
let execution_response = test_context.client.expect_execution_response(Ok(())).await;
843+
844+
let expected_action_result = ActionResult {
845+
error: Some(make_err!(
846+
Code::FailedPrecondition,
847+
"Hash 0123456789abcdef not found in either fast or slow store"
848+
)),
849+
..ActionResult::default()
850+
};
851+
assert_eq!(
852+
execution_response,
853+
ExecuteResult {
854+
instance_name: INSTANCE_NAME.to_string(),
855+
operation_id: String::new(),
856+
result: Some(execute_result::Result::ExecuteResponse(
857+
ActionStage::Completed(expected_action_result).into()
858+
)),
859+
}
860+
);
861+
862+
Ok(())
863+
}
864+
865+
#[nativelink_test]
866+
async fn non_cas_not_found_returns_internal_error_test() -> Result<(), Error> {
867+
let mut test_context = setup_local_worker(HashMap::new()).await;
868+
let streaming_response = test_context.maybe_streaming_response.take().unwrap();
869+
870+
{
871+
let props = test_context
872+
.client
873+
.expect_connect_worker(Ok(streaming_response))
874+
.await;
875+
assert_eq!(props, ConnectWorkerRequest::default());
876+
}
877+
878+
let expected_worker_id = "foobar".to_string();
879+
880+
let tx_stream = test_context.maybe_tx_stream.take().unwrap();
881+
{
882+
tx_stream
883+
.send(Frame::data(
884+
encode_stream_proto(&UpdateForWorker {
885+
update: Some(Update::ConnectionResult(ConnectionResult {
886+
worker_id: expected_worker_id.clone(),
887+
})),
888+
})
889+
.unwrap(),
890+
))
891+
.await
892+
.map_err(|e| make_input_err!("Could not send : {:?}", e))?;
893+
}
894+
895+
let action_digest = DigestInfo::new([3u8; 32], 10);
896+
let action_info = ActionInfo {
897+
command_digest: DigestInfo::new([1u8; 32], 10),
898+
input_root_digest: DigestInfo::new([2u8; 32], 10),
899+
timeout: Duration::from_secs(1),
900+
platform_properties: HashMap::new(),
901+
priority: 0,
902+
load_timestamp: SystemTime::UNIX_EPOCH,
903+
insert_timestamp: SystemTime::UNIX_EPOCH,
904+
unique_qualifier: ActionUniqueQualifier::Uncacheable(ActionUniqueKey {
905+
instance_name: INSTANCE_NAME.to_string(),
906+
digest_function: DigestHasherFunc::Sha256,
907+
digest: action_digest,
908+
}),
909+
};
910+
911+
{
912+
tx_stream
913+
.send(Frame::data(
914+
encode_stream_proto(&UpdateForWorker {
915+
update: Some(Update::StartAction(StartExecute {
916+
execute_request: Some((&action_info).into()),
917+
operation_id: String::new(),
918+
queued_timestamp: None,
919+
platform: Some(Platform::default()),
920+
worker_id: expected_worker_id.clone(),
921+
})),
922+
})
923+
.unwrap(),
924+
))
925+
.await
926+
.map_err(|e| make_input_err!("Could not send : {:?}", e))?;
927+
}
928+
929+
let running_action = Arc::new(MockRunningAction::new());
930+
931+
test_context
932+
.actions_manager
933+
.expect_create_and_add_action(Ok(running_action.clone()))
934+
.await;
935+
936+
// Simulate prepare_action failing with a NotFound error that does NOT contain
937+
// the CAS-specific message. This should result in an InternalError, not
938+
// FailedPrecondition.
939+
let other_not_found_error = make_err!(Code::NotFound, "Some other resource was not found");
940+
running_action
941+
.expect_prepare_action(Err(other_not_found_error.clone()))
942+
.await?;
943+
944+
// Cleanup is still called even when prepare_action fails.
945+
running_action.cleanup(Ok(())).await?;
946+
947+
// The worker should respond with InternalError since this is not a CAS blob miss.
948+
let execution_response = test_context.client.expect_execution_response(Ok(())).await;
949+
950+
assert_eq!(
951+
execution_response,
952+
ExecuteResult {
953+
instance_name: INSTANCE_NAME.to_string(),
954+
operation_id: String::new(),
955+
result: Some(execute_result::Result::InternalError(
956+
other_not_found_error.into()
957+
)),
958+
}
959+
);
960+
961+
Ok(())
962+
}
963+
755964
#[cfg(target_family = "unix")]
756965
#[nativelink_test]
757966
async fn preconditions_met_extra_envs() -> Result<(), Error> {

0 commit comments

Comments
 (0)