Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 81f3f38

Browse files
authored
introduce file number variable (#359)
* introduce file number variable, fix regex failing due to base16 change
1 parent 2ea885d commit 81f3f38

File tree

4 files changed

+64
-20
lines changed

4 files changed

+64
-20
lines changed

crates/orchestrator/src/api/routes/storage.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,14 @@ async fn request_upload(
172172
0
173173
}
174174
};
175+
let file_number = upload_count.saturating_sub(1);
175176

176177
if file_name.contains("${upload_count}") {
177178
file_name = file_name.replace("${upload_count}", &upload_count.to_string());
178179
}
180+
if file_name.contains("${file_number}") {
181+
file_name = file_name.replace("${file_number}", &file_number.to_string());
182+
}
179183

180184
let file_size = &request_upload.file_size;
181185
let file_type = &request_upload.file_type;
@@ -435,7 +439,7 @@ mod tests {
435439
name: "test-task".to_string(),
436440
storage_config: Some(StorageConfig {
437441
file_name_template: Some(
438-
"model_xyz/dataset_1/${node_group_id}-${node_group_size}-${node_group_index}-${upload_count}.parquet".to_string(),
442+
"model_xyz/dataset_1/${node_group_id}-${node_group_size}-${node_group_index}-${upload_count}-${file_number}.parquet".to_string(),
439443
),
440444
}),
441445
..Default::default()
@@ -472,11 +476,12 @@ mod tests {
472476
assert_eq!(
473477
json["file_name"],
474478
serde_json::Value::String(format!(
475-
"model_xyz/dataset_1/{}-{}-{}-{}.parquet",
479+
"model_xyz/dataset_1/{}-{}-{}-{}-{}.parquet",
476480
group.id,
477481
group.nodes.len(),
478482
0,
479-
1
483+
1,
484+
0
480485
))
481486
);
482487

@@ -500,11 +505,12 @@ mod tests {
500505
assert_eq!(
501506
json["file_name"],
502507
serde_json::Value::String(format!(
503-
"model_xyz/dataset_1/{}-{}-{}-{}.parquet",
508+
"model_xyz/dataset_1/{}-{}-{}-{}-{}.parquet",
504509
group.id,
505510
group.nodes.len(),
506511
0,
507-
1
512+
1,
513+
0
508514
))
509515
);
510516

@@ -528,11 +534,12 @@ mod tests {
528534
assert_eq!(
529535
json["file_name"],
530536
serde_json::Value::String(format!(
531-
"model_xyz/dataset_1/{}-{}-{}-{}.parquet",
537+
"model_xyz/dataset_1/{}-{}-{}-{}-{}.parquet",
532538
group.id,
533539
group.nodes.len(),
534540
0,
535-
2
541+
2,
542+
1
536543
))
537544
);
538545
}
@@ -564,7 +571,9 @@ mod tests {
564571
image: "test-image".to_string(),
565572
name: "test-task".to_string(),
566573
storage_config: Some(StorageConfig {
567-
file_name_template: Some("model_xyz/dataset_1/${upload_count}.parquet".to_string()),
574+
file_name_template: Some(
575+
"model_xyz/dataset_1/${upload_count}-${file_number}.parquet".to_string(),
576+
),
568577
}),
569578
..Default::default()
570579
};
@@ -597,9 +606,12 @@ mod tests {
597606
let body = test::read_body(resp).await;
598607
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
599608
assert_eq!(json["success"], serde_json::Value::Bool(true));
609+
610+
// As defined in the storage template: model_xyz/dataset_1/${upload_count}-${file_number}.parquet
611+
// Upload count var is 1 while file number is 0
600612
assert_eq!(
601613
json["file_name"],
602-
serde_json::Value::String("model_xyz/dataset_1/1.parquet".to_string())
614+
serde_json::Value::String("model_xyz/dataset_1/1-0.parquet".to_string())
603615
);
604616

605617
// Second request with same file name - should not increment count
@@ -619,9 +631,11 @@ mod tests {
619631
let body = test::read_body(resp).await;
620632
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
621633
assert_eq!(json["success"], serde_json::Value::Bool(true));
634+
635+
// Upload count var is 1 while file number is 0
622636
assert_eq!(
623637
json["file_name"],
624-
serde_json::Value::String("model_xyz/dataset_1/1.parquet".to_string())
638+
serde_json::Value::String("model_xyz/dataset_1/1-0.parquet".to_string())
625639
);
626640

627641
// Third request with different file name - should increment count
@@ -643,7 +657,7 @@ mod tests {
643657
assert_eq!(json["success"], serde_json::Value::Bool(true));
644658
assert_eq!(
645659
json["file_name"],
646-
serde_json::Value::String("model_xyz/dataset_1/2.parquet".to_string())
660+
serde_json::Value::String("model_xyz/dataset_1/2-1.parquet".to_string())
647661
);
648662
}
649663
}

crates/orchestrator/src/plugins/node_groups/scheduler_impl.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ impl SchedulerPlugin for NodeGroupsPlugin {
120120
"0".to_string()
121121
}
122122
};
123+
// File number starts with 0 for the first file, while filecount is at 1
124+
let file_number = upload_count.parse::<u32>().unwrap_or(0).saturating_sub(1);
123125

124126
let mut env_vars = task_clone.env_vars.unwrap_or_default();
125127
env_vars.insert("GROUP_INDEX".to_string(), idx.to_string());
@@ -129,7 +131,8 @@ impl SchedulerPlugin for NodeGroupsPlugin {
129131
.replace("${GROUP_SIZE}", &group.nodes.len().to_string())
130132
.replace("${NEXT_P2P_ADDRESS}", &next_p2p_id)
131133
.replace("${GROUP_ID}", &group.id)
132-
.replace("${UPLOAD_COUNT}", &upload_count.to_string());
134+
.replace("${UPLOAD_COUNT}", &upload_count.to_string())
135+
.replace("${FILE_NUMBER}", &file_number.to_string());
133136

134137
*value = new_value;
135138
}
@@ -142,6 +145,7 @@ impl SchedulerPlugin for NodeGroupsPlugin {
142145
.replace("${NEXT_P2P_ADDRESS}", &next_p2p_id)
143146
.replace("${GROUP_ID}", &group.id)
144147
.replace("${UPLOAD_COUNT}", &upload_count.to_string())
148+
.replace("${FILE_NUMBER}", &file_number.to_string())
145149
})
146150
.collect::<Vec<String>>()
147151
});

