Skip to content

Commit 02a6e8b

Browse files
authored
align toploc api with recent adjustments (#377)
1 parent d57785b commit 02a6e8b

File tree

2 files changed

+55
-30
lines changed

2 files changed

+55
-30
lines changed

crates/validator/src/validators/synthetic_data/mod.rs

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,6 @@ impl SyntheticDataValidator {
732732

733733
Ok(())
734734
}
735-
736735
pub async fn process_group_status_check(&self, group: ToplocGroup) -> Result<(), Error> {
737736
let toploc_config = self
738737
.find_matching_toploc_config(&group.prefix)
@@ -745,39 +744,56 @@ impl SyntheticDataValidator {
745744
.get_group_file_validation_status(&group.group_file_name)
746745
.await?;
747746

748-
info!("Group {} toploc status: {:?}", group.group_id, status);
749-
750747
if let Some(metrics) = &self.metrics {
751748
metrics.record_group_validation_status(&group.group_id, &status.status.to_string());
752749
}
753750

751+
// Calculate total claimed units
752+
let mut total_claimed_units: U256 = U256::from(0);
753+
for work_key in &group.sorted_work_keys {
754+
if let Some(work_info) = self.get_work_info_from_redis(work_key).await? {
755+
total_claimed_units += work_info.work_units;
756+
}
757+
}
758+
759+
// Log basic info for all cases
760+
info!(
761+
"Group {} ({}) - Status: {:?}, Claimed: {}, Toploc: {} flops (input: {} flops)",
762+
group.group_id,
763+
group.group_file_name,
764+
status.status,
765+
total_claimed_units,
766+
status.output_flops,
767+
status.input_flops
768+
);
769+
770+
// Handle rejection case
754771
if status.status == ValidationResult::Reject {
755772
let indices = status.failing_indices;
756-
let mut work_keys_to_invalidate: Vec<String> = Vec::new();
757-
for index in indices {
758-
let work_key = group.sorted_work_keys[index as usize].clone();
759-
work_keys_to_invalidate.push(work_key);
760-
}
773+
let work_keys_to_invalidate: Vec<String> = indices
774+
.iter()
775+
.map(|&idx| group.sorted_work_keys[idx as usize].clone())
776+
.collect();
777+
778+
warn!(
779+
"Group {} rejected - Invalidating keys: {:?}",
780+
group.group_id, work_keys_to_invalidate
781+
);
761782

762783
for work_key in work_keys_to_invalidate {
763784
self.invalidate_work(&work_key).await?;
764785
}
765786
}
766787

767-
let mut total_claimed_units: U256 = U256::from(0);
768-
for work_key in &group.sorted_work_keys {
769-
let work_info = self.get_work_info_from_redis(work_key).await?;
770-
if let Some(work_info) = work_info {
771-
total_claimed_units += work_info.work_units;
772-
}
788+
// Handle mismatched units case
789+
if total_claimed_units != U256::from(status.output_flops as u64) {
790+
warn!(
791+
"Group {} units mismatch - Claimed: {}, Toploc: {}, Keys: {:?}",
792+
group.group_id, total_claimed_units, status.output_flops, group.sorted_work_keys
793+
);
773794
}
774-
let toploc_units = status.flops;
775-
info!(
776-
"Group {} total claimed units: {:?} - toploc units: {:?}",
777-
group.group_id, total_claimed_units, toploc_units
778-
);
779-
// TODO: We need to invalidate work that has claimed too many units
780795

796+
// Update validation status for all work keys
781797
for work_key in &group.sorted_work_keys {
782798
self.update_work_validation_status(work_key, &status.status)
783799
.await?;
@@ -833,7 +849,7 @@ impl SyntheticDataValidator {
833849
{
834850
error!("Failed to process group task: {}", e);
835851
}
836-
info!(
852+
debug!(
837853
"waiting before next task: {}",
838854
validator_clone_group_trigger.grace_interval
839855
);

crates/validator/src/validators/synthetic_data/toploc.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ pub struct Toploc {
2323
#[derive(Debug, PartialEq, Serialize, Deserialize)]
2424
pub struct GroupValidationResult {
2525
pub status: ValidationResult,
26-
pub flops: f64,
26+
pub input_flops: f64,
27+
pub output_flops: f64,
2728
// This tells us which node(s) in a group actually failed the toploc validation
2829
pub failing_indices: Vec<i64>,
2930
}
@@ -86,7 +87,7 @@ impl Toploc {
8687
"{}/validate/{}",
8788
self.config.server_url, processed_file_name
8889
);
89-
info!(
90+
debug!(
9091
"Triggering remote toploc validation for {} {}",
9192
file_name, validate_url
9293
);
@@ -266,10 +267,15 @@ impl Toploc {
266267
_ => ValidationResult::Unknown,
267268
};
268269

269-
let flops = status_json
270-
.get("flops")
270+
let input_flops = status_json
271+
.get("input_flops")
272+
.and_then(|f| f.as_f64())
273+
.unwrap_or(0.0);
274+
let output_flops = status_json
275+
.get("output_flops")
271276
.and_then(|f| f.as_f64())
272277
.unwrap_or(0.0);
278+
273279
let failing_indices = status_json
274280
.get("failing_indices")
275281
.and_then(|f| f.as_array())
@@ -280,7 +286,8 @@ impl Toploc {
280286

281287
Ok(GroupValidationResult {
282288
status: validation_result,
283-
flops,
289+
input_flops,
290+
output_flops,
284291
failing_indices,
285292
})
286293
}
@@ -529,7 +536,7 @@ mod tests {
529536
let _status_mock = server
530537
.mock("GET", "/statusgroup/test-group.parquet")
531538
.with_status(200)
532-
.with_body(r#"{"status": "accept", "flops": 12345.67, "failing_indices": []}"#)
539+
.with_body(r#"{"status": "accept", "input_flops": 12345.67, "output_flops": 12345.67, "failing_indices": []}"#)
533540
.create();
534541

535542
let config = ToplocConfig {
@@ -546,7 +553,8 @@ mod tests {
546553
assert!(result.is_ok());
547554
let group_result = result.unwrap();
548555
assert_eq!(group_result.status, ValidationResult::Accept);
549-
assert_eq!(group_result.flops, 12345.67);
556+
assert_eq!(group_result.input_flops, 12345.67);
557+
assert_eq!(group_result.output_flops, 12345.67);
550558
assert!(group_result.failing_indices.is_empty());
551559
Ok(())
552560
}
@@ -558,7 +566,7 @@ mod tests {
558566
let _status_mock = server
559567
.mock("GET", "/statusgroup/test-group.parquet")
560568
.with_status(200)
561-
.with_body(r#"{"status": "reject", "flops": 0.0, "failing_indices": [1, 3, 5]}"#)
569+
.with_body(r#"{"status": "reject", "input_flops": 0.0, "output_flops": 0.0, "failing_indices": [1, 3, 5]}"#)
562570
.create();
563571

564572
let config = ToplocConfig {
@@ -575,7 +583,8 @@ mod tests {
575583
assert!(result.is_ok());
576584
let group_result = result.unwrap();
577585
assert_eq!(group_result.status, ValidationResult::Reject);
578-
assert_eq!(group_result.flops, 0.0);
586+
assert_eq!(group_result.input_flops, 0.0);
587+
assert_eq!(group_result.output_flops, 0.0);
579588
assert_eq!(group_result.failing_indices, vec![1, 3, 5]);
580589
Ok(())
581590
}

0 commit comments

Comments
 (0)