Skip to content

Commit eadc963

Browse files
author
bors-servo
authored
Auto merge of #197 - antrik:fix-notification-tests, r=jdm
Fix notification handling tests and `unix` implementation The first commit attempts to make notification tests more robust. (Hopefully fixing intermittent failures on Linux.) The other commits add some additional related tests, and fix a (minor) issue uncovered by them.
2 parents f2723d0 + 1c01539 commit eadc963

File tree

2 files changed

+110
-15
lines changed

2 files changed

+110
-15
lines changed

src/platform/test.rs

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -706,11 +706,64 @@ fn no_receiver_notification() {
706706
let (sender, receiver) = platform::channel().unwrap();
707707
drop(receiver);
708708
let data: &[u8] = b"1234567";
709+
loop {
710+
if let Err(err) = sender.send(data, vec![], vec![]) {
711+
// We don't have an actual method for distinguishing a "broken pipe" error --
712+
// but at least it's not supposed to signal the same condition as closing the sender.
713+
assert!(!err.channel_is_closed());
714+
break;
715+
}
716+
}
717+
}
718+
719+
/// Checks for broken pipe notification when receiver is closed
720+
/// while there are pending unreceived messages.
721+
///
722+
/// This can result in a different error condition
723+
/// than dropping the receiver before a send is attempted.
724+
/// (Linux reports `ECONNRESET` instead of `EPIPE` in this case.)
725+
#[test]
726+
fn no_receiver_notification_pending() {
727+
let (sender, receiver) = platform::channel().unwrap();
728+
let data: &[u8] = b"1234567";
729+
709730
let result = sender.send(data, vec![], vec![]);
710-
assert!(result.is_err());
711-
// We don't have an actual method for distinguishing a "broken pipe" error --
712-
// but at least it's not supposed to signal the same condition as closing the sender.
713-
assert!(!result.unwrap_err().channel_is_closed());
731+
assert!(result.is_ok());
732+
733+
drop(receiver);
734+
loop {
735+
if let Err(err) = sender.send(data, vec![], vec![]) {
736+
// We don't have an actual method for distinguishing a "broken pipe" error --
737+
// but at least it's not supposed to signal the same condition as closing the sender.
738+
assert!(!err.channel_is_closed());
739+
break;
740+
}
741+
}
742+
}
743+
744+
/// Checks for broken pipe notification when receiver is closed after a delay.
745+
///
746+
/// This might uncover some timing-related issues.
747+
#[test]
748+
fn no_receiver_notification_delayed() {
749+
let (sender, receiver) = platform::channel().unwrap();
750+
751+
let thread = thread::spawn(move || {
752+
thread::sleep(Duration::from_millis(42));
753+
drop(receiver);
754+
});
755+
756+
let data: &[u8] = b"1234567";
757+
loop {
758+
if let Err(err) = sender.send(data, vec![], vec![]) {
759+
// We don't have an actual method for distinguishing a "broken pipe" error --
760+
// but at least it's not supposed to signal the same condition as closing the sender.
761+
assert!(!err.channel_is_closed());
762+
break;
763+
}
764+
}
765+
766+
thread.join().unwrap();
714767
}
715768

716769
#[test]
@@ -756,9 +809,39 @@ fn no_senders_notification_try_recv() {
756809
assert!(result.is_err());
757810
assert!(!result.unwrap_err().channel_is_closed());
758811
drop(sender);
812+
loop {
813+
let result = receiver.try_recv();
814+
assert!(result.is_err());
815+
if result.unwrap_err().channel_is_closed() {
816+
break;
817+
}
818+
}
819+
}
820+
821+
/// Checks for channel closed notification when receiver is closed after a delay.
822+
///
823+
/// This might uncover some timing-related issues.
824+
#[test]
825+
fn no_senders_notification_try_recv_delayed() {
826+
let (sender, receiver) = platform::channel().unwrap();
759827
let result = receiver.try_recv();
760828
assert!(result.is_err());
761-
assert!(result.unwrap_err().channel_is_closed());
829+
assert!(!result.unwrap_err().channel_is_closed());
830+
831+
let thread = thread::spawn(move || {
832+
thread::sleep(Duration::from_millis(42));
833+
drop(sender);
834+
});
835+
836+
loop {
837+
let result = receiver.try_recv();
838+
assert!(result.is_err());
839+
if result.unwrap_err().channel_is_closed() {
840+
break;
841+
}
842+
}
843+
844+
thread.join().unwrap();
762845
}
763846

