From 0e82b01d598f5d80315167d5a28652065affc93a Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Fri, 1 Aug 2025 16:02:05 -0700 Subject: [PATCH 1/9] kinda sorta working local backend via Generic --- wdl-engine/src/backend.rs | 2 + wdl-engine/src/backend/generic.rs | 495 ++++++++++++++++++++++++++++++ wdl-engine/src/config.rs | 70 +++++ wdl-engine/src/eval/v1/task.rs | 2 + wdl-engine/tests/tasks.rs | 15 + 5 files changed, 584 insertions(+) create mode 100644 wdl-engine/src/backend/generic.rs diff --git a/wdl-engine/src/backend.rs b/wdl-engine/src/backend.rs index 5b4dadb5c..5e633989c 100644 --- a/wdl-engine/src/backend.rs +++ b/wdl-engine/src/backend.rs @@ -28,10 +28,12 @@ use crate::http::HttpDownloader; use crate::path::EvaluationPath; mod docker; +mod generic; mod local; mod tes; pub use docker::*; +pub use generic::*; pub use local::*; pub use tes::*; diff --git a/wdl-engine/src/backend/generic.rs b/wdl-engine/src/backend/generic.rs new file mode 100644 index 000000000..fcdc9cb9a --- /dev/null +++ b/wdl-engine/src/backend/generic.rs @@ -0,0 +1,495 @@ +use std::collections::HashMap; +use std::ffi::OsStr; +use std::fs::File; +use std::fs::Permissions; +use std::fs::{self}; +use std::os::unix::fs::PermissionsExt; +use std::path::Path; +use std::process::ExitCode; +use std::process::Stdio; +use std::sync::Arc; + +use anyhow::Context as _; +use anyhow::Result; +use anyhow::bail; +use futures::FutureExt as _; +use futures::future::BoxFuture; +use nonempty::NonEmpty; +use tokio::process::Command; +use tokio::sync::oneshot; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::info; +use tracing::warn; + +use super::TaskExecutionBackend; +use super::TaskExecutionConstraints; +use super::TaskExecutionEvents; +use super::TaskManager; +use super::TaskManagerRequest; +use super::TaskSpawnRequest; +use crate::COMMAND_FILE_NAME; +use crate::Input; +use crate::ONE_GIBIBYTE; +use crate::PrimitiveValue; +use crate::STDERR_FILE_NAME; +use crate::STDOUT_FILE_NAME; +use crate::SYSTEM; +use crate::TaskExecutionResult; +use crate::Value; +use crate::WORK_DIR_NAME; +use crate::config::Config; +use crate::config::DEFAULT_TASK_SHELL; +use crate::config::GenericBackendConfig; +use crate::config::TaskResourceLimitBehavior; +use crate::convert_unit_string; +use crate::http::Downloader as _; +use crate::http::HttpDownloader; +use crate::http::Location; +use crate::path::EvaluationPath; + +/// Represents a generic task request. +#[derive(Debug)] +struct GenericTaskRequest { + /// The engine configuration. + config: Arc, + /// The inner task spawn request. + inner: TaskSpawnRequest, + /// The requested CPU reservation for the task. + /// + /// Note that CPU isn't actually reserved for the task process. + cpu: f64, + /// The requested memory reservation for the task. + /// + /// Note that memory isn't actually reserved for the task process. + memory: u64, + /// The cancellation token for the request. + token: CancellationToken, +} + +impl TaskManagerRequest for GenericTaskRequest { + fn cpu(&self) -> f64 { + self.cpu + } + + fn memory(&self) -> u64 { + self.memory + } + + async fn run(self, spawned: oneshot::Sender<()>) -> Result { + // Create the working directory + let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME); + fs::create_dir_all(&work_dir).with_context(|| { + format!( + "failed to create directory `{path}`", + path = work_dir.display() + ) + })?; + + // Write the evaluated command to disk + let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME); + fs::write(&command_path, self.inner.command()).with_context(|| { + format!( + "failed to write command contents to `{path}`", + path = command_path.display() + ) + })?; + fs::set_permissions(&command_path, Permissions::from_mode(0o777))?; + + // Create a file for the stdout + let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME); + let stdout = File::create(&stdout_path).with_context(|| { + format!( + "failed to create stdout file `{path}`", + path = stdout_path.display() + ) + })?; + + // Create a file for the stderr + let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME); + let stderr = File::create(&stderr_path).with_context(|| { + format!( + "failed to create stderr file `{path}`", + path = stderr_path.display() + ) + })?; + + let mut command = Command::new( + self.config + .task + .shell + .as_deref() + .unwrap_or(DEFAULT_TASK_SHELL), + ); + command + .current_dir(&work_dir) + .arg("-C") + .arg(&command_path) + .stdin(Stdio::null()) + .stdout(stdout) + .stderr(stderr) + .envs( + self.inner + .env() + .iter() + .map(|(k, v)| (OsStr::new(k), OsStr::new(v))), + ) + .kill_on_drop(true); + + let crankshaft_generic_backend_driver = + crankshaft::config::backend::generic::driver::Config::builder() + .locale(crankshaft::config::backend::generic::driver::Locale::Local) + .shell(crankshaft::config::backend::generic::driver::Shell::Bash) + .build(); + let crankshaft_generic_backend_config = + crankshaft::config::backend::generic::Config::builder() + .driver(crankshaft_generic_backend_driver) + .submit(format!( + "(~{{command}} > {} 2> {} & echo $!)", + stdout_path.display(), + stderr_path.display(), + )) + .job_id_regex(r#"(\d+)"#) + .monitor("kill -0 ~{job_id}") + .kill("kill ~{job_id}") + .build(); + + const BACKEND_NAME: &'static str = "crankshaft_generic"; + let backend = crankshaft::Engine::default() + .with( + crankshaft::config::backend::Config::builder() + .name(BACKEND_NAME) + .max_tasks(10) + .kind(crankshaft::config::backend::Kind::Generic( + crankshaft_generic_backend_config, + )) + .build(), + ) + .await?; + + let generic_task = crankshaft::engine::Task::builder() + .executions(NonEmpty::new( + crankshaft::engine::task::Execution::builder() + .image("not_an_image") + .program(command_path.to_str().unwrap()) + .work_dir(work_dir.to_str().unwrap()) + // .stdout(stdout_path.to_str().unwrap()) + // .stderr(stderr_path.to_str().unwrap()) + .build(), + )) + .build(); + let handle = backend.spawn(BACKEND_NAME, generic_task, self.token.clone())?; + spawned.send(()).ok(); + + let res = handle.wait().await?; + + // Set the PATH variable for the child on Windows to get consistent PATH + // searching. See: https://github.com/rust-lang/rust/issues/122660 + #[cfg(windows)] + if let Ok(path) = std::env::var("PATH") { + command.env("PATH", path); + } + + return Ok(TaskExecutionResult { + inputs: self.inner.info.inputs, + exit_code: if res.last().code() == Some(1) { 0 } else { 127 }, + work_dir: EvaluationPath::Local(work_dir), + stdout: PrimitiveValue::new_file( + stdout_path + .into_os_string() + .into_string() + .expect("path should be UTF-8"), + ) + .into(), + stderr: PrimitiveValue::new_file( + stderr_path + .into_os_string() + .into_string() + .expect("path should be UTF-8"), + ) + .into(), + }); + + let mut child = command.spawn().context("failed to spawn `bash`")?; + + // Notify that the process has spawned + spawned.send(()).ok(); + + let id = child.id().expect("should have id"); + info!("spawned local `bash` process {id} for task execution"); + + tokio::select! { + // Poll the cancellation token before the child future + biased; + + _ = self.token.cancelled() => { + bail!("task was cancelled"); + } + status = child.wait() => { + let status = status.with_context(|| { + format!("failed to wait for termination of task child process {id}") + })?; + + #[cfg(unix)] + { + use std::os::unix::process::ExitStatusExt; + if let Some(signal) = status.signal() { + tracing::warn!("task process {id} has terminated with signal {signal}"); + + bail!( + "task child process {id} has terminated with signal {signal}; see stderr file \ + `{path}` for more details", + path = stderr_path.display() + ); + } + } + + let exit_code = status.code().expect("process should have exited"); + info!("task process {id} has terminated with status code {exit_code}"); + Ok(TaskExecutionResult { + inputs: self.inner.info.inputs, + exit_code, + work_dir: EvaluationPath::Local(work_dir), + stdout: PrimitiveValue::new_file( + stdout_path + .into_os_string() + .into_string() + .expect("path should be UTF-8") + ).into(), + stderr: PrimitiveValue::new_file( + stderr_path + .into_os_string() + .into_string() + .expect("path should be UTF-8") + ).into(), + }) + } + } + } +} + +/// Represents a task execution backend that uses Crankshaft's generic backend +/// to execute tasks. +pub struct GenericBackend { + /// The engine configuration. + config: Arc, + /// The total CPU of the host. + cpu: u64, + /// The total memory of the host. + memory: u64, + /// The underlying task manager. + manager: TaskManager, +} + +impl GenericBackend { + /// Constructs a new generic task execution backend with the given + /// configuration. + /// + /// The provided configuration is expected to have already been validated. + pub fn new(config: Arc, backend_config: &GenericBackendConfig) -> Result { + info!("initializing generic backend"); + + let cpu = backend_config + .cpu + .unwrap_or_else(|| SYSTEM.cpus().len() as u64); + let memory = backend_config + .memory + .as_ref() + .map(|s| convert_unit_string(s).expect("value should be valid")) + .unwrap_or_else(|| SYSTEM.total_memory()); + let manager = TaskManager::new(cpu, cpu, memory, memory); + + Ok(Self { + config, + cpu, + memory, + manager, + }) + } +} + +impl TaskExecutionBackend for GenericBackend { + fn max_concurrency(&self) -> u64 { + self.cpu + } + + fn constraints( + &self, + requirements: &HashMap, + _: &HashMap, + ) -> Result { + let mut cpu = crate::v1::cpu(requirements); + if (self.cpu as f64) < cpu { + match self.config.task.cpu_limit_behavior { + TaskResourceLimitBehavior::TryWithMax => { + warn!( + "task requires at least {cpu} CPU{s}, but the host only has {total_cpu} \ + available", + s = if cpu == 1.0 { "" } else { "s" }, + total_cpu = self.cpu, + ); + // clamp the reported constraint to what's available + cpu = self.cpu as f64; + } + TaskResourceLimitBehavior::Deny => { + bail!( + "task requires at least {cpu} CPU{s}, but the host only has {total_cpu} \ + available", + s = if cpu == 1.0 { "" } else { "s" }, + total_cpu = self.cpu, + ); + } + } + } + + let mut memory = crate::v1::memory(requirements)?; + if self.memory < memory as u64 { + match self.config.task.memory_limit_behavior { + TaskResourceLimitBehavior::TryWithMax => { + warn!( + "task requires at least {memory} GiB of memory, but the host only has \ + {total_memory} GiB available", + // Display the error in GiB, as it is the most common unit for memory + memory = memory as f64 / ONE_GIBIBYTE, + total_memory = self.memory as f64 / ONE_GIBIBYTE, + ); + // clamp the reported constraint to what's available + memory = self.memory.try_into().unwrap_or(i64::MAX); + } + TaskResourceLimitBehavior::Deny => { + bail!( + "task requires at least {memory} GiB of memory, but the host only has \ + {total_memory} GiB available", + // Display the error in GiB, as it is the most common unit for memory + memory = memory as f64 / ONE_GIBIBYTE, + total_memory = self.memory as f64 / ONE_GIBIBYTE, + ); + } + } + } + + Ok(TaskExecutionConstraints { + container: None, + cpu, + memory, + gpu: Default::default(), + fpga: Default::default(), + disks: Default::default(), + }) + } + + fn guest_work_dir(&self) -> Option<&Path> { + None + } + + fn localize_inputs<'a, 'b, 'c, 'd>( + &'a self, + downloader: &'b HttpDownloader, + inputs: &'c mut [Input], + ) -> BoxFuture<'d, Result<()>> + where + 'a: 'd, + 'b: 'd, + 'c: 'd, + Self: 'd, + { + async move { + let mut downloads = JoinSet::new(); + + for (idx, input) in inputs.iter_mut().enumerate() { + match input.path() { + EvaluationPath::Local(path) => { + let location = Location::Path(path.clone().into()); + let guest_path = location + .to_str() + .with_context(|| { + format!("path `{path}` is not UTF-8", path = path.display()) + })? + .to_string(); + input.set_location(location.into_owned()); + input.set_guest_path(guest_path); + } + EvaluationPath::Remote(url) => { + let downloader = downloader.clone(); + let url = url.clone(); + downloads.spawn(async move { + let location_result = downloader.download(&url).await; + + match location_result { + Ok(location) => Ok((idx, location.into_owned())), + Err(e) => bail!("failed to localize `{url}`: {e:?}"), + } + }); + } + } + } + + while let Some(result) = downloads.join_next().await { + match result { + Ok(Ok((idx, location))) => { + let guest_path = location + .to_str() + .with_context(|| { + format!( + "downloaded path `{path}` is not UTF-8", + path = location.display() + ) + })? + .to_string(); + + let input = inputs.get_mut(idx).expect("index should be valid"); + input.set_location(location); + input.set_guest_path(guest_path); + } + Ok(Err(e)) => { + // Futures are aborted when the `JoinSet` is dropped. + bail!(e); + } + Err(e) => { + // Futures are aborted when the `JoinSet` is dropped. + bail!("download task failed: {e}"); + } + } + } + + Ok(()) + } + .boxed() + } + + fn spawn( + &self, + request: TaskSpawnRequest, + token: CancellationToken, + ) -> Result { + let (spawned_tx, spawned_rx) = oneshot::channel(); + let (completed_tx, completed_rx) = oneshot::channel(); + + let requirements = request.requirements(); + let mut cpu = crate::v1::cpu(requirements); + if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior { + cpu = std::cmp::min(cpu.ceil() as u64, self.cpu) as f64; + } + let mut memory = crate::v1::memory(requirements)? as u64; + if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior { + memory = std::cmp::min(memory, self.memory); + } + + self.manager.send( + GenericTaskRequest { + config: self.config.clone(), + inner: request, + cpu, + memory, + token, + }, + spawned_tx, + completed_tx, + ); + + Ok(TaskExecutionEvents { + spawned: spawned_rx, + completed: completed_rx, + }) + } +} diff --git a/wdl-engine/src/config.rs b/wdl-engine/src/config.rs index 5af487c51..2d1de5240 100644 --- a/wdl-engine/src/config.rs +++ b/wdl-engine/src/config.rs @@ -15,6 +15,7 @@ use tracing::warn; use url::Url; use crate::DockerBackend; +use crate::GenericBackend; use crate::LocalBackend; use crate::SYSTEM; use crate::TaskExecutionBackend; @@ -139,6 +140,9 @@ impl Config { BackendConfig::Tes(config) => { Ok(Arc::new(TesBackend::new(self.clone(), config).await?)) } + BackendConfig::Generic(config) => { + Ok(Arc::new(GenericBackend::new(self.clone(), config)?)) + } } } } @@ -426,6 +430,8 @@ pub enum BackendConfig { Docker(DockerBackendConfig), /// Use the TES task execution backend. Tes(Box), + /// Use the generic task execution backend. + Generic(GenericBackendConfig), } impl Default for BackendConfig { @@ -441,6 +447,7 @@ impl BackendConfig { Self::Local(config) => config.validate(), Self::Docker(config) => config.validate(), Self::Tes(config) => config.validate(), + Self::Generic(config) => config.validate(), } } @@ -541,6 +548,69 @@ impl LocalBackendConfig { } } +/// Represents configuration for the generic task execution backend. +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub struct GenericBackendConfig { + #[serde(default)] + pub backend_config: crankshaft::config::backend::generic::Config, + /// Set the number of CPUs available for task execution. + /// + /// Defaults to the number of logical CPUs for the host. + /// + /// The value cannot be zero or exceed the host's number of CPUs. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub cpu: Option, + + /// Set the total amount of memory for task execution as a unit string (e.g. + /// `2 GiB`). + /// + /// Defaults to the total amount of memory for the host. + /// + /// The value cannot be zero or exceed the host's total amount of memory. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub memory: Option, +} + +impl GenericBackendConfig { + /// Validates the local task execution backend configuration. + pub fn validate(&self) -> Result<()> { + if let Some(cpu) = self.cpu { + if cpu == 0 { + bail!("local backend configuration value `cpu` cannot be zero"); + } + + let total = SYSTEM.cpus().len() as u64; + if cpu > total { + bail!( + "local backend configuration value `cpu` cannot exceed the virtual CPUs \ + available to the host ({total})" + ); + } + } + + if let Some(memory) = &self.memory { + let memory = convert_unit_string(memory).with_context(|| { + format!("local backend configuration value `memory` has invalid value `{memory}`") + })?; + + if memory == 0 { + bail!("local backend configuration value `memory` cannot be zero"); + } + + let total = SYSTEM.total_memory(); + if memory > total { + bail!( + "local backend configuration value `memory` cannot exceed the total memory of \ + the host ({total} bytes)" + ); + } + } + + Ok(()) + } +} + /// Gets the default value for the docker `cleanup` field. const fn cleanup_default() -> bool { true diff --git a/wdl-engine/src/eval/v1/task.rs b/wdl-engine/src/eval/v1/task.rs index c390ebefe..409047cd6 100644 --- a/wdl-engine/src/eval/v1/task.rs +++ b/wdl-engine/src/eval/v1/task.rs @@ -963,6 +963,8 @@ impl TaskEvaluator { let result = events .completed .await + // TODO ACF 2025-08-01: this can reasonably hit if a backend error occurs and + // shouldn't just be a panic .expect("failed to receive response from spawned task"); progress(ProgressKind::TaskExecutionCompleted { diff --git a/wdl-engine/tests/tasks.rs b/wdl-engine/tests/tasks.rs index a72687621..8bb7997a6 100644 --- a/wdl-engine/tests/tasks.rs +++ b/wdl-engine/tests/tasks.rs @@ -145,6 +145,21 @@ fn configs(path: &Path) -> Result, config::Config)>, anyh ..Default::default() } }), + ("local_generic".into(), { + config::Config { + backends: [( + "default".to_string(), + BackendConfig::Generic(Default::default()), + )] + .into(), + task: config::TaskConfig { + cpu_limit_behavior: config::TaskResourceLimitBehavior::TryWithMax, + memory_limit_behavior: config::TaskResourceLimitBehavior::TryWithMax, + ..Default::default() + }, + ..Default::default() + } + }), // Currently we limit running the Docker backend to Linux as GitHub does not have // Docker installed on macOS hosted runners and the Windows hosted runners // are configured to use Windows containers From 5244966368eb42ec419080d27e429987af4bf45b Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Tue, 5 Aug 2025 11:46:57 -0700 Subject: [PATCH 2/9] wip: get a lot more tests passing by using the `cwd` variable --- wdl-engine/src/backend/generic.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wdl-engine/src/backend/generic.rs b/wdl-engine/src/backend/generic.rs index fcdc9cb9a..cee581758 100644 --- a/wdl-engine/src/backend/generic.rs +++ b/wdl-engine/src/backend/generic.rs @@ -145,7 +145,7 @@ impl TaskManagerRequest for GenericTaskRequest { crankshaft::config::backend::generic::Config::builder() .driver(crankshaft_generic_backend_driver) .submit(format!( - "(~{{command}} > {} 2> {} & echo $!)", + "(cd ~{{cwd}}; ~{{command}} > {} 2> {} & echo $!)", stdout_path.display(), stderr_path.display(), )) From d474b9d59a19c5b477d6dcc39fb0e21b71dbea43 Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Thu, 7 Aug 2025 12:17:39 -0700 Subject: [PATCH 3/9] use an exit code file --- wdl-engine/src/backend/generic.rs | 44 ++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/wdl-engine/src/backend/generic.rs b/wdl-engine/src/backend/generic.rs index cee581758..3fcd4dfb7 100644 --- a/wdl-engine/src/backend/generic.rs +++ b/wdl-engine/src/backend/generic.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::HashMap; use std::ffi::OsStr; use std::fs::File; @@ -11,6 +12,7 @@ use std::sync::Arc; use anyhow::Context as _; use anyhow::Result; +use anyhow::anyhow; use anyhow::bail; use futures::FutureExt as _; use futures::future::BoxFuture; @@ -141,17 +143,30 @@ impl TaskManagerRequest for GenericTaskRequest { .locale(crankshaft::config::backend::generic::driver::Locale::Local) .shell(crankshaft::config::backend::generic::driver::Shell::Bash) .build(); + let mut attributes = HashMap::new(); + attributes.insert( + Cow::Borrowed("temp_dir"), + Cow::Owned(temp_dir.path().display().to_string()), + ); + let task_exit_code = temp_dir.path().join("task_exit_code"); + attributes.insert( + Cow::Borrowed("task_exit_code"), + Cow::Owned(task_exit_code.display().to_string()), + ); let crankshaft_generic_backend_config = crankshaft::config::backend::generic::Config::builder() .driver(crankshaft_generic_backend_driver) .submit(format!( - "(cd ~{{cwd}}; ~{{command}} > {} 2> {} & echo $!)", + "((cd ~{{cwd}}; ~{{command}} > {} 2> {}; sleep 1; echo $? > ~{{task_exit_code}}) & \ + echo $!)", stdout_path.display(), stderr_path.display(), )) .job_id_regex(r#"(\d+)"#) - .monitor("kill -0 ~{job_id}") + .monitor("file -E ~{task_exit_code}") + .get_exit_code("cat ~{task_exit_code}") .kill("kill ~{job_id}") + .attributes(attributes) .build(); const BACKEND_NAME: &'static str = "crankshaft_generic"; @@ -173,8 +188,8 @@ impl TaskManagerRequest for GenericTaskRequest { .image("not_an_image") .program(command_path.to_str().unwrap()) .work_dir(work_dir.to_str().unwrap()) - // .stdout(stdout_path.to_str().unwrap()) - // .stderr(stderr_path.to_str().unwrap()) + .stdout(stdout_path.display().to_string()) + .stderr(stderr_path.display().to_string()) .build(), )) .build(); @@ -192,15 +207,20 @@ impl TaskManagerRequest for GenericTaskRequest { return Ok(TaskExecutionResult { inputs: self.inner.info.inputs, - exit_code: if res.last().code() == Some(1) { 0 } else { 127 }, + exit_code: res + .last() + .code() + .ok_or(anyhow!("task did not return an exit code"))?, work_dir: EvaluationPath::Local(work_dir), - stdout: PrimitiveValue::new_file( - stdout_path - .into_os_string() - .into_string() - .expect("path should be UTF-8"), - ) - .into(), + stdout: dbg!( + PrimitiveValue::new_file( + stdout_path + .into_os_string() + .into_string() + .expect("path should be UTF-8"), + ) + .into() + ), stderr: PrimitiveValue::new_file( stderr_path .into_os_string() From cd6e136ef34535681717328835aa331b37fdbcf9 Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Thu, 7 Aug 2025 16:44:46 -0700 Subject: [PATCH 4/9] wip: revert some debugging code and use crankshaft submodule --- .gitmodules | 3 +++ Cargo.toml | 5 ++++- crankshaft | 1 + wdl-engine/src/backend/generic.rs | 20 ++++++++++---------- wdl-engine/tests/tasks.rs | 3 ++- 5 files changed, 20 insertions(+), 12 deletions(-) create mode 100644 .gitmodules create mode 160000 crankshaft diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..99f1a04a8 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "crankshaft"] + path = crankshaft + url = https://github.com/stjude-rust-labs/crankshaft.git diff --git a/Cargo.toml b/Cargo.toml index 2eeb3208e..d4ebe295e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,9 @@ members = [ "wdl-lint", "wdl-lsp", ] +exclude = [ + "crankshaft", +] resolver = "2" [workspace.package] @@ -33,7 +36,7 @@ clap-verbosity-flag = { version = "3.0.3", features = ["tracing"] } codespan-reporting = "0.12.0" colored = "3.0.0" convert_case = "0.8.0" -crankshaft = "0.4.0" +crankshaft = { path = "./crankshaft/crankshaft" } dirs = "6.0.0" faster-hex = "0.10.0" fs_extra = "1.3.0" diff --git a/crankshaft b/crankshaft new file mode 160000 index 000000000..f92f2192c --- /dev/null +++ b/crankshaft @@ -0,0 +1 @@ +Subproject commit f92f2192cdd98982f8895ee61980b68986874b95 diff --git a/wdl-engine/src/backend/generic.rs b/wdl-engine/src/backend/generic.rs index 3fcd4dfb7..0fbaaffb6 100644 --- a/wdl-engine/src/backend/generic.rs +++ b/wdl-engine/src/backend/generic.rs @@ -17,6 +17,7 @@ use anyhow::bail; use futures::FutureExt as _; use futures::future::BoxFuture; use nonempty::NonEmpty; +use tempfile::TempDir; use tokio::process::Command; use tokio::sync::oneshot; use tokio::task::JoinSet; @@ -143,6 +144,7 @@ impl TaskManagerRequest for GenericTaskRequest { .locale(crankshaft::config::backend::generic::driver::Locale::Local) .shell(crankshaft::config::backend::generic::driver::Shell::Bash) .build(); + let temp_dir = TempDir::new()?; let mut attributes = HashMap::new(); attributes.insert( Cow::Borrowed("temp_dir"), @@ -157,7 +159,7 @@ impl TaskManagerRequest for GenericTaskRequest { crankshaft::config::backend::generic::Config::builder() .driver(crankshaft_generic_backend_driver) .submit(format!( - "((cd ~{{cwd}}; ~{{command}} > {} 2> {}; sleep 1; echo $? > ~{{task_exit_code}}) & \ + "((cd ~{{cwd}}; ~{{command}} > {} 2> {}; echo $? > ~{{task_exit_code}}) & \ echo $!)", stdout_path.display(), stderr_path.display(), @@ -212,15 +214,13 @@ impl TaskManagerRequest for GenericTaskRequest { .code() .ok_or(anyhow!("task did not return an exit code"))?, work_dir: EvaluationPath::Local(work_dir), - stdout: dbg!( - PrimitiveValue::new_file( - stdout_path - .into_os_string() - .into_string() - .expect("path should be UTF-8"), - ) - .into() - ), + stdout: PrimitiveValue::new_file( + stdout_path + .into_os_string() + .into_string() + .expect("path should be UTF-8"), + ) + .into(), stderr: PrimitiveValue::new_file( stderr_path .into_os_string() diff --git a/wdl-engine/tests/tasks.rs b/wdl-engine/tests/tasks.rs index 8bb7997a6..f180852ef 100644 --- a/wdl-engine/tests/tasks.rs +++ b/wdl-engine/tests/tasks.rs @@ -311,7 +311,8 @@ async fn run_test(test: &Path, config: config::Config) -> Result<()> { inputs.join_paths(task, |_| Ok(&test_dir))?; let evaluator = TaskEvaluator::new(config, CancellationToken::new()).await?; - let dir = TempDir::new().context("failed to create temporary directory")?; + let mut dir = TempDir::new().context("failed to create temporary directory")?; + // dir.disable_cleanup(true); match evaluator .evaluate(result.document(), task, &inputs, dir.path(), |_| async {}) .await From 8e531b76990d42cdb1589aed5c58ecb22732eabd Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Tue, 12 Aug 2025 15:30:33 -0700 Subject: [PATCH 5/9] wip: cleanups and get generic config reading from file --- crankshaft | 2 +- wdl-engine/src/backend/generic.rs | 142 +++++------------- wdl-engine/src/config.rs | 2 + wdl-engine/tests/tasks.rs | 28 +++- .../tasks/task-with-comments/config.toml | 10 ++ 5 files changed, 75 insertions(+), 109 deletions(-) create mode 100644 wdl-engine/tests/tasks/task-with-comments/config.toml diff --git a/crankshaft b/crankshaft index f92f2192c..42400beeb 160000 --- a/crankshaft +++ b/crankshaft @@ -1 +1 @@ -Subproject commit f92f2192cdd98982f8895ee61980b68986874b95 +Subproject commit 42400beeb8f629a86a60f955e6094ffbcb5ec482 diff --git a/wdl-engine/src/backend/generic.rs b/wdl-engine/src/backend/generic.rs index 0fbaaffb6..2fc48f227 100644 --- a/wdl-engine/src/backend/generic.rs +++ b/wdl-engine/src/backend/generic.rs @@ -1,13 +1,10 @@ use std::borrow::Cow; use std::collections::HashMap; -use std::ffi::OsStr; use std::fs::File; use std::fs::Permissions; use std::fs::{self}; use std::os::unix::fs::PermissionsExt; use std::path::Path; -use std::process::ExitCode; -use std::process::Stdio; use std::sync::Arc; use anyhow::Context as _; @@ -18,7 +15,6 @@ use futures::FutureExt as _; use futures::future::BoxFuture; use nonempty::NonEmpty; use tempfile::TempDir; -use tokio::process::Command; use tokio::sync::oneshot; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -42,7 +38,6 @@ use crate::TaskExecutionResult; use crate::Value; use crate::WORK_DIR_NAME; use crate::config::Config; -use crate::config::DEFAULT_TASK_SHELL; use crate::config::GenericBackendConfig; use crate::config::TaskResourceLimitBehavior; use crate::convert_unit_string; @@ -56,6 +51,8 @@ use crate::path::EvaluationPath; struct GenericTaskRequest { /// The engine configuration. config: Arc, + /// The backend configuration. + backend_config: Arc, /// The inner task spawn request. inner: TaskSpawnRequest, /// The requested CPU reservation for the task. @@ -99,53 +96,30 @@ impl TaskManagerRequest for GenericTaskRequest { })?; fs::set_permissions(&command_path, Permissions::from_mode(0o777))?; - // Create a file for the stdout + // Create an empty file for the stdout let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME); - let stdout = File::create(&stdout_path).with_context(|| { + let _ = File::create(&stdout_path).with_context(|| { format!( "failed to create stdout file `{path}`", path = stdout_path.display() ) })?; - // Create a file for the stderr + // Create an empty file for the stderr let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME); - let stderr = File::create(&stderr_path).with_context(|| { + let _ = File::create(&stderr_path).with_context(|| { format!( "failed to create stderr file `{path}`", path = stderr_path.display() ) })?; - let mut command = Command::new( - self.config - .task - .shell - .as_deref() - .unwrap_or(DEFAULT_TASK_SHELL), - ); - command - .current_dir(&work_dir) - .arg("-C") - .arg(&command_path) - .stdin(Stdio::null()) - .stdout(stdout) - .stderr(stderr) - .envs( - self.inner - .env() - .iter() - .map(|(k, v)| (OsStr::new(k), OsStr::new(v))), - ) - .kill_on_drop(true); - - let crankshaft_generic_backend_driver = - crankshaft::config::backend::generic::driver::Config::builder() - .locale(crankshaft::config::backend::generic::driver::Locale::Local) - .shell(crankshaft::config::backend::generic::driver::Shell::Bash) - .build(); let temp_dir = TempDir::new()?; let mut attributes = HashMap::new(); + // TODO ACF 2025-08-12: might want to bake a tmpdir in as a more fully-fledged + // concept eventually, but for now putting things in a separate location + // is required to avoid spurious test failures from the fixture checking + // every file in `cwd` for equivalence attributes.insert( Cow::Borrowed("temp_dir"), Cow::Owned(temp_dir.path().display().to_string()), @@ -155,21 +129,26 @@ impl TaskManagerRequest for GenericTaskRequest { Cow::Borrowed("task_exit_code"), Cow::Owned(task_exit_code.display().to_string()), ); - let crankshaft_generic_backend_config = - crankshaft::config::backend::generic::Config::builder() - .driver(crankshaft_generic_backend_driver) - .submit(format!( - "((cd ~{{cwd}}; ~{{command}} > {} 2> {}; echo $? > ~{{task_exit_code}}) & \ - echo $!)", - stdout_path.display(), - stderr_path.display(), - )) - .job_id_regex(r#"(\d+)"#) - .monitor("file -E ~{task_exit_code}") - .get_exit_code("cat ~{task_exit_code}") - .kill("kill ~{job_id}") - .attributes(attributes) - .build(); + // let crankshaft_generic_backend_driver = + // crankshaft::config::backend::generic::driver::Config::builder() + // .locale(crankshaft::config::backend::generic::driver::Locale::Local) + // .shell(crankshaft::config::backend::generic::driver::Shell::Bash) + // .build(); + // let crankshaft_generic_backend_config = + // crankshaft::config::backend::generic::Config::builder() + // .driver(crankshaft_generic_backend_driver) + // .submit( + // r#"((cd ~{cwd}; ~{command} > ~{stdout} 2> ~{stderr}; echo $? > + // ~{task_exit_code}) & echo $!)"# ) + // .job_id_regex(r#"(\d+)"#) + // .monitor("file -E ~{task_exit_code}") + // .get_exit_code("cat ~{task_exit_code}") + // .kill("kill ~{job_id}") + // .attributes(attributes) + // .build(); + + let mut crankshaft_generic_backend_config = self.backend_config.backend_config.clone(); + *crankshaft_generic_backend_config.attributes_mut() = attributes; const BACKEND_NAME: &'static str = "crankshaft_generic"; let backend = crankshaft::Engine::default() @@ -229,62 +208,6 @@ impl TaskManagerRequest for GenericTaskRequest { ) .into(), }); - - let mut child = command.spawn().context("failed to spawn `bash`")?; - - // Notify that the process has spawned - spawned.send(()).ok(); - - let id = child.id().expect("should have id"); - info!("spawned local `bash` process {id} for task execution"); - - tokio::select! { - // Poll the cancellation token before the child future - biased; - - _ = self.token.cancelled() => { - bail!("task was cancelled"); - } - status = child.wait() => { - let status = status.with_context(|| { - format!("failed to wait for termination of task child process {id}") - })?; - - #[cfg(unix)] - { - use std::os::unix::process::ExitStatusExt; - if let Some(signal) = status.signal() { - tracing::warn!("task process {id} has terminated with signal {signal}"); - - bail!( - "task child process {id} has terminated with signal {signal}; see stderr file \ - `{path}` for more details", - path = stderr_path.display() - ); - } - } - - let exit_code = status.code().expect("process should have exited"); - info!("task process {id} has terminated with status code {exit_code}"); - Ok(TaskExecutionResult { - inputs: self.inner.info.inputs, - exit_code, - work_dir: EvaluationPath::Local(work_dir), - stdout: PrimitiveValue::new_file( - stdout_path - .into_os_string() - .into_string() - .expect("path should be UTF-8") - ).into(), - stderr: PrimitiveValue::new_file( - stderr_path - .into_os_string() - .into_string() - .expect("path should be UTF-8") - ).into(), - }) - } - } } } @@ -293,6 +216,8 @@ impl TaskManagerRequest for GenericTaskRequest { pub struct GenericBackend { /// The engine configuration. config: Arc, + /// The backend configuration. + backend_config: Arc, /// The total CPU of the host. cpu: u64, /// The total memory of the host. @@ -321,6 +246,8 @@ impl GenericBackend { Ok(Self { config, + // TODO ACF 2025-08-12: sort out this excess cloning nonsense + backend_config: Arc::new(backend_config.clone()), cpu, memory, manager, @@ -498,6 +425,7 @@ impl TaskExecutionBackend for GenericBackend { self.manager.send( GenericTaskRequest { config: self.config.clone(), + backend_config: self.backend_config.clone(), inner: request, cpu, memory, diff --git a/wdl-engine/src/config.rs b/wdl-engine/src/config.rs index 2d1de5240..dba13c05d 100644 --- a/wdl-engine/src/config.rs +++ b/wdl-engine/src/config.rs @@ -552,6 +552,8 @@ impl LocalBackendConfig { #[derive(Debug, Default, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case", deny_unknown_fields)] pub struct GenericBackendConfig { + /// The Crankshaft generic backend config. + // TODO ACF 2025-08-25: this maybe should be the entire struct? #[serde(default)] pub backend_config: crankshaft::config::backend::generic::Config, /// Set the number of CPUs available for task execution. diff --git a/wdl-engine/tests/tasks.rs b/wdl-engine/tests/tasks.rs index f180852ef..154a150a1 100644 --- a/wdl-engine/tests/tasks.rs +++ b/wdl-engine/tests/tasks.rs @@ -43,6 +43,7 @@ use wdl_engine::EvaluatedTask; use wdl_engine::EvaluationError; use wdl_engine::Inputs; use wdl_engine::config::BackendConfig; +use wdl_engine::config::GenericBackendConfig; use wdl_engine::config::{self}; use wdl_engine::v1::TaskEvaluator; @@ -130,6 +131,31 @@ fn configs(path: &Path) -> Result, config::Config)>, anyh let config = toml::from_str(&std::fs::read_to_string(file.path())?)?; configs_on_disk.push((config_name, config)); } + let generic_backend_config = GenericBackendConfig { + backend_config: crankshaft::config::backend::generic::Config::builder() + .driver( + crankshaft::config::backend::generic::driver::Config::builder() + .locale(crankshaft::config::backend::generic::driver::Locale::Local) + .shell(crankshaft::config::backend::generic::driver::Shell::Bash) + .build(), + ) + .submit( + r#"((cd ~{cwd}; ~{command} > ~{stdout} 2> ~{stderr}; echo $? > + ~{task_exit_code}) & echo $!)"#, + ) + .job_id_regex(r#"(\d+)"#) + .monitor("file -E ~{task_exit_code}") + .get_exit_code("cat ~{task_exit_code}") + .kill("kill ~{job_id}") + .build(), + cpu: None, + memory: None, + }; + // std::fs::write( + // "/tmp/generic_config.toml", + // toml::to_string_pretty(&generic_backend_config).unwrap(), + // ) + // .unwrap(); if !configs_on_disk.is_empty() || any_config_toml_found { Ok(configs_on_disk) } else { @@ -149,7 +175,7 @@ fn configs(path: &Path) -> Result, config::Config)>, anyh config::Config { backends: [( "default".to_string(), - BackendConfig::Generic(Default::default()), + BackendConfig::Generic(generic_backend_config), )] .into(), task: config::TaskConfig { diff --git a/wdl-engine/tests/tasks/task-with-comments/config.toml b/wdl-engine/tests/tasks/task-with-comments/config.toml new file mode 100644 index 000000000..6b3bd89fb --- /dev/null +++ b/wdl-engine/tests/tasks/task-with-comments/config.toml @@ -0,0 +1,10 @@ +backends.default.type = "generic" +backends.default.backend_config.locale.kind = "Local" +backends.default.backend_config.shell = "bash" +backends.default.backend_config.submit = """ +((cd ~{cwd}; ~{command} > ~{stdout} 2> ~{stderr}; echo $? > + ~{task_exit_code}) & echo $!)""" +backends.default.backend_config.job-id-regex = '(\d+)' +backends.default.backend_config.monitor = "file -E ~{task_exit_code}" +backends.default.backend_config.get-exit-code = "cat ~{task_exit_code}" +backends.default.backend_config.kill = "kill ~{job_id}" From 02af6572645a137c5946278fdd8fd240b781fdb4 Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Wed, 13 Aug 2025 15:16:50 -0500 Subject: [PATCH 6/9] wip: save the working bare LSF config --- crankshaft | 2 +- .../tasks/task-with-comments/config.toml | 22 ++++++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/crankshaft b/crankshaft index 42400beeb..ce9d63a87 160000 --- a/crankshaft +++ b/crankshaft @@ -1 +1 @@ -Subproject commit 42400beeb8f629a86a60f955e6094ffbcb5ec482 +Subproject commit ce9d63a876e71fe255dc9bd871c5b1d7746e236a diff --git a/wdl-engine/tests/tasks/task-with-comments/config.toml b/wdl-engine/tests/tasks/task-with-comments/config.toml index 6b3bd89fb..b6f9f5cda 100644 --- a/wdl-engine/tests/tasks/task-with-comments/config.toml +++ b/wdl-engine/tests/tasks/task-with-comments/config.toml @@ -1,10 +1,20 @@ backends.default.type = "generic" backends.default.backend_config.locale.kind = "Local" backends.default.backend_config.shell = "bash" -backends.default.backend_config.submit = """ -((cd ~{cwd}; ~{command} > ~{stdout} 2> ~{stderr}; echo $? > - ~{task_exit_code}) & echo $!)""" +backends.default.backend_config.submit = """\ +( \ +cd ~{cwd}; \ +bsub -Ne -oo ~{stdout} -eo ~{stderr} ~{command} | \ +sed -n -e 's/Job <\\([[:digit:]]\\+\\)>.*/\\1/p' \ +) +""" backends.default.backend_config.job-id-regex = '(\d+)' -backends.default.backend_config.monitor = "file -E ~{task_exit_code}" -backends.default.backend_config.get-exit-code = "cat ~{task_exit_code}" -backends.default.backend_config.kill = "kill ~{job_id}" +backends.default.backend_config.monitor = "(stat=$(bjobs -noheader -o \"stat\" ~{job_id}); [ $stat == \"DONE\" ] || [ $stat == \"EXIT\" ])" +backends.default.backend_config.get-exit-code = """\ +if [ $(bjobs -noheader -o \"stat\" ~{job_id}) == \"DONE\" ]; then \ +echo 0; \ +else \ +bjobs -noheader -o \"exit_code\" ~{job_id}; +fi +""" +backends.default.backend_config.kill = "bkill ~{job_id}" From f32448cb8cb1df807f32f183d2d5c27e57457186 Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Thu, 14 Aug 2025 12:38:07 -0500 Subject: [PATCH 7/9] wip: branch that only works on Adam's HPC account! I've been doing some lightweight editing in-place on the HPC, but need to get this into git both to avoid losing progress and to be able to edit in a friendlier setting --- wdl-engine/src/backend/generic.rs | 6 ++ wdl-engine/tests/tasks.rs | 65 +++++-------------- .../tasks/task-with-comments/config.toml | 2 +- 3 files changed, 23 insertions(+), 50 deletions(-) diff --git a/wdl-engine/src/backend/generic.rs b/wdl-engine/src/backend/generic.rs index 2fc48f227..f6649829f 100644 --- a/wdl-engine/src/backend/generic.rs +++ b/wdl-engine/src/backend/generic.rs @@ -129,6 +129,12 @@ impl TaskManagerRequest for GenericTaskRequest { Cow::Borrowed("task_exit_code"), Cow::Owned(task_exit_code.display().to_string()), ); + let container = crate::v1::container( + self.inner.requirements(), + self.config.task.container.as_deref(), + ) + .into_owned(); + attributes.insert(Cow::Borrowed("container"), container.into()); // let crankshaft_generic_backend_driver = // crankshaft::config::backend::generic::driver::Config::builder() // .locale(crankshaft::config::backend::generic::driver::Locale::Local) diff --git a/wdl-engine/tests/tasks.rs b/wdl-engine/tests/tasks.rs index 154a150a1..7e612b41b 100644 --- a/wdl-engine/tests/tasks.rs +++ b/wdl-engine/tests/tasks.rs @@ -140,13 +140,12 @@ fn configs(path: &Path) -> Result, config::Config)>, anyh .build(), ) .submit( - r#"((cd ~{cwd}; ~{command} > ~{stdout} 2> ~{stderr}; echo $? > - ~{task_exit_code}) & echo $!)"#, + r#"( cd ~{cwd}; bsub -Ne -oo ~{stdout} -eo ~{stderr} ~{command} | sed -n -e 's/Job <\([[:digit:]]\+\)>.*/\1/p' )"#, ) .job_id_regex(r#"(\d+)"#) - .monitor("file -E ~{task_exit_code}") - .get_exit_code("cat ~{task_exit_code}") - .kill("kill ~{job_id}") + .monitor(r#"(stat=$(bjobs -noheader -o "stat" ~{job_id}); [ $stat == "DONE" ] || [ $stat == "EXIT" ])"#) + .get_exit_code(r#"if [ $(bjobs -noheader -o "stat" ~{job_id}) == "DONE" ]; then echo 0; else bjobs -noheader -o "exit_code" ~{job_id}; fi"#) + .kill("bkill ~{job_id}") .build(), cpu: None, memory: None, @@ -159,49 +158,16 @@ fn configs(path: &Path) -> Result, config::Config)>, anyh if !configs_on_disk.is_empty() || any_config_toml_found { Ok(configs_on_disk) } else { - Ok(vec![ - ("local".into(), { - config::Config { - backends: [( - "default".to_string(), - BackendConfig::Local(Default::default()), - )] - .into(), - suppress_env_specific_output: true, - ..Default::default() - } - }), - ("local_generic".into(), { - config::Config { - backends: [( - "default".to_string(), - BackendConfig::Generic(generic_backend_config), - )] - .into(), - task: config::TaskConfig { - cpu_limit_behavior: config::TaskResourceLimitBehavior::TryWithMax, - memory_limit_behavior: config::TaskResourceLimitBehavior::TryWithMax, - ..Default::default() - }, - ..Default::default() - } - }), - // Currently we limit running the Docker backend to Linux as GitHub does not have - // Docker installed on macOS hosted runners and the Windows hosted runners - // are configured to use Windows containers - #[cfg(target_os = "linux")] - ("docker".into(), { - config::Config { - backends: [( - "default".to_string(), - BackendConfig::Docker(Default::default()), - )] - .into(), - suppress_env_specific_output: true, - ..Default::default() - } - }), - ]) + Ok(vec![("generic_lsf".into(), { + config::Config { + backends: [( + "default".to_string(), + BackendConfig::Generic(generic_backend_config), + )] + .into(), + ..Default::default() + } + })]) } } @@ -337,7 +303,8 @@ async fn run_test(test: &Path, config: config::Config) -> Result<()> { inputs.join_paths(task, |_| Ok(&test_dir))?; let evaluator = TaskEvaluator::new(config, CancellationToken::new()).await?; - let mut dir = TempDir::new().context("failed to create temporary directory")?; + let mut dir = TempDir::new_in("/home/afoltzer/faketmp") + .context("failed to create temporary directory")?; // dir.disable_cleanup(true); match evaluator .evaluate(result.document(), task, &inputs, dir.path(), |_| async {}) diff --git a/wdl-engine/tests/tasks/task-with-comments/config.toml b/wdl-engine/tests/tasks/task-with-comments/config.toml index b6f9f5cda..7f8100c92 100644 --- a/wdl-engine/tests/tasks/task-with-comments/config.toml +++ b/wdl-engine/tests/tasks/task-with-comments/config.toml @@ -4,7 +4,7 @@ backends.default.backend_config.shell = "bash" backends.default.backend_config.submit = """\ ( \ cd ~{cwd}; \ -bsub -Ne -oo ~{stdout} -eo ~{stderr} ~{command} | \ +bsub -Ne -oo ~{stdout} -eo ~{stderr} 'apptainer -q exec docker://~{container} ~{command}' | \ sed -n -e 's/Job <\\([[:digit:]]\\+\\)>.*/\\1/p' \ ) """ From 9d2d8ac8c2b84700a1d269b01c6de4fc8c9bd261 Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Thu, 14 Aug 2025 14:37:40 -0700 Subject: [PATCH 8/9] wip: add cpu and memory_mb template variables --- wdl-engine/src/backend/generic.rs | 9 +++++---- wdl-engine/src/lib.rs | 5 +++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/wdl-engine/src/backend/generic.rs b/wdl-engine/src/backend/generic.rs index f6649829f..c47b400c7 100644 --- a/wdl-engine/src/backend/generic.rs +++ b/wdl-engine/src/backend/generic.rs @@ -30,6 +30,7 @@ use super::TaskSpawnRequest; use crate::COMMAND_FILE_NAME; use crate::Input; use crate::ONE_GIBIBYTE; +use crate::ONE_MEGABYTE; use crate::PrimitiveValue; use crate::STDERR_FILE_NAME; use crate::STDOUT_FILE_NAME; @@ -56,12 +57,8 @@ struct GenericTaskRequest { /// The inner task spawn request. inner: TaskSpawnRequest, /// The requested CPU reservation for the task. - /// - /// Note that CPU isn't actually reserved for the task process. cpu: f64, /// The requested memory reservation for the task. - /// - /// Note that memory isn't actually reserved for the task process. memory: u64, /// The cancellation token for the request. token: CancellationToken, @@ -135,6 +132,10 @@ impl TaskManagerRequest for GenericTaskRequest { ) .into_owned(); attributes.insert(Cow::Borrowed("container"), container.into()); + let cpu = crate::v1::cpu(self.inner.requirements()).floor() as u64; + attributes.insert(Cow::Borrowed("cpu"), cpu.to_string().into()); + let memory_mb = crate::v1::memory(self.inner.requirements())? as f64 / ONE_MEGABYTE; + attributes.insert(Cow::Borrowed("memory_mb"), memory_mb.to_string().into()); // let crankshaft_generic_backend_driver = // crankshaft::config::backend::generic::driver::Config::builder() // .locale(crankshaft::config::backend::generic::driver::Locale::Local) diff --git a/wdl-engine/src/lib.rs b/wdl-engine/src/lib.rs index 58c69dd88..e20252685 100644 --- a/wdl-engine/src/lib.rs +++ b/wdl-engine/src/lib.rs @@ -38,6 +38,11 @@ use wdl_ast::TreeNode; /// This is defined as a constant as it's a commonly performed conversion. const ONE_GIBIBYTE: f64 = 1024.0 * 1024.0 * 1024.0; +/// One megabyte (MB) as a float. +/// +/// This is defined as a constant as it's a commonly performed conversion. +const ONE_MEGABYTE: f64 = 1000.0 * 1000.0; + /// Resolves a type name from a document. /// /// This function will import the type into the type cache if not already From 1f462e744ec57723374dbc456b0483654e6ddc32 Mon Sep 17 00:00:00 2001 From: "Adam C. Foltzer" Date: Wed, 20 Aug 2025 15:55:54 -0500 Subject: [PATCH 9/9] wip: fix fractional cpu handling in line with spec --- wdl-engine/src/backend/generic.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wdl-engine/src/backend/generic.rs b/wdl-engine/src/backend/generic.rs index c47b400c7..bb24938e6 100644 --- a/wdl-engine/src/backend/generic.rs +++ b/wdl-engine/src/backend/generic.rs @@ -132,7 +132,7 @@ impl TaskManagerRequest for GenericTaskRequest { ) .into_owned(); attributes.insert(Cow::Borrowed("container"), container.into()); - let cpu = crate::v1::cpu(self.inner.requirements()).floor() as u64; + let cpu = crate::v1::cpu(self.inner.requirements()).ceil() as u64; attributes.insert(Cow::Borrowed("cpu"), cpu.to_string().into()); let memory_mb = crate::v1::memory(self.inner.requirements())? as f64 / ONE_MEGABYTE; attributes.insert(Cow::Borrowed("memory_mb"), memory_mb.to_string().into());