Skip to content

Commit 0dba021

Browse files
committed
samples: work-philosphers: Async Semaphore based solution
Implement the async Semaphore based solution to the philospher problem. This demonstrates a few things: - The use of multiple async tasks created with `spawn` and `spawn_local`. - How to use `spawn_local` to allow a group of tasks all running on the same worker to use Rc instead of Arc for sharing. - A check that completed work (from a spawn, after calling join or join_async) drops all of its data properly. Signed-off-by: David Brown <[email protected]>
1 parent 4d8ab92 commit 0dba021

File tree

2 files changed

+153
-180
lines changed

2 files changed

+153
-180
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
//! Async Semaphore based demo
2+
//!
3+
//! This implementation on the dining philosopher problem uses Zephyr semaphores to represent the
4+
//! forks. Each philosopher dines as per the algorithm a number of times, and when the are all
5+
//! finished, the test is considered successful. Deadlock will result in the primary thread not
6+
//! completing.
7+
//!
8+
//! Notably, this uses Rc and RefCell along with spawn_local to demonstrate that multiple async
9+
//! tasks run on the same worker do not need Send. It is just important that write operations on
10+
//! the RefCell do not `.await` or a panic is likely.
11+
12+
use core::cell::RefCell;
13+
14+
use alloc::{rc::Rc, vec::Vec};
15+
use zephyr::{
16+
kio::{sleep, spawn_local},
17+
printkln,
18+
sys::sync::Semaphore,
19+
time::Forever,
20+
};
21+
22+
use crate::{get_random_delay, Stats, NUM_PHIL};
23+
24+
/// Number of iterations of each philospher.
25+
///
26+
/// Should be long enough to exercise the test, but too
27+
/// long and the test will timeout. The delay calculated will randomly be between 25 and 775, and
28+
/// there are two waits, so typically, each "eat" will take about a second.
29+
const EAT_COUNT: usize = 10;
30+
31+
pub async fn phil() -> Stats {
32+
// It is a little tricky to be able to use local workers. We have to have this nested thread
33+
// that waits. This is because the Future from `local_phil()` does not implement Send, since it
34+
// waits for the philosophers, which are not Send. However, this outer async function does not
35+
// hold onto any data that is not send, and therefore will be Send. Fortunately, this extra
36+
// Future is very lightweight.
37+
spawn_local(local_phil(), c"phil_wrap").join_async().await
38+
}
39+
40+
async fn local_phil() -> Stats {
41+
// Our overall stats.
42+
let stats = Rc::new(RefCell::new(Stats::default()));
43+
44+
// One fork for each philospher.
45+
let forks: Vec<_> = (0..NUM_PHIL)
46+
.map(|_| Rc::new(Semaphore::new(1, 1).unwrap()))
47+
.collect();
48+
49+
// Create all of the philosphers
50+
let phils: Vec<_> = (0..NUM_PHIL)
51+
.map(|i| {
52+
// Determine the two forks. The forks are paired with each philosopher taking the fork of
53+
// their number, and the next on, module the size of the ring. However, for the last case,
54+
// we need to swap the forks used, it is necessary to obey a strict ordering of the locks to
55+
// avoid deadlocks.
56+
let forks = if i == NUM_PHIL - 1 {
57+
[forks[0].clone(), forks[i].clone()]
58+
} else {
59+
[forks[i].clone(), forks[i + 1].clone()]
60+
};
61+
62+
spawn_local(one_phil(forks, i, stats.clone()), c"phil")
63+
})
64+
.collect();
65+
66+
// Wait for them all to finish.
67+
for p in phils {
68+
p.join_async().await;
69+
}
70+
71+
// Leak the stats as a test.
72+
// Uncomment this to test that the expect below does truly detect a missed drop.
73+
// let _ = Rc::into_raw(stats.clone());
74+
75+
// At this point, all of the philosphers should have dropped their stats ref, and we should be
76+
// able to turn stats back into it's value.
77+
// This tests that completed work does drop the future.
78+
Rc::into_inner(stats)
79+
.expect("Failure: a philospher didn't drop it's future")
80+
.into_inner()
81+
}
82+
83+
/// Simulate a single philospher.
84+
///
85+
/// The forks must be ordered with the first fork having th lowest number, otherwise this will
86+
/// likely deadlock.
87+
///
88+
/// This will run for EAT_COUNT times, and then return.
89+
async fn one_phil(forks: [Rc<Semaphore>; 2], n: usize, stats: Rc<RefCell<Stats>>) {
90+
for i in 0..EAT_COUNT {
91+
// Acquire the forks.
92+
// printkln!("Child {n} take left fork");
93+
forks[0].take_async(Forever).await.unwrap();
94+
// printkln!("Child {n} take right fork");
95+
forks[1].take_async(Forever).await.unwrap();
96+
97+
// printkln!("Child {n} eating");
98+
let delay = get_random_delay(n, 25);
99+
sleep(delay).await;
100+
stats.borrow_mut().record_eat(n, delay);
101+
102+
// Release the forks.
103+
// printkln!("Child {n} giving up forks");
104+
forks[1].give();
105+
forks[0].give();
106+
107+
let delay = get_random_delay(n, 25);
108+
sleep(delay).await;
109+
stats.borrow_mut().record_think(n, delay);
110+
111+
printkln!("Philospher {n} finished eating time {i}");
112+
}
113+
}

