Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 75 additions & 43 deletions crates/rspack_watcher/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Instant,
};

use rspack_util::fx_hash::FxHashSet as HashSet;
Expand Down Expand Up @@ -44,26 +47,24 @@ pub struct Executor {
start_waiting: bool,
execute_handle: Option<tokio::task::JoinHandle<()>>,
execute_aggregate_handle: Option<tokio::task::JoinHandle<()>>,
/// Tracks the last time an event was triggered.
///
/// The aggregate event handlers are only triggered after the aggregate timeout has passed from the last event.
///
/// For example, if the last event was triggered at time T, and the aggregate timeout is 100ms,
/// the event handler will only be executed if no new events are received until time T + 100ms.
last_changed: Arc<Mutex<Option<Instant>>>,
}

const DEFAULT_AGGREGATE_TIMEOUT: u32 = 50; // Default timeout in milliseconds

/// `ExecEvent` represents control events for the watcher executor loop.
/// - `Execute`: Indicates that an event (change or delete) has occurred and the handler should be triggered.
/// - `Close`: Indicates that the event receiver has been closed and the executor should stop.
#[derive(Debug)]
enum ExecAggregateEvent {
/// Trigger the execution of the event handler (e.g., after a file change or delete).
Execute,
/// Signal to close the executor loop (e.g., when the receiver is closed).
Close,
}

enum ExecEvent {
Execute(EventBatch),
Close,
}

type ExecAggregateEvent = ();

