Skip to content

Commit dec5377

Browse files
committed
fix(Target Devices): set target devices in parallel instead of serially
1 parent a91dab6 commit dec5377

File tree

1 file changed

+98
-76
lines changed

1 file changed

+98
-76
lines changed

src/input/composite_device/targets.rs

Lines changed: 98 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,11 @@ impl CompositeDeviceTargets {
154154

155155
// Identify which target devices are new
156156
let mut device_types_to_start: Vec<TargetDeviceTypeId> = vec![];
157+
let device_types_map = self.target_kinds_running().await?;
158+
let device_types_running: Vec<TargetDeviceTypeId> =
159+
device_types_map.values().cloned().collect();
157160
for kind in device_types.iter() {
158-
if self.target_kind_running(kind).await? {
161+
if device_types_running.contains(kind) {
159162
log::debug!("[{dbus_path}] Target device {kind} already running, nothing to do.");
160163
continue;
161164
}
@@ -165,85 +168,100 @@ impl CompositeDeviceTargets {
165168

166169
// Identify the targets that need to close
167170
let mut targets_to_stop: HashMap<String, TargetDeviceClient> = HashMap::new();
168-
for (path, target) in self.target_devices.clone().into_iter() {
169-
let target_type = match target.get_type().await {
170-
Ok(value) => value,
171-
Err(e) => {
172-
return Err(format!("Failed to request target type: {e:?}").into());
173-
}
174-
};
175-
if !device_types.contains(&target_type) {
176-
log::debug!("[{dbus_path}] Target device {path} not in new devices list. Adding to stop list.");
177-
targets_to_stop.insert(path, target);
171+
for (path, target_type) in device_types_map {
172+
if device_types.contains(&target_type) {
173+
continue;
178174
}
175+
let Some(target) = self.target_devices.get(&path) else {
176+
continue;
177+
};
178+
log::debug!(
179+
"[{dbus_path}] Target device {path} not in new devices list. Adding to stop list."
180+
);
181+
targets_to_stop.insert(path, target.clone());
179182
}
180183

181184
// Stop all old target devices that aren't going to persist
185+
let mut stop_tasks = JoinSet::new();
182186
for (path, target) in targets_to_stop.clone().into_iter() {
183187
log::debug!("[{dbus_path}] Stopping old target device: {path}");
184188
self.target_devices.remove(&path);
185189
for (_, target_devices) in self.target_devices_by_capability.iter_mut() {
186190
target_devices.remove(&path);
187191
}
188-
if let Err(e) = target.stop().await {
189-
log::error!("[{dbus_path}] Failed to stop old target device: {e:?}");
192+
stop_tasks.spawn(async move { target.stop().await });
193+
}
194+
for result in stop_tasks.join_all().await {
195+
if let Err(e) = result {
196+
log::error!("[{dbus_path}] Failed to stop old target device: {e}");
190197
}
191198
}
192199

193-
let composite_path = self.path.clone();
194-
195-
// Create new target devices using the input manager
200+
// Create new target devices using the input manager. Spawn a task for
201+
// each create request so they can be performed simultaneously.
202+
let mut tasks: JoinSet<Result<String, Box<dyn Error + Send + Sync>>> = JoinSet::new();
196203
for kind in device_types_to_start {
197-
// Ask the input manager to create a target device
198-
log::debug!("[{dbus_path}] Requesting to create device: {kind}");
199-
let (sender, mut receiver) = mpsc::channel(1);
200-
self.manager
201-
.send(ManagerCommand::CreateTargetDevice { kind, sender })
202-
.await?;
203-
let Some(response) = receiver.recv().await else {
204-
log::warn!("[{dbus_path}] Channel closed waiting for response from input manager");
205-
continue;
206-
};
207-
let target_path = match response {
208-
Ok(path) => path,
209-
Err(e) => {
210-
let err = format!("Failed to create target: {e:?}");
211-
log::error!("[{dbus_path}] {err}");
212-
continue;
213-
}
214-
};
215-
216-
// Ask the input manager to attach the target device to this composite
217-
// device. Note that this *must* be run in an async task to prevent
218-
// deadlocking.
219-
log::debug!("[{dbus_path}] Requesting to attach target device {target_path} to {composite_path}");
204+
let dbus_path = dbus_path.to_owned();
220205
let manager = self.manager.clone();
221-
let target_path_clone = target_path.clone();
222-
let composite_path_clone = composite_path.clone();
223-
let dbus_path = dbus_path.to_string();
224-
tokio::task::spawn(async move {
206+
let composite_path = self.path.clone();
207+
208+
// Spawn a task that requests creating a new target device.
209+
tasks.spawn(async move {
210+
// Ask the input manager to create a target device
211+
log::debug!("[{dbus_path}] Requesting to create device: {kind}");
225212
let (sender, mut receiver) = mpsc::channel(1);
226-
let result = manager
227-
.send(ManagerCommand::AttachTargetDevice {
228-
target_path: target_path_clone,
229-
composite_path: composite_path_clone,
230-
sender,
231-
})
232-
.await;
233-
if let Err(e) = result {
234-
log::warn!("[{dbus_path}] Failed to send attach request to input manager: {e}");
235-
return;
236-
}
213+
let create_cmd = ManagerCommand::CreateTargetDevice { kind, sender };
214+
manager.send(create_cmd).await?;
237215
let Some(response) = receiver.recv().await else {
238-
log::warn!(
239-
"[{dbus_path}] Channel closed waiting for response from input manager"
240-
);
241-
return;
216+
log::warn!("[{dbus_path}] Channel closed waiting for response from input manager");
217+
return Err("Channel closed waiting for response from input manager".into());
242218
};
243-
if let Err(e) = response {
244-
log::error!("[{dbus_path}] Failed to attach target device: {e:?}");
245-
}
219+
let target_path = response?;
220+
221+
// Ask the input manager to attach the target device to this composite
222+
// device. Note that this *must* be run in an async task to prevent
223+
// deadlocking.
224+
log::debug!("[{dbus_path}] Requesting to attach target device {target_path} to {composite_path}");
225+
let target_path_clone = target_path.clone();
226+
let composite_path_clone = composite_path.clone();
227+
let dbus_path = dbus_path.to_string();
228+
tokio::task::spawn(async move {
229+
let (sender, mut receiver) = mpsc::channel(1);
230+
let result = manager
231+
.send(ManagerCommand::AttachTargetDevice {
232+
target_path: target_path_clone,
233+
composite_path: composite_path_clone,
234+
sender,
235+
})
236+
.await;
237+
if let Err(e) = result {
238+
log::warn!("[{dbus_path}] Failed to send attach request to input manager: {e}");
239+
return;
240+
}
241+
let Some(response) = receiver.recv().await else {
242+
log::warn!(
243+
"[{dbus_path}] Channel closed waiting for response from input manager"
244+
);
245+
return;
246+
};
247+
if let Err(e) = response {
248+
log::error!("[{dbus_path}] Failed to attach target device: {e:?}");
249+
}
250+
});
251+
252+
Ok(target_path)
246253
});
254+
}
255+
256+
// Wait for all target devices to be created
257+
for result in tasks.join_all().await {
258+
let target_path = match result {
259+
Ok(path) => path,
260+
Err(e) => {
261+
log::error!("[{dbus_path}] Failed to create target device: {e}");
262+
continue;
263+
}
264+
};
247265

248266
// Enqueue the target device to wait for the attachment message from
249267
// the input manager to prevent multiple calls to set_target_devices()
@@ -256,24 +274,28 @@ impl CompositeDeviceTargets {
256274
//self.signal_targets_changed().await;
257275

258276
Ok(())
259-
260-
//
261277
}
262278

263-
// Deterimines if a given target device kind is already running
264-
async fn target_kind_running(&self, kind: &TargetDeviceTypeId) -> Result<bool, Box<dyn Error>> {
265-
for target in self.target_devices.values() {
266-
let target_type = match target.get_type().await {
267-
Ok(value) => value,
268-
Err(e) => {
269-
return Err(format!("Failed to request target type: {e:?}").into());
270-
}
271-
};
272-
if *kind == target_type {
273-
return Ok(true);
274-
}
279+
/// Return a list of currently running target devices
280+
async fn target_kinds_running(
281+
&self,
282+
) -> Result<HashMap<String, TargetDeviceTypeId>, Box<dyn Error>> {
283+
// Use a JoinSet to query all targets simultaneously
284+
let mut tasks = JoinSet::new();
285+
for (path, target) in self.target_devices.iter() {
286+
let path = path.clone();
287+
let target = target.clone();
288+
tasks.spawn(async move { (path, target.get_type().await) });
275289
}
276-
Ok(false)
290+
291+
// Collect the results
292+
let mut kinds_running = HashMap::with_capacity(self.target_devices.len());
293+
let results = tasks.join_all().await;
294+
for (path, kind) in results {
295+
kinds_running.insert(path, kind?);
296+
}
297+
298+
Ok(kinds_running)
277299
}
278300

279301
/// Write the given event to all target devices that are capable of emitting

0 commit comments

Comments
 (0)