Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 33 additions & 51 deletions src/input/composite_device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use std::{
};

use evdev::InputEvent;
use tokio::{sync::mpsc, task::JoinSet, time::Duration};
use tokio::{
sync::mpsc,
task::{JoinHandle, JoinSet},
time::Duration,
};
use zbus::Connection;

use crate::{
Expand Down Expand Up @@ -94,7 +98,7 @@ pub struct CompositeDevice {
/// release events
emitted_mappings: HashMap<String, CapabilityMapping>,
/// The DBus path this [CompositeDevice] is listening on
dbus_path: Option<String>,
dbus_path: String,
/// Mode defining how inputs should be routed
intercept_mode: InterceptMode,
/// Transmit channel for sending commands to this composite device
Expand Down Expand Up @@ -157,6 +161,7 @@ impl CompositeDevice {
manager: mpsc::Sender<ManagerCommand>,
config: CompositeDeviceConfig,
device_info: UdevDevice,
dbus_path: String,
capability_map: Option<CapabilityMap>,
) -> Result<Self, Box<dyn Error>> {
log::info!("Creating CompositeDevice with config: {}", config.name);
Expand All @@ -175,7 +180,7 @@ impl CompositeDevice {
translatable_active_inputs: Vec::new(),
translated_recent_events: HashSet::new(),
emitted_mappings: HashMap::new(),
dbus_path: None,
dbus_path,
intercept_mode: InterceptMode::None,
tx,
rx,
Expand Down Expand Up @@ -231,22 +236,25 @@ impl CompositeDevice {
Ok(device)
}

/// Return the DBus path of the composite device
pub fn dbus_path(&self) -> &str {
self.dbus_path.as_str()
}

/// Creates a new instance of the composite device interface on DBus.
pub async fn listen_on_dbus(&mut self, path: String) -> Result<(), Box<dyn Error>> {
pub async fn listen_on_dbus(&self) -> Result<JoinHandle<()>, Box<dyn Error>> {
let conn = self.conn.clone();
let client = self.client();
self.dbus_path = Some(path.clone());
tokio::spawn(async move {
let path = String::from(self.dbus_path());
Ok(tokio::spawn(async move {
log::debug!("Starting dbus interface: {path}");
let iface = CompositeDeviceInterface::new(client);
if let Err(e) = conn.object_server().at(path.clone(), iface).await {
log::debug!("Failed to start dbus interface {path}: {e:?}");
} else {
log::debug!("Started dbus interface: {path}");
log::debug!("Started listening on dbus interface: {path}");
}
});
log::info!("Started listening on {}", self.dbus_path.as_ref().unwrap());
Ok(())
}))
}

/// Starts the [CompositeDevice] and listens for events from all source
Expand All @@ -257,6 +265,8 @@ impl CompositeDevice {
) -> Result<(), Box<dyn Error>> {
log::debug!("Starting composite device");

let dbus_path = self.dbus_path.clone();

// Start all source devices
self.run_source_devices().await?;

Expand Down Expand Up @@ -371,8 +381,7 @@ impl CompositeDevice {
}
if self.source_devices_used.is_empty() {
log::debug!(
"No source devices remain. Stopping CompositeDevice {:?}",
self.dbus_path
"No source devices remain. Stopping CompositeDevice {dbus_path}"
);
break 'main;
}
Expand Down Expand Up @@ -471,27 +480,18 @@ impl CompositeDevice {
self.set_intercept_activation(activation_caps, target_cap)
}
CompositeCommand::Stop => {
log::debug!(
"Got STOP signal. Stopping CompositeDevice: {:?}",
self.dbus_path
);
log::debug!("Got STOP signal. Stopping CompositeDevice: {dbus_path}");
break 'main;
}
CompositeCommand::Suspend(sender) => {
log::info!(
"Preparing for system suspend for: {}",
self.dbus_path.as_ref().unwrap_or(&"".to_string())
);
log::info!("Preparing for system suspend for: {dbus_path}");
self.handle_suspend().await;
if let Err(e) = sender.send(()).await {
log::error!("Failed to send suspend response: {e:?}");
}
}
CompositeCommand::Resume(sender) => {
log::info!(
"Preparing for system resume for: {}",
self.dbus_path.as_ref().unwrap_or(&"".to_string())
);
log::info!("Preparing for system resume for: {dbus_path}");
self.handle_resume().await;
if let Err(e) = sender.send(()).await {
log::error!("Failed to send resume response: {e:?}");
Expand All @@ -503,17 +503,11 @@ impl CompositeDevice {
// If no source devices remain after processing the queue, stop
// the device.
if devices_removed && self.source_devices_used.is_empty() {
log::debug!(
"No source devices remain. Stopping CompositeDevice {:?}",
self.dbus_path
);
log::debug!("No source devices remain. Stopping CompositeDevice {dbus_path}");
break 'main;
}
}
log::info!(
"CompositeDevice stopping: {}",
self.dbus_path.as_ref().unwrap()
);
log::info!("CompositeDevice stopping: {dbus_path}");

// Stop all target devices
log::debug!("Stopping target devices");
Expand Down Expand Up @@ -556,10 +550,7 @@ impl CompositeDevice {
res?;
}

log::info!(
"CompositeDevice stopped: {}",
self.dbus_path.as_ref().unwrap()
);
log::info!("CompositeDevice stopped: {dbus_path}");

Ok(())
}
Expand Down Expand Up @@ -1800,9 +1791,7 @@ impl CompositeDevice {
}
}

let Some(composite_path) = self.dbus_path.clone() else {
return Err("No composite device DBus path found".into());
};
let composite_path = self.dbus_path.clone();

// Create new target devices using the input manager
for kind in device_types_to_start {
Expand Down Expand Up @@ -1909,6 +1898,8 @@ impl CompositeDevice {
&mut self,
targets: HashMap<String, TargetDeviceClient>,
) -> Result<(), Box<dyn Error>> {
let dbus_path = self.dbus_path.clone();

// Keep track of all target devices
for (path, target) in targets.into_iter() {
log::debug!("Attaching target device: {path}");
Expand All @@ -1917,10 +1908,7 @@ impl CompositeDevice {
format!("Failed to set composite device for target device: {:?}", e).into(),
);
}
log::debug!(
"Attached device {path} to {:?}",
self.dbus_path.as_ref().unwrap_or(&"".to_string())
);
log::debug!("Attached device {path} to {dbus_path}");

// Query the target device for its capabilities
let caps = match target.get_capabilities().await {
Expand Down Expand Up @@ -1956,10 +1944,7 @@ impl CompositeDevice {

/// Emit a DBus signal when target devices change
async fn signal_targets_changed(&self) {
let Some(dbus_path) = self.dbus_path.clone() else {
log::error!("No DBus path for composite device exists to emit signal!");
return;
};
let dbus_path = self.dbus_path.clone();
let conn = self.conn.clone();

tokio::task::spawn(async move {
Expand Down Expand Up @@ -1991,10 +1976,7 @@ impl CompositeDevice {

/// Emit a DBus signal when source devices change
async fn signal_sources_changed(&self) {
let Some(dbus_path) = self.dbus_path.clone() else {
log::error!("No DBus path for composite device exists to emit signal!");
return;
};
let dbus_path = self.dbus_path.clone();
let conn = self.conn.clone();

tokio::task::spawn(async move {
Expand Down
77 changes: 37 additions & 40 deletions src/input/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use mio::{Events, Interest, Poll, Token};
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::task;
use tokio::task::JoinHandle;
use zbus::fdo::ManagedObjects;
use zbus::zvariant::ObjectPath;
use zbus::Connection;
Expand Down Expand Up @@ -467,6 +468,7 @@ impl Manager {
self.tx.clone(),
config,
device,
self.next_composite_dbus_path()?,
capability_map,
)?;

Expand Down Expand Up @@ -580,22 +582,20 @@ impl Manager {
config: CompositeDeviceConfig,
target_types: Option<Vec<String>>,
source_device: SourceDevice,
) -> Result<(), Box<dyn Error>> {
// Generate the DBus tree path for this composite device
let path = self.next_composite_dbus_path();

) -> Result<JoinHandle<()>, Box<dyn Error>> {
// Keep track of the source devices that this composite device is
// using.
let source_device_ids = device.get_source_devices_used();
let composite_path = String::from(device.dbus_path());
log::debug!(
"Starting CompositeDevice at {path} with the following sources: {source_device_ids:?}"
"Starting CompositeDevice at {composite_path} with the following sources: {source_device_ids:?}"
);
for id in source_device_ids {
self.source_devices_used.insert(id.clone(), path.clone());
self.source_devices_used
.insert(id.clone(), composite_path.clone());
self.source_devices.insert(id, source_device.clone());
}

let composite_path = path.clone();
if !self.composite_device_sources.contains_key(&composite_path) {
self.composite_device_sources
.insert(composite_path.clone(), Vec::new());
Expand All @@ -606,8 +606,7 @@ impl Manager {
.unwrap();
sources.push(source_device);

// Create a DBus interface for the device
device.listen_on_dbus(path.clone()).await?;
device.listen_on_dbus().await?;

// Get a handle to the device
let client = device.client();
Expand All @@ -616,7 +615,7 @@ impl Manager {
let mut target_device_paths = Vec::new();

// Create a DBus target device
log::debug!("Creating target devices for {path}");
log::debug!("Creating target devices for {composite_path}");
let dbus_device = self.create_target_device("dbus").await?;
let dbus_devices = self.start_target_devices(vec![dbus_device]).await?;
let dbus_paths = dbus_devices.keys();
Expand All @@ -641,33 +640,36 @@ impl Manager {
target_device_paths.push(target_path.clone());
}

// Add the device to our maps
self.composite_devices
.insert(composite_path.clone(), client);
log::trace!("Managed source devices: {:?}", self.source_devices_used);
self.used_configs.insert(composite_path.clone(), config);
log::trace!("Used configs: {:?}", self.used_configs);
self.composite_device_targets
.insert(composite_path.clone(), target_device_paths);
log::trace!("Used target devices: {:?}", self.composite_device_targets);

// Run the device
let dbus_path = path.clone();
let composite_path = String::from(device.dbus_path());
let tx = self.tx.clone();
tokio::spawn(async move {
Ok(tokio::spawn(async move {
if let Err(e) = device.run(targets).await {
log::error!("Error running {dbus_path}: {e}");
log::error!("Error running {composite_path}: {}", e.to_string());
}
log::debug!("Composite device stopped running: {dbus_path}");
log::debug!("Composite device stopped running: {composite_path}");
if let Err(e) = tx
.send(ManagerCommand::CompositeDeviceStopped(dbus_path))
.send(ManagerCommand::CompositeDeviceStopped(
composite_path.clone(),
))
.await
{
log::error!("Error sending composite device stopped: {e}");
log::error!(
"Error sending to composite device {composite_path} the stopped signal: {}",
e.to_string()
);
}
});
let comp_path = path.clone();

// Add the device to our maps
self.composite_devices.insert(comp_path, client);
log::trace!("Managed source devices: {:?}", self.source_devices_used);
self.used_configs.insert(path, config);
log::trace!("Used configs: {:?}", self.used_configs);
self.composite_device_targets
.insert(composite_path.clone(), target_device_paths);
log::trace!("Used target devices: {:?}", self.composite_device_targets);

Ok(())
}))
}

/// Called when a composite device stops running
Expand Down Expand Up @@ -1275,20 +1277,15 @@ impl Manager {
}

/// Returns the next available composite device dbus path
fn next_composite_dbus_path(&self) -> String {
let max = 2048;
let mut i = 0;
loop {
if i > max {
return "Devices exceeded".to_string();
}
fn next_composite_dbus_path(&self) -> Result<String, Box<dyn Error>> {
for i in 0u64.. {
let path = format!("{}/CompositeDevice{}", BUS_PREFIX, i);
if self.composite_devices.contains_key(&path) {
i += 1;
continue;
if !self.composite_devices.contains_key(&path) {
return Ok(path);
}
return path;
}

Err(Box::from("No available dbus path left"))
}

/// Watch for IIO device events
Expand Down
Loading