crates/orchestrator/src/plugins/node_groups/tests.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ async fn test_group_scheduling() {
401401
env_vars.insert("WORLD_SIZE".to_string(), "${GROUP_SIZE}".to_string());
402402
env_vars.insert("GROUP_ID".to_string(), "${GROUP_ID}".to_string());
403403
env_vars.insert("UPLOAD_COUNT".to_string(), "${UPLOAD_COUNT}".to_string());
404+
env_vars.insert("FILE_NUMBER".to_string(), "${FILE_NUMBER}".to_string());
404405

405406
let task1 = Task {
406407
id: Uuid::new_v4(),
@@ -419,6 +420,8 @@ async fn test_group_scheduling() {
419420
"${GROUP_ID}".to_string(),
420421
"--upload-count".to_string(),
421422
"${UPLOAD_COUNT}".to_string(),
423+
"--file-number".to_string(),
424+
"${FILE_NUMBER}".to_string(),
422425
]),
423426
state: TaskState::PENDING,
424427
created_at: 0,
@@ -469,6 +472,7 @@ async fn test_group_scheduling() {
469472
assert_eq!(task_node_1.args.as_ref().unwrap()[3], "model/Qwen3-14B-0.2");
470473
assert_ne!(env_vars_1.get("GROUP_ID").unwrap(), "${GROUP_ID}");
471474
assert_eq!(env_vars_1.get("UPLOAD_COUNT").unwrap(), "1");
475+
assert_eq!(env_vars_1.get("FILE_NUMBER").unwrap(), "0");
472476
assert_eq!(task_node_1.args.as_ref().unwrap()[9], "1"); // Check upload count in args
473477

474478
assert_eq!(filtered_tasks_2.len(), 1);
@@ -480,6 +484,7 @@ async fn test_group_scheduling() {
480484
assert_eq!(task_node_2.args.as_ref().unwrap()[3], "model/Qwen3-14B-1.2");
481485
assert_ne!(env_vars_2.get("GROUP_ID").unwrap(), "${GROUP_ID}");
482486
assert_eq!(env_vars_2.get("UPLOAD_COUNT").unwrap(), "0");
487+
assert_eq!(env_vars_2.get("FILE_NUMBER").unwrap(), "0");
483488
assert_eq!(task_node_2.args.as_ref().unwrap()[9], "0"); // Check upload count in args
484489

485490
assert_eq!(task_node_1.id, task_node_2.id);

crates/validator/src/validators/synthetic_data/mod.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ struct GroupInformation {
9292
/// The group size from the filename
9393
group_size: u32,
9494
/// The file number from the filename
95+
/// This is not the file upload count! The first file starts with 0
9596
file_number: u32,
9697
/// The index number from the filename
9798
idx: String,
@@ -102,7 +103,7 @@ impl FromStr for GroupInformation {
102103
/// Parse a filename into GroupInformation
103104
/// Expected format: "prefix-groupid-groupsize-filenumber-idx.parquet"
104105
fn from_str(file_name: &str) -> Result<Self, Self::Err> {
105-
let re = regex::Regex::new(r".*?-(\d+)-(\d+)-(\d+)-(\d+)(\.[^.]+)$")
106+
let re = regex::Regex::new(r".*?-([0-9a-fA-F]+)-(\d+)-(\d+)-(\d+)(\.[^.]+)$")
106107
.map_err(|e| Error::msg(format!("Failed to compile regex: {}", e)))?;
107108

108109
let caps = re
@@ -497,9 +498,16 @@ impl SyntheticDataValidator {
497498
async fn get_group(&self, work_key: &str) -> Result<Option<ToplocGroup>, Error> {
498499
let group_key = match self.build_group_for_key(work_key).await {
499500
Ok(key) => key,
500-
Err(_) => return Ok(None),
501+
Err(e) => {
502+
error!("Failed to build group key for work key {}: {}", work_key, e);
503+
return Ok(None);
504+
}
501505
};
502506
let ready_for_validation = self.is_group_ready_for_validation(&group_key).await?;
507+
debug!(
508+
"Group for key {:?} ready for validation: {:?}",
509+
work_key, ready_for_validation
510+
);
503511
if ready_for_validation {
504512
let mut redis: redis::Connection = self.redis_store.client.get_connection()?;
505513
let group_entries: HashMap<String, String> = redis.hgetall(&group_key)?;
@@ -617,9 +625,13 @@ impl SyntheticDataValidator {
617625
Some(_) | None => {
618626
// Needs triggering (covers Pending, Invalidated, and None cases)
619627
if self.with_node_grouping {
628+
debug!("Checking group for work key: {:?}", work_key);
620629
let check_group = self.get_group(&work_key).await?;
621630
if let Some(group) = check_group {
631+
debug!("Group found for work key: {:?}", work_key);
622632
group_trigger_tasks.push(group);
633+
} else {
634+
debug!("Could not build a final group for work key: {:?}", work_key);
623635
}
624636
} else {
625637
single_trigger_tasks.push((work_key.clone(), work_info));
@@ -1129,22 +1141,22 @@ mod tests {
11291141

11301142
let mock_storage = MockStorageProvider::new();
11311143
mock_storage.add_file(
1132-
&format!("Qwen/Qwen0.6/dataset/samplingn-{}-1-9-1.parquet", group_id),
1144+
&format!("Qwen/Qwen0.6/dataset/samplingn-{}-1-0-0.parquet", group_id),
11331145
"file1",
11341146
);
11351147
mock_storage.add_mapping_file(
11361148
file_sha,
1137-
&format!("Qwen/Qwen0.6/dataset/samplingn-{}-1-9-1.parquet", group_id),
1149+
&format!("Qwen/Qwen0.6/dataset/samplingn-{}-1-0-0.parquet", group_id),
11381150
);
11391151
server
11401152
.mock(
11411153
"POST",
1142-
"/validategroup/dataset/samplingn-3450756714426841564-1-9.parquet",
1154+
"/validategroup/dataset/samplingn-3450756714426841564-1-0.parquet",
11431155
)
11441156
.match_body(mockito::Matcher::Json(serde_json::json!({
11451157
"file_shas": [file_sha],
11461158
"group_id": group_id,
1147-
"file_number": 9,
1159+
"file_number": 0,
11481160
"group_size": 1
11491161
})))
11501162
.with_status(200)
@@ -1153,7 +1165,7 @@ mod tests {
11531165
server
11541166
.mock(
11551167
"GET",
1156-
"/statusgroup/dataset/samplingn-3450756714426841564-1-9.parquet",
1168+
"/statusgroup/dataset/samplingn-3450756714426841564-1-0.parquet",
11571169
)
11581170
.with_status(200)
11591171
.with_body(r#"{"status": "accept"}"#)
@@ -1201,7 +1213,7 @@ mod tests {
12011213
let group = group.unwrap();
12021214
assert_eq!(group.group_id, group_id);
12031215
assert_eq!(group.group_size, 1);
1204-
assert_eq!(group.file_number, 9);
1216+
assert_eq!(group.file_number, 0);
12051217

12061218
let result = validator.process_group_task(group).await;
12071219
assert!(result.is_ok());
@@ -1302,4 +1314,13 @@ mod tests {
13021314

13031315
Ok(())
13041316
}
1317+
1318+
#[tokio::test]
1319+
async fn test_group_information_from_prod_string() -> Result<(), Error> {
1320+
let file =
1321+
"Qwen/Qwen3-14B/PrimeIntellect/INTELLECT-2-RL-Dataset/1-d4eb155339fc64e-1-20-0.parquet";
1322+
let group_info = GroupInformation::from_str(file)?;
1323+
println!("group_info: {:?}", group_info);
1324+
Ok(())
1325+
}
13051326
}

0 commit comments

Comments
 (0)