samples/work-philosophers/src/lib.rs

Lines changed: 40 additions & 180 deletions
Original file line numberDiff line numberDiff line change
@@ -2,213 +2,76 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
#![no_std]
5-
65
// Cargo tries to detect configs that have typos in them. Unfortunately, the Zephyr Kconfig system
76
// uses a large number of Kconfigs and there is no easy way to know which ones might conceivably be
87
// valid. This prevents a warning about each cfg that is used.
98
#![allow(unexpected_cfgs)]
109

1110
extern crate alloc;
1211

13-
use alloc::{boxed::Box, ffi::CString, format};
14-
use zephyr::{kobj_define, printkln, sync::Arc, sys::sync::Semaphore, time::{Duration, Forever}, work::{futures::sleep, Signal, WorkQueue, WorkQueueBuilder}};
12+
use zephyr::{
13+
kio::spawn,
14+
kobj_define, printkln,
15+
sync::Arc,
16+
sys::uptime_get,
17+
time::{Duration, Tick},
18+
work::WorkQueueBuilder,
19+
};
20+
21+
mod async_sem;
1522

1623
/// How many philosophers. There will be the same number of forks.
17-
const _NUM_PHIL: usize = 6;
24+
const NUM_PHIL: usize = 6;
1825

1926
/// Size of the stack for the work queue.
2027
const WORK_STACK_SIZE: usize = 2048;
2128

2229
// The dining philosophers problem is a simple example of cooperation between multiple threads.
23-
// This implementation use one of several different underlying mechanism to support this cooperation.
24-
25-
// This example uses dynamic dispatch to allow multiple implementations. The intent is to be able
26-
// to periodically shut down all of the philosphers and start them up with a differernt sync
27-
// mechanism. This isn't implemented yet.
28-
29-
/// The philosophers use a fork synchronization mechanism. Essentially, this is 6 locks, and will be
30-
/// implemented in a few different ways to demonstrate/test different mechanmism in Rust. All of
31-
/// them implement The ForkSync trait which provides this mechanism.
32-
/*
33-
trait ForkSync: core::fmt::Debug + Sync + Send {
34-
/// Take the given fork. The are indexed the same as the philosopher index number. This will
35-
/// block until the fork is released.
36-
fn take(&self, index: usize);
37-
38-
/// Release the given fork. Index is the same as take.
39-
fn release(&self, index: usize);
40-
}
41-
*/
30+
// This implementation demonstrates a few ways that Zephyr's work-queues can be used to simulate
31+
// this problem.
4232

