Skip to content

Commit 00aa2ab

Browse files
authored
Make uv cache clean parallel process safe (#15888)
Currently, `uv cache clean` and `uv cache prune` can cause crashes in other uv processes running in parallel by removing their in-use files. We can solve this by using a shared (read) lock on the cache directory, while the `uv cache` operations use an exclusive (write) lock. The drawback is that this is always one extra lock, and that we assume that all platforms support shared locks. Once Rust 1.89 fulfills our N-2 policy, we can add support for these methods in fs_err and switch to https://doc.rust-lang.org/std/fs/struct.File.html#platform-specific-behavior-2. **Test Plan** Open one terminal, run: ``` uv venv -c -p 3.13 UV_CACHE_DIR=cache uv cache clean UV_CACHE_DIR=cache uv pip install numpy==2.0.0 ``` Open another terminal, run: ``` UV_CACHE_DIR=cache uv cache clean ``` Fixes #15704 Part of #13883
1 parent 0889d53 commit 00aa2ab

File tree

11 files changed

+236
-51
lines changed

11 files changed

+236
-51
lines changed

crates/uv-cache/src/lib.rs

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ use std::str::FromStr;
77
use std::sync::Arc;
88

99
use rustc_hash::FxHashMap;
10-
use tracing::debug;
10+
use tracing::{debug, warn};
1111

12-
pub use archive::ArchiveId;
1312
use uv_cache_info::Timestamp;
14-
use uv_fs::{LockedFile, cachedir, directories};
13+
use uv_fs::{LockedFile, Simplified, cachedir, directories};
1514
use uv_normalize::PackageName;
1615
use uv_pypi_types::ResolutionMetadata;
1716

@@ -22,6 +21,7 @@ use crate::removal::Remover;
2221
pub use crate::removal::{Removal, rm_rf};
2322
pub use crate::wheel::WheelCache;
2423
use crate::wheel::WheelCacheKind;
24+
pub use archive::ArchiveId;
2525

2626
mod archive;
2727
mod by_timestamp;
@@ -135,6 +135,8 @@ impl Deref for CacheShard {
135135
}
136136

137137
/// The main cache abstraction.
138+
///
139+
/// While the cache is active, it holds a read (shared) lock that prevents cache cleaning
138140
#[derive(Debug, Clone)]
139141
pub struct Cache {
140142
/// The cache directory.
@@ -146,6 +148,9 @@ pub struct Cache {
146148
/// Included to ensure that the temporary directory exists for the length of the operation, but
147149
/// is dropped at the end as appropriate.
148150
temp_dir: Option<Arc<tempfile::TempDir>>,
151+
/// Ensure that `uv cache` operations don't remove items from the cache that are used by another
152+
/// uv process.
153+
lock_file: Option<Arc<LockedFile>>,
149154
}
150155

151156
impl Cache {
@@ -155,6 +160,7 @@ impl Cache {
155160
root: root.into(),
156161
refresh: Refresh::None(Timestamp::now()),
157162
temp_dir: None,
163+
lock_file: None,
158164
}
159165
}
160166

@@ -165,6 +171,7 @@ impl Cache {
165171
root: temp_dir.path().to_path_buf(),
166172
refresh: Refresh::None(Timestamp::now()),
167173
temp_dir: Some(Arc::new(temp_dir)),
174+
lock_file: None,
168175
})
169176
}
170177

@@ -174,6 +181,34 @@ impl Cache {
174181
Self { refresh, ..self }
175182
}
176183

