1- use std:: sync:: {
2- Arc ,
3- atomic:: { AtomicBool , Ordering } ,
1+ use std:: {
2+ sync:: {
3+ Arc ,
4+ atomic:: { AtomicBool , Ordering } ,
5+ } ,
6+ time:: Instant ,
47} ;
58
69use rspack_util:: fx_hash:: FxHashSet as HashSet ;
710use tokio:: sync:: {
8- Mutex ,
11+ Mutex , RwLock ,
912 mpsc:: { self , UnboundedReceiver , UnboundedSender } ,
1013} ;
1114
@@ -44,26 +47,24 @@ pub struct Executor {
4447 start_waiting : bool ,
4548 execute_handle : Option < tokio:: task:: JoinHandle < ( ) > > ,
4649 execute_aggregate_handle : Option < tokio:: task:: JoinHandle < ( ) > > ,
50+ /// Tracks the last time an event was triggered.
51+ ///
52+ /// The aggregate event handlers are only triggered after the aggregate timeout has passed from the last event.
53+ ///
54+ /// For example, if the last event was triggered at time T, and the aggregate timeout is 100ms,
55+ /// the event handler will only be executed if no new events are received until time T + 100ms.
56+ last_changed : Arc < RwLock < Option < Instant > > > ,
4757}
4858
4959const DEFAULT_AGGREGATE_TIMEOUT : u32 = 50 ; // Default timeout in milliseconds
5060
51- /// `ExecEvent` represents control events for the watcher executor loop.
52- /// - `Execute`: Indicates that an event (change or delete) has occurred and the handler should be triggered.
53- /// - `Close`: Indicates that the event receiver has been closed and the executor should stop.
54- #[ derive( Debug ) ]
55- enum ExecAggregateEvent {
56- /// Trigger the execution of the event handler (e.g., after a file change or delete).
57- Execute ,
58- /// Signal to close the executor loop (e.g., when the receiver is closed).
59- Close ,
60- }
61-
6261enum ExecEvent {
6362 Execute ( EventBatch ) ,
6463 Close ,
6564}
6665
66+ type ExecAggregateEvent = ( ) ;
67+
6768impl Executor {
6869 /// Create a new `WatcherExecutor` with the given receiver and optional aggregate timeout.
6970 pub fn new ( rx : UnboundedReceiver < EventBatch > , aggregate_timeout : Option < u32 > ) -> Self {
@@ -83,6 +84,7 @@ impl Executor {
8384 execute_aggregate_handle : None ,
8485 execute_handle : None ,
8586 aggregate_timeout : aggregate_timeout. unwrap_or ( DEFAULT_AGGREGATE_TIMEOUT ) ,
87+ last_changed : Default :: default ( ) ,
8688 }
8789 }
8890
@@ -132,7 +134,8 @@ impl Executor {
132134 let exec_aggregate_tx = self . exec_aggregate_tx . clone ( ) ;
133135 let exec_tx = self . exec_tx . clone ( ) ;
134136 let paused = Arc :: clone ( & self . paused ) ;
135- let aggregate_running = Arc :: clone ( & self . aggregate_running ) ;
137+ // let aggregate_running = Arc::clone(&self.aggregate_running);
138+ let last_changed = Arc :: clone ( & self . last_changed ) ;
136139
137140 let future = async move {
138141 while let Some ( events) = rx. lock ( ) . await . recv ( ) . await {
@@ -151,14 +154,17 @@ impl Executor {
151154 }
152155 }
153156
154- if !paused. load ( Ordering :: Relaxed ) && !aggregate_running. load ( Ordering :: Relaxed ) {
155- let _ = exec_aggregate_tx. send ( ExecAggregateEvent :: Execute ) ;
157+ let paused = paused. load ( Ordering :: Relaxed ) ;
158+
159+ if !paused {
160+ last_changed. write ( ) . await . replace ( Instant :: now ( ) ) ;
156161 }
157162
158163 let _ = exec_tx. send ( ExecEvent :: Execute ( events) ) ;
159164 }
160165
161- let _ = exec_aggregate_tx. send ( ExecAggregateEvent :: Close ) ;
166+ // Send close signal to both executors when the main receiver is closed
167+ let _ = exec_aggregate_tx. send ( ( ) ) ;
162168 let _ = exec_tx. send ( ExecEvent :: Close ) ;
163169 } ;
164170
@@ -184,6 +190,7 @@ impl Executor {
184190 Arc :: clone ( & self . files_data ) ,
185191 self . aggregate_timeout as u64 ,
186192 Arc :: clone ( & self . aggregate_running ) ,
193+ Arc :: clone ( & self . last_changed ) ,
187194 ) ) ;
188195
189196 self . execute_handle = Some ( create_execute_task (
@@ -233,38 +240,56 @@ fn create_execute_aggregate_task(
233240 files : ThreadSafety < FilesData > ,
234241 aggregate_timeout : u64 ,
235242 running : Arc < AtomicBool > ,
243+ last_changed : Arc < RwLock < Option < Instant > > > ,
236244) -> tokio:: task:: JoinHandle < ( ) > {
237245 let future = async move {
238246 loop {
239- let aggregate_rx = {
240- // release the lock on exec_aggregate_rx
241- // and wait for the next event
242- let mut exec_aggregate_rx_guard = exec_aggregate_rx. lock ( ) . await ;
243- match exec_aggregate_rx_guard. recv ( ) . await {
244- Some ( event) => event,
245- None => return ,
246- }
247+ // Wait for the signal to terminate the executor
248+ if exec_aggregate_rx. lock ( ) . await . try_recv ( ) . is_ok ( ) {
249+ break ;
250+ }
251+
252+ let time_elapsed_since_last_change = last_changed
253+ . read ( )
254+ . await
255+ . map ( |t| t. elapsed ( ) . as_millis ( ) as u64 ) ;
256+
257+ let on_timeout = if let Some ( elapsed) = time_elapsed_since_last_change {
258+ elapsed >= aggregate_timeout
259+ } else {
260+ false
247261 } ;
248262
249- if let ExecAggregateEvent :: Execute = aggregate_rx {
250- running. store ( true , Ordering :: Relaxed ) ;
251- // Wait for the aggregate timeout before executing the handler
252- tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( aggregate_timeout) ) . await ;
253-
254- // Get the files to process
255- let files = {
256- let mut files = files. lock ( ) . await ;
257- if files. is_empty ( ) {
258- running. store ( false , Ordering :: Relaxed ) ;
259- continue ;
260- }
261- std:: mem:: take ( & mut * files)
262- } ;
263+ if !on_timeout {
264+ // Not yet timed out, wait a bit and check again
265+ if let Some ( time_elapsed_since_last_change) = time_elapsed_since_last_change {
266+ debug_assert ! ( time_elapsed_since_last_change < aggregate_timeout) ;
267+ let wait_duration = aggregate_timeout - time_elapsed_since_last_change;
268+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( wait_duration) ) . await ;
269+ } else {
270+ // No changes have been recorded yet. The minimum wait is the aggregate timeout.
271+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( aggregate_timeout) ) . await ;
272+ }
263273
264- // Call the event handler with the changed and deleted files
265- event_handler. on_event_handle ( files. changed , files. deleted ) ;
266- running. store ( false , Ordering :: Relaxed ) ;
274+ continue ;
267275 }
276+
277+ running. store ( true , Ordering :: Relaxed ) ;
278+ * last_changed. write ( ) . await = None ;
279+
280+ // Get the files to process
281+ let files = {
282+ let mut files = files. lock ( ) . await ;
283+ if files. is_empty ( ) {
284+ running. store ( false , Ordering :: Relaxed ) ;
285+ continue ;
286+ }
287+ std:: mem:: take ( & mut * files)
288+ } ;
289+
290+ // Call the event handler with the changed and deleted files
291+ event_handler. on_event_handle ( files. changed , files. deleted ) ;
292+ running. store ( false , Ordering :: Relaxed ) ;
268293 }
269294 } ;
270295
0 commit comments