Skip to content

Commit c1f1e1f

Browse files
feat: Collect and return all bulk dataset processing errors in Vec<ReplicateStatusCause>
1 parent 4f8051a commit c1f1e1f

File tree

4 files changed

+454
-120
lines changed

4 files changed

+454
-120
lines changed

pre-compute/src/compute/app_runner.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,35 +42,33 @@ pub fn start_with_app<A: PreComputeAppTrait>(
4242
pre_compute_app: &mut A,
4343
chain_task_id: &str,
4444
) -> ExitMode {
45-
let exit_cause = match pre_compute_app.run() {
45+
let exit_causes = match pre_compute_app.run() {
4646
Ok(_) => {
4747
info!("TEE pre-compute completed");
4848
return ExitMode::Success;
4949
}
50-
Err(exit_cause) => {
51-
error!("TEE pre-compute failed with known exit cause [{exit_cause:?}]");
52-
exit_cause
50+
Err(exit_causes) => {
51+
error!("TEE pre-compute failed with known exit cause [{exit_causes:?}]");
52+
exit_causes
5353
}
5454
};
5555

5656
let authorization = match get_challenge(chain_task_id) {
5757
Ok(auth) => auth,
5858
Err(_) => {
59-
error!("Failed to sign exitCause message [{exit_cause:?}]");
59+
error!("Failed to sign exitCause message [{exit_causes:?}]");
6060
return ExitMode::UnreportedFailure;
6161
}
6262
};
6363

64-
let exit_causes = vec![exit_cause.clone()];
65-
6664
match WorkerApiClient::from_env().send_exit_causes_for_pre_compute_stage(
6765
&authorization,
6866
chain_task_id,
6967
&exit_causes,
7068
) {
7169
Ok(_) => ExitMode::ReportedFailure,
7270
Err(_) => {
73-
error!("Failed to report exitCause [{exit_cause:?}]");
71+
error!("Failed to report exitCause [{exit_causes:?}]");
7472
ExitMode::UnreportedFailure
7573
}
7674
}
@@ -150,7 +148,7 @@ mod pre_compute_start_with_app_tests {
150148

151149
let mut mock = MockPreComputeAppTrait::new();
152150
mock.expect_run()
153-
.returning(|| Err(ReplicateStatusCause::PreComputeWorkerAddressMissing));
151+
.returning(|| Err(vec![ReplicateStatusCause::PreComputeWorkerAddressMissing]));
154152

155153
temp_env::with_vars(env_vars_to_set, || {
156154
temp_env::with_vars_unset(env_vars_to_unset, || {
@@ -172,8 +170,11 @@ mod pre_compute_start_with_app_tests {
172170
let env_vars_to_unset = vec![ENV_SIGN_TEE_CHALLENGE_PRIVATE_KEY];
173171

174172
let mut mock = MockPreComputeAppTrait::new();
175-
mock.expect_run()
176-
.returning(|| Err(ReplicateStatusCause::PreComputeTeeChallengePrivateKeyMissing));
173+
mock.expect_run().returning(|| {
174+
Err(vec![
175+
ReplicateStatusCause::PreComputeTeeChallengePrivateKeyMissing,
176+
])
177+
});
177178

178179
temp_env::with_vars(env_vars_to_set, || {
179180
temp_env::with_vars_unset(env_vars_to_unset, || {
@@ -199,8 +200,11 @@ mod pre_compute_start_with_app_tests {
199200
let mock_server_addr_string = mock_server.address().to_string();
200201

201202
let mut mock = MockPreComputeAppTrait::new();
202-
mock.expect_run()
203-
.returning(|| Err(ReplicateStatusCause::PreComputeTeeChallengePrivateKeyMissing));
203+
mock.expect_run().returning(|| {
204+
Err(vec![
205+
ReplicateStatusCause::PreComputeTeeChallengePrivateKeyMissing,
206+
])
207+
});
204208

205209
let result_code = tokio::task::spawn_blocking(move || {
206210
let env_vars = vec![
@@ -248,7 +252,7 @@ mod pre_compute_start_with_app_tests {
248252
let mut mock = MockPreComputeAppTrait::new();
249253
mock.expect_run()
250254
.times(1)
251-
.returning(|| Err(ReplicateStatusCause::PreComputeOutputFolderNotFound));
255+
.returning(|| Err(vec![ReplicateStatusCause::PreComputeOutputFolderNotFound]));
252256

253257
// Move the blocking operations into spawn_blocking
254258
let result_code = tokio::task::spawn_blocking(move || {

pre-compute/src/compute/errors.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ pub enum ReplicateStatusCause {
2727
PreComputeInvalidTeeSignature,
2828
#[error("IS_DATASET_REQUIRED environment variable is missing")]
2929
PreComputeIsDatasetRequiredMissing,
30-
#[error("Input files download failed")]
31-
PreComputeInputFileDownloadFailed,
30+
#[error("Input file download failed for input {0}")]
31+
PreComputeInputFileDownloadFailed(usize),
3232
#[error("Input files number related environment variable is missing")]
3333
PreComputeInputFilesNumberMissing,
3434
#[error("Invalid dataset checksum for dataset {0}")]

pre-compute/src/compute/pre_compute_app.rs

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use std::path::{Path, PathBuf};
99

1010
#[cfg_attr(test, automock)]
1111
pub trait PreComputeAppTrait {
12-
fn run(&mut self) -> Result<(), ReplicateStatusCause>;
12+
fn run(&mut self) -> Result<(), Vec<ReplicateStatusCause>>;
1313
fn check_output_folder(&self) -> Result<(), ReplicateStatusCause>;
14-
fn download_input_files(&self) -> Result<(), ReplicateStatusCause>;
14+
fn download_input_files(&self) -> Result<(), Vec<ReplicateStatusCause>>;
1515
fn save_plain_dataset_file(
1616
&self,
1717
plain_content: &[u8],
@@ -55,17 +55,33 @@ impl PreComputeAppTrait for PreComputeApp {
5555
/// let mut app = PreComputeApp::new("task_id".to_string());
5656
/// app.run();
5757
/// ```
58-
fn run(&mut self) -> Result<(), ReplicateStatusCause> {
59-
// TODO: Collect all errors instead of propagating immediately, and return the list of errors
60-
self.pre_compute_args = PreComputeArgs::read_args()?;
61-
self.check_output_folder()?;
58+
fn run(&mut self) -> Result<(), Vec<ReplicateStatusCause>> {
59+
let (args, mut exit_causes) = PreComputeArgs::read_args();
60+
self.pre_compute_args = args;
61+
62+
if let Err(exit_cause) = self.check_output_folder() {
63+
return Err(vec![exit_cause]);
64+
}
65+
6266
for dataset in self.pre_compute_args.datasets.iter() {
63-
let encrypted_content = dataset.download_encrypted_dataset(&self.chain_task_id)?;
64-
let plain_content = dataset.decrypt_dataset(&encrypted_content)?;
65-
self.save_plain_dataset_file(&plain_content, &dataset.filename)?;
67+
if let Err(exit_cause) = dataset
68+
.download_encrypted_dataset(&self.chain_task_id)
69+
.and_then(|encrypted_content| dataset.decrypt_dataset(&encrypted_content))
70+
.and_then(|plain_content| {
71+
self.save_plain_dataset_file(&plain_content, &dataset.filename)
72+
})
73+
{
74+
exit_causes.push(exit_cause);
75+
};
76+
}
77+
if let Err(exit_cause) = self.download_input_files() {
78+
exit_causes.extend(exit_cause);
79+
};
80+
if !exit_causes.is_empty() {
81+
Err(exit_causes)
82+
} else {
83+
Ok(())
6684
}
67-
self.download_input_files()?;
68-
Ok(())
6985
}
7086

7187
/// Checks whether the output folder specified in `pre_compute_args` exists.
@@ -105,19 +121,27 @@ impl PreComputeAppTrait for PreComputeApp {
105121
/// This function panics if:
106122
/// - `pre_compute_args` is `None`.
107123
/// - `chain_task_id` is `None`.
108-
fn download_input_files(&self) -> Result<(), ReplicateStatusCause> {
124+
fn download_input_files(&self) -> Result<(), Vec<ReplicateStatusCause>> {
125+
let mut exit_causes: Vec<ReplicateStatusCause> = vec![];
109126
let args = &self.pre_compute_args;
110127
let chain_task_id: &str = &self.chain_task_id;
111128

112-
for url in &args.input_files {
129+
for (index, url) in args.input_files.iter().enumerate() {
113130
info!("Downloading input file [chainTaskId:{chain_task_id}, url:{url}]");
114131

115132
let filename = sha256(url.to_string());
116133
if download_file(url, &args.output_dir, &filename).is_none() {
117-
return Err(ReplicateStatusCause::PreComputeInputFileDownloadFailed);
134+
exit_causes.push(ReplicateStatusCause::PreComputeInputFileDownloadFailed(
135+
index,
136+
));
118137
}
119138
}
120-
Ok(())
139+
140+
if !exit_causes.is_empty() {
141+
Err(exit_causes)
142+
} else {
143+
Ok(())
144+
}
121145
}
122146

123147
/// Saves the decrypted (plain) dataset to disk in the configured output directory.
@@ -293,12 +317,12 @@ mod tests {
293317
let result = app.download_input_files();
294318
assert_eq!(
295319
result.unwrap_err(),
296-
ReplicateStatusCause::PreComputeInputFileDownloadFailed
320+
vec![ReplicateStatusCause::PreComputeInputFileDownloadFailed(0)]
297321
);
298322
}
299323

300324
#[test]
301-
fn test_partial_failure_stops_on_first_error() {
325+
fn test_partial_failure_dont_stops_on_first_error() {
302326
let (_container, json_url, xml_url) = start_container();
303327

304328
let temp_dir = TempDir::new().unwrap();
@@ -307,24 +331,24 @@ mod tests {
307331
vec![
308332
&json_url, // This should succeed
309333
"https://invalid-url-that-should-fail.com/file.txt", // This should fail
310-
&xml_url, // This shouldn't be reached
334+
&xml_url, // This should succeed
311335
],
312336
temp_dir.path().to_str().unwrap(),
313337
);
314338

315339
let result = app.download_input_files();
316340
assert_eq!(
317341
result.unwrap_err(),
318-
ReplicateStatusCause::PreComputeInputFileDownloadFailed
342+
vec![ReplicateStatusCause::PreComputeInputFileDownloadFailed(1)]
319343
);
320344

321345
// First file should be downloaded with SHA256 filename
322346
let json_hash = sha256(json_url);
323347
assert!(temp_dir.path().join(json_hash).exists());
324348

325-
// Third file should NOT be downloaded (stopped on second failure)
349+
// Third file should be downloaded (not stopped on second failure)
326350
let xml_hash = sha256(xml_url);
327-
assert!(!temp_dir.path().join(xml_hash).exists());
351+
assert!(temp_dir.path().join(xml_hash).exists());
328352
}
329353
// endregion
330354

0 commit comments

Comments
 (0)