184+
/// Acquire a lock that allows removing entries from the cache.
185+
pub fn with_exclusive_lock(self) -> Result<Self, io::Error> {
186+
let Self {
187+
root,
188+
refresh,
189+
temp_dir,
190+
lock_file,
191+
} = self;
192+
193+
// Release the existing lock, avoid deadlocks from a cloned cache.
194+
if let Some(lock_file) = lock_file {
195+
drop(
196+
Arc::try_unwrap(lock_file).expect(
197+
"cloning the cache before acquiring an exclusive lock causes a deadlock",
198+
),
199+
);
200+
}
201+
let lock_file =
202+
LockedFile::acquire_blocking(root.join(".lock"), root.simplified_display())?;
203+
204+
Ok(Self {
205+
root,
206+
refresh,
207+
temp_dir,
208+
lock_file: Some(Arc::new(lock_file)),
209+
})
210+
}
211+
177212
/// Return the root of the cache.
178213
pub fn root(&self) -> &Path {
179214
&self.root
@@ -359,15 +394,43 @@ impl Cache {
359394
.join(".git"),
360395
)?;
361396

397+
// Block cache removal operations from interfering.
398+
let lock_file = match LockedFile::acquire_shared_blocking(
399+
root.join(".lock"),
400+
root.simplified_display(),
401+
) {
402+
Ok(lock_file) => Some(Arc::new(lock_file)),
403+
Err(err) if err.kind() == io::ErrorKind::Unsupported => {
404+
warn!(
405+
"Shared locking is not supported by the current platform or filesystem, \
406+
reduced parallel process safety with `uv cache clean` and `uv cache prune`."
407+
);
408+
None
409+
}
410+
Err(err) => return Err(err),
411+
};
412+
362413
Ok(Self {
363414
root: std::path::absolute(root)?,
415+
lock_file,
364416
..self
365417
})
366418
}
367419