4333
#[no_mangle]
4434
extern "C" fn rust_main() {
45-
printkln!("Hello world from Rust on {}",
46-
zephyr::kconfig::CONFIG_BOARD);
35+
printkln!(
36+
"Async/work-queue dining philosophers{}",
37+
zephyr::kconfig::CONFIG_BOARD
38+
);
4739
printkln!("Time tick: {}", zephyr::time::SYS_FREQUENCY);
4840

4941
// Create the work queue to run this.
50-
let worker = Box::new(WorkQueueBuilder::new()
51-
.set_priority(1)
52-
.start(WORK_STACK.init_once(()).unwrap()));
42+
let worker = Arc::new(
43+
WorkQueueBuilder::new()
44+
.set_priority(1)
45+
.start(WORK_STACK.init_once(()).unwrap()),
46+
);
5347

5448
// In addition, create a lower priority worker.
55-
let lower_worker = Arc::new(WorkQueueBuilder::new()
56-
.set_priority(5)
57-
.start(LOWER_WORK_STACK.init_once(()).unwrap()));
49+
let lower_worker = Arc::new(
50+
WorkQueueBuilder::new()
51+
.set_priority(5)
52+
.start(LOWER_WORK_STACK.init_once(()).unwrap()),
53+
);
5854

5955
// It is important that work queues are not dropped, as they are persistent objects in the
6056
// Zephyr world.
6157
let _ = Arc::into_raw(lower_worker.clone());
58+
let _ = Arc::into_raw(worker.clone());
6259

63-
// Create a semaphore the phil thread can wait for, part way through its work.
64-
let sem = Arc::new(Semaphore::new(0, 1).unwrap());
65-
66-
let th = phil_thread(0, sem.clone(), lower_worker.clone());
67-
printkln!("Size of phil thread: {}", core::mem::size_of_val(&th));
68-
// TODO: How to do as much on the stack as we can, for now, don't worry too much.
69-
// let mut th = pin!(th);
70-
71-
let th = zephyr::kio::spawn(th, &worker, c"phil-worker");
72-
73-
// As we no longer use the worker, it is important that it never be dropped. Leaking the box
74-
// ensures that it will not be deallocated.
75-
Box::leak(worker);
76-
77-
// Sleep a bit to allow the worker to run to where it is waiting.
78-
zephyr::time::sleep(Duration::millis_at_least(1_000));
79-
printkln!("Giving the semaphore");
80-
sem.give();
81-
82-
let result = th.join();
83-
printkln!("th result: {:?}", result);
60+
// First run the async semaphore based one.
61+
printkln!("Running 'async-sem' test");
62+
let handle = spawn(async_sem::phil(), &worker, c"async-sem");
63+
let stats = handle.join();
64+
printkln!("Done with 'async-sem' test");
65+
stats.show();
8466

85-
/*
86-
// TODO: The allocated Arc doesn't work on all Zephyr platforms, but need to make our own copy
87-
// of alloc::task
88-
let waker = Arc::new(PWaker).into();
89-
let mut cx = Context::from_waker(&waker);
90-
91-
// Run the future to completion.
92-
loop {
93-
match th.as_mut().poll(&mut cx) {
94-
Poll::Pending => todo!(),
95-
Poll::Ready(_) => break,
96-
}
97-
}
98-
*/
9967
printkln!("All threads done");
100-
101-
/*
102-
let stats = Arc::new(Mutex::new_from(Stats::default(), STAT_MUTEX.init_once(()).unwrap()));
103-
104-
let syncers = get_syncer();
105-
106-
printkln!("Pre fork");
107-
108-
for (i, syncer) in (0..NUM_PHIL).zip(syncers.into_iter()) {
109-
let child_stat = stats.clone();
110-
let thread = PHIL_THREADS[i].init_once(PHIL_STACKS[i].init_once(()).unwrap()).unwrap();
111-
thread.spawn(move || {
112-
phil_thread(i, syncer, child_stat);
113-
});
114-
}
115-
116-
let delay = Duration::secs_at_least(10);
117-
loop {
118-
// Periodically, printout the stats.
119-
zephyr::time::sleep(delay);
120-
stats.lock().unwrap().show();
121-
}
122-
*/
123-
}
124-
125-
async fn phil_thread(n: usize, sem: Arc<Semaphore>, lower_thread: Arc<WorkQueue>) -> usize {
126-
printkln!("Child {} started", n);
127-
show_it(&sem).await;
128-
sleep(Duration::millis_at_least(1000)).await;
129-
printkln!("Child {} done sleeping", n);
130-
131-
// Lastly fire off something on the other worker thread.
132-
let sig = Arc::new(Signal::new().unwrap());
133-
let sig2 = sig.clone();
134-
135-
// To help with debugging, we can name these, but it must be a `&'static Cstr`. We'll build a
136-
// CString (on the heap), and then leak it to be owned by the work. The leaking while keeping
137-
// the `&'static CStr` is a bit messy.
138-
let name = CString::new(format!("phil-{}", n)).unwrap();
139-
let name = Box::leak(name.into_boxed_c_str());
140-
141-
// It _should_ be safe to drop the child, as it uses an internal clone of the Arc to keep it
142-
// around as long as there is a worker.
143-
let _child = Box::new(zephyr::kio::spawn(async move {
144-
sleep(Duration::millis_at_least(800)).await;
145-
sig2.raise(12345).unwrap();
146-
}, &lower_thread, name));
147-
148-
// Wait for the signal.
149-
let num = sig.wait_async(Forever).await.unwrap();
150-
printkln!("Signaled value: {}", num);
151-
152-
42
153-
}
154-
155-
async fn show_it(sem: &Semaphore) {
156-
for i in 0..10 {
157-
sleep(Duration::millis_at_least(1)).await;
158-
printkln!("Tick: {i}");
159-
160-
// Wait after 5 for the semaphore to be signaled.
161-
if i == 4 {
162-
printkln!("Waiting for semaphore");
163-
sem.take_async(Forever).await.unwrap();
164-
printkln!("Done waiting");
165-
}
166-
}
16768
}
16869