764847
#[test]

src/platform/unix/mod.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ impl OsIpcSender {
343343
// using a reduced send buffer size.
344344
//
345345
// Any other errors we might get here are non-recoverable.
346-
if !(error.0 == libc::ENOBUFS
346+
if !(error == UnixError::Errno(libc::ENOBUFS)
347347
&& downsize(&mut sendbuf_size, data.len()).is_ok()) {
348348
return Err(error)
349349
}
@@ -382,7 +382,7 @@ impl OsIpcSender {
382382
};
383383

384384
if let Err(error) = result {
385-
if error.0 == libc::ENOBUFS
385+
if error == UnixError::Errno(libc::ENOBUFS)
386386
&& downsize(&mut sendbuf_size, end_byte_position - byte_position).is_ok() {
387387
// If the kernel failed to allocate a buffer large enough for the packet,
388388
// retry with a smaller size (if possible).
@@ -807,17 +807,20 @@ impl OsIpcSharedMemory {
807807
}
808808
}
809809

810-
#[derive(Copy, Clone, Debug)]
811-
pub struct UnixError(c_int);
810+
#[derive(Copy, Clone, Debug, PartialEq)]
811+
pub enum UnixError {
812+
Errno(c_int),
813+
ChannelClosed,
814+
}
812815

813816
impl UnixError {
814817
fn last() -> UnixError {
815-
UnixError(Error::last_os_error().raw_os_error().unwrap())
818+
UnixError::Errno(Error::last_os_error().raw_os_error().unwrap())
816819
}
817820

818821
#[allow(dead_code)]
819822
pub fn channel_is_closed(&self) -> bool {
820-
self.0 == libc::ECONNRESET
823+
*self == UnixError::ChannelClosed
821824
}
822825
}
823826

@@ -829,13 +832,22 @@ impl From<UnixError> for bincode::Error {
829832

830833
impl From<UnixError> for Error {
831834
fn from(unix_error: UnixError) -> Error {
832-
Error::from_raw_os_error(unix_error.0)
835+
match unix_error {
836+
UnixError::Errno(errno) => Error::from_raw_os_error(errno),
837+
UnixError::ChannelClosed => Error::new(ErrorKind::ConnectionReset,
838+
"All senders for this socket closed"),
839+
}
833840
}
834841
}
835842

836843
impl From<Error> for UnixError {
837844
fn from(e: Error) -> UnixError {
838-
UnixError(e.raw_os_error().unwrap())
845+
if let Some(errno) = e.raw_os_error() {
846+
UnixError::Errno(errno)
847+
} else {
848+
assert!(e.kind() == ErrorKind::ConnectionReset);
849+
UnixError::ChannelClosed
850+
}
839851
}
840852
}
841853

@@ -932,7 +944,7 @@ fn recv(fd: c_int, blocking_mode: BlockingMode)
932944
};
933945

934946
if result == 0 {
935-
return Err(UnixError(libc::ECONNRESET))
947+
return Err(UnixError::ChannelClosed)
936948
} else if result < 0 {
937949
return Err(UnixError::last())
938950
};
@@ -1011,7 +1023,7 @@ impl UnixCmsg {
10111023
let result = if result > 0 {
10121024
Ok(result as usize)
10131025
} else if result == 0 {
1014-
Err(UnixError(libc::ECONNRESET))
1026+
Err(UnixError::ChannelClosed)
10151027
} else {
10161028
Err(UnixError::last())
10171029
};

0 commit comments

Comments
 (0)