Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions src/input/composite_device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,7 @@ impl CompositeDevice {
}
}
CompositeCommand::GetTargetCapabilities(sender) => {
let target_caps = match self.targets.get_capabilities().await {
Ok(caps) => caps,
Err(e) => {
log::error!("Failed to get target capabilities: {e:?}");
continue;
}
};
if let Err(e) = sender.send(target_caps).await {
log::error!("Failed to send target capabilities: {:?}", e);
}
self.targets.send_capabilities(sender);
}
CompositeCommand::SetInterceptMode(mode) => self.set_intercept_mode(mode).await,
CompositeCommand::GetInterceptMode(sender) => {
Expand Down
229 changes: 131 additions & 98 deletions src/input/composite_device/targets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::{
time::Duration,
};

use tokio::sync::mpsc::{self, Sender};
use tokio::{
sync::mpsc::{self, Sender},
task::JoinSet,
};
use zbus::Connection;

use crate::{
Expand Down Expand Up @@ -151,8 +154,11 @@ impl CompositeDeviceTargets {

// Identify which target devices are new
let mut device_types_to_start: Vec<TargetDeviceTypeId> = vec![];
let device_types_map = self.target_kinds_running().await?;
let device_types_running: Vec<TargetDeviceTypeId> =
device_types_map.values().cloned().collect();
for kind in device_types.iter() {
if self.target_kind_running(kind).await? {
if device_types_running.contains(kind) {
log::debug!("[{dbus_path}] Target device {kind} already running, nothing to do.");
continue;
}
Expand All @@ -162,85 +168,100 @@ impl CompositeDeviceTargets {

// Identify the targets that need to close
let mut targets_to_stop: HashMap<String, TargetDeviceClient> = HashMap::new();
for (path, target) in self.target_devices.clone().into_iter() {
let target_type = match target.get_type().await {
Ok(value) => value,
Err(e) => {
return Err(format!("Failed to request target type: {e:?}").into());
}
};
if !device_types.contains(&target_type) {
log::debug!("[{dbus_path}] Target device {path} not in new devices list. Adding to stop list.");
targets_to_stop.insert(path, target);
for (path, target_type) in device_types_map {
if device_types.contains(&target_type) {
continue;
}
let Some(target) = self.target_devices.get(&path) else {
continue;
};
log::debug!(
"[{dbus_path}] Target device {path} not in new devices list. Adding to stop list."
);
targets_to_stop.insert(path, target.clone());
}

// Stop all old target devices that aren't going to persist
let mut stop_tasks = JoinSet::new();
for (path, target) in targets_to_stop.clone().into_iter() {
log::debug!("[{dbus_path}] Stopping old target device: {path}");
self.target_devices.remove(&path);
for (_, target_devices) in self.target_devices_by_capability.iter_mut() {
target_devices.remove(&path);
}
if let Err(e) = target.stop().await {
log::error!("[{dbus_path}] Failed to stop old target device: {e:?}");
stop_tasks.spawn(async move { target.stop().await });
}
for result in stop_tasks.join_all().await {
if let Err(e) = result {
log::error!("[{dbus_path}] Failed to stop old target device: {e}");
}
}

let composite_path = self.path.clone();

// Create new target devices using the input manager
// Create new target devices using the input manager. Spawn a task for
// each create request so they can be performed simultaneously.
let mut tasks: JoinSet<Result<String, Box<dyn Error + Send + Sync>>> = JoinSet::new();
for kind in device_types_to_start {
// Ask the input manager to create a target device
log::debug!("[{dbus_path}] Requesting to create device: {kind}");
let (sender, mut receiver) = mpsc::channel(1);
self.manager
.send(ManagerCommand::CreateTargetDevice { kind, sender })
.await?;
let Some(response) = receiver.recv().await else {
log::warn!("[{dbus_path}] Channel closed waiting for response from input manager");
continue;
};
let target_path = match response {
Ok(path) => path,
Err(e) => {
let err = format!("Failed to create target: {e:?}");
log::error!("[{dbus_path}] {err}");
continue;
}
};

// Ask the input manager to attach the target device to this composite
// device. Note that this *must* be run in an async task to prevent
// deadlocking.
log::debug!("[{dbus_path}] Requesting to attach target device {target_path} to {composite_path}");
let dbus_path = dbus_path.to_owned();
let manager = self.manager.clone();
let target_path_clone = target_path.clone();
let composite_path_clone = composite_path.clone();
let dbus_path = dbus_path.to_string();
tokio::task::spawn(async move {
let composite_path = self.path.clone();

// Spawn a task that requests creating a new target device.
tasks.spawn(async move {
// Ask the input manager to create a target device
log::debug!("[{dbus_path}] Requesting to create device: {kind}");
let (sender, mut receiver) = mpsc::channel(1);
let result = manager
.send(ManagerCommand::AttachTargetDevice {
target_path: target_path_clone,
composite_path: composite_path_clone,
sender,
})
.await;
if let Err(e) = result {
log::warn!("[{dbus_path}] Failed to send attach request to input manager: {e}");
return;
}
let create_cmd = ManagerCommand::CreateTargetDevice { kind, sender };
manager.send(create_cmd).await?;
let Some(response) = receiver.recv().await else {
log::warn!(
"[{dbus_path}] Channel closed waiting for response from input manager"
);
return;
log::warn!("[{dbus_path}] Channel closed waiting for response from input manager");
return Err("Channel closed waiting for response from input manager".into());
};
if let Err(e) = response {
log::error!("[{dbus_path}] Failed to attach target device: {e:?}");
}
let target_path = response?;

// Ask the input manager to attach the target device to this composite
// device. Note that this *must* be run in an async task to prevent
// deadlocking.
log::debug!("[{dbus_path}] Requesting to attach target device {target_path} to {composite_path}");
let target_path_clone = target_path.clone();
let composite_path_clone = composite_path.clone();
let dbus_path = dbus_path.to_string();
tokio::task::spawn(async move {
let (sender, mut receiver) = mpsc::channel(1);
let result = manager
.send(ManagerCommand::AttachTargetDevice {
target_path: target_path_clone,
composite_path: composite_path_clone,
sender,
})
.await;
if let Err(e) = result {
log::warn!("[{dbus_path}] Failed to send attach request to input manager: {e}");
return;
}
let Some(response) = receiver.recv().await else {
log::warn!(
"[{dbus_path}] Channel closed waiting for response from input manager"
);
return;
};
if let Err(e) = response {
log::error!("[{dbus_path}] Failed to attach target device: {e:?}");
}
});

Ok(target_path)
});
}

// Wait for all target devices to be created
for result in tasks.join_all().await {
let target_path = match result {
Ok(path) => path,
Err(e) => {
log::error!("[{dbus_path}] Failed to create target device: {e}");
continue;
}
};

// Enqueue the target device to wait for the attachment message from
// the input manager to prevent multiple calls to set_target_devices()
Expand All @@ -253,24 +274,28 @@ impl CompositeDeviceTargets {
//self.signal_targets_changed().await;

Ok(())

//
}

// Deterimines if a given target device kind is already running
async fn target_kind_running(&self, kind: &TargetDeviceTypeId) -> Result<bool, Box<dyn Error>> {
for target in self.target_devices.values() {
let target_type = match target.get_type().await {
Ok(value) => value,
Err(e) => {
return Err(format!("Failed to request target type: {e:?}").into());
}
};
if *kind == target_type {
return Ok(true);
}
/// Return a list of currently running target devices
async fn target_kinds_running(
&self,
) -> Result<HashMap<String, TargetDeviceTypeId>, Box<dyn Error>> {
// Use a JoinSet to query all targets simultaneously
let mut tasks = JoinSet::new();
for (path, target) in self.target_devices.iter() {
let path = path.clone();
let target = target.clone();
tasks.spawn(async move { (path, target.get_type().await) });
}

// Collect the results
let mut kinds_running = HashMap::with_capacity(self.target_devices.len());
let results = tasks.join_all().await;
for (path, kind) in results {
kinds_running.insert(path, kind?);
}
Ok(false)

Ok(kinds_running)
}

/// Write the given event to all target devices that are capable of emitting
Expand Down Expand Up @@ -388,33 +413,41 @@ impl CompositeDeviceTargets {
}
}

// Get the capabilities of all target devices
pub async fn get_capabilities(&self) -> Result<HashSet<Capability>, Box<dyn Error>> {
let mut target_caps = HashSet::new();
/// Get the capabilities of all target devices and send the result to the
/// given channel.
pub fn send_capabilities(&self, sender: Sender<HashSet<Capability>>) {
// Use a JoinSet to query all targets simultaneously
let mut tasks = JoinSet::new();
for target in self.target_devices.values() {
let caps = match target.get_capabilities().await {
Ok(caps) => caps,
Err(e) => {
return Err(format!("Failed to get target capabilities: {e:?}").into());
}
};
for cap in caps {
target_caps.insert(cap);
}
let target = target.clone();
tasks.spawn(async move { target.get_capabilities().await });
}
for target in self.target_dbus_devices.values() {
let caps = match target.get_capabilities().await {
Ok(caps) => caps,
Err(e) => {
return Err(format!("Failed to get target capabilities: {e:?}").into());
let target = target.clone();
tasks.spawn(async move { target.get_capabilities().await });
}

// Collect the results in a task and send them to the given channel
tokio::task::spawn(async move {
let mut target_caps = HashSet::new();
let results = tasks.join_all().await;
for result in results {
let caps = match result {
Ok(caps) => caps,
Err(e) => {
log::warn!("Failed to get target capabilities: {e}");
continue;
}
};
for cap in caps {
target_caps.insert(cap);
}
};
for cap in caps {
target_caps.insert(cap);
}
}

Ok(target_caps)
if let Err(e) = sender.send(target_caps).await {
log::error!("Failed to send target capabilities: {e}");
}
});
}

/// Update the target capabilities of the given target device
Expand Down
2 changes: 1 addition & 1 deletion src/input/target/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub enum ClientError {
#[error("failed to try to send command to device: {0}")]
TrySendError(TrySendError<TargetCommand>),
#[error("service encountered an error processing the request: {0}")]
ServiceError(Box<dyn std::error::Error>),
ServiceError(Box<dyn std::error::Error + Send + Sync>),
#[error("device no longer exists")]
ChannelClosed,
}
Expand Down