Skip to content
1 change: 1 addition & 0 deletions backend/windmill-worker/src/ansible_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ async fn handle_ansible_python_deps(
worker_dir,
&mut Some(occupancy_metrics),
PyVAlias::default().into(),
None,
)
.await?;
additional_python_paths.append(&mut venv_path);
Expand Down
105 changes: 89 additions & 16 deletions backend/windmill-worker/src/python_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use std::{
sync::Arc,
};

#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;

use anyhow::anyhow;
use itertools::Itertools;
use regex::Regex;
Expand Down Expand Up @@ -1299,6 +1302,7 @@ Returned from server: py_version - {:?}, py_version_v2 - {:?}
worker_dir,
occupancy_metrics,
pyv.clone(),
None,
)
.await?;
additional_python_paths.append(&mut venv_path);
Expand Down Expand Up @@ -1532,6 +1536,7 @@ pub async fn handle_python_reqs(
worker_dir: &str,
_occupancy_metrics: &mut Option<&mut OccupancyMetrics>,
py_version: PyV,
reduced_concurrent_downloads: Option<usize>,
) -> error::Result<Vec<String>> {
let worker_dir = worker_dir.to_string();

Expand Down Expand Up @@ -1585,8 +1590,10 @@ pub async fn handle_python_reqs(
}

// Parallelism level (N)
let parallel_limit = // Semaphore will panic if value less then 1
PY_CONCURRENT_DOWNLOADS.clamp(1, 30);
let parallel_limit = reduced_concurrent_downloads
.unwrap_or(*PY_CONCURRENT_DOWNLOADS)
.clamp(1, 30);
// Semaphore will panic if value less then 1

tracing::info!(
workspace_id = %w_id,
Expand Down Expand Up @@ -1616,7 +1623,7 @@ pub async fn handle_python_reqs(
// Find out if there is already cached dependencies
// If so, skip them
let mut in_cache = vec![];
for req in requirements {
for req in &requirements {
// Ignore python version annotation backed into lockfile
if req.starts_with('#') || req.starts_with('-') || req.trim().is_empty() {
continue;
Expand Down Expand Up @@ -1650,8 +1657,8 @@ pub async fn handle_python_reqs(
.map(|_| kill_tx.subscribe())
.collect();

// ________ Read comments at the end of the function to get more context
let (_done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
// _______ Read comments at the end of the function to get more context
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);

let job_id_2 = job_id.clone();
let conn_2 = conn.clone();
Expand Down Expand Up @@ -1739,7 +1746,6 @@ pub async fn handle_python_reqs(
};

if canceled {

tracing::info!(
// If there is listener on other side,
workspace_id = %w_id_2,
Expand Down Expand Up @@ -1871,7 +1877,7 @@ pub async fn handle_python_reqs(
if let Some(os) = windmill_common::s3_helpers::get_object_store().await {
tokio::select! {
// Cancel was called on the job
_ = kill_rx.recv() => return Err(anyhow::anyhow!("S3 pull was canceled")),
_ = kill_rx.recv() => return Err(Error::from(anyhow::anyhow!("S3 pull was canceled"))),
pull = pull_from_tar(os, venv_p.clone(), py_version.to_cache_dir_top_level(false), None, false) => {
if let Err(e) = pull {
tracing::info!(
Expand Down Expand Up @@ -1932,7 +1938,7 @@ pub async fn handle_python_reqs(
)
.await;
pids.lock().await.get_mut(i).and_then(|e| e.take());
return Err(e.into());
return Err(Error::from(e));
}
};

Expand All @@ -1954,6 +1960,20 @@ pub async fn handle_python_reqs(

if let Some(pid) = pids.lock().await.get_mut(i) {
*pid = uv_install_proccess.id();
#[cfg(unix)]
if let Err(e) = uv_install_proccess
.id()
.ok_or(Error::InternalErr(format!(
"failed to get PID for python installation process: {}",
&req
)))
.and_then(|pid| write_file(&format!("/proc/{pid}"), "oom_score_adj", "1000"))
{
tracing::error!(
req = %req,
"Failed to create oom_score_adj for python dependency installation process: {e}"
);
}
} else {
tracing::error!(
workspace_id = %w_id,
Expand All @@ -1966,22 +1986,25 @@ pub async fn handle_python_reqs(
_ = kill_rx.recv() => {
Box::into_pin(uv_install_proccess.kill()).await?;
pids.lock().await.get_mut(i).and_then(|e| e.take());
return Err(anyhow::anyhow!("uv pip install was canceled"));
return Err(Error::from(anyhow::anyhow!("uv pip install was canceled")));
},
(_, _, exitstatus) = async {
// See tokio::process::Child::wait_with_output() for more context
// Sometimes uv_install_proccess.wait() is not exiting if stderr is not awaited before it :/
// Sometimes uv_install_proccess.wait() is not exiting if stderr is not awaited first
(stderr_future.await, stdout_future.await, Box::into_pin(uv_install_proccess.wait()).await)
} => match exitstatus {
Ok(status) => if !status.success() {
#[cfg(unix)]
let code = status.signal();
#[cfg(not(unix))]
let code = status.code();

tracing::warn!(
workspace_id = %w_id,
"uv install {} did not succeed, exit status: {:?}",
&req,
code
);

append_logs(
&job_id,
w_id,
Expand All @@ -1994,15 +2017,15 @@ pub async fn handle_python_reqs(
)
.await;
pids.lock().await.get_mut(i).and_then(|e| e.take());
return Err(anyhow!(stderr_buf));
return Err(Error::ExitStatus(stderr_buf, code.unwrap_or(1)));
},
Err(e) => {
tracing::error!(
workspace_id = %w_id,
"Cannot wait for uv_install_proccess, ExitStatus is Err: {e:?}",
);
pids.lock().await.get_mut(i).and_then(|e| e.take());
return Err(e.into());
return Err(Error::from(e));
}
}
};
Expand Down Expand Up @@ -2070,13 +2093,19 @@ pub async fn handle_python_reqs(
}));
}

let mut failed = false;
let (mut failed, mut oom_killed) = (false, false);
for (handle, (_, venv_p)) in handles.into_iter().zip(req_with_penv.into_iter()) {
if let Err(e) = handle
.await
.unwrap_or(Err(anyhow!("Problem by joining handle")))
.unwrap_or(Err(Error::from(anyhow!("Problem by joining handle"))))
{
failed = true;

// OOM code is 9 or 137
if matches!(e, Error::ExitStatus(_, 9 | 137)) {
oom_killed = true;
}

append_logs(
&job_id,
w_id,
Expand Down Expand Up @@ -2112,7 +2141,51 @@ pub async fn handle_python_reqs(
// it will be triggered
// If there is no listener, it will be dropped safely
return if failed {
Err(anyhow!("Env installation did not succeed, check logs").into())
if cfg!(unix) && oom_killed && parallel_limit > 1 {
// We want to drop it and stop monitor
// new invocation will create another one
drop(done_tx);

let reduced_limit = parallel_limit / 2;

append_logs(
&job_id,
w_id,
format!(
"\n
======================
===== IMPORTANT! =====
======================

Some of installations have been killed by OOM,
restarting with reduced concurrency: {parallel_limit} -> {reduced_limit}

This is not normal behavior, please make sure all workers have enough memory.\n
"
),
conn,
)
.await;

// restart with half of concurrency
Box::pin(handle_python_reqs(
requirements,
job_id,
w_id,
mem_peak,
_canceled_by,
conn,
_worker_name,
job_dir,
&worker_dir,
_occupancy_metrics,
py_version,
Some(reduced_limit),
))
.await
} else {
Err(anyhow!("Env installation did not succeed, check logs").into())
}
} else {
Ok(req_paths)
};
Expand Down
3 changes: 1 addition & 2 deletions backend/windmill-worker/src/ruby_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use crate::{
},
handle_child::{self},
universal_pkg_installer::{par_install_language_dependencies_seq, RequiredDependency},
DISABLE_NSJAIL, DISABLE_NUSER, NSJAIL_PATH, PATH_ENV, PROXY_ENVS,
RUBY_CACHE_DIR, RUBY_REPOS,
DISABLE_NSJAIL, DISABLE_NUSER, NSJAIL_PATH, PATH_ENV, PROXY_ENVS, RUBY_CACHE_DIR, RUBY_REPOS,
};
lazy_static::lazy_static! {
static ref RUBY_CONCURRENT_DOWNLOADS: usize = std::env::var("RUBY_CONCURRENT_DOWNLOADS").ok().map(|flag| flag.parse().unwrap_or(20)).unwrap_or(20);
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-worker/src/worker_lockfiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2447,6 +2447,7 @@ async fn python_dep(
occupancy_metrics,
// final_version,
PyVAlias::default().into(),
None,
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/test/wmill_integration_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _init_client(self):
return httpx.Client(
base_url=self._url,
headers=headers,
timeout=10.0,
timeout=60.0, # Go/Rust compilation can take 10+ seconds on first run
)

def _set_license_key(self):
Expand Down
Loading