Skip to content

Commit 5b2608a

Browse files
committed
init futures module
1 parent a08e02d commit 5b2608a

File tree

2 files changed

+328
-0
lines changed

2 files changed

+328
-0
lines changed

glib/src/futures/mod.rs

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
use futures_core::Stream;
2+
use futures_task::FutureObj;
3+
use futures_task::LocalFutureObj;
4+
use futures_task::LocalSpawn;
5+
use futures_task::Poll;
6+
use futures_task::Spawn;
7+
use futures_util::future::Either;
8+
use futures_util::future::Select;
9+
use futures_util::pin_mut;
10+
11+
use crate::FutureWithTimeoutError;
12+
use crate::JoinHandle;
13+
use crate::MainContext;
14+
use crate::Priority;
15+
use crate::SpawnWithinJoinHandle;
16+
use std::future::{Future, IntoFuture};
17+
use std::{pin::Pin, time::Duration};
18+
19+
#[derive(Default, Copy, Clone, Debug, Eq, PartialEq)]
20+
pub enum SchedulingPrecision {
21+
#[default]
22+
Millisecond,
23+
Second,
24+
}
25+
26+
#[derive(Default, Clone, Debug, Eq, PartialEq)]
27+
pub struct Sleep {
28+
duration: Duration,
29+
priority: Priority,
30+
precision: SchedulingPrecision,
31+
}
32+
33+
impl IntoFuture for Sleep {
34+
type Output = ();
35+
36+
type IntoFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
37+
38+
fn into_future(self) -> Self::IntoFuture {
39+
use SchedulingPrecision::*;
40+
match self.precision {
41+
Millisecond => crate::timeout_future_with_priority(self.priority, self.duration),
42+
Second => crate::timeout_future_seconds_with_priority(
43+
self.priority,
44+
self.duration.as_secs() as u32,
45+
),
46+
}
47+
}
48+
}
49+
50+
impl Sleep {
51+
pub fn priority(mut self, priority: Priority) -> Self {
52+
self.priority = priority;
53+
self
54+
}
55+
pub fn precision(mut self, precision: SchedulingPrecision) -> Self {
56+
self.precision = precision;
57+
self
58+
}
59+
}
60+
61+
pub fn sleep(duration: Duration) -> Sleep {
62+
Sleep {
63+
priority: crate::PRIORITY_DEFAULT,
64+
duration,
65+
precision: SchedulingPrecision::Millisecond,
66+
}
67+
}
68+
69+
// rustdoc-stripper-ignore-next
70+
/// Options to build a future that will run until the specified `duration` passes.
71+
#[derive(Default, Debug, Eq, PartialEq)]
72+
pub struct Timeout<F: Future> {
73+
duration: Duration,
74+
priority: Priority,
75+
precision: SchedulingPrecision,
76+
future: F,
77+
}
78+
pub struct TimeoutFuture<F> {
79+
select: Select<F, Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
80+
}
81+
82+
impl<F: Future + Unpin> Future for TimeoutFuture<F> {
83+
type Output = Result<F::Output, FutureWithTimeoutError>;
84+
85+
fn poll(
86+
mut self: Pin<&mut Self>,
87+
cx: &mut futures_task::Context<'_>,
88+
) -> futures_task::Poll<Self::Output> {
89+
let select = &mut self.as_mut().select;
90+
pin_mut!(select);
91+
match select.poll(cx) {
92+
Poll::Ready(res) => match res {
93+
Either::Left(value) => Poll::Ready(Ok(value.0)),
94+
Either::Right(_timedout) => Poll::Ready(Err(FutureWithTimeoutError)),
95+
},
96+
Poll::Pending => Poll::Pending,
97+
}
98+
}
99+
}
100+
101+
impl<F: Future + std::marker::Unpin + 'static> IntoFuture for Timeout<F> {
102+
type Output = Result<F::Output, FutureWithTimeoutError>;
103+
104+
type IntoFuture = TimeoutFuture<F>;
105+
106+
fn into_future(self) -> Self::IntoFuture {
107+
let sleep = Sleep {
108+
duration: self.duration,
109+
precision: self.precision,
110+
priority: self.priority,
111+
};
112+
TimeoutFuture {
113+
select: futures_util::future::select(self.future, sleep.into_future()),
114+
}
115+
}
116+
}
117+
118+
impl<F: Future> Timeout<F> {
119+
pub fn priority(mut self, priority: Priority) -> Self {
120+
self.priority = priority;
121+
self
122+
}
123+
pub fn precision(mut self, precision: SchedulingPrecision) -> Self {
124+
self.precision = precision;
125+
self
126+
}
127+
}
128+
129+
pub fn timeout<F: Future>(
130+
duration: Duration,
131+
future: F,
132+
) -> Timeout<impl Future<Output = F::Output>> {
133+
Timeout {
134+
duration,
135+
priority: crate::PRIORITY_DEFAULT,
136+
future: Box::pin(future),
137+
precision: SchedulingPrecision::Millisecond,
138+
}
139+
}
140+
141+
#[derive(Default, Clone, Debug, Eq, PartialEq)]
142+
pub struct IntervalOptions {
143+
priority: Priority,
144+
precision: SchedulingPrecision,
145+
duration: Duration,
146+
}
147+
148+
impl IntervalOptions {
149+
pub fn new(duration: Duration) -> Self {
150+
Self {
151+
priority: crate::PRIORITY_DEFAULT,
152+
precision: SchedulingPrecision::Millisecond,
153+
duration,
154+
}
155+
}
156+
pub fn precision(&mut self, precision: SchedulingPrecision) -> &mut Self {
157+
self.precision = precision;
158+
self
159+
}
160+
pub fn priority(&mut self, priority: Priority) -> &mut Self {
161+
self.priority = priority;
162+
self
163+
}
164+
pub fn stream(&self) -> impl Stream<Item = ()> + Send {
165+
use SchedulingPrecision::*;
166+
match self.precision {
167+
Second => crate::interval_stream_seconds_with_priority(
168+
self.priority,
169+
self.duration.as_secs() as u32,
170+
),
171+
Millisecond => crate::interval_stream_with_priority(self.priority, self.duration),
172+
}
173+
}
174+
}
175+
176+
pub fn interval(duration: Duration) -> impl Stream<Item = ()> + Send {
177+
IntervalOptions::new(duration).stream()
178+
}
179+
180+
#[derive(Default, Clone, Debug, Eq, PartialEq)]
181+
pub struct SpawnOptions {
182+
priority: Priority,
183+
context: Option<crate::MainContext>,
184+
}
185+
186+
impl SpawnOptions {
187+
pub fn new() -> Self {
188+
Self {
189+
priority: crate::PRIORITY_DEFAULT,
190+
context: None,
191+
}
192+
}
193+
pub fn priority(&mut self, priority: Priority) -> &mut Self {
194+
self.priority = priority;
195+
self
196+
}
197+
pub fn context(&mut self, context: MainContext) -> &mut Self {
198+
self.context = Some(context);
199+
self
200+
}
201+
pub fn spawn_local<F: Future + 'static>(&self, future: F) -> JoinHandle<<F as Future>::Output> {
202+
self.context
203+
.as_ref()
204+
.unwrap_or(&MainContext::default())
205+
.spawn_local_with_priority(self.priority, future)
206+
}
207+
pub fn spawn<R: Send + 'static, F: Future<Output = R> + Send + 'static>(
208+
&self,
209+
future: F,
210+
) -> JoinHandle<R> {
211+
self.context
212+
.as_ref()
213+
.unwrap_or(&MainContext::default())
214+
.spawn_with_priority(self.priority, future)
215+
}
216+
pub fn spawn_from_within<F: Future + 'static>(
217+
&self,
218+
func: impl FnOnce() -> F + Send + 'static,
219+
) -> SpawnWithinJoinHandle<<F as Future>::Output> {
220+
self.context
221+
.as_ref()
222+
.unwrap_or(&MainContext::default())
223+
.spawn_from_within_with_priority(self.priority, func)
224+
}
225+
}
226+
227+
impl From<MainContext> for SpawnOptions {
228+
fn from(value: MainContext) -> Self {
229+
let mut opts = SpawnOptions::new();
230+
opts.context(value);
231+
opts
232+
}
233+
}
234+
235+
// The following trait implementations will reuse the methods from `SpawnOptions`, so the spawned
236+
// futures will have the correct priority chosen by the user.
237+
// This is an improvement compared to `MainContext::spawn_obj`, which doesn't let you specify the
238+
// priority.
239+
impl Spawn for SpawnOptions {
240+
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), futures_task::SpawnError> {
241+
self.spawn(future);
242+
Ok(())
243+
}
244+
}
245+
impl LocalSpawn for SpawnOptions {
246+
fn spawn_local_obj(
247+
&self,
248+
future: LocalFutureObj<'static, ()>,
249+
) -> Result<(), futures_task::SpawnError> {
250+
self.spawn_local(future);
251+
Ok(())
252+
}
253+
}
254+
255+
#[test]
256+
fn test_sleep() {
257+
use crate::MainContext;
258+
259+
let c = MainContext::new();
260+
261+
c.block_on(async {
262+
sleep(Duration::from_millis(10)).await;
263+
sleep(Duration::from_secs(1))
264+
.priority(crate::PRIORITY_HIGH)
265+
.precision(SchedulingPrecision::Second)
266+
.await;
267+
});
268+
}
269+
270+
#[test]
271+
fn test_timeout() {
272+
use crate::{MainContext, MainLoop};
273+
use std::future::ready;
274+
275+
let c = MainContext::new();
276+
let l = MainLoop::new(Some(&c), false);
277+
278+
let tt = timeout(Duration::from_millis(10), ready(()));
279+
let l_clone = l.clone();
280+
c.spawn_local(async move {
281+
tt.await.unwrap();
282+
l_clone.quit();
283+
});
284+
l.run();
285+
286+
let tt = timeout(Duration::from_millis(10), async move { 2 }).priority(crate::PRIORITY_HIGH);
287+
let l_clone = l.clone();
288+
c.spawn(async move {
289+
tt.await.unwrap();
290+
l_clone.quit();
291+
});
292+
l.run();
293+
}
294+
295+
#[test]
296+
fn test_interval() {
297+
let _stream = interval(Duration::from_millis(1));
298+
let _stream = IntervalOptions::new(Duration::from_secs(1))
299+
.priority(crate::PRIORITY_HIGH)
300+
.precision(SchedulingPrecision::Second)
301+
.stream();
302+
}
303+
304+
#[test]
305+
fn test_spawn() {
306+
use crate::{MainContext, MainLoop};
307+
308+
let c = MainContext::new();
309+
let l = MainLoop::new(Some(&c), false);
310+
311+
let l_clone = l.clone();
312+
SpawnOptions::new().spawn(async move {
313+
2;
314+
l_clone.quit();
315+
});
316+
l.run();
317+
318+
let l_clone = l.clone();
319+
SpawnOptions::new()
320+
.context(c)
321+
.priority(crate::PRIORITY_HIGH)
322+
.spawn_local(async move {
323+
2;
324+
l_clone.quit();
325+
});
326+
l.run();
327+
}

glib/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ mod convert;
130130
pub use self::convert::*;
131131
mod enums;
132132
mod functions;
133+
pub mod futures;
133134
pub use self::functions::*;
134135
mod key_file;
135136
pub mod prelude;

0 commit comments

Comments
 (0)