From a91dab61a675ddc9dcf217b9e0fa44daf7b88011 Mon Sep 17 00:00:00 2001 From: William Edwards Date: Wed, 31 Dec 2025 15:20:42 -0800 Subject: [PATCH 1/2] fix(CompositeDevice): asyncronously fetch target capabilities instead of blocking --- src/input/composite_device/mod.rs | 11 +----- src/input/composite_device/targets.rs | 55 ++++++++++++++++----------- src/input/target/client.rs | 2 +- 3 files changed, 35 insertions(+), 33 deletions(-) diff --git a/src/input/composite_device/mod.rs b/src/input/composite_device/mod.rs index 83558671..8979f4a1 100644 --- a/src/input/composite_device/mod.rs +++ b/src/input/composite_device/mod.rs @@ -334,16 +334,7 @@ impl CompositeDevice { } } CompositeCommand::GetTargetCapabilities(sender) => { - let target_caps = match self.targets.get_capabilities().await { - Ok(caps) => caps, - Err(e) => { - log::error!("Failed to get target capabilities: {e:?}"); - continue; - } - }; - if let Err(e) = sender.send(target_caps).await { - log::error!("Failed to send target capabilities: {:?}", e); - } + self.targets.send_capabilities(sender); } CompositeCommand::SetInterceptMode(mode) => self.set_intercept_mode(mode).await, CompositeCommand::GetInterceptMode(sender) => { diff --git a/src/input/composite_device/targets.rs b/src/input/composite_device/targets.rs index 67e3eeb1..b9fbd320 100644 --- a/src/input/composite_device/targets.rs +++ b/src/input/composite_device/targets.rs @@ -4,7 +4,10 @@ use std::{ time::Duration, }; -use tokio::sync::mpsc::{self, Sender}; +use tokio::{ + sync::mpsc::{self, Sender}, + task::JoinSet, +}; use zbus::Connection; use crate::{ @@ -388,33 +391,41 @@ impl CompositeDeviceTargets { } } - // Get the capabilities of all target devices - pub async fn get_capabilities(&self) -> Result, Box> { - let mut target_caps = HashSet::new(); + /// Get the capabilities of all target devices and send the result to the + /// given channel. + pub fn send_capabilities(&self, sender: Sender>) { + // Use a JoinSet to query all targets simultaneously + let mut tasks = JoinSet::new(); for target in self.target_devices.values() { - let caps = match target.get_capabilities().await { - Ok(caps) => caps, - Err(e) => { - return Err(format!("Failed to get target capabilities: {e:?}").into()); - } - }; - for cap in caps { - target_caps.insert(cap); - } + let target = target.clone(); + tasks.spawn(async move { target.get_capabilities().await }); } for target in self.target_dbus_devices.values() { - let caps = match target.get_capabilities().await { - Ok(caps) => caps, - Err(e) => { - return Err(format!("Failed to get target capabilities: {e:?}").into()); + let target = target.clone(); + tasks.spawn(async move { target.get_capabilities().await }); + } + + // Collect the results in a task and send them to the given channel + tokio::task::spawn(async move { + let mut target_caps = HashSet::new(); + let results = tasks.join_all().await; + for result in results { + let caps = match result { + Ok(caps) => caps, + Err(e) => { + log::warn!("Failed to get target capabilities: {e}"); + continue; + } + }; + for cap in caps { + target_caps.insert(cap); } - }; - for cap in caps { - target_caps.insert(cap); } - } - Ok(target_caps) + if let Err(e) = sender.send(target_caps).await { + log::error!("Failed to send target capabilities: {e}"); + } + }); } /// Update the target capabilities of the given target device diff --git a/src/input/target/client.rs b/src/input/target/client.rs index a910efab..e45e9d98 100644 --- a/src/input/target/client.rs +++ b/src/input/target/client.rs @@ -29,7 +29,7 @@ pub enum ClientError { #[error("failed to try to send command to device: {0}")] TrySendError(TrySendError), #[error("service encountered an error processing the request: {0}")] - ServiceError(Box), + ServiceError(Box), #[error("device no longer exists")] ChannelClosed, } From dec537735348ca5ab6b5a41dfbb133deb8cdb5bf Mon Sep 17 00:00:00 2001 From: William Edwards Date: Wed, 31 Dec 2025 15:22:02 -0800 Subject: [PATCH 2/2] fix(Target Devices): set target devices in parallel instead of serially --- src/input/composite_device/targets.rs | 174 +++++++++++++++----------- 1 file changed, 98 insertions(+), 76 deletions(-) diff --git a/src/input/composite_device/targets.rs b/src/input/composite_device/targets.rs index b9fbd320..ab2c5b7c 100644 --- a/src/input/composite_device/targets.rs +++ b/src/input/composite_device/targets.rs @@ -154,8 +154,11 @@ impl CompositeDeviceTargets { // Identify which target devices are new let mut device_types_to_start: Vec = vec![]; + let device_types_map = self.target_kinds_running().await?; + let device_types_running: Vec = + device_types_map.values().cloned().collect(); for kind in device_types.iter() { - if self.target_kind_running(kind).await? { + if device_types_running.contains(kind) { log::debug!("[{dbus_path}] Target device {kind} already running, nothing to do."); continue; } @@ -165,85 +168,100 @@ impl CompositeDeviceTargets { // Identify the targets that need to close let mut targets_to_stop: HashMap = HashMap::new(); - for (path, target) in self.target_devices.clone().into_iter() { - let target_type = match target.get_type().await { - Ok(value) => value, - Err(e) => { - return Err(format!("Failed to request target type: {e:?}").into()); - } - }; - if !device_types.contains(&target_type) { - log::debug!("[{dbus_path}] Target device {path} not in new devices list. Adding to stop list."); - targets_to_stop.insert(path, target); + for (path, target_type) in device_types_map { + if device_types.contains(&target_type) { + continue; } + let Some(target) = self.target_devices.get(&path) else { + continue; + }; + log::debug!( + "[{dbus_path}] Target device {path} not in new devices list. Adding to stop list." + ); + targets_to_stop.insert(path, target.clone()); } // Stop all old target devices that aren't going to persist + let mut stop_tasks = JoinSet::new(); for (path, target) in targets_to_stop.clone().into_iter() { log::debug!("[{dbus_path}] Stopping old target device: {path}"); self.target_devices.remove(&path); for (_, target_devices) in self.target_devices_by_capability.iter_mut() { target_devices.remove(&path); } - if let Err(e) = target.stop().await { - log::error!("[{dbus_path}] Failed to stop old target device: {e:?}"); + stop_tasks.spawn(async move { target.stop().await }); + } + for result in stop_tasks.join_all().await { + if let Err(e) = result { + log::error!("[{dbus_path}] Failed to stop old target device: {e}"); } } - let composite_path = self.path.clone(); - - // Create new target devices using the input manager + // Create new target devices using the input manager. Spawn a task for + // each create request so they can be performed simultaneously. + let mut tasks: JoinSet>> = JoinSet::new(); for kind in device_types_to_start { - // Ask the input manager to create a target device - log::debug!("[{dbus_path}] Requesting to create device: {kind}"); - let (sender, mut receiver) = mpsc::channel(1); - self.manager - .send(ManagerCommand::CreateTargetDevice { kind, sender }) - .await?; - let Some(response) = receiver.recv().await else { - log::warn!("[{dbus_path}] Channel closed waiting for response from input manager"); - continue; - }; - let target_path = match response { - Ok(path) => path, - Err(e) => { - let err = format!("Failed to create target: {e:?}"); - log::error!("[{dbus_path}] {err}"); - continue; - } - }; - - // Ask the input manager to attach the target device to this composite - // device. Note that this *must* be run in an async task to prevent - // deadlocking. - log::debug!("[{dbus_path}] Requesting to attach target device {target_path} to {composite_path}"); + let dbus_path = dbus_path.to_owned(); let manager = self.manager.clone(); - let target_path_clone = target_path.clone(); - let composite_path_clone = composite_path.clone(); - let dbus_path = dbus_path.to_string(); - tokio::task::spawn(async move { + let composite_path = self.path.clone(); + + // Spawn a task that requests creating a new target device. + tasks.spawn(async move { + // Ask the input manager to create a target device + log::debug!("[{dbus_path}] Requesting to create device: {kind}"); let (sender, mut receiver) = mpsc::channel(1); - let result = manager - .send(ManagerCommand::AttachTargetDevice { - target_path: target_path_clone, - composite_path: composite_path_clone, - sender, - }) - .await; - if let Err(e) = result { - log::warn!("[{dbus_path}] Failed to send attach request to input manager: {e}"); - return; - } + let create_cmd = ManagerCommand::CreateTargetDevice { kind, sender }; + manager.send(create_cmd).await?; let Some(response) = receiver.recv().await else { - log::warn!( - "[{dbus_path}] Channel closed waiting for response from input manager" - ); - return; + log::warn!("[{dbus_path}] Channel closed waiting for response from input manager"); + return Err("Channel closed waiting for response from input manager".into()); }; - if let Err(e) = response { - log::error!("[{dbus_path}] Failed to attach target device: {e:?}"); - } + let target_path = response?; + + // Ask the input manager to attach the target device to this composite + // device. Note that this *must* be run in an async task to prevent + // deadlocking. + log::debug!("[{dbus_path}] Requesting to attach target device {target_path} to {composite_path}"); + let target_path_clone = target_path.clone(); + let composite_path_clone = composite_path.clone(); + let dbus_path = dbus_path.to_string(); + tokio::task::spawn(async move { + let (sender, mut receiver) = mpsc::channel(1); + let result = manager + .send(ManagerCommand::AttachTargetDevice { + target_path: target_path_clone, + composite_path: composite_path_clone, + sender, + }) + .await; + if let Err(e) = result { + log::warn!("[{dbus_path}] Failed to send attach request to input manager: {e}"); + return; + } + let Some(response) = receiver.recv().await else { + log::warn!( + "[{dbus_path}] Channel closed waiting for response from input manager" + ); + return; + }; + if let Err(e) = response { + log::error!("[{dbus_path}] Failed to attach target device: {e:?}"); + } + }); + + Ok(target_path) }); + } + + // Wait for all target devices to be created + for result in tasks.join_all().await { + let target_path = match result { + Ok(path) => path, + Err(e) => { + log::error!("[{dbus_path}] Failed to create target device: {e}"); + continue; + } + }; // Enqueue the target device to wait for the attachment message from // the input manager to prevent multiple calls to set_target_devices() @@ -256,24 +274,28 @@ impl CompositeDeviceTargets { //self.signal_targets_changed().await; Ok(()) - - // } - // Deterimines if a given target device kind is already running - async fn target_kind_running(&self, kind: &TargetDeviceTypeId) -> Result> { - for target in self.target_devices.values() { - let target_type = match target.get_type().await { - Ok(value) => value, - Err(e) => { - return Err(format!("Failed to request target type: {e:?}").into()); - } - }; - if *kind == target_type { - return Ok(true); - } + /// Return a list of currently running target devices + async fn target_kinds_running( + &self, + ) -> Result, Box> { + // Use a JoinSet to query all targets simultaneously + let mut tasks = JoinSet::new(); + for (path, target) in self.target_devices.iter() { + let path = path.clone(); + let target = target.clone(); + tasks.spawn(async move { (path, target.get_type().await) }); } - Ok(false) + + // Collect the results + let mut kinds_running = HashMap::with_capacity(self.target_devices.len()); + let results = tasks.join_all().await; + for (path, kind) in results { + kinds_running.insert(path, kind?); + } + + Ok(kinds_running) } /// Write the given event to all target devices that are capable of emitting