diff --git a/Cargo.toml b/Cargo.toml index 198b103..5d6e9f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ rustix = { version = "1.0.1", features = ["fs", "event", "net", "time", "mm"] } linux-raw-sys = { version = ">= 0.11, <= 0.12", features = ["ioctl"] } [target.'cfg(target_os="windows")'.dependencies] -windows-sys = { version = ">= 0.60, <= 0.61", features = ["Win32_Devices_Usb", "Win32_Devices_DeviceAndDriverInstallation", "Win32_Foundation", "Win32_Devices_Properties", "Win32_Storage_FileSystem", "Win32_Security", "Win32_System_IO", "Win32_System_Registry", "Win32_System_Com"] } +windows-sys = { version = ">= 0.60, <= 0.61", features = ["Win32_Devices_Usb", "Win32_Devices_DeviceAndDriverInstallation", "Win32_Foundation", "Win32_Devices_Properties", "Win32_Storage_FileSystem", "Win32_Security", "Win32_System_IO", "Win32_System_Registry", "Win32_System_Threading", "Win32_System_Com"] } [target.'cfg(target_os="macos")'.dependencies] core-foundation = "0.10.1" diff --git a/src/device.rs b/src/device.rs index f68907b..5e78916 100644 --- a/src/device.rs +++ b/src/device.rs @@ -426,8 +426,6 @@ impl Interface { /// byte of `index` must match the interface number, or /// `TransferError::InvalidArgument` will be returned. This is a WinUSB /// limitation. - /// * On Windows, the timeout is currently fixed to 5 seconds and the - /// timeout argument is ignored. pub fn control_in( &self, data: ControlIn, @@ -467,8 +465,6 @@ impl Interface { /// byte of `index` must match the interface number, or /// `TransferError::InvalidArgument` will be returned. This is a WinUSB /// limitation. - /// * On Windows, the timeout is currently fixed to 5 seconds and the - /// timeout argument is ignored. pub fn control_out( &self, data: ControlOut, diff --git a/src/platform/windows_winusb/device.rs b/src/platform/windows_winusb/device.rs index fd5b79d..ee2967f 100644 --- a/src/platform/windows_winusb/device.rs +++ b/src/platform/windows_winusb/device.rs @@ -2,11 +2,8 @@ use std::{ collections::{btree_map::Entry, BTreeMap, VecDeque}, ffi::c_void, io, - mem::{size_of_val, transmute}, - os::windows::{ - io::{AsRawHandle, RawHandle}, - prelude::OwnedHandle, - }, + mem::{self, size_of_val, transmute}, + os::windows::io::{AsRawHandle, OwnedHandle, RawHandle}, ptr::{self, null_mut}, sync::{Arc, Mutex}, task::{Context, Poll}, @@ -24,9 +21,16 @@ use windows_sys::Win32::{ Foundation::{ GetLastError, ERROR_BAD_COMMAND, ERROR_DEVICE_NOT_CONNECTED, ERROR_FILE_NOT_FOUND, ERROR_IO_PENDING, ERROR_NOT_FOUND, ERROR_NO_MORE_ITEMS, ERROR_NO_SUCH_DEVICE, FALSE, - HANDLE, TRUE, + FILETIME, HANDLE, TRUE, + }, + System::{ + Threading::{ + CancelThreadpoolIo, CloseThreadpoolIo, CreateThreadpoolIo, CreateThreadpoolTimer, + SetThreadpoolTimer, StartThreadpoolIo, WaitForThreadpoolTimerCallbacks, + PTP_CALLBACK_INSTANCE, PTP_IO, PTP_TIMER, + }, + IO::{CancelIoEx, OVERLAPPED}, }, - System::IO::{CancelIoEx, OVERLAPPED}, }; use crate::{ @@ -263,6 +267,7 @@ impl BitSet256 { pub(crate) struct WinusbFileHandle { first_interface: u8, handle: OwnedHandle, + threadpool_io: PTP_IO, winusb_handle: WINUSB_INTERFACE_HANDLE, claimed_interfaces: BitSet256, } @@ -275,11 +280,27 @@ impl WinusbFileHandle { fn new(path: &WCStr, first_interface: u8) -> Result { let handle = create_file(path) .map_err(|e| Error::new_os(ErrorKind::Other, "failed to open device", e).log_debug())?; - super::events::register(&handle)?; + + let threadpool_io = unsafe { + CreateThreadpoolIo( + raw_handle(&handle), + Some(io_callback), + null_mut(), + null_mut(), + ) + }; + + if threadpool_io == 0 { + let err = unsafe { GetLastError() }; + return Err( + Error::new_os(ErrorKind::Other, "CreateThreadpoolIo failed", err).log_error(), + ); + } let winusb_handle = unsafe { let mut h = ptr::null_mut(); if WinUsb_Initialize(raw_handle(&handle), &mut h) == FALSE { + CloseThreadpoolIo(threadpool_io); return Err(Error::new_os( ErrorKind::Other, "failed to initialize WinUSB", @@ -292,9 +313,27 @@ impl WinusbFileHandle { debug!("Opened WinUSB handle for {path} (interface {first_interface})"); + unsafe { + // Disable WinUSB's default control transfer timeout so we can do our own + // per-request timeout handling by cancelling the transfer with a timer. + let timeout: u32 = 0; + let r = WinUsb_SetPipePolicy( + winusb_handle, + 0x00, + Usb::PIPE_TRANSFER_TIMEOUT, + size_of_val(&timeout) as u32, + &timeout as *const _ as *const c_void, + ); + if r != TRUE { + let err = GetLastError(); + warn!("Failed to disable default timeout on control endpoint, error {err}"); + } + } + Ok(WinusbFileHandle { first_interface, handle, + threadpool_io, winusb_handle, claimed_interfaces: BitSet256::new(), }) @@ -349,6 +388,7 @@ impl WinusbFileHandle { Ok(Arc::new(WindowsInterface { handle: self.handle.as_raw_handle(), + threadpool_io: self.threadpool_io, device: device.clone(), interface_number, first_interface_number: self.first_interface, @@ -365,16 +405,78 @@ impl Drop for WinusbFileHandle { self.first_interface ); unsafe { + CloseThreadpoolIo(self.threadpool_io); WinUsb_Free(self.winusb_handle); } } } +unsafe extern "system" fn io_callback( + _instance: PTP_CALLBACK_INSTANCE, + _context: *mut c_void, + overlapped: *mut c_void, + result: u32, + bytes_transferred: usize, + _io: PTP_IO, +) { + let t = overlapped as *mut TransferData; + { + let transfer = unsafe { &*t }; + + debug!( + "Transfer {t:?} on endpoint {:02x} complete: status {}, {} bytes", + transfer.endpoint, result, bytes_transferred, + ); + + if let Some(timer) = transfer.timeout { + // Cancel the timeout and wait for any callback to complete that may be concurrently + // accessing `transfer`. + unsafe { + SetThreadpoolTimer(timer, ptr::null_mut(), 0, 0); + WaitForThreadpoolTimerCallbacks(timer, 1); + } + } + } + unsafe { notify_completion::(t) } +} + +unsafe extern "system" fn timer_callback( + _instance: PTP_CALLBACK_INSTANCE, + context: *mut c_void, + _timer: PTP_TIMER, +) { + let transfer_data = &*(context as *const TransferData); + debug!( + "Transfer {context:?} timeout on endpoint 0x{:02X}", + transfer_data.endpoint + ); + unsafe { + CancelIoEx(transfer_data.handle, &transfer_data.overlapped); + } +} + +fn duration_to_filetime(duration: Duration) -> FILETIME { + let time = i64::try_from(duration.as_micros()) + .unwrap_or(i64::MAX) + .saturating_mul(-10); // in 100-nanosecond intervals, negative for relative time + FILETIME { + dwLowDateTime: (time & 0xFFFFFFFF) as u32, + dwHighDateTime: (time >> 32) as u32, + } +} + pub(crate) struct WindowsInterface { + /// Owned by the `WinUSBFileHandle` pub(crate) handle: RawHandle, + + /// Owned by the `WinUSBFileHandle` + pub(crate) threadpool_io: PTP_IO, + pub(crate) device: Arc, pub(crate) first_interface_number: u8, pub(crate) interface_number: u8, + + /// Owned by this object if `first_interface_number != interface_number`, otherwise owned by the `WinUSBFileHandle` pub(crate) winusb_handle: WINUSB_INTERFACE_HANDLE, state: Mutex, } @@ -430,7 +532,7 @@ impl WindowsInterface { data: ControlIn, timeout: Duration, ) -> impl MaybeFuture, TransferError>> { - let mut t = TransferData::new(0x80); + let mut t = TransferData::new(self.handle, 0x80); t.set_buffer(Buffer::new(data.length as usize)); let pkt = WINUSB_SETUP_PACKET { @@ -443,7 +545,7 @@ impl WindowsInterface { let intf = self.clone(); - TransferFuture::new(t, |t| self.submit_control(t, pkt)).map(move |mut t| { + TransferFuture::new(t, |t| self.submit_control(t, pkt, timeout)).map(move |mut t| { let c = t.take_completion(&intf); c.status?; Ok(c.buffer.into_vec()) @@ -455,7 +557,7 @@ impl WindowsInterface { data: ControlOut, timeout: Duration, ) -> impl MaybeFuture> { - let mut t = TransferData::new(0x00); + let mut t = TransferData::new(self.handle, 0x00); t.set_buffer(Buffer::from(data.data.to_vec())); let pkt = WINUSB_SETUP_PACKET { @@ -468,7 +570,7 @@ impl WindowsInterface { let intf = self.clone(); - TransferFuture::new(t, |t| self.submit_control(t, pkt)).map(move |mut t| { + TransferFuture::new(t, |t| self.submit_control(t, pkt, timeout)).map(move |mut t| { let c = t.take_completion(&intf); c.status }) @@ -560,7 +662,7 @@ impl WindowsInterface { let dir = Direction::from_address(endpoint); let len = t.request_len; let buf = t.buf; - t.overlapped.InternalHigh = 0; + t.overlapped = unsafe { mem::zeroed() }; t.error_from_submit = Ok(()); let t = t.pre_submit(); @@ -568,6 +670,10 @@ impl WindowsInterface { debug!("Submit transfer {ptr:?} on endpoint {endpoint:02X} for {len} bytes {dir:?}"); + unsafe { + StartThreadpoolIo(self.threadpool_io); + } + let r = unsafe { match dir { Direction::Out => WinUsb_WritePipe( @@ -596,12 +702,13 @@ impl WindowsInterface { &self, mut t: Idle, pkt: WINUSB_SETUP_PACKET, + timeout: Duration, ) -> Pending { let endpoint = t.endpoint; let dir = Direction::from_address(endpoint); let len = t.request_len; let buf = t.buf; - t.overlapped.InternalHigh = 0; + t.overlapped = unsafe { mem::zeroed() }; t.error_from_submit = Ok(()); if pkt.RequestType & 0x1f == Recipient::Interface as u8 @@ -612,11 +719,35 @@ impl WindowsInterface { return t.simulate_complete(); } - let t = t.pre_submit(); let ptr = t.as_ptr(); + let timer = unsafe { + CreateThreadpoolTimer( + Some(timer_callback), + ptr as *mut core::ffi::c_void, + ptr::null_mut(), + ) + }; + if timer == 0 { + let e = unsafe { GetLastError() }; + log::error!("CreateThreadpoolTimer failed with error {e}"); + } else { + t.timeout = Some(timer); + } + + let t = t.pre_submit(); + + // Start the timeout, now that we're no longer borrowing the transfer, but also before submitting the transfer: + // this guarantees that the IO callback trying to clear the timer can't occur before `SetThreadpoolTimer` + // enables it. + let tm = duration_to_filetime(timeout); + unsafe { SetThreadpoolTimer(timer, &tm, 0, 0) }; debug!("Submit control {dir:?} transfer {ptr:?} for {len} bytes"); + unsafe { + StartThreadpoolIo(self.threadpool_io); + } + let r = unsafe { WinUsb_ControlTransfer( self.winusb_handle, @@ -641,6 +772,10 @@ impl WindowsInterface { if err != ERROR_IO_PENDING { error!("submit failed: {}", io::Error::from_raw_os_error(err as _)); + unsafe { + CancelThreadpoolIo(self.threadpool_io); + } + // Safety: Transfer was not submitted, so we still own it // and must complete it in place of the event thread. unsafe { @@ -711,7 +846,10 @@ impl WindowsEndpoint { fn make_transfer(&mut self, buffer: Buffer) -> Idle { let mut t = self.idle_transfer.take().unwrap_or_else(|| { - Idle::new(self.inner.clone(), TransferData::new(self.inner.address)) + Idle::new( + self.inner.clone(), + TransferData::new(self.inner.interface.handle, self.inner.address), + ) }); t.set_buffer(buffer); t diff --git a/src/platform/windows_winusb/events.rs b/src/platform/windows_winusb/events.rs deleted file mode 100644 index d3e1567..0000000 --- a/src/platform/windows_winusb/events.rs +++ /dev/null @@ -1,109 +0,0 @@ -use once_cell::sync::OnceCell; -use std::{ - os::windows::{ - io::HandleOrNull, - prelude::{OwnedHandle, RawHandle}, - }, - ptr, thread, -}; -use windows_sys::Win32::{ - Foundation::{GetLastError, FALSE, INVALID_HANDLE_VALUE}, - System::IO::{CreateIoCompletionPort, GetQueuedCompletionStatusEx, OVERLAPPED_ENTRY}, -}; - -use crate::Error; - -use super::util::raw_handle; - -struct IoCompletionPort(OwnedHandle); - -impl IoCompletionPort { - fn new() -> Result { - unsafe { - let port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, ptr::null_mut(), 0, 0); - match HandleOrNull::from_raw_handle(port as RawHandle).try_into() { - Ok(handle) => Ok(IoCompletionPort(handle)), - Err(_) => Err(Error::new_os( - crate::ErrorKind::Other, - "failed to create IO completion port", - GetLastError(), - ) - .log_error()), - } - } - } - - fn register(&self, handle: &OwnedHandle) -> Result<(), Error> { - unsafe { - let r = CreateIoCompletionPort(raw_handle(handle), raw_handle(&self.0), 0, 0); - if r.is_null() { - Err(Error::new_os( - crate::ErrorKind::Other, - "failed to register IO completion port", - GetLastError(), - ) - .log_error()) - } else { - Ok(()) - } - } - } - - fn wait(&self, events: &mut Vec) -> Result<(), Error> { - unsafe { - let mut event_count = 0; - let r = GetQueuedCompletionStatusEx( - raw_handle(&self.0), - events.as_mut_ptr(), - events - .capacity() - .try_into() - .expect("events capacity should fit in u32"), - &mut event_count, - u32::MAX, - 0, - ); - - if r == FALSE { - Err(Error::new_os( - crate::ErrorKind::Other, - "failed to get events from IO completion port", - GetLastError(), - ) - .log_error()) - } else { - events.set_len(event_count as usize); - Ok(()) - } - } - } -} - -static IOCP_HANDLE: OnceCell = OnceCell::new(); - -pub(super) fn register(usb_fd: &OwnedHandle) -> Result<(), Error> { - let mut start_thread = false; - let iocp = IOCP_HANDLE.get_or_try_init(|| { - start_thread = true; - IoCompletionPort::new() - })?; - - if start_thread { - thread::spawn(event_loop); - } - - iocp.register(usb_fd) -} - -fn event_loop() { - let iocp = IOCP_HANDLE.get().unwrap(); - let mut event_list = Vec::with_capacity(8); - loop { - event_list.clear(); - iocp.wait(&mut event_list).unwrap(); - - for event in &event_list { - super::transfer::handle_event(event.lpOverlapped); - } - } -} diff --git a/src/platform/windows_winusb/mod.rs b/src/platform/windows_winusb/mod.rs index f7861d0..0b5c90f 100644 --- a/src/platform/windows_winusb/mod.rs +++ b/src/platform/windows_winusb/mod.rs @@ -3,8 +3,6 @@ use std::num::NonZeroU32; pub use enumeration::{list_buses, list_devices}; -mod events; - mod device; pub(crate) use device::WindowsDevice as Device; pub(crate) use device::WindowsEndpoint as Endpoint; diff --git a/src/platform/windows_winusb/transfer.rs b/src/platform/windows_winusb/transfer.rs index e63b325..0a4e1cd 100644 --- a/src/platform/windows_winusb/transfer.rs +++ b/src/platform/windows_winusb/transfer.rs @@ -1,16 +1,21 @@ -use std::mem::{self, ManuallyDrop}; +use std::{ + mem::{self, ManuallyDrop}, + os::windows::io::RawHandle, +}; -use log::debug; use windows_sys::Win32::{ Foundation::{ GetLastError, ERROR_DEVICE_NOT_CONNECTED, ERROR_FILE_NOT_FOUND, ERROR_GEN_FAILURE, ERROR_NO_SUCH_DEVICE, ERROR_OPERATION_ABORTED, ERROR_REQUEST_ABORTED, ERROR_SEM_TIMEOUT, ERROR_SUCCESS, ERROR_TIMEOUT, }, - System::IO::{GetOverlappedResult, OVERLAPPED}, + System::{ + Threading::{CloseThreadpoolTimer, PTP_TIMER}, + IO::{GetOverlappedResult, OVERLAPPED}, + }, }; -use crate::transfer::{internal::notify_completion, Buffer, Completion, Direction, TransferError}; +use crate::transfer::{Buffer, Completion, Direction, TransferError}; use super::Interface; @@ -20,35 +25,36 @@ pub struct TransferData { // overlapped.Internal contains the status // overlapped.InternalHigh contains the number of bytes transferred pub(crate) overlapped: OVERLAPPED, + + // Owned by the `WinusbFileHandle` + pub(crate) handle: RawHandle, pub(crate) buf: *mut u8, pub(crate) capacity: u32, pub(crate) request_len: u32, pub(crate) endpoint: u8, pub(crate) error_from_submit: Result<(), TransferError>, + pub(crate) timeout: Option, } unsafe impl Send for TransferData {} unsafe impl Sync for TransferData {} impl TransferData { - pub(crate) fn new(endpoint: u8) -> TransferData { + pub(crate) fn new(handle: RawHandle, endpoint: u8) -> TransferData { let mut empty = ManuallyDrop::new(Vec::with_capacity(0)); TransferData { overlapped: unsafe { mem::zeroed() }, + handle, buf: empty.as_mut_ptr(), capacity: 0, request_len: 0, endpoint, + timeout: None, error_from_submit: Ok(()), } } - #[inline] - pub fn actual_len(&self) -> usize { - self.overlapped.InternalHigh - } - pub fn set_buffer(&mut self, buf: Buffer) { debug_assert!(self.capacity == 0); let buf = ManuallyDrop::new(buf); @@ -109,21 +115,10 @@ impl Drop for TransferData { fn drop(&mut self) { unsafe { drop(Vec::from_raw_parts(self.buf, 0, self.capacity as usize)); - } - } -} -pub(super) fn handle_event(completion: *mut OVERLAPPED) { - let t = completion as *mut TransferData; - { - let transfer = unsafe { &mut *t }; - - debug!( - "Transfer {t:?} on endpoint {:02x} complete: status {}, {} bytes", - transfer.endpoint, - transfer.overlapped.Internal, - transfer.actual_len(), - ); + if let Some(timer) = self.timeout { + CloseThreadpoolTimer(timer); + } + } } - unsafe { notify_completion::(t) } } diff --git a/src/transfer/internal.rs b/src/transfer/internal.rs index 8fb7190..007250a 100644 --- a/src/transfer/internal.rs +++ b/src/transfer/internal.rs @@ -115,16 +115,38 @@ const STATE_PENDING: u8 = 1; const STATE_ABANDONED: u8 = 2; /// Handle to a transfer that is known to be idle. -pub(crate) struct Idle

(Box>); +pub(crate) struct Idle

{ + ptr: NonNull>, +} impl

Idle

{ /// Create a new transfer and get a handle. pub(crate) fn new(notify: Arc + Send + Sync>, inner: P) -> Idle

{ - Idle(Box::new(TransferInner { + let b = Box::new(TransferInner { platform_data: inner, state: AtomicU8::new(STATE_IDLE), notify, - })) + }); + Idle { + // SAFETY: Box pointer is non-null + ptr: unsafe { NonNull::new_unchecked(Box::into_raw(b)) }, + } + } + + fn inner(&self) -> &TransferInner

{ + // SAFETY: Idle state means there is no concurrent access + unsafe { self.ptr.as_ref() } + } + + fn inner_mut(&mut self) -> &mut TransferInner

{ + // SAFETY: Idle state means there is no concurrent access + unsafe { self.ptr.as_mut() } + } + + #[allow(unused)] + pub fn as_ptr(&self) -> *mut P { + // first member of repr(C) struct + self.ptr.as_ptr().cast() } /// Mark the transfer as pending. The caller must submit the transfer to the kernel @@ -132,30 +154,38 @@ impl

Idle

{ pub(crate) fn pre_submit(self) -> Pending

{ // It's the syscall that submits the transfer that actually performs the // release ordering. - let prev = self.0.state.swap(STATE_PENDING, Ordering::Relaxed); + let prev = self.inner().state.swap(STATE_PENDING, Ordering::Relaxed); assert_eq!(prev, STATE_IDLE, "Transfer should be idle when submitted"); - Pending { - ptr: unsafe { NonNull::new_unchecked(Box::into_raw(self.0)) }, - } + let transfer = ManuallyDrop::new(self); + Pending { ptr: transfer.ptr } } pub(crate) fn simulate_complete(self) -> Pending

{ - Pending { - ptr: unsafe { NonNull::new_unchecked(Box::into_raw(self.0)) }, - } + let transfer = ManuallyDrop::new(self); + Pending { ptr: transfer.ptr } } } +unsafe impl Send for Idle

{} +unsafe impl Sync for Idle

{} + impl

Deref for Idle

{ type Target = P; fn deref(&self) -> &Self::Target { - &self.0.platform_data + &self.inner().platform_data } } impl

DerefMut for Idle

{ fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0.platform_data + &mut self.inner_mut().platform_data + } +} + +impl

Drop for Idle

{ + fn drop(&mut self) { + // SAFETY: state means there is no concurrent access + unsafe { drop(Box::from_raw(self.ptr.as_ptr())) } } } @@ -191,7 +221,7 @@ impl

Pending

{ pub unsafe fn into_idle(self) -> Idle

{ debug_assert!(self.is_complete()); let transfer = ManuallyDrop::new(self); - Idle(Box::from_raw(transfer.ptr.as_ptr())) + Idle { ptr: transfer.ptr } } }