Skip to content

Commit 766428f

Browse files
author
Inkedstinct
committed
feat(jobs) : Submit job if it can be treated before the next day/night slot, fix Rsync mecanism
1 parent 8fa12cb commit 766428f

File tree

2 files changed

+76
-21
lines changed

2 files changed

+76
-21
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
/test_results/**/*
1111
/jobs.yaml
1212
/menage.sh
13+
/resources
1314

1415

1516
# Added by cargo

src/jobs.rs

Lines changed: 75 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ use serde_yaml::{self};
1010
use std::collections::HashMap;
1111
use std::fmt::{self, Display};
1212
use std::str::{self};
13-
use std::time::Duration;
1413
use std::{env, fs};
1514
use std::path::{Path, PathBuf};
1615
use subprocess::{Popen, PopenConfig, Redirection};
1716
use thiserror::Error;
1817
use std::process::Command;
18+
use chrono::{Local, Timelike, Duration};
1919

2020
const MAX_CONCURRENT_JOBS: usize = 30;
21+
const G5K_DAY_BOTTOM_BOUNDARY: i64 = 9;
22+
const G5K_DAY_UP_BOUNDARY: i64 = 19;
2123

2224
#[derive(Error, Debug)]
2325
pub enum JobError {
@@ -168,6 +170,7 @@ impl Job {
168170
}
169171

170172
pub async fn submit_job(&mut self) -> JobResult {
173+
info!("Submitting job on {}", &self.node.uid);
171174
let session = ssh::ssh_connect(&self.site).await?;
172175
ssh::create_remote_directory(&session, &self.script_file).await?;
173176
ssh::sftp_upload(&session, &self.script_file, &self.script_file).await?;
@@ -184,15 +187,15 @@ impl Job {
184187
} else {
185188
let client = reqwest::Client::builder().build()?;
186189
let endpoint = format!("{}/sites/{}/jobs", super::BASE_URL, self.site);
187-
let data = serde_json::json!({"properties": format!("host={}",self.node.uid), "resources": "walltime=5", "types": ["deploy"], "command": "sleep 14500"});
190+
let data = serde_json::json!({"properties": format!("host={}",self.node.uid), "resources": format!("walltime={}", scripts::WALLTIME), "types": ["deploy"], "command": "sleep 14500"});
188191

189192
if let Ok(response) = inventories::post_api_call(&client, &endpoint, &data).await {
190193
debug!("Job has been posted on deploy mode");
191194
self.state = OARState::WaitingToBeDeployed;
192195
let job_id = response.get("uid").unwrap();
193196
self.oar_job_id = job_id.as_u64();
194197
} else {
195-
debug!("Job has failed to be posted on deploy mode");
198+
error!("Job has failed to be posted on deploy mode");
196199
self.state = OARState::Failed;
197200
}
198201
}
@@ -240,7 +243,9 @@ impl Job {
240243
let str_state = response.get("state").unwrap().as_str();
241244
if str_state == Some("waiting") && self.state == OARState::WaitingToBeDeployed {
242245
state = OARState::WaitingToBeDeployed;
243-
} else {
246+
} else if str_state == Some("launching") || str_state == Some("to_launch") {
247+
state = self.state.clone();
248+
} else {
244249
state = OARState::try_from(str_state.unwrap()).unwrap();
245250
}
246251
}
@@ -271,8 +276,10 @@ impl Job {
271276

272277
pub async fn job_running(&mut self) -> JobResult {
273278
if self.os_flavor == super::DEFAULT_OS_FLAVOR {
279+
info!("Starting script on {}", &self.node.uid);
274280
return Ok(())
275281
}
282+
info!("Deploying new environement on {}", &self.node.uid);
276283
// CURL KADEPLOY
277284
let client = reqwest::Client::builder().build()?;
278285
let endpoint = format!("{}/sites/{}/deployments", super::BASE_URL, self.site);
@@ -294,7 +301,7 @@ impl Job {
294301
self.deployment_id = Some(deployment_id.as_str().unwrap().to_owned());
295302
}
296303
Err(e) => {
297-
debug!("Job os_flavor has failed to be deployed : {:?}", e);
304+
error!("Job os_flavor has failed to be deployed : {:?}", e);
298305
self.state = OARState::Failed;
299306
}
300307
}
@@ -303,6 +310,7 @@ impl Job {
303310
}
304311

305312
pub async fn job_os_deployed(&mut self) -> JobResult {
313+
info!("Running script on {}", &self.node.uid);
306314

307315
let session = ssh::ssh_connect(&self.site).await?;
308316
let host = format!("{}.{}.grid5000.fr", self.node.uid, self.site);
@@ -315,18 +323,18 @@ impl Job {
315323
}
316324

317325
pub async fn job_terminated(&mut self) -> JobResult {
318-
let script_dir = Path::new(&self.script_file)
319-
.components() // Break the path into components
326+
info!("Downloading and processing results from {}", &self.node.uid);
327+
let root_results_dir = Path::new(&self.results_dir)
328+
.components()
320329
.filter_map(|comp| match comp {
321330
std::path::Component::Normal(name) => name.to_str(),
322331
_ => None,
323-
}) // Keep only "normal" components (skip root, prefix, etc.)
332+
})
324333
.next();
325334
if let Err(rsync_result) = rsync_results(
326-
script_dir.unwrap(),
327335
&self.site,
328-
self.node.cluster.as_deref().unwrap(),
329-
&self.node.uid,
336+
&self.results_dir,
337+
root_results_dir.unwrap(),
330338
) {
331339
self.state = OARState::UnknownState;
332340
} else {
@@ -426,6 +434,20 @@ impl Jobs {
426434
self.check_unfinished_jobs(&client, super::BASE_URL, jobs_file)
427435
.await?;
428436
}
437+
while !within_time_window(scripts::WALLTIME) {
438+
info!(
439+
"Too close of day|night boundaries for {} WALLTIME",
440+
scripts::WALLTIME
441+
);
442+
tokio::time::sleep(std::time::Duration::from_secs(
443+
super::SLEEP_CHECK_TIME_IN_SECONDES,
444+
))
445+
.await;
446+
447+
let client = reqwest::Client::builder().build()?;
448+
self.check_unfinished_jobs(&client, super::BASE_URL, jobs_file)
449+
.await?;
450+
}
429451
// Job creation and submission
430452
let core_values =
431453
configs::generate_core_values(5, node.architecture.nb_cores);
@@ -441,8 +463,8 @@ impl Jobs {
441463
job.submit_job().await?;
442464
self.jobs.push(job);
443465
info!("Job submitted for {} node", node_uid);
444-
info!("Wait 300 ms before another submission");
445-
tokio::time::sleep(Duration::from_millis(300)).await;
466+
debug!("Wait 300 ms before another submission");
467+
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
446468

447469
let client = reqwest::Client::builder().build()?;
448470
self.check_unfinished_jobs(&client, super::BASE_URL, jobs_file)
@@ -451,7 +473,7 @@ impl Jobs {
451473
// Throttling based on the maximum allowed concurrent jobs
452474
} else {
453475
info!("Job already listed on {} node, skipping", node_uid);
454-
tokio::time::sleep(Duration::from_millis(300)).await;
476+
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
455477
}
456478
}
457479
}
@@ -476,12 +498,12 @@ impl Jobs {
476498
for job in self.jobs.iter_mut().filter(|j| !j.finished()) {
477499
job.update_job_state(client, base_url).await?;
478500
if !job.finished() {
479-
debug!(
501+
info!(
480502
"Job {:?} is still in '{}' state.",
481503
job.oar_job_id, job.state
482504
);
483505
}
484-
tokio::time::sleep(Duration::from_millis(300)).await;
506+
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
485507
}
486508

487509
self.dump_to_file(file_to_dump_to)?;
@@ -515,8 +537,8 @@ impl Jobs {
515537
}
516538
}
517539

518-
pub fn rsync_results(site: &str, cluster: &str, node: &str, results_dir: &str) -> JobResult {
519-
let remote_directory = format!("{}:/home/nleblond/results.d", site);
540+
pub fn rsync_results(site: &str, results_dir: &str, root_results_dir: &str) -> JobResult {
541+
let remote_directory = format!("{}:/home/nleblond/{}", site, root_results_dir);
520542
let mut p = Popen::create(
521543
&["rsync", "-avzP", &remote_directory, "."],
522544
PopenConfig {
@@ -527,7 +549,7 @@ pub fn rsync_results(site: &str, cluster: &str, node: &str, results_dir: &str) -
527549

528550
let (out, err) = p.communicate(None)?;
529551

530-
if let Ok(Some(exit_status)) = p.wait_timeout(Duration::from_secs(120)) {
552+
if let Ok(Some(exit_status)) = p.wait_timeout(std::time::Duration::from_secs(120)) {
531553
if exit_status.success() {
532554
debug!("Rsync with site {} done.\n{:?}", site, out);
533555
} else {
@@ -537,7 +559,7 @@ pub fn rsync_results(site: &str, cluster: &str, node: &str, results_dir: &str) -
537559
} else {
538560
p.terminate()?;
539561
}
540-
let checksum_file = format!("{}/{}/{}/{}.tar.xz.md5", results_dir, site, cluster, node);
562+
let checksum_file = format!("{}.tar.xz.md5", results_dir);
541563
let mut p = Popen::create(
542564
&["md5sum", "-c", &checksum_file],
543565
PopenConfig {
@@ -548,7 +570,7 @@ pub fn rsync_results(site: &str, cluster: &str, node: &str, results_dir: &str) -
548570

549571
let (out, err) = p.communicate(None)?;
550572

551-
if let Ok(Some(exit_status)) = p.wait_timeout(Duration::from_secs(120)) {
573+
if let Ok(Some(exit_status)) = p.wait_timeout(std::time::Duration::from_secs(120)) {
552574
if exit_status.success() {
553575
debug!("Checksum success.\n{:?}", out);
554576
} else {
@@ -610,3 +632,35 @@ fn extract_tar_xz(dir_path: &str) -> Result <(), String> {
610632

611633
Ok(())
612634
}
635+
636+
fn parse_walltime(walltime: &str) -> Option<Duration> {
637+
let parts: Vec<&str> = walltime.split(':').collect();
638+
match parts.len() {
639+
1 => parts[0].parse::<i64>().ok().map(|h| Duration::hours(h)),
640+
2 => {
641+
let hours = parts[0].parse::<i64>().ok()?;
642+
let minutes = parts[1].parse::<i64>().ok()?;
643+
Some(Duration::hours(hours) + Duration::minutes(minutes))
644+
}
645+
3 => {
646+
let hours = parts[0].parse::<i64>().ok()?;
647+
let minutes = parts[1].parse::<i64>().ok()?;
648+
let seconds = parts[2].parse::<i64>().ok()?;
649+
Some(Duration::hours(hours) + Duration::minutes(minutes) + Duration::seconds(seconds))
650+
}
651+
_ => None,
652+
}
653+
}
654+
655+
fn within_time_window(walltime: &str) -> bool {
656+
let now = Local::now();
657+
let current_hour = now.hour() as i64;
658+
let walltime_duration = parse_walltime(walltime).unwrap_or_else(|| Duration::hours(0));
659+
let adjusted_hour = current_hour + walltime_duration.num_hours();
660+
661+
if (G5K_DAY_BOTTOM_BOUNDARY..G5K_DAY_UP_BOUNDARY).contains(&current_hour) {
662+
adjusted_hour < G5K_DAY_UP_BOUNDARY
663+
} else {
664+
adjusted_hour < G5K_DAY_BOTTOM_BOUNDARY || adjusted_hour >= 24
665+
}
666+
}

0 commit comments

Comments
 (0)