Skip to content

Commit b7b0e6b

Browse files
committed
fix: restore threads examples and add backward-compatible start() API
1 parent a5ef112 commit b7b0e6b

File tree

23 files changed

+560
-31
lines changed

23 files changed

+560
-31
lines changed

Cargo.lock

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ members = [
55
"rt",
66
"concurrency",
77
"examples/bank",
8+
"examples/bank_threads",
89
"examples/name_server",
910
"examples/ping_pong",
11+
"examples/ping_pong_threads",
1012
"examples/updater",
13+
"examples/updater_threads",
1114
"examples/blocking_genserver",
1215
"examples/busy_genserver_warning",
1316
]

concurrency/src/tasks/gen_server.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,19 @@ pub trait GenServer: Send + Sized {
182182
type OutMsg: Send + Sized;
183183
type Error: Debug + Send;
184184

185+
/// Start the GenServer with the default backend (Async).
186+
fn start(self) -> GenServerHandle<Self> {
187+
self.start_with_backend(Backend::default())
188+
}
189+
185190
/// Start the GenServer with the specified backend.
186191
///
187192
/// # Arguments
188193
/// * `backend` - The execution backend to use:
189194
/// - `Backend::Async` - Run on tokio async runtime (default, best for non-blocking workloads)
190195
/// - `Backend::Blocking` - Run on tokio's blocking thread pool (for blocking operations)
191196
/// - `Backend::Thread` - Run on a dedicated OS thread (for long-running blocking services)
192-
fn start(self, backend: Backend) -> GenServerHandle<Self> {
197+
fn start_with_backend(self, backend: Backend) -> GenServerHandle<Self> {
193198
match backend {
194199
Backend::Async => GenServerHandle::new(self),
195200
Backend::Blocking => GenServerHandle::new_blocking(self),
@@ -496,16 +501,15 @@ mod tests {
496501
}
497502
}
498503

499-
const ASYNC: Backend = Backend::Async;
500504
const BLOCKING: Backend = Backend::Blocking;
501505

502506
#[test]
503507
pub fn badly_behaved_thread_non_blocking() {
504508
let runtime = rt::Runtime::new().unwrap();
505509
runtime.block_on(async move {
506-
let mut badboy = BadlyBehavedTask.start(ASYNC);
510+
let mut badboy = BadlyBehavedTask.start();
507511
let _ = badboy.cast(Unused).await;
508-
let mut goodboy = WellBehavedTask { count: 0 }.start(ASYNC);
512+
let mut goodboy = WellBehavedTask { count: 0 }.start();
509513
let _ = goodboy.cast(Unused).await;
510514
rt::sleep(Duration::from_secs(1)).await;
511515
let count = goodboy.call(InMessage::GetCount).await.unwrap();
@@ -523,9 +527,9 @@ mod tests {
523527
pub fn badly_behaved_thread() {
524528
let runtime = rt::Runtime::new().unwrap();
525529
runtime.block_on(async move {
526-
let mut badboy = BadlyBehavedTask.start(BLOCKING);
530+
let mut badboy = BadlyBehavedTask.start_with_backend(BLOCKING);
527531
let _ = badboy.cast(Unused).await;
528-
let mut goodboy = WellBehavedTask { count: 0 }.start(ASYNC);
532+
let mut goodboy = WellBehavedTask { count: 0 }.start();
529533
let _ = goodboy.cast(Unused).await;
530534
rt::sleep(Duration::from_secs(1)).await;
531535
let count = goodboy.call(InMessage::GetCount).await.unwrap();
@@ -580,7 +584,7 @@ mod tests {
580584
pub fn unresolving_task_times_out() {
581585
let runtime = rt::Runtime::new().unwrap();
582586
runtime.block_on(async move {
583-
let mut unresolving_task = SomeTask.start(ASYNC);
587+
let mut unresolving_task = SomeTask.start();
584588

585589
let result = unresolving_task
586590
.call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION)
@@ -630,7 +634,7 @@ mod tests {
630634
runtime.block_on(async move {
631635
let (rx, tx) = mpsc::channel::<u8>();
632636
let sender_channel = Arc::new(Mutex::new(tx));
633-
let _task = SomeTaskThatFailsOnInit::new(sender_channel).start(ASYNC);
637+
let _task = SomeTaskThatFailsOnInit::new(sender_channel).start();
634638

635639
// Wait a while to ensure the task has time to run and fail
636640
rt::sleep(Duration::from_secs(1)).await;

concurrency/src/tasks/stream_tests.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::tasks::{
2-
send_after, stream::spawn_listener, Backend, CallResponse, CastResponse, GenServer,
3-
GenServerHandle,
2+
send_after, stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle,
43
};
54
use futures::{stream, StreamExt};
65
use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream};
@@ -68,7 +67,7 @@ impl GenServer for Summatory {
6867
pub fn test_sum_numbers_from_stream() {
6968
let runtime = rt::Runtime::new().unwrap();
7069
runtime.block_on(async move {
71-
let mut summatory_handle = Summatory::new(0).start(Backend::Async);
70+
let mut summatory_handle = Summatory::new(0).start();
7271
let stream = stream::iter(vec![1u16, 2, 3, 4, 5].into_iter().map(Ok::<u16, ()>));
7372

7473
spawn_listener(
@@ -88,7 +87,7 @@ pub fn test_sum_numbers_from_stream() {
8887
pub fn test_sum_numbers_from_channel() {
8988
let runtime = rt::Runtime::new().unwrap();
9089
runtime.block_on(async move {
91-
let mut summatory_handle = Summatory::new(0).start(Backend::Async);
90+
let mut summatory_handle = Summatory::new(0).start();
9291
let (tx, rx) = spawned_rt::tasks::mpsc::channel::<Result<u16, ()>>();
9392

9493
// Spawn a task to send numbers to the channel
@@ -116,7 +115,7 @@ pub fn test_sum_numbers_from_channel() {
116115
pub fn test_sum_numbers_from_broadcast_channel() {
117116
let runtime = rt::Runtime::new().unwrap();
118117
runtime.block_on(async move {
119-
let mut summatory_handle = Summatory::new(0).start(Backend::Async);
118+
let mut summatory_handle = Summatory::new(0).start();
120119
let (tx, rx) = tokio::sync::broadcast::channel::<u16>(5);
121120

122121
// Spawn a task to send numbers to the channel
@@ -146,7 +145,7 @@ pub fn test_stream_cancellation() {
146145

147146
let runtime = rt::Runtime::new().unwrap();
148147
runtime.block_on(async move {
149-
let mut summatory_handle = Summatory::new(0).start(Backend::Async);
148+
let mut summatory_handle = Summatory::new(0).start();
150149
let (tx, rx) = spawned_rt::tasks::mpsc::channel::<Result<u16, ()>>();
151150

152151
// Spawn a task to send numbers to the channel
@@ -193,7 +192,7 @@ pub fn test_stream_cancellation() {
193192
pub fn test_halting_on_stream_error() {
194193
let runtime = rt::Runtime::new().unwrap();
195194
runtime.block_on(async move {
196-
let mut summatory_handle = Summatory::new(0).start(Backend::Async);
195+
let mut summatory_handle = Summatory::new(0).start();
197196
let stream = tokio_stream::iter(vec![Ok(1u16), Ok(2), Ok(3), Err(()), Ok(4), Ok(5)]);
198197
let msg_stream = stream.filter_map(|value| async move {
199198
match value {
@@ -217,7 +216,7 @@ pub fn test_halting_on_stream_error() {
217216
pub fn test_skipping_on_stream_error() {
218217
let runtime = rt::Runtime::new().unwrap();
219218
runtime.block_on(async move {
220-
let mut summatory_handle = Summatory::new(0).start(Backend::Async);
219+
let mut summatory_handle = Summatory::new(0).start();
221220
let stream = tokio_stream::iter(vec![Ok(1u16), Ok(2), Ok(3), Err(()), Ok(4), Ok(5)]);
222221
let msg_stream = stream.filter_map(|value| async move {
223222
match value {

concurrency/src/tasks/timer_tests.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{
2-
send_after, send_interval, Backend, CallResponse, CastResponse, GenServer, GenServerHandle,
3-
InitResult, InitResult::Success,
2+
send_after, send_interval, CallResponse, CastResponse, GenServer, GenServerHandle, InitResult,
3+
InitResult::Success,
44
};
55
use spawned_rt::tasks::{self as rt, CancellationToken};
66
use std::time::Duration;
@@ -102,7 +102,7 @@ pub fn test_send_interval_and_cancellation() {
102102
let runtime = rt::Runtime::new().unwrap();
103103
runtime.block_on(async move {
104104
// Start a Repeater
105-
let mut repeater = Repeater::new(0).start(Backend::Async);
105+
let mut repeater = Repeater::new(0).start();
106106

107107
// Wait for 1 second
108108
rt::sleep(Duration::from_secs(1)).await;
@@ -209,7 +209,7 @@ pub fn test_send_after_and_cancellation() {
209209
let runtime = rt::Runtime::new().unwrap();
210210
runtime.block_on(async move {
211211
// Start a Delayed
212-
let mut repeater = Delayed::new(0).start(Backend::Async);
212+
let mut repeater = Delayed::new(0).start();
213213

214214
// Set a just once timed message
215215
let _ = send_after(
@@ -253,7 +253,7 @@ pub fn test_send_after_gen_server_teardown() {
253253
let runtime = rt::Runtime::new().unwrap();
254254
runtime.block_on(async move {
255255
// Start a Delayed
256-
let mut repeater = Delayed::new(0).start(Backend::Async);
256+
let mut repeater = Delayed::new(0).start();
257257

258258
// Set a just once timed message
259259
let _ = send_after(

examples/bank/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ mod server;
2424

2525
use messages::{BankError, BankOutMessage};
2626
use server::Bank;
27-
use spawned_concurrency::tasks::{Backend, GenServer as _};
27+
use spawned_concurrency::tasks::GenServer as _;
2828
use spawned_rt::tasks as rt;
2929

3030
fn main() {
3131
rt::run(async {
3232
// Starting the bank
33-
let mut name_server = Bank::new().start(Backend::Async);
33+
let mut name_server = Bank::new().start();
3434

3535
// Testing initial balance for "main" account
3636
let result = Bank::withdraw(&mut name_server, "main".to_string(), 15).await;

examples/bank_threads/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "bank_threads"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
spawned-rt = { workspace = true }
8+
spawned-concurrency = { workspace = true }
9+
tracing = { workspace = true }
10+
11+
[[bin]]
12+
name = "bank_threads"
13+
path = "src/main.rs"

examples/bank_threads/src/main.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
//! Simple example to test concurrency/Process abstraction.
2+
//!
3+
//! Based on Joe's Armstrong book: Programming Erlang, Second edition
4+
//! Section 22.1 - The Road to the Generic Server
5+
//!
6+
//! Erlang usage example:
7+
//! 1> my_bank:start().
8+
//! {ok,<0.33.0>}
9+
//! 2> my_bank:deposit("joe", 10).
10+
//! not_a_customer
11+
//! 3> my_bank:new_account("joe").
12+
//! {welcome,"joe"}
13+
//! 4> my_bank:deposit("joe", 10).
14+
//! {thanks,"joe",your_balance_is,10}
15+
//! 5> my_bank:deposit("joe", 30).
16+
//! {thanks,"joe",your_balance_is,40}
17+
//! 6> my_bank:withdraw("joe", 15).
18+
//! {thanks,"joe",your_balance_is,25}
19+
//! 7> my_bank:withdraw("joe", 45).
20+
//! {sorry,"joe",you_only_have,25,in_the_bank
21+
22+
mod messages;
23+
mod server;
24+
25+
use messages::{BankError, BankOutMessage};
26+
use server::Bank;
27+
use spawned_concurrency::threads::GenServer as _;
28+
use spawned_rt::threads as rt;
29+
30+
fn main() {
31+
rt::run(|| {
32+
// Starting the bank
33+
let mut name_server = Bank::new().start();
34+
35+
// Testing initial balance for "main" account
36+
let result = Bank::withdraw(&mut name_server, "main".to_string(), 15);
37+
tracing::info!("Withdraw result {result:?}");
38+
assert_eq!(
39+
result,
40+
Ok(BankOutMessage::WidrawOk {
41+
who: "main".to_string(),
42+
amount: 985
43+
})
44+
);
45+
46+
let joe = "Joe".to_string();
47+
48+
// Error on deposit for an unexistent account
49+
let result = Bank::deposit(&mut name_server, joe.clone(), 10);
50+
tracing::info!("Deposit result {result:?}");
51+
assert_eq!(result, Err(BankError::NotACustomer { who: joe.clone() }));
52+
53+
// Account creation
54+
let result = Bank::new_account(&mut name_server, "Joe".to_string());
55+
tracing::info!("New account result {result:?}");
56+
assert_eq!(result, Ok(BankOutMessage::Welcome { who: joe.clone() }));
57+
58+
// Deposit
59+
let result = Bank::deposit(&mut name_server, "Joe".to_string(), 10);
60+
tracing::info!("Deposit result {result:?}");
61+
assert_eq!(
62+
result,
63+
Ok(BankOutMessage::Balance {
64+
who: joe.clone(),
65+
amount: 10
66+
})
67+
);
68+
69+
// Deposit
70+
let result = Bank::deposit(&mut name_server, "Joe".to_string(), 30);
71+
tracing::info!("Deposit result {result:?}");
72+
assert_eq!(
73+
result,
74+
Ok(BankOutMessage::Balance {
75+
who: joe.clone(),
76+
amount: 40
77+
})
78+
);
79+
80+
// Withdrawal
81+
let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 15);
82+
tracing::info!("Withdraw result {result:?}");
83+
assert_eq!(
84+
result,
85+
Ok(BankOutMessage::WidrawOk {
86+
who: joe.clone(),
87+
amount: 25
88+
})
89+
);
90+
91+
// Withdrawal with not enough balance
92+
let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 45);
93+
tracing::info!("Withdraw result {result:?}");
94+
assert_eq!(
95+
result,
96+
Err(BankError::InsufficientBalance {
97+
who: joe.clone(),
98+
amount: 25
99+
})
100+
);
101+
102+
// Full withdrawal
103+
let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 25);
104+
tracing::info!("Withdraw result {result:?}");
105+
assert_eq!(
106+
result,
107+
Ok(BankOutMessage::WidrawOk {
108+
who: joe,
109+
amount: 0
110+
})
111+
);
112+
113+
// Stopping the bank
114+
let result = Bank::stop(&mut name_server);
115+
tracing::info!("Stop result {result:?}");
116+
assert_eq!(result, Ok(BankOutMessage::Stopped));
117+
})
118+
}

0 commit comments

Comments
 (0)