Skip to content

Commit 13899e5

Browse files
authored
Turbopack: Restore watchers for children when using non-recursive FS watcher (#82130)
**This should be merged alongside #82258, as this introduces a potentially high time complexity operation that #82258 solves.** With this PR and the previous one, my fuzzer shows that we capture every (currently tested) file change correctly: ``` rm -rf /tmp/fuzz && cargo run --release -p turbo-tasks-fuzz -- fs-watcher --fs-root /tmp/fuzz ``` However, this does add an `O(n)` iteration of the `watching` map that I'm working on cleaning up (why this is still a draft). I'll do something similar to #82133, but it's a bit more complicated to switch to a `BTreeSet` because it's currently using a `DashSet`, and I don't want to introduce more locking than I have to.
1 parent 2d3cdb0 commit 13899e5

File tree

1 file changed

+82
-46
lines changed

1 file changed

+82
-46
lines changed

turbopack/crates/turbo-tasks-fs/src/watcher.rs

Lines changed: 82 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ use std::{
44
mem::take,
55
path::{Path, PathBuf},
66
sync::{
7-
Arc, LazyLock, Mutex, MutexGuard,
7+
Arc, LazyLock, Mutex,
88
mpsc::{Receiver, TryRecvError, channel},
99
},
1010
time::Duration,
1111
};
1212

13-
use anyhow::{Context, Result, anyhow};
13+
use anyhow::{Context, Result};
1414
use dashmap::DashSet;
1515
use notify::{
1616
Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
@@ -78,6 +78,8 @@ impl DiskWatcherInternal {
7878

7979
#[derive(Serialize, Deserialize)]
8080
pub(crate) struct DiskWatcher {
81+
/// This value is [`None`] when the watcher has been stopped (see
82+
/// [`DiskWatcher::stop_watching`]).
8183
#[serde(skip)]
8284
internal: Mutex<Option<DiskWatcherInternal>>,
8385

@@ -118,10 +120,17 @@ impl NonRecursiveDiskWatcherState {
118120

119121
/// Called after a rescan in case a previously watched-but-deleted directory was recreated.
120122
pub(crate) fn restore_all_watching(&self, watcher: &DiskWatcher, root_path: &Path) {
121-
let mut internal = watcher.internal.lock().unwrap();
123+
let mut internal_guard = watcher.internal.lock().unwrap();
124+
let Some(internal) = &mut *internal_guard else {
125+
return;
126+
};
122127
for dir_path in self.watching.iter() {
123128
// TODO: Report diagnostics if this error happens
124-
let _ = self.start_watching_dir(&mut internal, &dir_path, root_path);
129+
//
130+
// Don't watch the parents, because those are already included in `self.watching` (so
131+
// it'd be redundant), but also because this could deadlock, since we'd try to modify
132+
// `self.watching` while iterating over it (write lock overlapping with a read lock).
133+
let _ = self.start_watching_dir(internal, &dir_path, root_path);
125134
}
126135
}
127136

@@ -136,9 +145,24 @@ impl NonRecursiveDiskWatcherState {
136145
if dir_path == root_path || !self.watching.contains(dir_path) {
137146
return Ok(());
138147
}
139-
let mut internal = watcher.internal.lock().unwrap();
140-
// TODO: Also restore any watchers for children of this directory
141-
self.start_watching_dir(&mut internal, dir_path, root_path)
148+
let mut internal_guard = watcher.internal.lock().unwrap();
149+
let Some(internal) = &mut *internal_guard else {
150+
return Ok(());
151+
};
152+
153+
// watch the new directory
154+
self.start_watching_dir(internal, dir_path, root_path)?;
155+
156+
// Also try to restore any watchers for children of this directory
157+
for child_path in self
158+
.watching
159+
.iter()
160+
.filter(|p| p.key().starts_with(dir_path) && **p != dir_path)
161+
{
162+
// Don't watch the parents -- see the comment on `restore_all_watching`
163+
self.start_watching_dir(internal, child_path.key(), root_path)?;
164+
}
165+
Ok(())
142166
}
143167

144168
/// Called when a file in `dir_path` or `dir_path` itself is read or written. Adds a new watcher
@@ -152,60 +176,72 @@ impl NonRecursiveDiskWatcherState {
152176
if dir_path == root_path || self.watching.contains(dir_path) {
153177
return Ok(());
154178
}
155-
let mut internal = watcher.internal.lock().unwrap();
179+
let mut internal_guard = watcher.internal.lock().unwrap();
180+
let Some(internal) = &mut *internal_guard else {
181+
return Ok(());
182+
};
156183
if self.watching.insert(dir_path.to_path_buf()) {
157-
self.start_watching_dir(&mut internal, dir_path, root_path)?;
184+
self.start_watching_dir_and_parents(internal, dir_path, root_path)?;
158185
}
159186
Ok(())
160187
}
161188

162-
/// Private helper, assumes that the path has already been added to `self.watching`.
189+
/// Private helper, assumes that `dir_path` has already been added to `self.watching`.
190+
///
191+
/// This does not watch any of the parent directories. For that, use
192+
/// [`start_watching_dir_and_parents`]. Use this method when iterating over previously-watched
193+
/// values in `self.watching`.
163194
fn start_watching_dir(
164195
&self,
165-
watcher_internal_guard: &mut MutexGuard<Option<DiskWatcherInternal>>,
196+
watcher_internal: &mut DiskWatcherInternal,
166197
dir_path: &Path,
167198
root_path: &Path,
168199
) -> Result<()> {
169200
debug_assert_ne!(dir_path, root_path);
170-
let Some(watcher_internal_guard) = watcher_internal_guard.as_mut() else {
171-
return Ok(());
172-
};
173201

174-
let mut path = dir_path;
175-
let err_with_context = |err: anyhow::Error| {
176-
return Err(err).context(format!(
177-
"Unable to watch {} (tried up to {})",
178-
dir_path.display(),
179-
path.display()
180-
));
181-
};
202+
match watcher_internal.watch(dir_path, RecursiveMode::NonRecursive) {
203+
Ok(())
204+
| Err(notify::Error {
205+
// The path was probably deleted before we could process the event, but the parent
206+
// should still be watched. The codepaths that care about this either call
207+
// `start_watching_dir_and_parents` or handle the parents themselves.
208+
kind: notify::ErrorKind::PathNotFound,
209+
..
210+
}) => Ok(()),
211+
Err(err) => {
212+
return Err(err).context(format!("Unable to watch {}", dir_path.display(),));
213+
}
214+
}
215+
}
182216

183-
// watch every parent: https://docs.rs/notify/latest/notify/#parent-folder-deletion
217+
/// Private helper, assumes that `dir_path` has already been added to `self.watching`.
218+
///
219+
/// Watches the given `dir_path` and every parent up to `root_path`. Parents must be recursively
220+
/// watched in case any of them change:
221+
/// https://docs.rs/notify/latest/notify/#parent-folder-deletion
222+
fn start_watching_dir_and_parents(
223+
&self,
224+
watcher_internal: &mut DiskWatcherInternal,
225+
dir_path: &Path,
226+
root_path: &Path,
227+
) -> Result<()> {
228+
let mut cur_path = dir_path;
184229
loop {
185-
match watcher_internal_guard.watch(path, RecursiveMode::NonRecursive) {
186-
res @ Ok(())
187-
| res @ Err(notify::Error {
188-
// The path was probably deleted before we could process the event. That's
189-
// okay, just make sure we're watching the parent directory, so we can know
190-
// if it gets recreated.
191-
kind: notify::ErrorKind::PathNotFound,
192-
..
193-
}) => {
194-
let Some(parent_path) = path.parent() else {
195-
// this should never happen as we break before we reach the root path
196-
return err_with_context(res.err().map_or_else(
197-
|| anyhow!("failed to compute parent path"),
198-
|err| err.into(),
199-
));
200-
};
201-
if parent_path == root_path || !self.watching.insert(parent_path.to_path_buf())
202-
{
203-
break;
204-
}
205-
path = parent_path;
206-
}
207-
Err(err) => return err_with_context(err.into()),
230+
self.start_watching_dir(watcher_internal, cur_path, root_path)?;
231+
232+
let Some(parent_path) = cur_path.parent() else {
233+
// this should never happen as we break before we reach the root path
234+
anyhow::bail!(
235+
"failed to compute parent path of {cur_path:?} while watching {dir_path:?} in \
236+
root {root_path:?}"
237+
);
238+
};
239+
240+
if parent_path == root_path || !self.watching.insert(parent_path.to_path_buf()) {
241+
break;
208242
}
243+
244+
cur_path = parent_path;
209245
}
210246

211247
Ok(())

0 commit comments

Comments
 (0)