Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 46 additions & 22 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,33 +981,57 @@ mod tests {
while !handle.is_finished() {
events.clear();

let count = sources.poll(&mut events, Timeout::Never).unwrap();
if count > 0 {
let event = events.pop().unwrap();
assert_eq!(event.key, "waker");
assert!(events.is_empty());

// There's always a message on the channel if we got woken up.
rx.recv().unwrap();
received += 1;

// We may get additional messages on the channel, if the sending is
// faster than the waking.
while rx.try_recv().is_ok() {
received += 1;
match sources.poll(&mut events, Timeout::from_millis(1)) {
Ok(0) => {
panic!("Got Ok(0) from sources.poll()");
}
Ok(_) => {
let event = events.pop().unwrap();
assert_eq!(event.key, "waker");
assert!(events.is_empty());

// * If the waker has already been dropped in the other
// thread, we'll get a "bad file descriptor" error.
// * If the waker hasn't been dropped, we'll get `Ok(())`.
if let Err(e) = Waker::reset(event.source) {
// Currently a "bad file descriptor" error uses
// ErrorKind::Uncategorized, which is undocumented and
// isn’t supposed to be matched against.
assert_eq!(Some(9), e.raw_os_error());

// The loop will terminate on its own next iteration.
}

if received == iterations {
// Error: "bad file descriptor", as the waker handle gets
// dropped by the other thread.
Waker::reset(event.source).unwrap_err();
break;
}
// It is possible to get 0 or more messages on the channel
// after waking, since the last time we woke we might have
// gotten the the message that we just received the wake
// for. Or, we might get multiple messages if the sender is
// fast enough.
while rx.try_recv().is_ok() {
received += 1;
}

Waker::reset(event.source).ok(); // We might get the "bad file descriptor" error here.
wakes += 1;
wakes += 1;
}
Err(e) if e.kind() == io::ErrorKind::TimedOut => {
// The other thread has probably finished and dropped the
// waker, so there was nothing left to wake the poll() call.
// Alternatively, something might have just take a bit.
//
// On the next loop we'll check if the thread is finished.
}
Err(e) => {
panic!("Error {e:?} in sources.poll()");
}
}
}

// It is possible to get messages on the channel after the thread is
// finished if things are timed just right.
while rx.try_recv().is_ok() {
received += 1;
}

handle.join().unwrap();

assert_eq!(received, iterations);
Expand Down