Skip to content

Commit 9d251f9

Browse files
committed
Closing connections properly on Windows
The current closing logic was causing a deadlock because an unused pipe was being returned when the close was called on the listener. This sets a flag so we can detect that we should not use that pipe connection (which wasn't actually connected to a client but returned from the async call becuase we told it too return early). Signed-off-by: James Sturtevant <[email protected]>
1 parent 2396cf3 commit 9d251f9

File tree

1 file changed

+96
-0
lines changed

1 file changed

+96
-0
lines changed

src/sync/sys/windows/net.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ const WAIT_FOR_EVENT: i32 = 1;
3737

3838
pub struct PipeListener {
3939
first_instance: AtomicBool,
40+
shutting_down: AtomicBool,
4041
address: String,
4142
connection_event: isize,
4243
}
@@ -65,6 +66,7 @@ impl PipeListener {
6566
let connection_event = create_event()?;
6667
Ok(PipeListener {
6768
first_instance: AtomicBool::new(true),
69+
shutting_down: AtomicBool::new(false),
6870
address: sockaddr.to_string(),
6971
connection_event
7072
})
@@ -98,6 +100,13 @@ impl PipeListener {
98100
trace!("listening for connection");
99101
let result = unsafe { ConnectNamedPipe(np.named_pipe, ol.as_mut_ptr())};
100102
if result != 0 {
103+
if self.shutting_down.load(Ordering::SeqCst) {
104+
np.close().unwrap_or_else(|err| trace!("Failed to close the pipe {:?}", err));
105+
return Err(io::Error::new(
106+
io::ErrorKind::Other,
107+
"closing pipe",
108+
));
109+
}
101110
return Err(io::Error::last_os_error());
102111
}
103112

@@ -110,11 +119,25 @@ impl PipeListener {
110119
return Err(io::Error::last_os_error());
111120
}
112121
_ => {
122+
if self.shutting_down.load(Ordering::SeqCst) {
123+
np.close().unwrap_or_else(|err| trace!("Failed to close the pipe {:?}", err));
124+
return Err(io::Error::new(
125+
io::ErrorKind::Other,
126+
"closing pipe",
127+
));
128+
}
113129
Ok(Some(np))
114130
}
115131
}
116132
}
117133
e if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => {
134+
if self.shutting_down.load(Ordering::SeqCst) {
135+
np.close().unwrap_or_else(|err| trace!("Failed to close the pipe {:?}", err));
136+
return Err(io::Error::new(
137+
io::ErrorKind::Other,
138+
"closing pipe",
139+
));
140+
}
118141
Ok(Some(np))
119142
}
120143
e => {
@@ -153,6 +176,7 @@ impl PipeListener {
153176

154177
pub fn close(&self) -> Result<()> {
155178
// release the ConnectNamedPipe thread by signaling the event and clean up event handle
179+
self.shutting_down.store(true, Ordering::SeqCst);
156180
set_event(self.connection_event)?;
157181
close_handle(self.connection_event)
158182
}
@@ -359,4 +383,76 @@ mod test {
359383
}
360384
}
361385
}
386+
387+
#[test]
388+
fn should_accept_new_client() {
389+
let listener = Arc::new(PipeListener::new(r"\\.\pipe\ttrpc-test-accept").unwrap());
390+
391+
let listener_server = listener.clone();
392+
let thread = std::thread::spawn(move || {
393+
let quit_flag = Arc::new(AtomicBool::new(false));
394+
match listener_server.accept(&quit_flag) {
395+
Ok(Some(_)) => {
396+
// pipe is working
397+
}
398+
Ok(None) => {
399+
assert!(false, "should get a working pipe")
400+
}
401+
Err(e) => {
402+
assert!(false, "should not get error {}", e.to_string())
403+
}
404+
}
405+
});
406+
407+
wait_socket_working(r"\\.\pipe\ttrpc-test-accept", 10, 5).unwrap();
408+
thread.join().unwrap();
409+
}
410+
411+
#[test]
412+
fn close_should_cancel_accept() {
413+
let listener = Arc::new(PipeListener::new(r"\\.\pipe\ttrpc-test-close").unwrap());
414+
415+
let listener_server = listener.clone();
416+
let thread = std::thread::spawn(move || {
417+
let quit_flag = Arc::new(AtomicBool::new(false));
418+
match listener_server.accept(&quit_flag) {
419+
Ok(_) => {
420+
assert!(false, "should not get pipe on close")
421+
}
422+
Err(e) => {
423+
assert_eq!(e.to_string(), "closing pipe")
424+
}
425+
}
426+
});
427+
428+
// sleep for a moment to allow the pipe to start initialize and be ready to accept new connection.
429+
// this simulates scenario where the thread is asleep and awaiting a connection
430+
std::thread::sleep(std::time::Duration::from_millis(500));
431+
listener.close().unwrap();
432+
thread.join().unwrap();
433+
}
434+
435+
fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
436+
for _i in 0..count {
437+
let client = match ClientConnection::client_connect(address) {
438+
Ok(c) => {
439+
c
440+
}
441+
Err(_) => {
442+
std::thread::sleep(std::time::Duration::from_millis(interval_in_ms));
443+
continue;
444+
}
445+
};
446+
447+
match client.get_pipe_connection() {
448+
Ok(_) => {
449+
return Ok(());
450+
}
451+
Err(_) => {
452+
std::thread::sleep(std::time::Duration::from_millis(interval_in_ms));
453+
}
454+
}
455+
}
456+
Err(Error::Others("timed out".to_string()))
457+
}
362458
}

0 commit comments

Comments
 (0)