diff --git a/backend/windmill-worker/src/ansible_executor.rs b/backend/windmill-worker/src/ansible_executor.rs index 3da3fa619d8da..5795c1ccd7d37 100644 --- a/backend/windmill-worker/src/ansible_executor.rs +++ b/backend/windmill-worker/src/ansible_executor.rs @@ -410,6 +410,7 @@ async fn handle_ansible_python_deps( worker_dir, &mut Some(occupancy_metrics), PyVAlias::default().into(), + None, ) .await?; additional_python_paths.append(&mut venv_path); diff --git a/backend/windmill-worker/src/python_executor.rs b/backend/windmill-worker/src/python_executor.rs index 3f64641813033..a2919fcfce3bf 100644 --- a/backend/windmill-worker/src/python_executor.rs +++ b/backend/windmill-worker/src/python_executor.rs @@ -7,6 +7,9 @@ use std::{ sync::Arc, }; +#[cfg(unix)] +use std::os::unix::process::ExitStatusExt; + use anyhow::anyhow; use itertools::Itertools; use regex::Regex; @@ -1299,6 +1302,7 @@ Returned from server: py_version - {:?}, py_version_v2 - {:?} worker_dir, occupancy_metrics, pyv.clone(), + None, ) .await?; additional_python_paths.append(&mut venv_path); @@ -1532,6 +1536,7 @@ pub async fn handle_python_reqs( worker_dir: &str, _occupancy_metrics: &mut Option<&mut OccupancyMetrics>, py_version: PyV, + reduced_concurrent_downloads: Option, ) -> error::Result> { let worker_dir = worker_dir.to_string(); @@ -1585,8 +1590,10 @@ pub async fn handle_python_reqs( } // Parallelism level (N) - let parallel_limit = // Semaphore will panic if value less then 1 - PY_CONCURRENT_DOWNLOADS.clamp(1, 30); + let parallel_limit = reduced_concurrent_downloads + .unwrap_or(*PY_CONCURRENT_DOWNLOADS) + .clamp(1, 30); + // Semaphore will panic if value less then 1 tracing::info!( workspace_id = %w_id, @@ -1616,7 +1623,7 @@ pub async fn handle_python_reqs( // Find out if there is already cached dependencies // If so, skip them let mut in_cache = vec![]; - for req in requirements { + for req in &requirements { // Ignore python version annotation backed into lockfile if req.starts_with('#') || req.starts_with('-') || req.trim().is_empty() { continue; @@ -1650,8 +1657,8 @@ pub async fn handle_python_reqs( .map(|_| kill_tx.subscribe()) .collect(); - // ________ Read comments at the end of the function to get more context - let (_done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1); + // _______ Read comments at the end of the function to get more context + let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1); let job_id_2 = job_id.clone(); let conn_2 = conn.clone(); @@ -1739,7 +1746,6 @@ pub async fn handle_python_reqs( }; if canceled { - tracing::info!( // If there is listener on other side, workspace_id = %w_id_2, @@ -1871,7 +1877,7 @@ pub async fn handle_python_reqs( if let Some(os) = windmill_common::s3_helpers::get_object_store().await { tokio::select! { // Cancel was called on the job - _ = kill_rx.recv() => return Err(anyhow::anyhow!("S3 pull was canceled")), + _ = kill_rx.recv() => return Err(Error::from(anyhow::anyhow!("S3 pull was canceled"))), pull = pull_from_tar(os, venv_p.clone(), py_version.to_cache_dir_top_level(false), None, false) => { if let Err(e) = pull { tracing::info!( @@ -1932,7 +1938,7 @@ pub async fn handle_python_reqs( ) .await; pids.lock().await.get_mut(i).and_then(|e| e.take()); - return Err(e.into()); + return Err(Error::from(e)); } }; @@ -1954,6 +1960,20 @@ pub async fn handle_python_reqs( if let Some(pid) = pids.lock().await.get_mut(i) { *pid = uv_install_proccess.id(); + #[cfg(unix)] + if let Err(e) = uv_install_proccess + .id() + .ok_or(Error::InternalErr(format!( + "failed to get PID for python installation process: {}", + &req + ))) + .and_then(|pid| write_file(&format!("/proc/{pid}"), "oom_score_adj", "1000")) + { + tracing::error!( + req = %req, + "Failed to create oom_score_adj for python dependency installation process: {e}" + ); + } } else { tracing::error!( workspace_id = %w_id, @@ -1966,22 +1986,25 @@ pub async fn handle_python_reqs( _ = kill_rx.recv() => { Box::into_pin(uv_install_proccess.kill()).await?; pids.lock().await.get_mut(i).and_then(|e| e.take()); - return Err(anyhow::anyhow!("uv pip install was canceled")); + return Err(Error::from(anyhow::anyhow!("uv pip install was canceled"))); }, (_, _, exitstatus) = async { // See tokio::process::Child::wait_with_output() for more context - // Sometimes uv_install_proccess.wait() is not exiting if stderr is not awaited before it :/ + // Sometimes uv_install_proccess.wait() is not exiting if stderr is not awaited first (stderr_future.await, stdout_future.await, Box::into_pin(uv_install_proccess.wait()).await) } => match exitstatus { Ok(status) => if !status.success() { + #[cfg(unix)] + let code = status.signal(); + #[cfg(not(unix))] let code = status.code(); + tracing::warn!( workspace_id = %w_id, "uv install {} did not succeed, exit status: {:?}", &req, code ); - append_logs( &job_id, w_id, @@ -1994,7 +2017,7 @@ pub async fn handle_python_reqs( ) .await; pids.lock().await.get_mut(i).and_then(|e| e.take()); - return Err(anyhow!(stderr_buf)); + return Err(Error::ExitStatus(stderr_buf, code.unwrap_or(1))); }, Err(e) => { tracing::error!( @@ -2002,7 +2025,7 @@ pub async fn handle_python_reqs( "Cannot wait for uv_install_proccess, ExitStatus is Err: {e:?}", ); pids.lock().await.get_mut(i).and_then(|e| e.take()); - return Err(e.into()); + return Err(Error::from(e)); } } }; @@ -2070,13 +2093,19 @@ pub async fn handle_python_reqs( })); } - let mut failed = false; + let (mut failed, mut oom_killed) = (false, false); for (handle, (_, venv_p)) in handles.into_iter().zip(req_with_penv.into_iter()) { if let Err(e) = handle .await - .unwrap_or(Err(anyhow!("Problem by joining handle"))) + .unwrap_or(Err(Error::from(anyhow!("Problem by joining handle")))) { failed = true; + + // OOM code is 9 or 137 + if matches!(e, Error::ExitStatus(_, 9 | 137)) { + oom_killed = true; + } + append_logs( &job_id, w_id, @@ -2112,7 +2141,51 @@ pub async fn handle_python_reqs( // it will be triggered // If there is no listener, it will be dropped safely return if failed { - Err(anyhow!("Env installation did not succeed, check logs").into()) + if cfg!(unix) && oom_killed && parallel_limit > 1 { + // We want to drop it and stop monitor + // new invocation will create another one + drop(done_tx); + + let reduced_limit = parallel_limit / 2; + + append_logs( + &job_id, + w_id, + format!( + "\n + ====================== + ===== IMPORTANT! ===== + ====================== + +Some of installations have been killed by OOM, +restarting with reduced concurrency: {parallel_limit} -> {reduced_limit} + +This is not normal behavior, please make sure all workers have enough memory.\n +" + ), + conn, + ) + .await; + + // restart with half of concurrency + Box::pin(handle_python_reqs( + requirements, + job_id, + w_id, + mem_peak, + _canceled_by, + conn, + _worker_name, + job_dir, + &worker_dir, + _occupancy_metrics, + py_version, + Some(reduced_limit), + )) + .await + } else { + Err(anyhow!("Env installation did not succeed, check logs").into()) + } } else { Ok(req_paths) }; diff --git a/backend/windmill-worker/src/ruby_executor.rs b/backend/windmill-worker/src/ruby_executor.rs index 0039e6e8c26c1..5b12aa51898de 100644 --- a/backend/windmill-worker/src/ruby_executor.rs +++ b/backend/windmill-worker/src/ruby_executor.rs @@ -28,8 +28,7 @@ use crate::{ }, handle_child::{self}, universal_pkg_installer::{par_install_language_dependencies_seq, RequiredDependency}, - DISABLE_NSJAIL, DISABLE_NUSER, NSJAIL_PATH, PATH_ENV, PROXY_ENVS, - RUBY_CACHE_DIR, RUBY_REPOS, + DISABLE_NSJAIL, DISABLE_NUSER, NSJAIL_PATH, PATH_ENV, PROXY_ENVS, RUBY_CACHE_DIR, RUBY_REPOS, }; lazy_static::lazy_static! { static ref RUBY_CONCURRENT_DOWNLOADS: usize = std::env::var("RUBY_CONCURRENT_DOWNLOADS").ok().map(|flag| flag.parse().unwrap_or(20)).unwrap_or(20); diff --git a/backend/windmill-worker/src/worker_lockfiles.rs b/backend/windmill-worker/src/worker_lockfiles.rs index b7b11cf6af7c2..23768c07efce8 100644 --- a/backend/windmill-worker/src/worker_lockfiles.rs +++ b/backend/windmill-worker/src/worker_lockfiles.rs @@ -2447,6 +2447,7 @@ async fn python_dep( occupancy_metrics, // final_version, PyVAlias::default().into(), + None, ) .await; diff --git a/integration_tests/test/wmill_integration_test_utils.py b/integration_tests/test/wmill_integration_test_utils.py index 04c132db59c46..260986ef9b43d 100644 --- a/integration_tests/test/wmill_integration_test_utils.py +++ b/integration_tests/test/wmill_integration_test_utils.py @@ -56,7 +56,7 @@ def _init_client(self): return httpx.Client( base_url=self._url, headers=headers, - timeout=10.0, + timeout=60.0, # Go/Rust compilation can take 10+ seconds on first run ) def _set_license_key(self):