impl Executor {
/// Create a new `WatcherExecutor` with the given receiver and optional aggregate timeout.
pub fn new(rx: UnboundedReceiver<EventBatch>, aggregate_timeout: Option<u32>) -> Self {
Expand All @@ -83,6 +84,7 @@ impl Executor {
execute_aggregate_handle: None,
execute_handle: None,
aggregate_timeout: aggregate_timeout.unwrap_or(DEFAULT_AGGREGATE_TIMEOUT),
last_changed: Default::default(),
}
}

Expand Down Expand Up @@ -132,7 +134,7 @@ impl Executor {
let exec_aggregate_tx = self.exec_aggregate_tx.clone();
let exec_tx = self.exec_tx.clone();
let paused = Arc::clone(&self.paused);
let aggregate_running = Arc::clone(&self.aggregate_running);
let last_changed = Arc::clone(&self.last_changed);

let future = async move {
while let Some(events) = rx.lock().await.recv().await {
Expand All @@ -151,14 +153,17 @@ impl Executor {
}
}

if !paused.load(Ordering::Relaxed) && !aggregate_running.load(Ordering::Relaxed) {
let _ = exec_aggregate_tx.send(ExecAggregateEvent::Execute);
let paused = paused.load(Ordering::Relaxed);

if !paused {
last_changed.lock().await.replace(Instant::now());
}

let _ = exec_tx.send(ExecEvent::Execute(events));
}

let _ = exec_aggregate_tx.send(ExecAggregateEvent::Close);
// Send close signal to both executors when the main receiver is closed
let _ = exec_aggregate_tx.send(());
let _ = exec_tx.send(ExecEvent::Close);
};

Expand All @@ -184,6 +189,7 @@ impl Executor {
Arc::clone(&self.files_data),
self.aggregate_timeout as u64,
Arc::clone(&self.aggregate_running),
Arc::clone(&self.last_changed),
));

self.execute_handle = Some(create_execute_task(
Expand Down Expand Up @@ -233,38 +239,64 @@ fn create_execute_aggregate_task(
files: ThreadSafety<FilesData>,
aggregate_timeout: u64,
running: Arc<AtomicBool>,
last_changed: Arc<Mutex<Option<Instant>>>,
) -> tokio::task::JoinHandle<()> {
let future = async move {
loop {
let aggregate_rx = {
// release the lock on exec_aggregate_rx
// and wait for the next event
let mut exec_aggregate_rx_guard = exec_aggregate_rx.lock().await;
match exec_aggregate_rx_guard.recv().await {
Some(event) => event,
None => return,
}
// Wait for the signal to terminate the executor
if exec_aggregate_rx.lock().await.try_recv().is_ok() {
break;
}

let mut last_changed = last_changed.lock().await;
let time_elapsed_since_last_change = last_changed.map(|t| t.elapsed().as_millis() as u64);

let on_timeout = if let Some(elapsed) = time_elapsed_since_last_change {
elapsed >= aggregate_timeout
} else {
false
};

if let ExecAggregateEvent::Execute = aggregate_rx {
running.store(true, Ordering::Relaxed);
// Wait for the aggregate timeout before executing the handler
tokio::time::sleep(tokio::time::Duration::from_millis(aggregate_timeout)).await;

// Get the files to process
let files = {
let mut files = files.lock().await;
if files.is_empty() {
running.store(false, Ordering::Relaxed);
continue;
}
std::mem::take(&mut *files)
};
if !on_timeout {
// Not yet timed out, wait a bit and check again
if let Some(time_elapsed_since_last_change) = time_elapsed_since_last_change {
debug_assert!(time_elapsed_since_last_change < aggregate_timeout);
let wait_duration = aggregate_timeout - time_elapsed_since_last_change;
tokio::time::sleep(tokio::time::Duration::from_millis(wait_duration)).await;
} else {
// No changes have been recorded yet. The minimum wait is the aggregate timeout.
tokio::time::sleep(tokio::time::Duration::from_millis(aggregate_timeout)).await;
}

// Call the event handler with the changed and deleted files
event_handler.on_event_handle(files.changed, files.deleted);
running.store(false, Ordering::Relaxed);
continue;
}

running.store(true, Ordering::Relaxed);

// Lock to `last_changed` should be held until we reset it to `None`
// to avoid race conditions where new events arrive just before we reset it.
//
// Failed to do so may cause missing aggregate events. For example (the bad case):
// - Event A arrives, `last_changed` is set to T1
// - Timeout occurs, we are about to process events
// - Before we reset `last_changed`, Event B arrives, `last_changed` is set to T2
// - We reset `last_changed` to None, losing the information about Event B
// - Event B is never processed because `last_changed` is None and `on_timeout` is always a `false`.
*last_changed = None;

// Get the files to process
let files = {
let mut files = files.lock().await;
if files.is_empty() {
running.store(false, Ordering::Relaxed);
continue;
}
std::mem::take(&mut *files)
};

// Call the event handler with the changed and deleted files
event_handler.on_event_handle(files.changed, files.deleted);
running.store(false, Ordering::Relaxed);
}
};

Expand Down
92 changes: 92 additions & 0 deletions crates/rspack_watcher/tests/helpers/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#[macro_export]
macro_rules! empty {
() => {
(std::iter::empty(), std::iter::empty())
};
}

#[macro_export]
macro_rules! paths {
($($path:expr),*) => {
(vec![$(rspack_paths::ArcPath::from($path)),*].into_iter(), std::iter::empty())
};
}

#[macro_export]
macro_rules! helper {
($options:expr) => {
helper!($options, Default::default())
};
($options:expr, $ignore:expr) => {
$crate::helpers::TestHelper::new(|| rspack_watcher::FsWatcher::new($options, $ignore))
};
}

#[macro_export]
macro_rules! counter {
() => {
std::sync::atomic::AtomicU8::new(0)
};
}

#[macro_export]
macro_rules! arc_counter {
() => {
std::sync::Arc::new(std::sync::atomic::AtomicU8::new(0))
};
}

#[macro_export]
macro_rules! add {
($c:expr) => {
$c.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
};
}

#[macro_export]
macro_rules! load {
($c:expr) => {
$c.load(std::sync::atomic::Ordering::SeqCst)
};
}

#[macro_export]
macro_rules! watch {
($helper:expr, $($files:expr),*) => {
watch!(files @ $helper, $($files),*)
};
($helper:expr, _, $($dirs:expr),*) => {
watch!(dirs @ $helper, $($dirs),*)
};
($helper:expr, _, _, $($missing:expr),*) => {
watch!(missing @ $helper, $($missing),*)
};

(files @ $helper:expr, $($files:expr),*) => {
$helper.watch(paths!($($files),*), empty!(), empty!())
};
(dirs @ $helper:expr, $($dirs:expr),*) => {
$helper.watch(empty!(), paths!($($dirs),*), empty!())
};
(missing @ $helper:expr, $($missing:expr),*) => {
$helper.watch(empty!(), empty!(), paths!($($missing),*))
};
}

#[macro_export]
macro_rules! assert_no_events {
($change_events:expr, $aggregated_events:expr) => {
assert_eq!(
load!($change_events),
0,
"Expected no change events but got {}",
load!($change_events)
);
assert_eq!(
load!($aggregated_events),
0,
"Expected no aggregated events but got {}",
load!($aggregated_events)
);
};
}
Loading
Loading