diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000..99f1a04a8 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "crankshaft"] + path = crankshaft + url = https://github.com/stjude-rust-labs/crankshaft.git diff --git a/Cargo.toml b/Cargo.toml index 2eeb3208e..d4ebe295e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,9 @@ members = [ "wdl-lint", "wdl-lsp", ] +exclude = [ + "crankshaft", +] resolver = "2" [workspace.package] @@ -33,7 +36,7 @@ clap-verbosity-flag = { version = "3.0.3", features = ["tracing"] } codespan-reporting = "0.12.0" colored = "3.0.0" convert_case = "0.8.0" -crankshaft = "0.4.0" +crankshaft = { path = "./crankshaft/crankshaft" } dirs = "6.0.0" faster-hex = "0.10.0" fs_extra = "1.3.0" diff --git a/crankshaft b/crankshaft new file mode 160000 index 000000000..ce9d63a87 --- /dev/null +++ b/crankshaft @@ -0,0 +1 @@ +Subproject commit ce9d63a876e71fe255dc9bd871c5b1d7746e236a diff --git a/wdl-engine/src/backend.rs b/wdl-engine/src/backend.rs index 5b4dadb5c..5e633989c 100644 --- a/wdl-engine/src/backend.rs +++ b/wdl-engine/src/backend.rs @@ -28,10 +28,12 @@ use crate::http::HttpDownloader; use crate::path::EvaluationPath; mod docker; +mod generic; mod local; mod tes; pub use docker::*; +pub use generic::*; pub use local::*; pub use tes::*; diff --git a/wdl-engine/src/backend/generic.rs b/wdl-engine/src/backend/generic.rs new file mode 100644 index 000000000..bb24938e6 --- /dev/null +++ b/wdl-engine/src/backend/generic.rs @@ -0,0 +1,450 @@ +use std::borrow::Cow; +use std::collections::HashMap; +use std::fs::File; +use std::fs::Permissions; +use std::fs::{self}; +use std::os::unix::fs::PermissionsExt; +use std::path::Path; +use std::sync::Arc; + +use anyhow::Context as _; +use anyhow::Result; +use anyhow::anyhow; +use anyhow::bail; +use futures::FutureExt as _; +use futures::future::BoxFuture; +use nonempty::NonEmpty; +use tempfile::TempDir; +use tokio::sync::oneshot; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::info; +use tracing::warn; + +use super::TaskExecutionBackend; +use super::TaskExecutionConstraints; +use super::TaskExecutionEvents; +use super::TaskManager; +use super::TaskManagerRequest; +use super::TaskSpawnRequest; +use crate::COMMAND_FILE_NAME; +use crate::Input; +use crate::ONE_GIBIBYTE; +use crate::ONE_MEGABYTE; +use crate::PrimitiveValue; +use crate::STDERR_FILE_NAME; +use crate::STDOUT_FILE_NAME; +use crate::SYSTEM; +use crate::TaskExecutionResult; +use crate::Value; +use crate::WORK_DIR_NAME; +use crate::config::Config; +use crate::config::GenericBackendConfig; +use crate::config::TaskResourceLimitBehavior; +use crate::convert_unit_string; +use crate::http::Downloader as _; +use crate::http::HttpDownloader; +use crate::http::Location; +use crate::path::EvaluationPath; + +/// Represents a generic task request. +#[derive(Debug)] +struct GenericTaskRequest { + /// The engine configuration. + config: Arc, + /// The backend configuration. + backend_config: Arc, + /// The inner task spawn request. + inner: TaskSpawnRequest, + /// The requested CPU reservation for the task. + cpu: f64, + /// The requested memory reservation for the task. + memory: u64, + /// The cancellation token for the request. + token: CancellationToken, +} + +impl TaskManagerRequest for GenericTaskRequest { + fn cpu(&self) -> f64 { + self.cpu + } + + fn memory(&self) -> u64 { + self.memory + } + + async fn run(self, spawned: oneshot::Sender<()>) -> Result { + // Create the working directory + let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME); + fs::create_dir_all(&work_dir).with_context(|| { + format!( + "failed to create directory `{path}`", + path = work_dir.display() + ) + })?; + + // Write the evaluated command to disk + let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME); + fs::write(&command_path, self.inner.command()).with_context(|| { + format!( + "failed to write command contents to `{path}`", + path = command_path.display() + ) + })?; + fs::set_permissions(&command_path, Permissions::from_mode(0o777))?; + + // Create an empty file for the stdout + let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME); + let _ = File::create(&stdout_path).with_context(|| { + format!( + "failed to create stdout file `{path}`", + path = stdout_path.display() + ) + })?; + + // Create an empty file for the stderr + let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME); + let _ = File::create(&stderr_path).with_context(|| { + format!( + "failed to create stderr file `{path}`", + path = stderr_path.display() + ) + })?; + + let temp_dir = TempDir::new()?; + let mut attributes = HashMap::new(); + // TODO ACF 2025-08-12: might want to bake a tmpdir in as a more fully-fledged + // concept eventually, but for now putting things in a separate location + // is required to avoid spurious test failures from the fixture checking + // every file in `cwd` for equivalence + attributes.insert( + Cow::Borrowed("temp_dir"), + Cow::Owned(temp_dir.path().display().to_string()), + ); + let task_exit_code = temp_dir.path().join("task_exit_code"); + attributes.insert( + Cow::Borrowed("task_exit_code"), + Cow::Owned(task_exit_code.display().to_string()), + ); + let container = crate::v1::container( + self.inner.requirements(), + self.config.task.container.as_deref(), + ) + .into_owned(); + attributes.insert(Cow::Borrowed("container"), container.into()); + let cpu = crate::v1::cpu(self.inner.requirements()).ceil() as u64; + attributes.insert(Cow::Borrowed("cpu"), cpu.to_string().into()); + let memory_mb = crate::v1::memory(self.inner.requirements())? as f64 / ONE_MEGABYTE; + attributes.insert(Cow::Borrowed("memory_mb"), memory_mb.to_string().into()); + // let crankshaft_generic_backend_driver = + // crankshaft::config::backend::generic::driver::Config::builder() + // .locale(crankshaft::config::backend::generic::driver::Locale::Local) + // .shell(crankshaft::config::backend::generic::driver::Shell::Bash) + // .build(); + // let crankshaft_generic_backend_config = + // crankshaft::config::backend::generic::Config::builder() + // .driver(crankshaft_generic_backend_driver) + // .submit( + // r#"((cd ~{cwd}; ~{command} > ~{stdout} 2> ~{stderr}; echo $? > + // ~{task_exit_code}) & echo $!)"# ) + // .job_id_regex(r#"(\d+)"#) + // .monitor("file -E ~{task_exit_code}") + // .get_exit_code("cat ~{task_exit_code}") + // .kill("kill ~{job_id}") + // .attributes(attributes) + // .build(); + + let mut crankshaft_generic_backend_config = self.backend_config.backend_config.clone(); + *crankshaft_generic_backend_config.attributes_mut() = attributes; + + const BACKEND_NAME: &'static str = "crankshaft_generic"; + let backend = crankshaft::Engine::default() + .with( + crankshaft::config::backend::Config::builder() + .name(BACKEND_NAME) + .max_tasks(10) + .kind(crankshaft::config::backend::Kind::Generic( + crankshaft_generic_backend_config, + )) + .build(), + ) + .await?; + + let generic_task = crankshaft::engine::Task::builder() + .executions(NonEmpty::new( + crankshaft::engine::task::Execution::builder() + .image("not_an_image") + .program(command_path.to_str().unwrap()) + .work_dir(work_dir.to_str().unwrap()) + .stdout(stdout_path.display().to_string()) + .stderr(stderr_path.display().to_string()) + .build(), + )) + .build(); + let handle = backend.spawn(BACKEND_NAME, generic_task, self.token.clone())?; + spawned.send(()).ok(); + + let res = handle.wait().await?; + + // Set the PATH variable for the child on Windows to get consistent PATH + // searching. See: https://github.com/rust-lang/rust/issues/122660 + #[cfg(windows)] + if let Ok(path) = std::env::var("PATH") { + command.env("PATH", path); + } + + return Ok(TaskExecutionResult { + inputs: self.inner.info.inputs, + exit_code: res + .last() + .code() + .ok_or(anyhow!("task did not return an exit code"))?, + work_dir: EvaluationPath::Local(work_dir), + stdout: PrimitiveValue::new_file( + stdout_path + .into_os_string() + .into_string() + .expect("path should be UTF-8"), + ) + .into(), + stderr: PrimitiveValue::new_file( + stderr_path + .into_os_string() + .into_string() + .expect("path should be UTF-8"), + ) + .into(), + }); + } +} + +/// Represents a task execution backend that uses Crankshaft's generic backend +/// to execute tasks. +pub struct GenericBackend { + /// The engine configuration. + config: Arc, + /// The backend configuration. + backend_config: Arc, + /// The total CPU of the host. + cpu: u64, + /// The total memory of the host. + memory: u64, + /// The underlying task manager. + manager: TaskManager, +} + +impl GenericBackend { + /// Constructs a new generic task execution backend with the given + /// configuration. + /// + /// The provided configuration is expected to have already been validated. + pub fn new(config: Arc, backend_config: &GenericBackendConfig) -> Result { + info!("initializing generic backend"); + + let cpu = backend_config + .cpu + .unwrap_or_else(|| SYSTEM.cpus().len() as u64); + let memory = backend_config + .memory + .as_ref() + .map(|s| convert_unit_string(s).expect("value should be valid")) + .unwrap_or_else(|| SYSTEM.total_memory()); + let manager = TaskManager::new(cpu, cpu, memory, memory); + + Ok(Self { + config, + // TODO ACF 2025-08-12: sort out this excess cloning nonsense + backend_config: Arc::new(backend_config.clone()), + cpu, + memory, + manager, + }) + } +} + +impl TaskExecutionBackend for GenericBackend { + fn max_concurrency(&self) -> u64 { + self.cpu + } + + fn constraints( + &self, + requirements: &HashMap, + _: &HashMap, + ) -> Result { + let mut cpu = crate::v1::cpu(requirements); + if (self.cpu as f64) < cpu { + match self.config.task.cpu_limit_behavior { + TaskResourceLimitBehavior::TryWithMax => { + warn!( + "task requires at least {cpu} CPU{s}, but the host only has {total_cpu} \ + available", + s = if cpu == 1.0 { "" } else { "s" }, + total_cpu = self.cpu, + ); + // clamp the reported constraint to what's available + cpu = self.cpu as f64; + } + TaskResourceLimitBehavior::Deny => { + bail!( + "task requires at least {cpu} CPU{s}, but the host only has {total_cpu} \ + available", + s = if cpu == 1.0 { "" } else { "s" }, + total_cpu = self.cpu, + ); + } + } + } + + let mut memory = crate::v1::memory(requirements)?; + if self.memory < memory as u64 { + match self.config.task.memory_limit_behavior { + TaskResourceLimitBehavior::TryWithMax => { + warn!( + "task requires at least {memory} GiB of memory, but the host only has \ + {total_memory} GiB available", + // Display the error in GiB, as it is the most common unit for memory + memory = memory as f64 / ONE_GIBIBYTE, + total_memory = self.memory as f64 / ONE_GIBIBYTE, + ); + // clamp the reported constraint to what's available + memory = self.memory.try_into().unwrap_or(i64::MAX); + } + TaskResourceLimitBehavior::Deny => { + bail!( + "task requires at least {memory} GiB of memory, but the host only has \ + {total_memory} GiB available", + // Display the error in GiB, as it is the most common unit for memory + memory = memory as f64 / ONE_GIBIBYTE, + total_memory = self.memory as f64 / ONE_GIBIBYTE, + ); + } + } + } + + Ok(TaskExecutionConstraints { + container: None, + cpu, + memory, + gpu: Default::default(), + fpga: Default::default(), + disks: Default::default(), + }) + } + + fn guest_work_dir(&self) -> Option<&Path> { + None + } + + fn localize_inputs<'a, 'b, 'c, 'd>( + &'a self, + downloader: &'b HttpDownloader, + inputs: &'c mut [Input], + ) -> BoxFuture<'d, Result<()>> + where + 'a: 'd, + 'b: 'd, + 'c: 'd, + Self: 'd, + { + async move { + let mut downloads = JoinSet::new(); + + for (idx, input) in inputs.iter_mut().enumerate() { + match input.path() { + EvaluationPath::Local(path) => { + let location = Location::Path(path.clone().into()); + let guest_path = location + .to_str() + .with_context(|| { + format!("path `{path}` is not UTF-8", path = path.display()) + })? + .to_string(); + input.set_location(location.into_owned()); + input.set_guest_path(guest_path); + } + EvaluationPath::Remote(url) => { + let downloader = downloader.clone(); + let url = url.clone(); + downloads.spawn(async move { + let location_result = downloader.download(&url).await; + + match location_result { + Ok(location) => Ok((idx, location.into_owned())), + Err(e) => bail!("failed to localize `{url}`: {e:?}"), + } + }); + } + } + } + + while let Some(result) = downloads.join_next().await { + match result { + Ok(Ok((idx, location))) => { + let guest_path = location + .to_str() + .with_context(|| { + format!( + "downloaded path `{path}` is not UTF-8", + path = location.display() + ) + })? + .to_string(); + + let input = inputs.get_mut(idx).expect("index should be valid"); + input.set_location(location); + input.set_guest_path(guest_path); + } + Ok(Err(e)) => { + // Futures are aborted when the `JoinSet` is dropped. + bail!(e); + } + Err(e) => { + // Futures are aborted when the `JoinSet` is dropped. + bail!("download task failed: {e}"); + } + } + } + + Ok(()) + } + .boxed() + } + + fn spawn( + &self, + request: TaskSpawnRequest, + token: CancellationToken, + ) -> Result { + let (spawned_tx, spawned_rx) = oneshot::channel(); + let (completed_tx, completed_rx) = oneshot::channel(); + + let requirements = request.requirements(); + let mut cpu = crate::v1::cpu(requirements); + if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior { + cpu = std::cmp::min(cpu.ceil() as u64, self.cpu) as f64; + } + let mut memory = crate::v1::memory(requirements)? as u64; + if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior { + memory = std::cmp::min(memory, self.memory); + } + + self.manager.send( + GenericTaskRequest { + config: self.config.clone(), + backend_config: self.backend_config.clone(), + inner: request, + cpu, + memory, + token, + }, + spawned_tx, + completed_tx, + ); + + Ok(TaskExecutionEvents { + spawned: spawned_rx, + completed: completed_rx, + }) + } +} diff --git a/wdl-engine/src/config.rs b/wdl-engine/src/config.rs index 5af487c51..dba13c05d 100644 --- a/wdl-engine/src/config.rs +++ b/wdl-engine/src/config.rs @@ -15,6 +15,7 @@ use tracing::warn; use url::Url; use crate::DockerBackend; +use crate::GenericBackend; use crate::LocalBackend; use crate::SYSTEM; use crate::TaskExecutionBackend; @@ -139,6 +140,9 @@ impl Config { BackendConfig::Tes(config) => { Ok(Arc::new(TesBackend::new(self.clone(), config).await?)) } + BackendConfig::Generic(config) => { + Ok(Arc::new(GenericBackend::new(self.clone(), config)?)) + } } } } @@ -426,6 +430,8 @@ pub enum BackendConfig { Docker(DockerBackendConfig), /// Use the TES task execution backend. Tes(Box), + /// Use the generic task execution backend. + Generic(GenericBackendConfig), } impl Default for BackendConfig { @@ -441,6 +447,7 @@ impl BackendConfig { Self::Local(config) => config.validate(), Self::Docker(config) => config.validate(), Self::Tes(config) => config.validate(), + Self::Generic(config) => config.validate(), } } @@ -541,6 +548,71 @@ impl LocalBackendConfig { } } +/// Represents configuration for the generic task execution backend. +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub struct GenericBackendConfig { + /// The Crankshaft generic backend config. + // TODO ACF 2025-08-25: this maybe should be the entire struct? + #[serde(default)] + pub backend_config: crankshaft::config::backend::generic::Config, + /// Set the number of CPUs available for task execution. + /// + /// Defaults to the number of logical CPUs for the host. + /// + /// The value cannot be zero or exceed the host's number of CPUs. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub cpu: Option, + + /// Set the total amount of memory for task execution as a unit string (e.g. + /// `2 GiB`). + /// + /// Defaults to the total amount of memory for the host. + /// + /// The value cannot be zero or exceed the host's total amount of memory. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub memory: Option, +} + +impl GenericBackendConfig { + /// Validates the local task execution backend configuration. + pub fn validate(&self) -> Result<()> { + if let Some(cpu) = self.cpu { + if cpu == 0 { + bail!("local backend configuration value `cpu` cannot be zero"); + } + + let total = SYSTEM.cpus().len() as u64; + if cpu > total { + bail!( + "local backend configuration value `cpu` cannot exceed the virtual CPUs \ + available to the host ({total})" + ); + } + } + + if let Some(memory) = &self.memory { + let memory = convert_unit_string(memory).with_context(|| { + format!("local backend configuration value `memory` has invalid value `{memory}`") + })?; + + if memory == 0 { + bail!("local backend configuration value `memory` cannot be zero"); + } + + let total = SYSTEM.total_memory(); + if memory > total { + bail!( + "local backend configuration value `memory` cannot exceed the total memory of \ + the host ({total} bytes)" + ); + } + } + + Ok(()) + } +} + /// Gets the default value for the docker `cleanup` field. const fn cleanup_default() -> bool { true diff --git a/wdl-engine/src/eval/v1/task.rs b/wdl-engine/src/eval/v1/task.rs index c390ebefe..409047cd6 100644 --- a/wdl-engine/src/eval/v1/task.rs +++ b/wdl-engine/src/eval/v1/task.rs @@ -963,6 +963,8 @@ impl TaskEvaluator { let result = events .completed .await + // TODO ACF 2025-08-01: this can reasonably hit if a backend error occurs and + // shouldn't just be a panic .expect("failed to receive response from spawned task"); progress(ProgressKind::TaskExecutionCompleted { diff --git a/wdl-engine/src/lib.rs b/wdl-engine/src/lib.rs index 58c69dd88..e20252685 100644 --- a/wdl-engine/src/lib.rs +++ b/wdl-engine/src/lib.rs @@ -38,6 +38,11 @@ use wdl_ast::TreeNode; /// This is defined as a constant as it's a commonly performed conversion. const ONE_GIBIBYTE: f64 = 1024.0 * 1024.0 * 1024.0; +/// One megabyte (MB) as a float. +/// +/// This is defined as a constant as it's a commonly performed conversion. +const ONE_MEGABYTE: f64 = 1000.0 * 1000.0; + /// Resolves a type name from a document. /// /// This function will import the type into the type cache if not already diff --git a/wdl-engine/tests/tasks.rs b/wdl-engine/tests/tasks.rs index a72687621..7e612b41b 100644 --- a/wdl-engine/tests/tasks.rs +++ b/wdl-engine/tests/tasks.rs @@ -43,6 +43,7 @@ use wdl_engine::EvaluatedTask; use wdl_engine::EvaluationError; use wdl_engine::Inputs; use wdl_engine::config::BackendConfig; +use wdl_engine::config::GenericBackendConfig; use wdl_engine::config::{self}; use wdl_engine::v1::TaskEvaluator; @@ -130,37 +131,43 @@ fn configs(path: &Path) -> Result, config::Config)>, anyh let config = toml::from_str(&std::fs::read_to_string(file.path())?)?; configs_on_disk.push((config_name, config)); } + let generic_backend_config = GenericBackendConfig { + backend_config: crankshaft::config::backend::generic::Config::builder() + .driver( + crankshaft::config::backend::generic::driver::Config::builder() + .locale(crankshaft::config::backend::generic::driver::Locale::Local) + .shell(crankshaft::config::backend::generic::driver::Shell::Bash) + .build(), + ) + .submit( + r#"( cd ~{cwd}; bsub -Ne -oo ~{stdout} -eo ~{stderr} ~{command} | sed -n -e 's/Job <\([[:digit:]]\+\)>.*/\1/p' )"#, + ) + .job_id_regex(r#"(\d+)"#) + .monitor(r#"(stat=$(bjobs -noheader -o "stat" ~{job_id}); [ $stat == "DONE" ] || [ $stat == "EXIT" ])"#) + .get_exit_code(r#"if [ $(bjobs -noheader -o "stat" ~{job_id}) == "DONE" ]; then echo 0; else bjobs -noheader -o "exit_code" ~{job_id}; fi"#) + .kill("bkill ~{job_id}") + .build(), + cpu: None, + memory: None, + }; + // std::fs::write( + // "/tmp/generic_config.toml", + // toml::to_string_pretty(&generic_backend_config).unwrap(), + // ) + // .unwrap(); if !configs_on_disk.is_empty() || any_config_toml_found { Ok(configs_on_disk) } else { - Ok(vec![ - ("local".into(), { - config::Config { - backends: [( - "default".to_string(), - BackendConfig::Local(Default::default()), - )] - .into(), - suppress_env_specific_output: true, - ..Default::default() - } - }), - // Currently we limit running the Docker backend to Linux as GitHub does not have - // Docker installed on macOS hosted runners and the Windows hosted runners - // are configured to use Windows containers - #[cfg(target_os = "linux")] - ("docker".into(), { - config::Config { - backends: [( - "default".to_string(), - BackendConfig::Docker(Default::default()), - )] - .into(), - suppress_env_specific_output: true, - ..Default::default() - } - }), - ]) + Ok(vec![("generic_lsf".into(), { + config::Config { + backends: [( + "default".to_string(), + BackendConfig::Generic(generic_backend_config), + )] + .into(), + ..Default::default() + } + })]) } } @@ -296,7 +303,9 @@ async fn run_test(test: &Path, config: config::Config) -> Result<()> { inputs.join_paths(task, |_| Ok(&test_dir))?; let evaluator = TaskEvaluator::new(config, CancellationToken::new()).await?; - let dir = TempDir::new().context("failed to create temporary directory")?; + let mut dir = TempDir::new_in("/home/afoltzer/faketmp") + .context("failed to create temporary directory")?; + // dir.disable_cleanup(true); match evaluator .evaluate(result.document(), task, &inputs, dir.path(), |_| async {}) .await diff --git a/wdl-engine/tests/tasks/task-with-comments/config.toml b/wdl-engine/tests/tasks/task-with-comments/config.toml new file mode 100644 index 000000000..7f8100c92 --- /dev/null +++ b/wdl-engine/tests/tasks/task-with-comments/config.toml @@ -0,0 +1,20 @@ +backends.default.type = "generic" +backends.default.backend_config.locale.kind = "Local" +backends.default.backend_config.shell = "bash" +backends.default.backend_config.submit = """\ +( \ +cd ~{cwd}; \ +bsub -Ne -oo ~{stdout} -eo ~{stderr} 'apptainer -q exec docker://~{container} ~{command}' | \ +sed -n -e 's/Job <\\([[:digit:]]\\+\\)>.*/\\1/p' \ +) +""" +backends.default.backend_config.job-id-regex = '(\d+)' +backends.default.backend_config.monitor = "(stat=$(bjobs -noheader -o \"stat\" ~{job_id}); [ $stat == \"DONE\" ] || [ $stat == \"EXIT\" ])" +backends.default.backend_config.get-exit-code = """\ +if [ $(bjobs -noheader -o \"stat\" ~{job_id}) == \"DONE\" ]; then \ +echo 0; \ +else \ +bjobs -noheader -o \"exit_code\" ~{job_id}; +fi +""" +backends.default.backend_config.kill = "bkill ~{job_id}"