Skip to content

Commit 0b4d038

Browse files
committed
gio/cancellable-future: Use safer cancellable connection mechanism
Instead of connecting to the signal only when polling starts, and do potential disconnections and re-connections, do it just once so that there is no risk that a signal may happen during this phase, and disconnect only after the future has been dropped.
1 parent a3f5b75 commit 0b4d038

File tree

1 file changed

+36
-27
lines changed

1 file changed

+36
-27
lines changed

gio/src/cancellable_future.rs

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,19 @@ pin_project! {
7777
#[pin]
7878
future: F,
7979

80-
#[pin]
81-
waker_handler_cb: Option<CancelledHandlerId>,
82-
8380
cancellable: Cancellable,
81+
connection_id: Option<CancelledHandlerId>,
82+
83+
waker: std::sync::Arc<std::sync::Mutex<Option<std::task::Waker>>>,
84+
}
85+
86+
impl<F> PinnedDrop for CancellableFuture<F> {
87+
fn drop(this: Pin<&mut Self>) {
88+
let this = this.project();
89+
if let Some(connection_id) = this.connection_id.take() {
90+
this.cancellable.disconnect_cancelled(connection_id);
91+
}
92+
}
8493
}
8594
}
8695

@@ -92,10 +101,22 @@ impl<F> CancellableFuture<F> {
92101
/// immediately without making any further progress. In such a case, an error
93102
/// will be returned by this future (i.e., [`Cancelled`]).
94103
pub fn new(future: F, cancellable: Cancellable) -> Self {
104+
let waker = std::sync::Arc::new(std::sync::Mutex::new(None::<std::task::Waker>));
105+
let connection_id = cancellable.connect_cancelled(glib::clone!(
106+
#[strong]
107+
waker,
108+
move |_| {
109+
if let Some(waker) = waker.lock().unwrap().take() {
110+
waker.wake();
111+
}
112+
}
113+
));
114+
95115
Self {
96116
future,
97-
waker_handler_cb: None,
98117
cancellable,
118+
connection_id,
119+
waker,
99120
}
100121
}
101122

@@ -128,34 +149,22 @@ where
128149
type Output = Result<<F as Future>::Output, Cancelled>;
129150

130151
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131-
if self.is_cancelled() {
132-
return Poll::Ready(Err(Cancelled));
152+
if self.cancellable.is_cancelled() {
153+
// XXX: Whenever we want to break the API, we should return here only
154+
// self.cancellable.set_error_if_cancelled() value.
155+
return std::task::Poll::Ready(Err(Cancelled));
133156
}
134157

135-
let mut this = self.as_mut().project();
158+
let mut waker = self.waker.lock().unwrap();
159+
if waker.is_none() {
160+
*waker = Some(cx.waker().clone());
161+
}
162+
drop(waker);
136163

164+
let this = self.as_mut().project();
137165
match this.future.poll(cx) {
138166
Poll::Ready(out) => Poll::Ready(Ok(out)),
139-
140-
Poll::Pending => {
141-
if let Some(prev_handler) = this.waker_handler_cb.take() {
142-
this.cancellable.disconnect_cancelled(prev_handler);
143-
}
144-
145-
let canceller_handler_id = this.cancellable.connect_cancelled({
146-
let w = cx.waker().clone();
147-
move |_| w.wake()
148-
});
149-
150-
match canceller_handler_id {
151-
Some(canceller_handler_id) => {
152-
*this.waker_handler_cb = Some(canceller_handler_id);
153-
Poll::Pending
154-
}
155-
156-
None => Poll::Ready(Err(Cancelled)),
157-
}
158-
}
167+
Poll::Pending => Poll::Pending,
159168
}
160169
}
161170
}

0 commit comments

Comments
 (0)