368420
/// Clear the cache, removing all entries.
369-
pub fn clear(&self, reporter: Box<dyn CleanReporter>) -> Result<Removal, io::Error> {
370-
Remover::new(reporter).rm_rf(&self.root)
421+
pub fn clear(self, reporter: Box<dyn CleanReporter>) -> Result<Removal, io::Error> {
422+
// Remove everything but `.lock`, for Windows locked file special cases.
423+
let mut removal = Remover::new(reporter).rm_rf(&self.root, true)?;
424+
let Self {
425+
root, lock_file, ..
426+
} = self;
427+
// Unlock `.lock`
428+
drop(lock_file);
429+
fs_err::remove_file(root.join(".lock"))?;
430+
removal.num_files += 1;
431+
fs_err::remove_dir(root)?;
432+
removal.num_dirs += 1;
433+
Ok(removal)
371434
}
372435

373436
/// Remove a package from the cache.
@@ -407,6 +470,7 @@ impl Cache {
407470
if entry.file_name() == "CACHEDIR.TAG"
408471
|| entry.file_name() == ".gitignore"
409472
|| entry.file_name() == ".git"
473+
|| entry.file_name() == ".lock"
410474
{
411475
continue;
412476
}

crates/uv-cache/src/removal.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::CleanReporter;
1010
/// Remove a file or directory and all its contents, returning a [`Removal`] with
1111
/// the number of files and directories removed, along with a total byte count.
1212
pub fn rm_rf(path: impl AsRef<Path>) -> io::Result<Removal> {
13-
Remover::default().rm_rf(path)
13+
Remover::default().rm_rf(path, false)
1414
}
1515

1616
/// A builder for a [`Remover`] that can remove files and directories.
@@ -29,9 +29,13 @@ impl Remover {
2929

3030
/// Remove a file or directory and all its contents, returning a [`Removal`] with
3131
/// the number of files and directories removed, along with a total byte count.
32-
pub(crate) fn rm_rf(&self, path: impl AsRef<Path>) -> io::Result<Removal> {
32+
pub(crate) fn rm_rf(
33+
&self,
34+
path: impl AsRef<Path>,
35+
skip_locked_file: bool,
36+
) -> io::Result<Removal> {
3337
let mut removal = Removal::default();
34-
removal.rm_rf(path.as_ref(), self.reporter.as_deref())?;
38+
removal.rm_rf(path.as_ref(), self.reporter.as_deref(), skip_locked_file)?;
3539
Ok(removal)
3640
}
3741
}
@@ -52,7 +56,12 @@ pub struct Removal {
5256

5357
impl Removal {
5458
/// Recursively remove a file or directory and all its contents.
55-
fn rm_rf(&mut self, path: &Path, reporter: Option<&dyn CleanReporter>) -> io::Result<()> {
59+
fn rm_rf(
60+
&mut self,
61+
path: &Path,
62+
reporter: Option<&dyn CleanReporter>,
63+
skip_locked_file: bool,
64+
) -> io::Result<()> {
5665
let metadata = match fs_err::symlink_metadata(path) {
5766
Ok(metadata) => metadata,
5867
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()),
@@ -100,13 +109,25 @@ impl Removal {
100109
if set_readable(dir).unwrap_or(false) {
101110
// Retry the operation; if we _just_ `self.rm_rf(dir)` and continue,
102111
// `walkdir` may give us duplicate entries for the directory.
103-
return self.rm_rf(path, reporter);
112+
return self.rm_rf(path, reporter, skip_locked_file);
104113
}
105114
}
106115
}
107116
}
108117

109118
let entry = entry?;
119+
120+
// Remove the exclusive lock last.
121+
if skip_locked_file
122+
&& entry.file_name() == ".lock"
123+
&& entry
124+
.path()
125+
.strip_prefix(path)
126+
.is_ok_and(|suffix| suffix == Path::new(".lock"))
127+
{
128+
continue;
129+
}
130+
110131
if entry.file_type().is_symlink() && {
111132
#[cfg(windows)]
112133
{
@@ -121,6 +142,11 @@ impl Removal {
121142
self.num_files += 1;
122143
remove_dir(entry.path())?;
123144
} else if entry.file_type().is_dir() {
145+
// Remove the directory with the exclusive lock last.
146+
if skip_locked_file && entry.path() == path {
147+
continue;
148+
}
149+
124150
self.num_dirs += 1;
125151

126152
// The contents should have been removed by now, but sometimes a race condition is

crates/uv-fs/src/lib.rs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -693,9 +693,50 @@ impl LockedFile {
693693
}
694694
}
695695

696-
/// The same as [`LockedFile::acquire`], but for synchronous contexts. Do not use from an async
697-
/// context, as this can block the runtime while waiting for another process to release the
698-
/// lock.
696+
/// Inner implementation for [`LockedFile::acquire_shared_blocking`] and
697+
/// [`LockedFile::acquire_blocking`].
698+
fn lock_file_shared_blocking(
699+
file: fs_err::File,
700+
resource: &str,
701+
) -> Result<Self, std::io::Error> {
702+
trace!(
703+
"Checking shared lock for `{resource}` at `{}`",
704+
file.path().user_display()
705+
);
706+
// TODO(konsti): Update fs_err to support this.
707+
match FileExt::try_lock_shared(file.file()) {
708+
Ok(()) => {
709+
debug!("Acquired shared lock for `{resource}`");
710+
Ok(Self(file))
711+
}
712+
Err(err) => {
713+
// Log error code and enum kind to help debugging more exotic failures.
714+
if err.kind() != std::io::ErrorKind::WouldBlock {
715+
debug!("Try lock error: {err:?}");
716+
}
717+
info!(
718+
"Waiting to acquire shared lock for `{resource}` at `{}`",
719+
file.path().user_display(),
720+
);
721+
FileExt::lock_shared(file.file()).map_err(|err| {
722+
// Not an fs_err method, we need to build our own path context
723+
std::io::Error::other(format!(
724+
"Could not acquire shared lock for `{resource}` at `{}`: {}",
725+
file.path().user_display(),
726+
err
727+
))
728+
})?;
729+
730+
debug!("Acquired shared lock for `{resource}`");
731+
Ok(Self(file))
732+
}
733+
}
734+
}
735+
736+
/// The same as [`LockedFile::acquire`], but for synchronous contexts.
737+
///
738+
/// Do not use from an async context, as this can block the runtime while waiting for another
739+
/// process to release the lock.
699740
pub fn acquire_blocking(
700741
path: impl AsRef<Path>,
701742
resource: impl Display,
@@ -705,6 +746,19 @@ impl LockedFile {
705746
Self::lock_file_blocking(file, &resource)
706747
}
707748

749+
/// The same as [`LockedFile::acquire_blocking`], but for synchronous contexts.
750+
///
751+
/// Do not use from an async context, as this can block the runtime while waiting for another
752+
/// process to release the lock.
753+
pub fn acquire_shared_blocking(
754+
path: impl AsRef<Path>,
755+
resource: impl Display,
756+
) -> Result<Self, std::io::Error> {
757+
let file = Self::create(path)?;
758+
let resource = resource.to_string();
759+
Self::lock_file_shared_blocking(file, &resource)
760+
}
761+
708762
/// Acquire a cross-process lock for a resource using a file at the provided path.
709763
#[cfg(feature = "tokio")]
710764
pub async fn acquire(
@@ -716,6 +770,18 @@ impl LockedFile {
716770
tokio::task::spawn_blocking(move || Self::lock_file_blocking(file, &resource)).await?
717771
}
718772

773+
/// Acquire a cross-process read lock for a shared resource using a file at the provided path.
774+
#[cfg(feature = "tokio")]
775+
pub async fn acquire_shared(
776+
path: impl AsRef<Path>,
777+
resource: impl Display,
778+
) -> Result<Self, std::io::Error> {
779+
let file = Self::create(path)?;
780+
let resource = resource.to_string();
781+
tokio::task::spawn_blocking(move || Self::lock_file_shared_blocking(file, &resource))
782+
.await?
783+
}
784+
719785
#[cfg(unix)]
720786
fn create(path: impl AsRef<Path>) -> Result<fs_err::File, std::io::Error> {
721787
use std::os::unix::fs::PermissionsExt;

crates/uv/src/commands/cache_clean.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::printer::Printer;
1414
/// Clear the cache, removing all entries or those linked to specific packages.
1515
pub(crate) fn cache_clean(
1616
packages: &[PackageName],
17-
cache: &Cache,
17+
cache: Cache,
1818
printer: Printer,
1919
) -> Result<ExitStatus> {
2020
if !cache.root().exists() {
@@ -25,6 +25,7 @@ pub(crate) fn cache_clean(
2525
)?;
2626
return Ok(ExitStatus::Success);
2727
}
28+
let cache = cache.with_exclusive_lock()?;
2829

2930
let summary = if packages.is_empty() {
3031
writeln!(
@@ -36,9 +37,10 @@ pub(crate) fn cache_clean(
3637
let num_paths = walkdir::WalkDir::new(cache.root()).into_iter().count();
3738
let reporter = CleaningDirectoryReporter::new(printer, num_paths);
3839

40+
let root = cache.root().to_path_buf();
3941
cache
4042
.clear(Box::new(reporter))
41-
.with_context(|| format!("Failed to clear cache at: {}", cache.root().user_display()))?
43+
.with_context(|| format!("Failed to clear cache at: {}", root.user_display()))?
4244
} else {
4345
let reporter = CleaningPackageReporter::new(printer, packages.len());
4446
let mut summary = Removal::default();

crates/uv/src/commands/cache_prune.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::commands::{ExitStatus, human_readable_bytes};
1010
use crate::printer::Printer;
1111

1212
/// Prune all unreachable objects from the cache.
13-
pub(crate) fn cache_prune(ci: bool, cache: &Cache, printer: Printer) -> Result<ExitStatus> {
13+
pub(crate) fn cache_prune(ci: bool, cache: Cache, printer: Printer) -> Result<ExitStatus> {
1414
if !cache.root().exists() {
1515
writeln!(
1616
printer.stderr(),
@@ -19,6 +19,7 @@ pub(crate) fn cache_prune(ci: bool, cache: &Cache, printer: Printer) -> Result<E
1919
)?;
2020
return Ok(ExitStatus::Success);
2121
}
22+
let cache = cache.with_exclusive_lock()?;
2223

2324
writeln!(
2425
printer.stderr(),
@@ -29,7 +30,7 @@ pub(crate) fn cache_prune(ci: bool, cache: &Cache, printer: Printer) -> Result<E
2930
let mut summary = Removal::default();
3031

3132
// Prune the source distribution cache, which is tightly coupled to the builder crate.
32-
summary += uv_distribution::prune(cache)
33+
summary += uv_distribution::prune(&cache)
3334
.with_context(|| format!("Failed to prune cache at: {}", cache.root().user_display()))?;
3435

3536
// Prune the remaining cache buckets.

0 commit comments

Comments
 (0)