Skip to content

Commit 59dc7ac

Browse files
authored
Resolve run teardown on individual slice retry exhaustion (#432)
1 parent 0d2ff1a commit 59dc7ac

File tree

6 files changed

+106
-52
lines changed

6 files changed

+106
-52
lines changed

Cargo.lock

Lines changed: 16 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ futures-util = "0.3"
5050
shellexpand = "3.1.2"
5151
openssl = { version = "0.10", features = ["vendored"] }
5252
flate2 = "1"
53-
dsperse = { git = "https://github.com/inference-labs-inc/dsperse", rev = "176ed08ee9552ac23f4a6470d1e7b53b67305b08" }
53+
dsperse = { git = "https://github.com/inference-labs-inc/dsperse", rev = "3d1db73f" }
5454
ndarray = { version = "0.17", features = ["serde"] }
5555
zip = { version = "2", default-features = false, features = ["deflate"] }
5656
bittensor-drand = { git = "https://github.com/inference-labs-inc/bittensor-drand.git", rev = "e55ed98", default-features = false }

crates/sn2-validator/src/incremental_runner.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,6 @@ impl IncrementalRunManager {
189189
self.evicted.contains(run_uid)
190190
}
191191

192-
pub fn get_run_source(&self, run_uid: &str) -> Option<RunSource> {
193-
self.runs.get(run_uid).map(|r| r.run_source)
194-
}
195-
196192
pub fn all_circuit_work(&self, run_uid: &str) -> anyhow::Result<Vec<SliceWork>> {
197193
let run = self
198194
.runs
@@ -323,6 +319,34 @@ impl IncrementalRunManager {
323319
false
324320
}
325321

322+
pub fn mark_slice_failed(&mut self, run_uid: &str, slice_id: &str) -> usize {
323+
self.tile_counters
324+
.remove(&(run_uid.to_string(), slice_id.to_string()));
325+
if let Some(run) = self.runs.get_mut(run_uid) {
326+
run.last_activity = Instant::now();
327+
if let Some(ref mut combined) = run.combined {
328+
combined.mark_slice_failed(slice_id);
329+
return combined.failed_count();
330+
}
331+
}
332+
0
333+
}
334+
335+
pub fn is_slice_failed(&self, run_uid: &str, slice_id: &str) -> bool {
336+
self.runs
337+
.get(run_uid)
338+
.and_then(|r| r.combined.as_ref())
339+
.is_some_and(|c| c.is_slice_failed(slice_id))
340+
}
341+
342+
pub fn failed_slice_count(&self, run_uid: &str) -> usize {
343+
self.runs
344+
.get(run_uid)
345+
.and_then(|r| r.combined.as_ref())
346+
.map(|c| c.failed_count())
347+
.unwrap_or(0)
348+
}
349+
326350
pub fn is_run_complete(&self, run_uid: &str) -> bool {
327351
self.runs
328352
.get(run_uid)

crates/sn2-validator/src/validator_loop/dslice.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,8 @@ impl ValidatorLoop {
377377
self.dslice_input_scales
378378
.retain(|(uid, _), _| uid != run_uid);
379379

380-
info!(run_uid = %run_uid, "combined run complete");
380+
let failed_count = self.run_manager.failed_slice_count(run_uid);
381+
info!(run_uid = %run_uid, failed_count, "combined run complete");
381382

382383
let final_output = self.run_manager.final_output_json(run_uid);
383384
let mut active_run = self.run_manager.remove_run(run_uid);
@@ -389,7 +390,7 @@ impl ValidatorLoop {
389390

390391
let relay_output = final_output.clone();
391392
self.spawn_artifact_upload(run_uid, &mut active_run, final_output);
392-
self.notify_run_completed(run_uid, &active_run, relay_output)
393+
self.notify_run_completed(run_uid, &active_run, relay_output, failed_count)
393394
.await;
394395
}
395396

crates/sn2-validator/src/validator_loop/relay.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,26 +125,41 @@ impl ValidatorLoop {
125125
run_uid: &str,
126126
active_run: &Option<crate::incremental_runner::ActiveRun>,
127127
final_output: Option<serde_json::Value>,
128+
failed_count: usize,
128129
) {
129130
let notify_circuit_id = active_run
130131
.as_ref()
131132
.map(|r| r.circuit_id.as_str())
132133
.unwrap_or_default()
133134
.to_string();
134-
let mut result = serde_json::json!({"run_uid": run_uid, "status": "complete"});
135+
let status = if failed_count > 0 {
136+
"partial"
137+
} else {
138+
"complete"
139+
};
140+
let mut result = serde_json::json!({"run_uid": run_uid, "status": status});
135141
if let Some(output) = final_output {
136142
result["output"] = output;
137143
}
144+
if failed_count > 0 {
145+
result["failed_slices"] = serde_json::json!(failed_count);
146+
}
138147
self.relay_set_request_result(run_uid, result).await;
139-
self.relay_send_notification(
140-
"subnet-2.batch_completed",
141-
serde_json::json!({
142-
"run_uid": run_uid,
143-
"circuit_id": notify_circuit_id,
144-
"status": "completed",
145-
}),
146-
)
147-
.await;
148+
let notification_status = if failed_count > 0 {
149+
"partial"
150+
} else {
151+
"completed"
152+
};
153+
let mut notification = serde_json::json!({
154+
"run_uid": run_uid,
155+
"circuit_id": notify_circuit_id,
156+
"status": notification_status,
157+
});
158+
if failed_count > 0 {
159+
notification["failed_slices"] = serde_json::json!(failed_count);
160+
}
161+
self.relay_send_notification("subnet-2.batch_completed", notification)
162+
.await;
148163
}
149164

150165
pub(super) fn report_dsperse_completion(&self, run: &crate::incremental_runner::ActiveRun) {

crates/sn2-validator/src/validator_loop/results.rs

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use sn2_types::*;
2-
use tracing::{info, warn};
2+
use tracing::{debug, info, warn};
33

44
use super::{event_slice_num, RetryPayload, ValidatorLoop};
55
use crate::incremental_runner::SliceArtifact;
@@ -126,27 +126,32 @@ impl ValidatorLoop {
126126
reason: &str,
127127
) {
128128
if let Some(run_uid) = run_uid {
129+
if !self.run_manager.has_run(run_uid) {
130+
return;
131+
}
132+
129133
if let Some(snum) = slice_num {
130-
let ruid = run_uid.clone();
131-
let event_snum = event_slice_num(snum, is_tile, tile_idx);
132-
let err = reason.to_string();
133-
self.emit_event(move |ev| async move {
134-
ev.emit_slice_failed(&ruid, &event_snum, &err).await;
135-
});
134+
if !self.run_manager.is_slice_failed(run_uid, snum) {
135+
let ruid = run_uid.clone();
136+
let event_snum = event_slice_num(snum, is_tile, tile_idx);
137+
let err = reason.to_string();
138+
self.emit_event(move |ev| async move {
139+
ev.emit_slice_failed(&ruid, &event_snum, &err).await;
140+
});
141+
142+
let failed_count = self.run_manager.mark_slice_failed(run_uid, snum);
143+
warn!(
144+
run_uid = %run_uid,
145+
slice = %snum,
146+
failed_count,
147+
"slice max retries exceeded, continuing run"
148+
);
149+
}
136150
}
137-
warn!(run_uid = %run_uid, "dslice max retries exceeded, removing run");
138-
if self.run_manager.get_run_source(run_uid) == Some(RunSource::Api) {
139-
self.relay_set_request_result(
140-
run_uid,
141-
serde_json::json!({
142-
"run_uid": run_uid,
143-
"status": "failed",
144-
"error": "max retries exceeded",
145-
}),
146-
)
147-
.await;
151+
152+
if self.run_manager.is_run_complete(run_uid) {
153+
self.finalize_combined_run(run_uid).await;
148154
}
149-
self.teardown_run(run_uid).await;
150155
}
151156
}
152157

@@ -180,6 +185,15 @@ impl ValidatorLoop {
180185
return;
181186
}
182187

188+
if self.run_manager.is_slice_failed(&run_uid, &slice_num) {
189+
debug!(
190+
run_uid = %run_uid,
191+
slice = %slice_num,
192+
"ignoring late success for failed slice"
193+
);
194+
return;
195+
}
196+
183197
self.run_manager.push_artifact(
184198
&run_uid,
185199
SliceArtifact {

0 commit comments

Comments
 (0)