Skip to content

Commit b39e8d5

Browse files
committed
Added send_interval and cancellation behavior
1 parent 52963da commit b39e8d5

File tree

15 files changed

+137
-43
lines changed

15 files changed

+137
-43
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ members = [
88
"examples/ping_pong",
99
"examples/ping_pong_threads",
1010
"examples/updater",
11-
"examples/updater_threads", "examples/blocking_genserver",
11+
"examples/updater_threads",
12+
"examples/blocking_genserver",
1213
]
1314

1415
[workspace.dependencies]

concurrency/src/tasks/gen_server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ pub trait GenServer
105105
where
106106
Self: Send + Sized,
107107
{
108-
type CallMsg: Send + Sized;
109-
type CastMsg: Send + Sized;
108+
type CallMsg: Clone + Send + Sized + Sync;
109+
type CastMsg: Clone + Send + Sized + Sync;
110110
type OutMsg: Send + Sized;
111111
type State: Clone + Send;
112112
type Error: Debug + Send;

concurrency/src/tasks/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ mod time;
99
pub use error::GenServerError;
1010
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
1111
pub use process::{send, Process, ProcessInfo};
12-
pub use time::send_after;
12+
pub use time::{send_after, send_interval};

concurrency/src/tasks/time.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,72 @@
1+
use futures::future::select;
12
use std::time::Duration;
23

3-
use spawned_rt::tasks::{self as rt, JoinHandle};
4+
use spawned_rt::tasks::{self as rt, CancellationToken, JoinHandle};
45

56
use super::{GenServer, GenServerHandle};
67

8+
pub struct TimerHandle {
9+
pub join_handle: JoinHandle<()>,
10+
pub cancellation_token: CancellationToken,
11+
}
12+
713
// Sends a message after a given period to the specified GenServer. The task terminates
814
// once the send has completed
915
pub fn send_after<T>(
1016
period: Duration,
1117
mut handle: GenServerHandle<T>,
1218
message: T::CastMsg,
13-
) -> JoinHandle<()>
19+
) -> TimerHandle
20+
where
21+
T: GenServer + 'static,
22+
{
23+
let cancellation_token = CancellationToken::new();
24+
let cloned_token = cancellation_token.clone();
25+
let join_handle = rt::spawn(async move {
26+
let _ = select(
27+
Box::pin(cloned_token.cancelled()),
28+
Box::pin(async {
29+
rt::sleep(period).await;
30+
let _ = handle.cast(message.clone()).await;
31+
}),
32+
)
33+
.await;
34+
});
35+
TimerHandle {
36+
join_handle,
37+
cancellation_token,
38+
}
39+
}
40+
41+
// Sends a message to the specified GenServe repeatedly after `Time` milliseconds.
42+
pub fn send_interval<T>(
43+
period: Duration,
44+
mut handle: GenServerHandle<T>,
45+
message: T::CastMsg,
46+
) -> TimerHandle
1447
where
1548
T: GenServer + 'static,
1649
{
17-
rt::spawn(async move {
18-
rt::sleep(period).await;
19-
let _ = handle.cast(message).await;
20-
})
50+
let cancellation_token = CancellationToken::new();
51+
let cloned_token = cancellation_token.clone();
52+
let join_handle = rt::spawn(async move {
53+
loop {
54+
let result = select(
55+
Box::pin(cloned_token.cancelled()),
56+
Box::pin(async {
57+
rt::sleep(period).await;
58+
let _ = handle.cast(message.clone()).await;
59+
}),
60+
)
61+
.await;
62+
match result {
63+
futures::future::Either::Left(_) => break,
64+
futures::future::Either::Right(_) => (),
65+
}
66+
}
67+
});
68+
TimerHandle {
69+
join_handle,
70+
cancellation_token,
71+
}
2172
}

concurrency/src/threads/gen_server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ pub trait GenServer
8383
where
8484
Self: Send + Sized,
8585
{
86-
type CallMsg: Send + Sized;
87-
type CastMsg: Send + Sized;
86+
type CallMsg: Clone + Send + Sized;
87+
type CastMsg: Clone + Send + Sized;
8888
type OutMsg: Send + Sized;
8989
type State: Clone + Send;
9090
type Error: Debug;

concurrency/src/threads/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ mod time;
88

99
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
1010
pub use process::{send, Process, ProcessInfo};
11-
pub use time::send_after;
11+
pub use time::{send_after, send_interval};

concurrency/src/threads/time.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
use std::time::Duration;
22

3-
use spawned_rt::threads::{self as rt, JoinHandle};
3+
use spawned_rt::threads::{self as rt, CancellationToken, JoinHandle};
44

55
use super::{GenServer, GenServerHandle};
66

7+
pub struct TimerHandle {
8+
pub join_handle: JoinHandle<()>,
9+
pub cancellation_token: CancellationToken,
10+
}
11+
712
// Sends a message after a given period to the specified GenServer. The task terminates
813
// once the send has completed
914
pub fn send_after<T>(
@@ -19,3 +24,28 @@ where
1924
let _ = handle.cast(message);
2025
})
2126
}
27+
28+
// Sends a message to the specified GenServe repeatedly after `Time` milliseconds.
29+
pub fn send_interval<T>(
30+
period: Duration,
31+
mut handle: GenServerHandle<T>,
32+
message: T::CastMsg,
33+
) -> TimerHandle
34+
where
35+
T: GenServer + 'static,
36+
{
37+
let cancellation_token = CancellationToken::new();
38+
let mut cloned_token = cancellation_token.clone();
39+
let join_handle = rt::spawn(move || loop {
40+
rt::sleep(period);
41+
if cloned_token.is_cancelled() {
42+
break;
43+
} else {
44+
let _ = handle.cast(message.clone());
45+
};
46+
});
47+
TimerHandle {
48+
join_handle,
49+
cancellation_token,
50+
}
51+
}

concurrency/src/time.rs

Lines changed: 0 additions & 21 deletions
This file was deleted.

examples/updater/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@ use spawned_rt::tasks as rt;
1414

1515
fn main() {
1616
rt::run(async {
17+
tracing::info!("Starting Updater");
1718
UpdaterServer::start(UpdateServerState {
1819
url: "https://httpbin.org/ip".to_string(),
1920
periodicity: Duration::from_millis(1000),
21+
timer_token: None,
2022
});
2123

2224
// giving it some time before ending
2325
thread::sleep(Duration::from_secs(10));
26+
tracing::info!("Updater stopped");
2427
})
2528
}

0 commit comments

Comments
 (0)