16970
kobj_define! {
17071
static WORK_STACK: ThreadStack<WORK_STACK_SIZE>;
17172
static LOWER_WORK_STACK: ThreadStack<WORK_STACK_SIZE>;
17273
}
17374

174-
/*
175-
fn phil_thread(n: usize, syncer: Arc<dyn ForkSync>, stats: Arc<Mutex<Stats>>) {
176-
printkln!("Child {} started: {:?}", n, syncer);
177-
178-
// Determine our two forks.
179-
let forks = if n == NUM_PHIL - 1 {
180-
// Per Dijkstra, the last phyilosopher needs to reverse forks, or we deadlock.
181-
(0, n)
182-
} else {
183-
(n, n+1)
184-
};
185-
186-
loop {
187-
{
188-
// printkln!("Child {} hungry", n);
189-
// printkln!("Child {} take left fork", n);
190-
syncer.take(forks.0);
191-
// printkln!("Child {} take right fork", n);
192-
syncer.take(forks.1);
193-
194-
let delay = get_random_delay(n, 25);
195-
// printkln!("Child {} eating ({} ms)", n, delay);
196-
sleep(delay);
197-
stats.lock().unwrap().record_eat(n, delay);
198-
199-
// Release the forks.
200-
// printkln!("Child {} giving up forks", n);
201-
syncer.release(forks.1);
202-
syncer.release(forks.0);
203-
204-
let delay = get_random_delay(n, 25);
205-
// printkln!("Child {} thinking ({} ms)", n, delay);
206-
sleep(delay);
207-
stats.lock().unwrap().record_think(n, delay);
208-
}
209-
}
210-
}
211-
21275
/// Get a random delay, based on the ID of this user, and the current uptime.
21376
fn get_random_delay(id: usize, period: usize) -> Duration {
21477
let tick = (uptime_get() & (usize::MAX as i64)) as usize;
@@ -241,14 +104,11 @@ impl Stats {
241104
}
242105

243106
fn show(&self) {
244-
printkln!("c:{:?}, e:{:?}, t:{:?}", self.count, self.eating, self.thinking);
107+
printkln!(
108+
"c:{:?}, e:{:?}, t:{:?}",
109+
self.count,
110+
self.eating,
111+
self.thinking
112+
);
245113
}
246114
}
247-
248-
kobj_define! {
249-
static PHIL_THREADS: [StaticThread; NUM_PHIL];
250-
static PHIL_STACKS: [ThreadStack<PHIL_STACK_SIZE>; NUM_PHIL];
251-
252-
static STAT_MUTEX: StaticMutex;
253-
}
254-
*/

0 commit comments

Comments
 (0)