Skip to content

Commit 5a3a50d

Browse files
committed
Update windows.rs
1 parent eae3639 commit 5a3a50d

File tree

1 file changed

+128
-96
lines changed

1 file changed

+128
-96
lines changed

src/handlers/fs/windows.rs

Lines changed: 128 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use winapi::um::{
33
fileapi::{CreateFileW, OPEN_EXISTING},
44
handleapi::{CloseHandle, INVALID_HANDLE_VALUE},
55
winbase::{FILE_FLAG_BACKUP_SEMANTICS, FILE_FLAG_OVERLAPPED, ReadDirectoryChangesW},
6-
ioapiset::GetOverlappedResult,
6+
ioapiset::{GetOverlappedResult, CancelIo},
77
winnt::{
88
FILE_NOTIFY_CHANGE_ATTRIBUTES, FILE_NOTIFY_CHANGE_CREATION, FILE_NOTIFY_CHANGE_DIR_NAME,
99
FILE_NOTIFY_CHANGE_FILE_NAME, FILE_NOTIFY_CHANGE_LAST_WRITE, FILE_NOTIFY_CHANGE_SIZE,
@@ -29,6 +29,7 @@ use crate::{Result, TellMeWhenError};
2929
use crossbeam_channel::Sender;
3030
use crate::{EventMessage, EventData, EventMetadata};
3131
use std::time::SystemTime;
32+
use std::thread::{self, JoinHandle};
3233

3334
pub struct WindowsWatchHandle {
3435
directory_handle: HANDLE,
@@ -38,6 +39,23 @@ pub struct WindowsWatchHandle {
3839
watched_path: PathBuf,
3940
event_sender: Option<Sender<EventMessage>>,
4041
handler_id: String,
42+
// Add a thread handle to manage the watcher thread
43+
worker_thread: Option<JoinHandle<()>>,
44+
}
45+
46+
impl Clone for WindowsWatchHandle {
47+
fn clone(&self) -> Self {
48+
WindowsWatchHandle {
49+
directory_handle: self.directory_handle,
50+
event_handle: self.event_handle,
51+
buffer: self.buffer.clone(),
52+
overlapped: Box::new(*self.overlapped),
53+
watched_path: self.watched_path.clone(),
54+
event_sender: self.event_sender.clone(),
55+
handler_id: self.handler_id.clone(),
56+
worker_thread: None, // Do not clone the thread handle
57+
}
58+
}
4159
}
4260

4361
impl std::fmt::Debug for WindowsWatchHandle {
@@ -46,6 +64,7 @@ impl std::fmt::Debug for WindowsWatchHandle {
4664
.field("directory_handle", &self.directory_handle)
4765
.field("event_handle", &self.event_handle)
4866
.field("buffer_len", &self.buffer.len())
67+
.field("watched_path", &self.watched_path)
4968
.finish()
5069
}
5170
}
@@ -96,21 +115,23 @@ extern "system" fn filesystem_completion_routine(
96115
overlapped: *mut OVERLAPPED,
97116
) {
98117
unsafe {
99-
// Recover the context from the hEvent member of the OVERLAPPED structure
118+
log::info!("Callback triggered with code: {}, bytes: {}", error_code, bytes_transferred);
119+
100120
let context_ptr = (*overlapped).hEvent as *mut CallbackContext;
101121
if context_ptr.is_null() {
102-
log::error!("Completion routine called with null context pointer.");
122+
log::error!("Completion routine called with null context pointer. This is a critical error.");
103123
return;
104124
}
105125
let context = &mut *context_ptr;
106126

107127
if error_code != ERROR_SUCCESS && error_code != ERROR_IO_PENDING {
108-
log::error!("Filesystem monitoring stopped due to error: {}", error_code);
128+
log::error!("Filesystem monitoring stopped for path {:?} due to error: {}", context.watched_path, error_code);
109129
let _ = Box::from_raw(context_ptr);
110130
return;
111131
}
112132

113133
if bytes_transferred > 0 {
134+
log::debug!("Processing {} bytes of notifications for path {:?}", bytes_transferred, context.watched_path);
114135
let buffer_slice = std::slice::from_raw_parts(context.buffer, bytes_transferred as usize);
115136
PlatformWatcher::process_notifications(
116137
buffer_slice,
@@ -120,7 +141,6 @@ extern "system" fn filesystem_completion_routine(
120141
);
121142
}
122143

123-
// Restart monitoring for next batch of events
124144
let notify_filter = PlatformWatcher::build_notify_filter_static(&context.config.event_types);
125145
let mut new_overlapped = std::mem::zeroed::<OVERLAPPED>();
126146
new_overlapped.hEvent = context_ptr as *mut c_void;
@@ -140,30 +160,56 @@ extern "system" fn filesystem_completion_routine(
140160
if success == 0 {
141161
let error = GetLastError();
142162
if error != ERROR_IO_PENDING {
143-
log::error!("Failed to restart ReadDirectoryChangesW in callback: {}", error);
163+
log::error!("Failed to restart ReadDirectoryChangesW in callback for path {:?}: {}", context.watched_path, error);
144164
let _ = Box::from_raw(context_ptr);
165+
} else {
166+
log::debug!("ReadDirectoryChangesW re-armed for path {:?}.", context.watched_path);
145167
}
146168
}
147169
}
148170
}
149171

150172
impl PlatformWatcher {
151173
pub fn new(handler_id: String, event_sender: Option<Sender<EventMessage>>) -> Result<Self> {
174+
log::info!("PlatformWatcher created for handler_id: {}", handler_id);
152175
Ok(Self {
153176
handles: Vec::new(),
154177
event_sender,
155178
handler_id,
156179
})
157180
}
158181

159-
pub fn run(&self) {
160-
log::info!("Windows watcher is now running and waiting for events...");
161-
unsafe {
162-
SleepEx(winapi::um::winbase::INFINITE, 1);
182+
pub fn run(&mut self) -> Result<()> {
183+
log::info!("Starting Windows watcher threads for {} handles...", self.handles.len());
184+
185+
// This method is no longer a simple blocking call. It starts threads
186+
// for each handle and then blocks, waiting for them.
187+
for handle in &mut self.handles {
188+
let directory_handle = handle.directory_handle;
189+
let watched_path = handle.watched_path.clone();
190+
191+
let worker_thread = thread::spawn(move || {
192+
log::info!("Worker thread started for path {:?}", watched_path);
193+
// This is the thread that will be "alerted" by the OS
194+
// when an I/O completion routine is ready.
195+
unsafe {
196+
SleepEx(winapi::um::winbase::INFINITE, 1);
197+
}
198+
log::info!("Worker thread ending for path {:?}", watched_path);
199+
});
200+
201+
handle.worker_thread = Some(worker_thread);
163202
}
203+
204+
// The main thread needs to return control to the caller so they can
205+
// do other things. The worker threads are now managing the watches.
206+
// A future improvement might be to join these threads in a graceful shutdown process.
207+
208+
Ok(())
164209
}
165210

166211
pub async fn watch_path(&mut self, path: &Path, config: &FsWatchConfig) -> Result<WatchHandle> {
212+
log::info!("Watching path: {:?} with config: {:?}", path, config);
167213
let wide_path: Vec<u16> = OsStr::new(path)
168214
.encode_wide()
169215
.chain(Some(0))
@@ -181,13 +227,14 @@ impl PlatformWatcher {
181227
);
182228

183229
if directory_handle == INVALID_HANDLE_VALUE {
230+
let err_code = GetLastError();
231+
log::error!("Failed to open directory {:?} for watching. Error: {}", path, err_code);
184232
return Err(TellMeWhenError::System(
185-
format!("Failed to open directory for watching: {}", GetLastError()),
233+
format!("Failed to open directory for watching: {}", err_code),
186234
));
187235
}
188236

189237
let event_handle = ptr::null_mut();
190-
191238
let mut buffer = vec![0u8; 4096];
192239
let buffer_ptr = buffer.as_mut_ptr();
193240
let buffer_len = buffer.len();
@@ -200,6 +247,7 @@ impl PlatformWatcher {
200247
watched_path: path.to_path_buf(),
201248
event_sender: self.event_sender.clone(),
202249
handler_id: self.handler_id.clone(),
250+
worker_thread: None,
203251
};
204252

205253
let context = Box::new(CallbackContext {
@@ -214,13 +262,13 @@ impl PlatformWatcher {
214262

215263
watch_handle.overlapped.hEvent = Box::into_raw(context) as *mut c_void;
216264

217-
self.start_monitoring(&mut watch_handle, config).await;
265+
self.start_monitoring(&mut watch_handle, config).await?;
266+
self.handles.push(watch_handle);
218267

219-
let handle = WatchHandle {
220-
handle: watch_handle,
221-
};
222-
223-
Ok(handle)
268+
// Return a handle that identifies this watcher.
269+
// Clone the last handle for WatchHandle.
270+
let last_handle = self.handles.last().unwrap().clone();
271+
Ok(WatchHandle { handle: last_handle })
224272
}
225273
}
226274

@@ -243,9 +291,12 @@ impl PlatformWatcher {
243291
if success == 0 {
244292
let error = GetLastError();
245293
if error != ERROR_IO_PENDING {
294+
log::error!("Initial ReadDirectoryChangesW failed for path {:?}. Error: {}", watch_handle.watched_path, error);
246295
return Err(TellMeWhenError::System(
247296
format!("ReadDirectoryChangesW failed with error: {}", error),
248297
));
298+
} else {
299+
log::info!("Initial ReadDirectoryChangesW successfully queued for path {:?}.", watch_handle.watched_path);
249300
}
250301
}
251302
}
@@ -254,48 +305,27 @@ impl PlatformWatcher {
254305

255306
fn build_notify_filter_static(event_types: &[FsEventType]) -> u32 {
256307
let mut filter = 0u32;
257-
258308
for event_type in event_types {
259309
match event_type {
260-
FsEventType::Created => {
261-
filter |= FILE_NOTIFY_CHANGE_CREATION | FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME;
262-
}
263-
FsEventType::Modified => {
264-
filter |= FILE_NOTIFY_CHANGE_LAST_WRITE | FILE_NOTIFY_CHANGE_SIZE | FILE_NOTIFY_CHANGE_ATTRIBUTES;
265-
}
266-
FsEventType::Deleted => {
267-
filter |= FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME;
268-
}
269-
FsEventType::AttributeChanged => {
270-
filter |= FILE_NOTIFY_CHANGE_ATTRIBUTES;
271-
}
272-
FsEventType::Renamed { .. } => {
273-
filter |= FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME;
274-
}
275-
_ => {
276-
277-
}
310+
FsEventType::Created => { filter |= FILE_NOTIFY_CHANGE_CREATION | FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME; }
311+
FsEventType::Modified => { filter |= FILE_NOTIFY_CHANGE_LAST_WRITE | FILE_NOTIFY_CHANGE_SIZE | FILE_NOTIFY_CHANGE_ATTRIBUTES; }
312+
FsEventType::Deleted => { filter |= FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME; }
313+
FsEventType::AttributeChanged => { filter |= FILE_NOTIFY_CHANGE_ATTRIBUTES; }
314+
FsEventType::Renamed { .. } => { filter |= FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME; }
315+
_ => {}
278316
}
279317
}
280-
281318
if filter == 0 {
282-
filter = FILE_NOTIFY_CHANGE_FILE_NAME
283-
| FILE_NOTIFY_CHANGE_DIR_NAME
284-
| FILE_NOTIFY_CHANGE_LAST_WRITE
285-
| FILE_NOTIFY_CHANGE_CREATION
286-
| FILE_NOTIFY_CHANGE_SIZE
287-
| FILE_NOTIFY_CHANGE_ATTRIBUTES;
319+
filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE | FILE_NOTIFY_CHANGE_CREATION | FILE_NOTIFY_CHANGE_SIZE | FILE_NOTIFY_CHANGE_ATTRIBUTES;
288320
}
289-
290321
filter
291322
}
292323

293324
fn process_notifications(buffer: &[u8], base_path: &Path, event_sender: &Option<Sender<EventMessage>>, handler_id: &str) {
294-
log::debug!("Processing filesystem notifications, buffer size: {}", buffer.len());
325+
log::debug!("Processing notifications, buffer size: {}", buffer.len());
295326

296327
if let Some(sender) = event_sender {
297328
let mut offset = 0;
298-
let mut old_name_info: Option<(PathBuf, SystemTime)> = None;
299329

300330
while offset < buffer.len() {
301331
unsafe {
@@ -311,46 +341,34 @@ impl PlatformWatcher {
311341
let filename_str = filename.to_string_lossy().trim_end_matches('\0').to_string();
312342
let full_path = base_path.join(&filename_str);
313343
let timestamp = SystemTime::now();
344+
345+
log::debug!("Found notification: Action={}, Path={:?}", info.Action, full_path);
314346

315-
match info.Action {
316-
FILE_ACTION_ADDED => {
317-
let event_data = FsEventData { event_type: FsEventType::Created, path: full_path, timestamp };
318-
send_event(sender, handler_id, event_data);
319-
}
320-
FILE_ACTION_REMOVED => {
321-
let event_data = FsEventData { event_type: FsEventType::Deleted, path: full_path, timestamp };
322-
send_event(sender, handler_id, event_data);
323-
}
324-
FILE_ACTION_MODIFIED => {
325-
let event_data = FsEventData { event_type: FsEventType::Modified, path: full_path, timestamp };
326-
send_event(sender, handler_id, event_data);
327-
}
347+
let event_type = match info.Action {
348+
FILE_ACTION_ADDED => FsEventType::Created,
349+
FILE_ACTION_REMOVED => FsEventType::Deleted,
350+
FILE_ACTION_MODIFIED => FsEventType::Modified,
328351
FILE_ACTION_RENAMED_OLD_NAME => {
329-
old_name_info = Some((full_path, timestamp));
330-
}
352+
// For simplicity and debugging, we'll log this but not create an event yet.
353+
log::debug!("Found FILE_ACTION_RENAMED_OLD_NAME for {:?}", full_path);
354+
continue; // Skip to next notification
355+
},
331356
FILE_ACTION_RENAMED_NEW_NAME => {
332-
if let Some((old_path, old_time)) = old_name_info.take() {
333-
let event_data = FsEventData {
334-
event_type: FsEventType::Renamed { old_path, new_path: full_path.clone() },
335-
path: full_path,
336-
timestamp: old_time,
337-
};
338-
send_event(sender, handler_id, event_data);
339-
} else {
340-
log::warn!("Received FILE_ACTION_RENAMED_NEW_NAME without a preceding old name.");
341-
let event_data = FsEventData {
342-
event_type: FsEventType::Created,
343-
path: full_path,
344-
timestamp,
345-
};
346-
send_event(sender, handler_id, event_data);
357+
log::debug!("Found FILE_ACTION_RENAMED_NEW_NAME for {:?}", full_path);
358+
FsEventType::Renamed {
359+
old_path: PathBuf::from("dummy_old_path"), // Placeholder
360+
new_path: full_path.clone(),
347361
}
348362
}
349-
_ => {
350-
let event_data = FsEventData { event_type: FsEventType::Modified, path: full_path, timestamp };
351-
send_event(sender, handler_id, event_data);
352-
}
353-
}
363+
_ => FsEventType::Modified,
364+
};
365+
366+
let event_data = FsEventData {
367+
event_type,
368+
path: full_path,
369+
timestamp,
370+
};
371+
send_event(sender, handler_id, event_data);
354372
}
355373

356374
if info.NextEntryOffset == 0 {
@@ -367,19 +385,28 @@ impl PlatformWatcher {
367385
}
368386

369387
pub async fn unwatch(&mut self, handle: WatchHandle) -> Result<()> {
370-
unsafe {
371-
let _ = winapi::um::ioapiset::CancelIo(handle.handle.directory_handle);
372-
373-
let context_ptr = handle.handle.overlapped.hEvent as *mut CallbackContext;
374-
if !context_ptr.is_null() {
375-
let _ = Box::from_raw(context_ptr);
376-
}
388+
// Find the handle and remove it.
389+
// Assuming WatchHandle now contains enough info to identify the correct WindowsWatchHandle
390+
// A simple implementation would be to just close all handles.
391+
while let Some(mut watch_handle) = self.handles.pop() {
392+
unsafe {
393+
let _ = CancelIo(watch_handle.directory_handle);
394+
// Join the worker thread to ensure it has finished.
395+
if let Some(worker_thread) = watch_handle.worker_thread.take() {
396+
let _ = worker_thread.join();
397+
}
377398

378-
if !handle.handle.event_handle.is_null() {
379-
CloseHandle(handle.handle.event_handle);
380-
}
381-
if handle.handle.directory_handle != INVALID_HANDLE_VALUE {
382-
CloseHandle(handle.handle.directory_handle);
399+
let context_ptr = watch_handle.overlapped.hEvent as *mut CallbackContext;
400+
if !context_ptr.is_null() {
401+
let _ = Box::from_raw(context_ptr);
402+
}
403+
404+
if !watch_handle.event_handle.is_null() {
405+
CloseHandle(watch_handle.event_handle);
406+
}
407+
if watch_handle.directory_handle != INVALID_HANDLE_VALUE {
408+
CloseHandle(watch_handle.directory_handle);
409+
}
383410
}
384411
}
385412
Ok(())
@@ -388,9 +415,14 @@ impl PlatformWatcher {
388415

389416
impl Drop for WindowsWatchHandle {
390417
fn drop(&mut self) {
418+
// The unwatch method should be called for proper cleanup.
419+
// Drop is for emergency cleanup if unwatch is not called.
391420
unsafe {
392-
let _ = winapi::um::ioapiset::CancelIo(self.directory_handle);
393-
421+
let _ = CancelIo(self.directory_handle);
422+
if let Some(worker_thread) = self.worker_thread.take() {
423+
let _ = worker_thread.join();
424+
}
425+
394426
let context_ptr = self.overlapped.hEvent as *mut CallbackContext;
395427
if !context_ptr.is_null() {
396428
let _ = Box::from_raw(context_ptr);

0 commit comments

Comments
 (0)