@@ -7,7 +7,6 @@ use std::{
77} ;
88
99use anyhow:: anyhow;
10- use futures:: lock:: Mutex ;
1110use itertools:: Itertools ;
1211use regex:: Regex ;
1312use serde_json:: value:: RawValue ;
@@ -36,8 +35,6 @@ use windmill_common::variables::get_secret_value_as_admin;
3635use windmill_queue:: { append_logs, CanceledBy } ;
3736
3837lazy_static:: lazy_static! {
39- static ref BUSY_WITH_UV_INSTALL : Mutex <( ) > = Mutex :: new( ( ) ) ;
40-
4138 static ref PYTHON_PATH : String =
4239 std:: env:: var( "PYTHON_PATH" ) . unwrap_or_else( |_| "/usr/local/bin/python3" . to_string( ) ) ;
4340
@@ -1248,6 +1245,8 @@ async fn spawn_uv_install(
12481245 "--target" ,
12491246 venv_p,
12501247 "--no-cache" ,
1248+ // If we invoke uv pip install, then we want to overwrite existing data
1249+ "--reinstall" ,
12511250 ]
12521251 } ;
12531252
@@ -1356,7 +1355,6 @@ pub async fn handle_python_reqs(
13561355 mut no_uv_install : bool ,
13571356 is_ansible : bool ,
13581357) -> error:: Result < Vec < String > > {
1359- let lock = BUSY_WITH_UV_INSTALL . lock ( ) . await ;
13601358 let counter_arc = Arc :: new ( tokio:: sync:: Mutex :: new ( 0 ) ) ;
13611359 // Append logs with line like this:
13621360 // [9/21] + requests==2.32.3 << (S3) | in 57ms
@@ -1486,10 +1484,11 @@ pub async fn handle_python_reqs(
14861484 "{py_prefix}/{}" ,
14871485 req. replace( ' ' , "" ) . replace( '/' , "" ) . replace( ':' , "" )
14881486 ) ;
1489- if metadata ( & venv_p) . await . is_ok ( ) {
1487+ if metadata ( venv_p. clone ( ) + "/.valid.windmill" ) . await . is_ok ( ) {
14901488 req_paths. push ( venv_p) ;
14911489 in_cache. push ( req. to_string ( ) ) ;
14921490 } else {
1491+ // There is no valid or no wheel at all. Regardless of if there is content or not, we will overwrite it with --reinstall flag
14931492 req_with_penv. push ( ( req. to_string ( ) , venv_p) ) ;
14941493 }
14951494 }
@@ -1520,12 +1519,6 @@ pub async fn handle_python_reqs(
15201519 let pids = Arc :: new ( tokio:: sync:: Mutex :: new ( vec ! [ None ; total_to_install] ) ) ;
15211520 let mem_peak_thread_safe = Arc :: new ( tokio:: sync:: Mutex :: new ( 0 ) ) ;
15221521 {
1523- // when we cancel the job, it has up to 1 second window before actually getting cancelled
1524- // Thus the directory with wheel in windmill's cache cleaned only after that.
1525- // If we manage to start new job during that period windmill might see that wanted wheel is already there (because we have not cleaned it yet)
1526- // and write it to installed wheels, meanwhile previous job will clean that wheel.
1527- // To fix that we create lock, which will pipeline all uv installs on worker
1528- let _lock = lock;
15291522 let pids = pids. clone ( ) ;
15301523 let mem_peak_thread_safe = mem_peak_thread_safe. clone ( ) ;
15311524 tokio:: spawn ( async move {
@@ -1866,6 +1859,17 @@ pub async fn handle_python_reqs(
18661859 ) ;
18671860
18681861 pids. lock ( ) . await . get_mut ( i) . and_then ( |e| e. take ( ) ) ;
1862+ // Create a file to indicate that installation was successfull
1863+ let valid_path = venv_p. clone ( ) + "/.valid.windmill" ;
1864+ // This is atomic operation, meaning, that it either completes and wheel is valid,
1865+ // or it does not and wheel is invalid and will be reinstalled next run
1866+ if let Err ( e) = File :: create ( & valid_path) . await {
1867+ tracing:: error!(
1868+ workspace_id = %w_id,
1869+ job_id = %job_id,
1870+ "Failed to create {}!\n {e}\n
1871+ This file needed for python jobs to function" , valid_path)
1872+ } ;
18691873 Ok ( ( ) )
18701874 } ) ) ;
18711875 }
@@ -1882,13 +1886,6 @@ pub async fn handle_python_reqs(
18821886 "Env installation failed: {:?}" ,
18831887 e
18841888 ) ;
1885- if let Err ( e) = fs:: remove_dir_all ( & venv_p) {
1886- tracing:: warn!(
1887- workspace_id = %w_id,
1888- "Failed to remove cache dir: {:?}" ,
1889- e
1890- ) ;
1891- }
18921889 } else {
18931890 req_paths. push ( venv_p) ;
18941891 }
0 commit comments