Skip to content

Commit b93f4d0

Browse files
fix: update event handling to support batch processing of FsEvents (#11460)
1 parent eb0352a commit b93f4d0

File tree

4 files changed

+65
-50
lines changed

4 files changed

+65
-50
lines changed

crates/rspack_fs/src/watcher/executor.rs

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ use tokio::sync::{
99
mpsc::{self, UnboundedReceiver, UnboundedSender},
1010
};
1111

12-
use super::{EventAggregateHandler, EventHandler, FsEvent, FsEventKind};
12+
use super::{EventAggregateHandler, EventHandler, FsEventKind};
13+
use crate::watcher::EventBatch;
1314

1415
type ThreadSafetyReceiver<T> = ThreadSafety<UnboundedReceiver<T>>;
1516
type ThreadSafety<T> = Arc<Mutex<T>>;
@@ -32,7 +33,7 @@ impl FilesData {
3233
/// deleted files, and coordinates the event handling logic.
3334
pub struct Executor {
3435
aggregate_timeout: u32,
35-
rx: ThreadSafety<UnboundedReceiver<FsEvent>>,
36+
rx: ThreadSafetyReceiver<EventBatch>,
3637
files_data: ThreadSafety<FilesData>,
3738
exec_aggregate_tx: UnboundedSender<ExecAggregateEvent>,
3839
exec_aggregate_rx: ThreadSafetyReceiver<ExecAggregateEvent>,
@@ -59,13 +60,13 @@ enum ExecAggregateEvent {
5960
}
6061

6162
enum ExecEvent {
62-
Execute(FsEvent),
63+
Execute(EventBatch),
6364
Close,
6465
}
6566

6667
impl Executor {
6768
/// Create a new `WatcherExecutor` with the given receiver and optional aggregate timeout.
68-
pub fn new(rx: UnboundedReceiver<FsEvent>, aggregate_timeout: Option<u32>) -> Self {
69+
pub fn new(rx: UnboundedReceiver<EventBatch>, aggregate_timeout: Option<u32>) -> Self {
6970
let (exec_aggregate_tx, exec_aggregate_rx) = mpsc::unbounded_channel::<ExecAggregateEvent>();
7071
let (exec_tx, exec_rx) = mpsc::unbounded_channel::<ExecEvent>();
7172

@@ -134,24 +135,27 @@ impl Executor {
134135
let aggregate_running = Arc::clone(&self.aggregate_running);
135136

136137
let future = async move {
137-
while let Some(event) = rx.lock().await.recv().await {
138-
let path = event.path.to_string_lossy().to_string();
139-
match event.kind {
140-
FsEventKind::Change => {
141-
files_data.lock().await.changed.insert(path);
142-
}
143-
FsEventKind::Remove => {
144-
files_data.lock().await.deleted.insert(path);
145-
}
146-
FsEventKind::Create => {
147-
files_data.lock().await.changed.insert(path);
138+
while let Some(events) = rx.lock().await.recv().await {
139+
for event in &events {
140+
let path = event.path.to_string_lossy().to_string();
141+
match event.kind {
142+
FsEventKind::Change => {
143+
files_data.lock().await.changed.insert(path);
144+
}
145+
FsEventKind::Remove => {
146+
files_data.lock().await.deleted.insert(path);
147+
}
148+
FsEventKind::Create => {
149+
files_data.lock().await.changed.insert(path);
150+
}
148151
}
149-
};
152+
}
150153

151154
if !paused.load(Ordering::Relaxed) && !aggregate_running.load(Ordering::Relaxed) {
152155
let _ = exec_aggregate_tx.send(ExecAggregateEvent::Execute);
153156
}
154-
let _ = exec_tx.send(ExecEvent::Execute(event));
157+
158+
let _ = exec_tx.send(ExecEvent::Execute(events));
155159
}
156160

157161
let _ = exec_aggregate_tx.send(ExecAggregateEvent::Close);
@@ -194,19 +198,22 @@ fn create_execute_task(
194198
exec_rx: ThreadSafetyReceiver<ExecEvent>,
195199
) -> tokio::task::JoinHandle<()> {
196200
let future = async move {
197-
while let Some(event) = exec_rx.lock().await.recv().await {
198-
match event {
199-
ExecEvent::Execute(event) => {
200-
let path = event.path.to_string_lossy().to_string();
201-
match event.kind {
202-
super::FsEventKind::Change | super::FsEventKind::Create => {
203-
if event_handler.on_change(path).is_err() {
204-
break;
201+
while let Some(exec_event) = exec_rx.lock().await.recv().await {
202+
match exec_event {
203+
ExecEvent::Execute(batch_events) => {
204+
for event in batch_events {
205+
// Handle each event based on its kind
206+
let path = event.path.to_string_lossy().to_string();
207+
match event.kind {
208+
super::FsEventKind::Change | super::FsEventKind::Create => {
209+
if event_handler.on_change(path).is_err() {
210+
break;
211+
}
205212
}
206-
}
207-
super::FsEventKind::Remove => {
208-
if event_handler.on_delete(path).is_err() {
209-
break;
213+
super::FsEventKind::Remove => {
214+
if event_handler.on_delete(path).is_err() {
215+
break;
216+
}
210217
}
211218
}
212219
}

crates/rspack_fs/src/watcher/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ pub(crate) struct FsEvent {
3939
pub kind: FsEventKind,
4040
}
4141

42+
pub(crate) type EventBatch = Vec<FsEvent>;
43+
4244
/// `EventAggregateHandler` is a trait for handling aggregated file system events.
4345
/// It provides methods to handle changes and deletions of files, as well as errors.
4446
/// Implementors of this trait can define custom behavior for these events.

crates/rspack_fs/src/watcher/scanner.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,17 @@ use std::{ops::Deref, sync::Arc};
33
use tokio::sync::mpsc::UnboundedSender;
44

55
use super::{FsEvent, FsEventKind, PathManager};
6+
use crate::watcher::EventBatch;
67

78
// Scanner will scann the path whether it is exist or not in disk on initialization
89
pub struct Scanner {
910
path_manager: Arc<PathManager>,
10-
tx: Option<UnboundedSender<FsEvent>>,
11+
tx: Option<UnboundedSender<EventBatch>>,
1112
}
1213

1314
impl Scanner {
1415
/// Creates a new `Scanner` that will send events to the provided sender when paths are scanned.
15-
pub fn new(tx: UnboundedSender<FsEvent>, path_manager: Arc<PathManager>) -> Self {
16+
pub fn new(tx: UnboundedSender<EventBatch>, path_manager: Arc<PathManager>) -> Self {
1617
Self {
1718
path_manager,
1819
tx: Some(tx),
@@ -33,10 +34,10 @@ impl Scanner {
3334
&& let Some(tx) = &_tx
3435
{
3536
// If the file does not exist, send a delete event
36-
let _ = tx.send(FsEvent {
37+
let _ = tx.send(vec![FsEvent {
3738
path: filepath.clone(),
3839
kind: FsEventKind::Remove,
39-
});
40+
}]);
4041
}
4142
}
4243
});
@@ -50,10 +51,10 @@ impl Scanner {
5051
&& let Some(tx) = &_tx
5152
{
5253
// If the directory does not exist, send a delete event
53-
let _ = tx.send(FsEvent {
54+
let _ = tx.send(vec![FsEvent {
5455
path: dirpath.clone(),
5556
kind: FsEventKind::Remove,
56-
});
57+
}]);
5758
}
5859
}
5960
});
@@ -110,13 +111,13 @@ mod tests {
110111
let collected_events = collector.await.unwrap();
111112
assert_eq!(collected_events.len(), 2);
112113

113-
assert!(collected_events.contains(&FsEvent {
114+
assert!(collected_events.contains(&vec![FsEvent {
114115
path: ArcPath::from(current_dir.join("___test_file.txt")),
115116
kind: FsEventKind::Remove
116-
}));
117-
assert!(collected_events.contains(&FsEvent {
117+
}]));
118+
assert!(collected_events.contains(&vec![FsEvent {
118119
path: ArcPath::from(current_dir.join("___test_dir/a/b/c")),
119120
kind: FsEventKind::Remove,
120-
}));
121+
}]));
121122
}
122123
}

crates/rspack_fs/src/watcher/trigger.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use rspack_paths::ArcPath;
55
use tokio::sync::mpsc::UnboundedSender;
66

77
use super::{FsEvent, FsEventKind};
8-
use crate::watcher::paths::PathManager;
8+
use crate::watcher::{EventBatch, paths::PathManager};
99
/// `DependencyFinder` provides references to sets of files, directories, and missing paths,
1010
/// allowing efficient lookup and dependency resolution for a given path.
1111
///
@@ -91,12 +91,12 @@ pub struct Trigger {
9191
/// Shared reference to the path register, which tracks watched files/directories/missing.
9292
path_manager: Arc<PathManager>,
9393
/// Sender for communicating file system events to the watcher executor.
94-
tx: UnboundedSender<FsEvent>,
94+
tx: UnboundedSender<EventBatch>,
9595
}
9696

9797
impl Trigger {
9898
/// Create a new `Trigger` with the given path register and event sender.
99-
pub fn new(path_manager: Arc<PathManager>, tx: UnboundedSender<FsEvent>) -> Self {
99+
pub fn new(path_manager: Arc<PathManager>, tx: UnboundedSender<EventBatch>) -> Self {
100100
Self { path_manager, tx }
101101
}
102102

@@ -116,9 +116,7 @@ impl Trigger {
116116
pub fn on_event(&self, path: &ArcPath, kind: FsEventKind) {
117117
let finder = self.finder();
118118
let associated_event = finder.find_associated_event(path, kind);
119-
for (path, kind) in associated_event {
120-
self.trigger_event(path, kind);
121-
}
119+
self.trigger_events(associated_event);
122120
}
123121

124122
/// Helper to construct a `DependencyFinder` for the current path register state.
@@ -136,11 +134,18 @@ impl Trigger {
136134
}
137135
}
138136

139-
/// Sends a file system event for the given path and event kind.
140-
/// Ignores any error if the receiver has been dropped.
141-
fn trigger_event(&self, path: ArcPath, kind: FsEventKind) {
142-
let event = FsEvent { path, kind };
143-
_ = self.tx.send(event);
137+
/// Sends a group of file system events for the given path and event kind.
138+
/// If the event is successfully sent, it returns true; otherwise, it returns false.
139+
fn trigger_events(&self, events: Vec<(ArcPath, FsEventKind)>) -> bool {
140+
self
141+
.tx
142+
.send(
143+
events
144+
.into_iter()
145+
.map(|(path, kind)| FsEvent { path, kind })
146+
.collect(),
147+
)
148+
.is_ok()
144149
}
145150
}
146151
#[cfg(test)]

0 commit comments

Comments
 (0)