Skip to content

Commit bb0879e

Browse files
authored
Merge pull request #901 from sdroege/main-context-spawn-within
glib: Add `MainContext::spawn_from_within()` for spawning non-`Send` …
2 parents e44e65b + e0e1685 commit bb0879e

File tree

2 files changed

+135
-1
lines changed

2 files changed

+135
-1
lines changed

glib/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ pub use self::bridged_logging::{rust_log_handler, GlibLogger, GlibLoggerDomain,
199199
pub mod subclass;
200200

201201
mod main_context_futures;
202-
pub use main_context_futures::{JoinError, JoinHandle};
202+
pub use main_context_futures::{JoinError, JoinHandle, SpawnWithinJoinHandle};
203203
mod source_futures;
204204
pub use self::source_futures::*;
205205

glib/src/main_context_futures.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,87 @@ impl<T: 'static> futures_core::FusedFuture for JoinHandle<T> {
353353
}
354354
}
355355

356+
unsafe impl<T> Send for JoinHandle<T> {}
357+
358+
// rustdoc-stripper-ignore-next
359+
/// Variant of [`JoinHandle`] that is returned from [`MainContext::spawn_from_within`].
360+
#[derive(Debug)]
361+
pub struct SpawnWithinJoinHandle<T> {
362+
rx: Option<oneshot::Receiver<JoinHandle<T>>>,
363+
join_handle: Option<JoinHandle<T>>,
364+
}
365+
366+
impl<T> SpawnWithinJoinHandle<T> {
367+
// rustdoc-stripper-ignore-next
368+
/// Waits until the task is spawned and returns the [`JoinHandle`].
369+
pub async fn into_inner(self) -> Result<JoinHandle<T>, JoinError> {
370+
if let Some(join_handle) = self.join_handle {
371+
return Ok(join_handle);
372+
}
373+
374+
if let Some(rx) = self.rx {
375+
match rx.await {
376+
Ok(join_handle) => return Ok(join_handle),
377+
Err(_) => return Err(JoinErrorInner::Cancelled.into()),
378+
}
379+
}
380+
381+
Err(JoinErrorInner::Cancelled.into())
382+
}
383+
}
384+
385+
impl<T: 'static> Future for SpawnWithinJoinHandle<T> {
386+
type Output = Result<T, JoinError>;
387+
#[inline]
388+
fn poll(
389+
mut self: std::pin::Pin<&mut Self>,
390+
cx: &mut std::task::Context<'_>,
391+
) -> std::task::Poll<Self::Output> {
392+
if let Some(ref mut rx) = self.rx {
393+
match std::pin::Pin::new(rx).poll(cx) {
394+
std::task::Poll::Pending => return std::task::Poll::Pending,
395+
std::task::Poll::Ready(Err(_)) => {
396+
self.rx = None;
397+
return std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()));
398+
}
399+
std::task::Poll::Ready(Ok(join_handle)) => {
400+
self.rx = None;
401+
self.join_handle = Some(join_handle);
402+
}
403+
}
404+
}
405+
406+
if let Some(ref mut join_handle) = self.join_handle {
407+
match std::pin::Pin::new(join_handle).poll(cx) {
408+
std::task::Poll::Pending => return std::task::Poll::Pending,
409+
std::task::Poll::Ready(Err(e)) => {
410+
self.join_handle = None;
411+
return std::task::Poll::Ready(Err(e));
412+
}
413+
std::task::Poll::Ready(Ok(r)) => {
414+
self.join_handle = None;
415+
return std::task::Poll::Ready(Ok(r));
416+
}
417+
}
418+
}
419+
420+
std::task::Poll::Ready(Err(JoinErrorInner::Cancelled.into()))
421+
}
422+
}
423+
424+
impl<T: 'static> futures_core::FusedFuture for SpawnWithinJoinHandle<T> {
425+
#[inline]
426+
fn is_terminated(&self) -> bool {
427+
if let Some(ref rx) = self.rx {
428+
rx.is_terminated()
429+
} else if let Some(ref join_handle) = self.join_handle {
430+
join_handle.is_terminated()
431+
} else {
432+
true
433+
}
434+
}
435+
}
436+
356437
// rustdoc-stripper-ignore-next
357438
/// Task failure from awaiting a [`JoinHandle`].
358439
#[derive(Debug)]
@@ -491,6 +572,42 @@ impl MainContext {
491572
JoinHandle::new(self, source, rx)
492573
}
493574

575+
// rustdoc-stripper-ignore-next
576+
/// Spawn a new infallible `Future` on the main context from inside the main context.
577+
///
578+
/// The given `Future` does not have to be `Send` but the closure to spawn it has to be.
579+
///
580+
/// This can be called only from any thread.
581+
pub fn spawn_from_within<R: 'static, F: Future<Output = R> + 'static>(
582+
&self,
583+
func: impl FnOnce() -> F + Send + 'static,
584+
) -> SpawnWithinJoinHandle<R> {
585+
self.spawn_from_within_with_priority(crate::PRIORITY_DEFAULT, func)
586+
}
587+
588+
// rustdoc-stripper-ignore-next
589+
/// Spawn a new infallible `Future` on the main context from inside the main context.
590+
///
591+
/// The given `Future` does not have to be `Send` but the closure to spawn it has to be.
592+
///
593+
/// This can be called only from any thread.
594+
pub fn spawn_from_within_with_priority<R: 'static, F: Future<Output = R> + 'static>(
595+
&self,
596+
priority: Priority,
597+
func: impl FnOnce() -> F + Send + 'static,
598+
) -> SpawnWithinJoinHandle<R> {
599+
let ctx = self.clone();
600+
let (tx, rx) = oneshot::channel();
601+
self.invoke_with_priority(priority, move || {
602+
let _ = tx.send(ctx.spawn_local(func()));
603+
});
604+
605+
SpawnWithinJoinHandle {
606+
rx: Some(rx),
607+
join_handle: None,
608+
}
609+
}
610+
494611
// rustdoc-stripper-ignore-next
495612
/// Runs a new, infallible `Future` on the main context and block until it finished, returning
496613
/// the result of the `Future`.
@@ -627,6 +744,23 @@ mod tests {
627744
.unwrap();
628745
}
629746

747+
#[test]
748+
fn test_spawn_from_within() {
749+
let c = MainContext::new();
750+
let l = crate::MainLoop::new(Some(&c), false);
751+
752+
std::thread::spawn({
753+
let l_clone = l.clone();
754+
move || {
755+
c.spawn_from_within(move || async move {
756+
l_clone.quit();
757+
});
758+
}
759+
});
760+
761+
l.run();
762+
}
763+
630764
#[test]
631765
fn test_block_on() {
632766
let c = MainContext::new();

0 commit comments

Comments
 (0)