Skip to content

Commit c8d07cf

Browse files
authored
ensure the router proxy loop is broken out of on shutdown; join on thread handle (#413)
Signed-off-by: gterzian <[email protected]>
1 parent e65001f commit c8d07cf

File tree

2 files changed

+39
-25
lines changed

2 files changed

+39
-25
lines changed

src/router.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
1616
use std::collections::HashMap;
1717
use std::sync::{LazyLock, Mutex};
18-
use std::thread;
18+
use std::thread::{self, JoinHandle};
1919

20-
use crate::ipc::OpaqueIpcReceiver;
21-
use crate::ipc::{self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender};
2220
use crossbeam_channel::{self, Receiver, Sender};
2321
use serde::{Deserialize, Serialize};
2422

23+
use crate::ipc::{
24+
self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender, OpaqueIpcReceiver,
25+
};
26+
2527
/// Global object wrapping a `RouterProxy`.
2628
/// Add routes ([add_route](RouterProxy::add_route)), or convert IpcReceiver<T>
2729
/// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver))
@@ -44,7 +46,7 @@ impl RouterProxy {
4446
// Router proxy takes both sending ends.
4547
let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
4648
let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap();
47-
thread::Builder::new()
49+
let handle = thread::Builder::new()
4850
.name("router-proxy".to_string())
4951
.spawn(move || Router::new(msg_receiver, wakeup_receiver).run())
5052
.expect("Failed to spawn router proxy thread");
@@ -53,6 +55,7 @@ impl RouterProxy {
5355
msg_sender,
5456
wakeup_sender,
5557
shutdown: false,
58+
handle: Some(handle),
5659
}),
5760
}
5861
}
@@ -117,6 +120,11 @@ impl RouterProxy {
117120
ack_receiver.recv().unwrap();
118121
})
119122
.unwrap();
123+
comm.handle
124+
.take()
125+
.expect("Should have a join handle at shutdown")
126+
.join()
127+
.expect("Failed to join on the router proxy thread");
120128
}
121129

122130
/// A convenience function to route an `IpcReceiver<T>` to an existing `Sender<T>`.
@@ -152,6 +160,7 @@ struct RouterProxyComm {
152160
msg_sender: Sender<RouterMsg>,
153161
wakeup_sender: IpcSender<()>,
154162
shutdown: bool,
163+
handle: Option<JoinHandle<()>>,
155164
}
156165

157166
/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet
@@ -211,7 +220,7 @@ impl Router {
211220
sender
212221
.send(())
213222
.expect("Failed to send comfirmation of shutdown.");
214-
break;
223+
return;
215224
},
216225
}
217226
},

src/test.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@
77
// option. This file may not be copied, modified, or distributed
88
// except according to those terms.
99

10-
#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))]
11-
use crate::ipc::IpcReceiver;
12-
use crate::ipc::{self, IpcReceiverSet, IpcSender, IpcSharedMemory};
13-
use crate::router::{RouterProxy, ROUTER};
14-
use crossbeam_channel::{self, Sender};
15-
use serde::{Deserialize, Deserializer, Serialize, Serializer};
1610
use std::cell::RefCell;
1711
#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))]
1812
use std::env;
19-
use std::iter;
13+
#[cfg(not(any(
14+
feature = "force-inprocess",
15+
target_os = "android",
16+
target_os = "ios",
17+
target_os = "windows",
18+
)))]
19+
use std::io::Error;
2020
#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios",)))]
2121
use std::process::{self, Command, Stdio};
2222
#[cfg(not(any(
@@ -27,7 +27,11 @@ use std::process::{self, Command, Stdio};
2727
)))]
2828
use std::ptr;
2929
use std::rc::Rc;
30-
use std::thread;
30+
use std::time::{Duration, Instant};
31+
use std::{iter, thread};
32+
33+
use crossbeam_channel::{self, Sender};
34+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
3135

3236
#[cfg(not(any(
3337
feature = "force-inprocess",
@@ -36,15 +40,10 @@ use std::thread;
3640
target_os = "windows"
3741
)))]
3842
use crate::ipc::IpcOneShotServer;
39-
40-
#[cfg(not(any(
41-
feature = "force-inprocess",
42-
target_os = "android",
43-
target_os = "ios",
44-
target_os = "windows",
45-
)))]
46-
use std::io::Error;
47-
use std::time::{Duration, Instant};
43+
#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))]
44+
use crate::ipc::IpcReceiver;
45+
use crate::ipc::{self, IpcReceiverSet, IpcSender, IpcSharedMemory};
46+
use crate::router::{RouterProxy, ROUTER};
4847

4948
#[cfg(not(any(
5049
feature = "force-inprocess",
@@ -340,6 +339,12 @@ fn router_flood() {
340339
}
341340
}
342341

342+
#[test]
343+
fn router_shutdown() {
344+
let router = RouterProxy::new();
345+
router.shutdown();
346+
}
347+
343348
#[test]
344349
fn router_routing_to_new_crossbeam_receiver() {
345350
let person = ("Patrick Walton".to_owned(), 29);
@@ -722,10 +727,10 @@ fn transfer_closed_sender() {
722727
#[cfg(feature = "async")]
723728
#[test]
724729
fn test_receiver_stream() {
725-
use futures_core::task::Context;
726-
use futures_core::task::Poll;
727-
use futures_core::Stream;
728730
use std::pin::Pin;
731+
732+
use futures_core::task::{Context, Poll};
733+
use futures_core::Stream;
729734
let (tx, rx) = ipc::channel().unwrap();
730735
let (waker, count) = futures_test::task::new_count_waker();
731736
let mut ctx = Context::from_waker(&waker);

0 commit comments

Comments
 (0)