diff --git a/crates/rspack_watcher/src/executor.rs b/crates/rspack_watcher/src/executor.rs index 1bd8e26aee98..675a0b6074d5 100644 --- a/crates/rspack_watcher/src/executor.rs +++ b/crates/rspack_watcher/src/executor.rs @@ -1,6 +1,9 @@ -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, +use std::{ + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + time::Instant, }; use rspack_util::fx_hash::FxHashSet as HashSet; @@ -44,26 +47,24 @@ pub struct Executor { start_waiting: bool, execute_handle: Option>, execute_aggregate_handle: Option>, + /// Tracks the last time an event was triggered. + /// + /// The aggregate event handlers are only triggered after the aggregate timeout has passed from the last event. + /// + /// For example, if the last event was triggered at time T, and the aggregate timeout is 100ms, + /// the event handler will only be executed if no new events are received until time T + 100ms. + last_changed: Arc>>, } const DEFAULT_AGGREGATE_TIMEOUT: u32 = 50; // Default timeout in milliseconds -/// `ExecEvent` represents control events for the watcher executor loop. -/// - `Execute`: Indicates that an event (change or delete) has occurred and the handler should be triggered. -/// - `Close`: Indicates that the event receiver has been closed and the executor should stop. -#[derive(Debug)] -enum ExecAggregateEvent { - /// Trigger the execution of the event handler (e.g., after a file change or delete). - Execute, - /// Signal to close the executor loop (e.g., when the receiver is closed). - Close, -} - enum ExecEvent { Execute(EventBatch), Close, } +type ExecAggregateEvent = (); + impl Executor { /// Create a new `WatcherExecutor` with the given receiver and optional aggregate timeout. pub fn new(rx: UnboundedReceiver, aggregate_timeout: Option) -> Self { @@ -83,6 +84,7 @@ impl Executor { execute_aggregate_handle: None, execute_handle: None, aggregate_timeout: aggregate_timeout.unwrap_or(DEFAULT_AGGREGATE_TIMEOUT), + last_changed: Default::default(), } } @@ -132,7 +134,7 @@ impl Executor { let exec_aggregate_tx = self.exec_aggregate_tx.clone(); let exec_tx = self.exec_tx.clone(); let paused = Arc::clone(&self.paused); - let aggregate_running = Arc::clone(&self.aggregate_running); + let last_changed = Arc::clone(&self.last_changed); let future = async move { while let Some(events) = rx.lock().await.recv().await { @@ -151,14 +153,17 @@ impl Executor { } } - if !paused.load(Ordering::Relaxed) && !aggregate_running.load(Ordering::Relaxed) { - let _ = exec_aggregate_tx.send(ExecAggregateEvent::Execute); + let paused = paused.load(Ordering::Relaxed); + + if !paused { + last_changed.lock().await.replace(Instant::now()); } let _ = exec_tx.send(ExecEvent::Execute(events)); } - let _ = exec_aggregate_tx.send(ExecAggregateEvent::Close); + // Send close signal to both executors when the main receiver is closed + let _ = exec_aggregate_tx.send(()); let _ = exec_tx.send(ExecEvent::Close); }; @@ -184,6 +189,7 @@ impl Executor { Arc::clone(&self.files_data), self.aggregate_timeout as u64, Arc::clone(&self.aggregate_running), + Arc::clone(&self.last_changed), )); self.execute_handle = Some(create_execute_task( @@ -233,38 +239,64 @@ fn create_execute_aggregate_task( files: ThreadSafety, aggregate_timeout: u64, running: Arc, + last_changed: Arc>>, ) -> tokio::task::JoinHandle<()> { let future = async move { loop { - let aggregate_rx = { - // release the lock on exec_aggregate_rx - // and wait for the next event - let mut exec_aggregate_rx_guard = exec_aggregate_rx.lock().await; - match exec_aggregate_rx_guard.recv().await { - Some(event) => event, - None => return, - } + // Wait for the signal to terminate the executor + if exec_aggregate_rx.lock().await.try_recv().is_ok() { + break; + } + + let mut last_changed = last_changed.lock().await; + let time_elapsed_since_last_change = last_changed.map(|t| t.elapsed().as_millis() as u64); + + let on_timeout = if let Some(elapsed) = time_elapsed_since_last_change { + elapsed >= aggregate_timeout + } else { + false }; - if let ExecAggregateEvent::Execute = aggregate_rx { - running.store(true, Ordering::Relaxed); - // Wait for the aggregate timeout before executing the handler - tokio::time::sleep(tokio::time::Duration::from_millis(aggregate_timeout)).await; - - // Get the files to process - let files = { - let mut files = files.lock().await; - if files.is_empty() { - running.store(false, Ordering::Relaxed); - continue; - } - std::mem::take(&mut *files) - }; + if !on_timeout { + // Not yet timed out, wait a bit and check again + if let Some(time_elapsed_since_last_change) = time_elapsed_since_last_change { + debug_assert!(time_elapsed_since_last_change < aggregate_timeout); + let wait_duration = aggregate_timeout - time_elapsed_since_last_change; + tokio::time::sleep(tokio::time::Duration::from_millis(wait_duration)).await; + } else { + // No changes have been recorded yet. The minimum wait is the aggregate timeout. + tokio::time::sleep(tokio::time::Duration::from_millis(aggregate_timeout)).await; + } - // Call the event handler with the changed and deleted files - event_handler.on_event_handle(files.changed, files.deleted); - running.store(false, Ordering::Relaxed); + continue; } + + running.store(true, Ordering::Relaxed); + + // Lock to `last_changed` should be held until we reset it to `None` + // to avoid race conditions where new events arrive just before we reset it. + // + // Failed to do so may cause missing aggregate events. For example (the bad case): + // - Event A arrives, `last_changed` is set to T1 + // - Timeout occurs, we are about to process events + // - Before we reset `last_changed`, Event B arrives, `last_changed` is set to T2 + // - We reset `last_changed` to None, losing the information about Event B + // - Event B is never processed because `last_changed` is None and `on_timeout` is always a `false`. + *last_changed = None; + + // Get the files to process + let files = { + let mut files = files.lock().await; + if files.is_empty() { + running.store(false, Ordering::Relaxed); + continue; + } + std::mem::take(&mut *files) + }; + + // Call the event handler with the changed and deleted files + event_handler.on_event_handle(files.changed, files.deleted); + running.store(false, Ordering::Relaxed); } }; diff --git a/crates/rspack_watcher/tests/helpers/macros.rs b/crates/rspack_watcher/tests/helpers/macros.rs new file mode 100644 index 000000000000..d36abd1759b7 --- /dev/null +++ b/crates/rspack_watcher/tests/helpers/macros.rs @@ -0,0 +1,92 @@ +#[macro_export] +macro_rules! empty { + () => { + (std::iter::empty(), std::iter::empty()) + }; +} + +#[macro_export] +macro_rules! paths { + ($($path:expr),*) => { + (vec![$(rspack_paths::ArcPath::from($path)),*].into_iter(), std::iter::empty()) + }; +} + +#[macro_export] +macro_rules! helper { + ($options:expr) => { + helper!($options, Default::default()) + }; + ($options:expr, $ignore:expr) => { + $crate::helpers::TestHelper::new(|| rspack_watcher::FsWatcher::new($options, $ignore)) + }; +} + +#[macro_export] +macro_rules! counter { + () => { + std::sync::atomic::AtomicU8::new(0) + }; +} + +#[macro_export] +macro_rules! arc_counter { + () => { + std::sync::Arc::new(std::sync::atomic::AtomicU8::new(0)) + }; +} + +#[macro_export] +macro_rules! add { + ($c:expr) => { + $c.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + }; +} + +#[macro_export] +macro_rules! load { + ($c:expr) => { + $c.load(std::sync::atomic::Ordering::SeqCst) + }; +} + +#[macro_export] +macro_rules! watch { + ($helper:expr, $($files:expr),*) => { + watch!(files @ $helper, $($files),*) + }; + ($helper:expr, _, $($dirs:expr),*) => { + watch!(dirs @ $helper, $($dirs),*) + }; + ($helper:expr, _, _, $($missing:expr),*) => { + watch!(missing @ $helper, $($missing),*) + }; + + (files @ $helper:expr, $($files:expr),*) => { + $helper.watch(paths!($($files),*), empty!(), empty!()) + }; + (dirs @ $helper:expr, $($dirs:expr),*) => { + $helper.watch(empty!(), paths!($($dirs),*), empty!()) + }; + (missing @ $helper:expr, $($missing:expr),*) => { + $helper.watch(empty!(), empty!(), paths!($($missing),*)) + }; +} + +#[macro_export] +macro_rules! assert_no_events { + ($change_events:expr, $aggregated_events:expr) => { + assert_eq!( + load!($change_events), + 0, + "Expected no change events but got {}", + load!($change_events) + ); + assert_eq!( + load!($aggregated_events), + 0, + "Expected no aggregated events but got {}", + load!($aggregated_events) + ); + }; +} diff --git a/crates/rspack_watcher/tests/helpers/mod.rs b/crates/rspack_watcher/tests/helpers/mod.rs index 5e7c2a23e9b8..d62ce85e5e54 100644 --- a/crates/rspack_watcher/tests/helpers/mod.rs +++ b/crates/rspack_watcher/tests/helpers/mod.rs @@ -1,5 +1,7 @@ #![allow(clippy::unwrap_used)] +pub mod macros; + use std::{ mem::ManuallyDrop, path::PathBuf, @@ -16,6 +18,8 @@ use rspack_watcher::{EventAggregateHandler, EventHandler, FsWatcher}; use tempfile::TempDir; use tokio::sync::RwLock; +const TEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + pub struct TestHelper { /// Temporary directory for testing temp_dir: ManuallyDrop, @@ -157,19 +161,25 @@ impl TestHelper { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() - .as_millis() + .as_micros() ), ) .unwrap(); } - pub fn collect_events( + /// Collects events from the receiver in a blocking manner. + /// + /// The `on_changed` and `on_aggregated` callbacks are invoked for each respective event. + /// If either callback sets the `abort` flag to `true`, the collection stops. + /// + /// The function will timeout after a predefined duration (10s) if no events are received. + pub fn collect_events_blocking( &self, rx: Receiver, mut on_changed: impl FnMut(&ChangedEvent, &mut bool), mut on_aggregated: impl FnMut(&AggregatedEvent, &mut bool), ) { - while let Ok(event) = rx.recv_timeout(std::time::Duration::from_secs(10)) { + while let Ok(event) = rx.recv_timeout(TEST_TIMEOUT) { match event { Event::Aggregated(agg_event) => { let mut abort = false; @@ -189,11 +199,50 @@ impl TestHelper { } } + /// Collects events from the receiver in a separate thread. + /// + /// The `on_changed` and `on_aggregated` callbacks are invoked for each respective event. + /// If either callback sets the `abort` flag to `true`, the collection stops. + /// + /// The function will timeout after a predefined duration (10s) if no events are received. + pub fn collect_events( + &self, + rx: Receiver, + on_changed: impl Fn(&ChangedEvent, &mut bool) + Send + 'static, + on_aggregated: impl Fn(&AggregatedEvent, &mut bool) + Send + 'static, + ) { + std::thread::spawn(move || { + while let Ok(event) = rx.recv_timeout(TEST_TIMEOUT) { + match event { + Event::Aggregated(agg_event) => { + let mut abort = false; + on_aggregated(&agg_event, &mut abort); + if abort { + break; + } + } + Event::Changed(chg_event) => { + let mut abort = false; + on_changed(&chg_event, &mut abort); + if abort { + break; + } + } + } + } + }); + } + pub fn tick(&self, f: impl FnOnce()) { std::thread::sleep(std::time::Duration::from_millis(200)); f(); } + pub fn tick_ms(&self, duration: u64, f: impl FnOnce()) { + std::thread::sleep(std::time::Duration::from_millis(duration)); + f(); + } + /// Watches the specified files, directories, and missing paths. /// /// All paths are relative to the temporary directory. @@ -270,10 +319,12 @@ impl TestHelper { changed_files: FxHashSet, deleted_files: FxHashSet, ) { - let _ = self.0.send(Event::Aggregated(AggregatedEvent { + if let Err(e) = self.0.send(Event::Aggregated(AggregatedEvent { changed_files, deleted_files, - })); + })) { + eprintln!("Failed to send aggregated event: {}", e); + } } } @@ -281,16 +332,22 @@ impl TestHelper { impl EventHandler for ChangeHandler { fn on_change(&self, changed_file: String) -> rspack_error::Result<()> { - let _ = self + if let Err(e) = self .0 - .send(Event::Changed(ChangedEvent::Changed(changed_file))); + .send(Event::Changed(ChangedEvent::Changed(changed_file))) + { + eprintln!("Failed to send change event: {}", e); + }; Ok(()) } fn on_delete(&self, deleted_file: String) -> rspack_error::Result<()> { - let _ = self + if let Err(e) = self .0 - .send(Event::Changed(ChangedEvent::Deleted(deleted_file))); + .send(Event::Changed(ChangedEvent::Deleted(deleted_file))) + { + eprintln!("Failed to send delete event: {}", e); + }; Ok(()) } } diff --git a/crates/rspack_watcher/tests/watcher.rs b/crates/rspack_watcher/tests/watcher.rs index 5a4fdf32abfb..01772d8bb510 100644 --- a/crates/rspack_watcher/tests/watcher.rs +++ b/crates/rspack_watcher/tests/watcher.rs @@ -1,97 +1,190 @@ #![allow(dead_code)] -use std::sync::atomic::AtomicU8; - -use rspack_paths::ArcPath; -use rspack_watcher::{FsWatcher, FsWatcherOptions}; +use rspack_regex::RspackRegex; +use rspack_watcher::{FsWatcherIgnored, FsWatcherOptions}; mod helpers; -macro_rules! e { - () => { - (std::iter::empty(), std::iter::empty()) - }; -} +#[test] +fn should_watch_a_single_file() { + let mut h = helper!(FsWatcherOptions { + aggregate_timeout: Some(1000), + ..Default::default() + }); -macro_rules! f { - ($($file:expr),*) => { - (vec![$(ArcPath::from($file)),*].into_iter(), std::iter::empty()) - }; -} + let rx = watch!(h, "a"); -macro_rules! h { - ($options:expr) => { - h!($options, Default::default()) - }; - ($options:expr, $ignore:expr) => { - helpers::TestHelper::new(|| FsWatcher::new($options, $ignore)) - }; -} + h.tick(|| { + h.file("a"); + }); + + let change_events = counter!(); -macro_rules! c { - () => { - AtomicU8::new(0) - }; + h.collect_events_blocking( + rx, + |file, _| { + file.assert_path(h.join("a")); + add!(change_events); + }, + |changes, abort| { + changes.assert_changed(h.join("a")); + assert!(load!(change_events) > 0); + *abort = true; + }, + ); } -macro_rules! add { - ($c:expr) => { - $c.fetch_add(1, std::sync::atomic::Ordering::SeqCst) - }; +#[test] +fn should_not_watch_a_single_ignored_file_glob() { + let mut h = helper!( + FsWatcherOptions { + aggregate_timeout: Some(300), + ..Default::default() + }, + FsWatcherIgnored::Path("**/a".to_string()) + ); + + let rx = watch!(h, "a"); + + let change_events = arc_counter!(); + let aggregated_events = arc_counter!(); + + h.tick(|| { + h.file("a"); + h.tick_ms(1000, || { + { + let change_events = change_events.clone(); + let aggregated_events = aggregated_events.clone(); + + h.collect_events( + rx, + move |_file, _abort| { + add!(change_events); + }, + move |_changes, _abort| { + add!(aggregated_events); + }, + ); + } + h.tick_ms(1000, || { + assert_no_events!(change_events, aggregated_events); + }); + }); + }); } -macro_rules! load { - ($c:expr) => { - $c.load(std::sync::atomic::Ordering::SeqCst) - }; +#[test] +fn should_not_watch_a_single_ignored_file_regexp() { + let mut h = helper!( + FsWatcherOptions { + aggregate_timeout: Some(300), + ..Default::default() + }, + FsWatcherIgnored::Regex(RspackRegex::new(r"/a$").unwrap()) + ); + + let rx = watch!(h, "a"); + + let change_events = arc_counter!(); + let aggregated_events = arc_counter!(); + + h.tick(|| { + h.file("a"); + h.tick_ms(1000, || { + { + let change_events = change_events.clone(); + let aggregated_events = aggregated_events.clone(); + + h.collect_events( + rx, + move |_file, _abort| { + add!(change_events); + }, + move |_changes, _abort| { + add!(aggregated_events); + }, + ); + } + h.tick_ms(1000, || { + assert_no_events!(change_events, aggregated_events); + }); + }); + }); } -macro_rules! watch { - ($helper:expr, $($files:expr),*) => { - watch!(files @ $helper, $($files),*) - }; - ($helper:expr, _, $($dirs:expr),*) => { - watch!(dirs @ $helper, $($dirs),*) - }; - ($helper:expr, _, _, $($missing:expr),*) => { - watch!(missing @ $helper, $($missing),*) - }; - - (files @ $helper:expr, $($files:expr),*) => { - $helper.watch(f!($($files),*), e!(), e!()) - }; - (dirs @ $helper:expr, $($dirs:expr),*) => { - $helper.watch(e!(), f!($($dirs),*), e!()) - }; - (missing @ $helper:expr, $($missing:expr),*) => { - $helper.watch(e!(), e!(), f!($($missing),*)) - }; +#[test] +#[ignore] +fn should_not_watch_a_single_ignored_file_function() { + todo!("FsWatcherIgnored::Function") } #[test] -fn should_watch_a_single_file() { - let mut helper = h!(FsWatcherOptions { +fn should_watch_multiple_files() { + let mut h = helper!(FsWatcherOptions { aggregate_timeout: Some(1000), ..Default::default() }); - let rx = watch!(helper, "a"); + let rx = watch!(h, "a", "b"); - helper.tick(|| { - helper.file("a"); + let change_events = arc_counter!(); + let changed_files = std::sync::Arc::new(std::sync::Mutex::new(Vec::::new())); + + h.tick_ms(400, || { + h.file("a"); + h.tick_ms(400, || { + h.file("b"); + h.tick_ms(400, || { + h.file("a"); + h.tick_ms(400, || { + h.file("b"); + h.tick_ms(400, || { + h.file("a"); + }); + }); + }); + }); }); - let change_events = c!(); - helper.collect_events( + h.collect_events_blocking( rx, - |file, _| { - file.assert_path(helper.join("a")); - add!(change_events); + { + let change_events = change_events.clone(); + let changed_files = changed_files.clone(); + move |file, _| { + let mut files = changed_files.lock().unwrap(); + let file_str = match file { + helpers::ChangedEvent::Changed(path) => path.clone(), + _ => return, + }; + // Dedupe consecutive duplicates + if files.is_empty() || files.last().unwrap() != &file_str { + files.push(file_str); + } + add!(change_events); + } }, - |changes, abort| { - changes.assert_changed(helper.join("a")); - assert!(load!(change_events) > 0); - *abort = true; + { + let change_events = change_events.clone(); + let changed_files = changed_files.clone(); + let a = h.join("a").to_string(); + let b = h.join("b").to_string(); + move |changes, abort| { + // Verify aggregated changes contain both files + let mut sorted_changes: Vec = changes.changed_files.iter().cloned().collect(); + sorted_changes.sort(); + assert_eq!(sorted_changes, vec![a.clone(), b.clone()]); + + // Verify the change event sequence + let files = changed_files.lock().unwrap(); + assert_eq!( + *files, + vec![a.clone(), b.clone(), a.clone(), b.clone(), a.clone()] + ); + + assert!(load!(change_events) > 0); + *abort = true; + } }, ); }