|
2 | 2 | // SPDX-License-Identifier: Apache-2.0
|
3 | 3 |
|
4 | 4 | #![no_std]
|
5 |
| - |
6 | 5 | // Cargo tries to detect configs that have typos in them. Unfortunately, the Zephyr Kconfig system
|
7 | 6 | // uses a large number of Kconfigs and there is no easy way to know which ones might conceivably be
|
8 | 7 | // valid. This prevents a warning about each cfg that is used.
|
9 | 8 | #![allow(unexpected_cfgs)]
|
10 | 9 |
|
11 | 10 | extern crate alloc;
|
12 | 11 |
|
13 |
| -use alloc::{boxed::Box, ffi::CString, format}; |
14 |
| -use zephyr::{kobj_define, printkln, sync::Arc, sys::sync::Semaphore, time::{Duration, Forever}, work::{futures::sleep, Signal, WorkQueue, WorkQueueBuilder}}; |
| 12 | +use zephyr::{ |
| 13 | + kio::spawn, |
| 14 | + kobj_define, printkln, |
| 15 | + sync::Arc, |
| 16 | + sys::uptime_get, |
| 17 | + time::{Duration, Tick}, |
| 18 | + work::WorkQueueBuilder, |
| 19 | +}; |
| 20 | + |
| 21 | +mod async_sem; |
15 | 22 |
|
16 | 23 | /// How many philosophers. There will be the same number of forks.
|
17 |
| -const _NUM_PHIL: usize = 6; |
| 24 | +const NUM_PHIL: usize = 6; |
18 | 25 |
|
19 | 26 | /// Size of the stack for the work queue.
|
20 | 27 | const WORK_STACK_SIZE: usize = 2048;
|
21 | 28 |
|
22 | 29 | // The dining philosophers problem is a simple example of cooperation between multiple threads.
|
23 |
| -// This implementation use one of several different underlying mechanism to support this cooperation. |
24 |
| - |
25 |
| -// This example uses dynamic dispatch to allow multiple implementations. The intent is to be able |
26 |
| -// to periodically shut down all of the philosphers and start them up with a differernt sync |
27 |
| -// mechanism. This isn't implemented yet. |
28 |
| - |
29 |
| -/// The philosophers use a fork synchronization mechanism. Essentially, this is 6 locks, and will be |
30 |
| -/// implemented in a few different ways to demonstrate/test different mechanmism in Rust. All of |
31 |
| -/// them implement The ForkSync trait which provides this mechanism. |
32 |
| -/* |
33 |
| -trait ForkSync: core::fmt::Debug + Sync + Send { |
34 |
| - /// Take the given fork. The are indexed the same as the philosopher index number. This will |
35 |
| - /// block until the fork is released. |
36 |
| - fn take(&self, index: usize); |
37 |
| -
|
38 |
| - /// Release the given fork. Index is the same as take. |
39 |
| - fn release(&self, index: usize); |
40 |
| -} |
41 |
| -*/ |
| 30 | +// This implementation demonstrates a few ways that Zephyr's work-queues can be used to simulate |
| 31 | +// this problem. |
42 | 32 |
|
43 | 33 | #[no_mangle]
|
44 | 34 | extern "C" fn rust_main() {
|
45 |
| - printkln!("Hello world from Rust on {}", |
46 |
| - zephyr::kconfig::CONFIG_BOARD); |
| 35 | + printkln!( |
| 36 | + "Async/work-queue dining philosophers{}", |
| 37 | + zephyr::kconfig::CONFIG_BOARD |
| 38 | + ); |
47 | 39 | printkln!("Time tick: {}", zephyr::time::SYS_FREQUENCY);
|
48 | 40 |
|
49 | 41 | // Create the work queue to run this.
|
50 |
| - let worker = Box::new(WorkQueueBuilder::new() |
51 |
| - .set_priority(1) |
52 |
| - .start(WORK_STACK.init_once(()).unwrap())); |
| 42 | + let worker = Arc::new( |
| 43 | + WorkQueueBuilder::new() |
| 44 | + .set_priority(1) |
| 45 | + .start(WORK_STACK.init_once(()).unwrap()), |
| 46 | + ); |
53 | 47 |
|
54 | 48 | // In addition, create a lower priority worker.
|
55 |
| - let lower_worker = Arc::new(WorkQueueBuilder::new() |
56 |
| - .set_priority(5) |
57 |
| - .start(LOWER_WORK_STACK.init_once(()).unwrap())); |
| 49 | + let lower_worker = Arc::new( |
| 50 | + WorkQueueBuilder::new() |
| 51 | + .set_priority(5) |
| 52 | + .start(LOWER_WORK_STACK.init_once(()).unwrap()), |
| 53 | + ); |
58 | 54 |
|
59 | 55 | // It is important that work queues are not dropped, as they are persistent objects in the
|
60 | 56 | // Zephyr world.
|
61 | 57 | let _ = Arc::into_raw(lower_worker.clone());
|
| 58 | + let _ = Arc::into_raw(worker.clone()); |
62 | 59 |
|
63 |
| - // Create a semaphore the phil thread can wait for, part way through its work. |
64 |
| - let sem = Arc::new(Semaphore::new(0, 1).unwrap()); |
65 |
| - |
66 |
| - let th = phil_thread(0, sem.clone(), lower_worker.clone()); |
67 |
| - printkln!("Size of phil thread: {}", core::mem::size_of_val(&th)); |
68 |
| - // TODO: How to do as much on the stack as we can, for now, don't worry too much. |
69 |
| - // let mut th = pin!(th); |
70 |
| - |
71 |
| - let th = zephyr::kio::spawn(th, &worker, c"phil-worker"); |
72 |
| - |
73 |
| - // As we no longer use the worker, it is important that it never be dropped. Leaking the box |
74 |
| - // ensures that it will not be deallocated. |
75 |
| - Box::leak(worker); |
76 |
| - |
77 |
| - // Sleep a bit to allow the worker to run to where it is waiting. |
78 |
| - zephyr::time::sleep(Duration::millis_at_least(1_000)); |
79 |
| - printkln!("Giving the semaphore"); |
80 |
| - sem.give(); |
81 |
| - |
82 |
| - let result = th.join(); |
83 |
| - printkln!("th result: {:?}", result); |
| 60 | + // First run the async semaphore based one. |
| 61 | + printkln!("Running 'async-sem' test"); |
| 62 | + let handle = spawn(async_sem::phil(), &worker, c"async-sem"); |
| 63 | + let stats = handle.join(); |
| 64 | + printkln!("Done with 'async-sem' test"); |
| 65 | + stats.show(); |
84 | 66 |
|
85 |
| - /* |
86 |
| - // TODO: The allocated Arc doesn't work on all Zephyr platforms, but need to make our own copy |
87 |
| - // of alloc::task |
88 |
| - let waker = Arc::new(PWaker).into(); |
89 |
| - let mut cx = Context::from_waker(&waker); |
90 |
| -
|
91 |
| - // Run the future to completion. |
92 |
| - loop { |
93 |
| - match th.as_mut().poll(&mut cx) { |
94 |
| - Poll::Pending => todo!(), |
95 |
| - Poll::Ready(_) => break, |
96 |
| - } |
97 |
| - } |
98 |
| - */ |
99 | 67 | printkln!("All threads done");
|
100 |
| - |
101 |
| - /* |
102 |
| - let stats = Arc::new(Mutex::new_from(Stats::default(), STAT_MUTEX.init_once(()).unwrap())); |
103 |
| -
|
104 |
| - let syncers = get_syncer(); |
105 |
| -
|
106 |
| - printkln!("Pre fork"); |
107 |
| -
|
108 |
| - for (i, syncer) in (0..NUM_PHIL).zip(syncers.into_iter()) { |
109 |
| - let child_stat = stats.clone(); |
110 |
| - let thread = PHIL_THREADS[i].init_once(PHIL_STACKS[i].init_once(()).unwrap()).unwrap(); |
111 |
| - thread.spawn(move || { |
112 |
| - phil_thread(i, syncer, child_stat); |
113 |
| - }); |
114 |
| - } |
115 |
| -
|
116 |
| - let delay = Duration::secs_at_least(10); |
117 |
| - loop { |
118 |
| - // Periodically, printout the stats. |
119 |
| - zephyr::time::sleep(delay); |
120 |
| - stats.lock().unwrap().show(); |
121 |
| - } |
122 |
| - */ |
123 |
| -} |
124 |
| - |
125 |
| -async fn phil_thread(n: usize, sem: Arc<Semaphore>, lower_thread: Arc<WorkQueue>) -> usize { |
126 |
| - printkln!("Child {} started", n); |
127 |
| - show_it(&sem).await; |
128 |
| - sleep(Duration::millis_at_least(1000)).await; |
129 |
| - printkln!("Child {} done sleeping", n); |
130 |
| - |
131 |
| - // Lastly fire off something on the other worker thread. |
132 |
| - let sig = Arc::new(Signal::new().unwrap()); |
133 |
| - let sig2 = sig.clone(); |
134 |
| - |
135 |
| - // To help with debugging, we can name these, but it must be a `&'static Cstr`. We'll build a |
136 |
| - // CString (on the heap), and then leak it to be owned by the work. The leaking while keeping |
137 |
| - // the `&'static CStr` is a bit messy. |
138 |
| - let name = CString::new(format!("phil-{}", n)).unwrap(); |
139 |
| - let name = Box::leak(name.into_boxed_c_str()); |
140 |
| - |
141 |
| - // It _should_ be safe to drop the child, as it uses an internal clone of the Arc to keep it |
142 |
| - // around as long as there is a worker. |
143 |
| - let _child = Box::new(zephyr::kio::spawn(async move { |
144 |
| - sleep(Duration::millis_at_least(800)).await; |
145 |
| - sig2.raise(12345).unwrap(); |
146 |
| - }, &lower_thread, name)); |
147 |
| - |
148 |
| - // Wait for the signal. |
149 |
| - let num = sig.wait_async(Forever).await.unwrap(); |
150 |
| - printkln!("Signaled value: {}", num); |
151 |
| - |
152 |
| - 42 |
153 |
| -} |
154 |
| - |
155 |
| -async fn show_it(sem: &Semaphore) { |
156 |
| - for i in 0..10 { |
157 |
| - sleep(Duration::millis_at_least(1)).await; |
158 |
| - printkln!("Tick: {i}"); |
159 |
| - |
160 |
| - // Wait after 5 for the semaphore to be signaled. |
161 |
| - if i == 4 { |
162 |
| - printkln!("Waiting for semaphore"); |
163 |
| - sem.take_async(Forever).await.unwrap(); |
164 |
| - printkln!("Done waiting"); |
165 |
| - } |
166 |
| - } |
167 | 68 | }
|
168 | 69 |
|
169 | 70 | kobj_define! {
|
170 | 71 | static WORK_STACK: ThreadStack<WORK_STACK_SIZE>;
|
171 | 72 | static LOWER_WORK_STACK: ThreadStack<WORK_STACK_SIZE>;
|
172 | 73 | }
|
173 | 74 |
|
174 |
| -/* |
175 |
| -fn phil_thread(n: usize, syncer: Arc<dyn ForkSync>, stats: Arc<Mutex<Stats>>) { |
176 |
| - printkln!("Child {} started: {:?}", n, syncer); |
177 |
| -
|
178 |
| - // Determine our two forks. |
179 |
| - let forks = if n == NUM_PHIL - 1 { |
180 |
| - // Per Dijkstra, the last phyilosopher needs to reverse forks, or we deadlock. |
181 |
| - (0, n) |
182 |
| - } else { |
183 |
| - (n, n+1) |
184 |
| - }; |
185 |
| -
|
186 |
| - loop { |
187 |
| - { |
188 |
| - // printkln!("Child {} hungry", n); |
189 |
| - // printkln!("Child {} take left fork", n); |
190 |
| - syncer.take(forks.0); |
191 |
| - // printkln!("Child {} take right fork", n); |
192 |
| - syncer.take(forks.1); |
193 |
| -
|
194 |
| - let delay = get_random_delay(n, 25); |
195 |
| - // printkln!("Child {} eating ({} ms)", n, delay); |
196 |
| - sleep(delay); |
197 |
| - stats.lock().unwrap().record_eat(n, delay); |
198 |
| -
|
199 |
| - // Release the forks. |
200 |
| - // printkln!("Child {} giving up forks", n); |
201 |
| - syncer.release(forks.1); |
202 |
| - syncer.release(forks.0); |
203 |
| -
|
204 |
| - let delay = get_random_delay(n, 25); |
205 |
| - // printkln!("Child {} thinking ({} ms)", n, delay); |
206 |
| - sleep(delay); |
207 |
| - stats.lock().unwrap().record_think(n, delay); |
208 |
| - } |
209 |
| - } |
210 |
| -} |
211 |
| -
|
212 | 75 | /// Get a random delay, based on the ID of this user, and the current uptime.
|
213 | 76 | fn get_random_delay(id: usize, period: usize) -> Duration {
|
214 | 77 | let tick = (uptime_get() & (usize::MAX as i64)) as usize;
|
@@ -241,14 +104,11 @@ impl Stats {
|
241 | 104 | }
|
242 | 105 |
|
243 | 106 | fn show(&self) {
|
244 |
| - printkln!("c:{:?}, e:{:?}, t:{:?}", self.count, self.eating, self.thinking); |
| 107 | + printkln!( |
| 108 | + "c:{:?}, e:{:?}, t:{:?}", |
| 109 | + self.count, |
| 110 | + self.eating, |
| 111 | + self.thinking |
| 112 | + ); |
245 | 113 | }
|
246 | 114 | }
|
247 |
| -
|
248 |
| -kobj_define! { |
249 |
| - static PHIL_THREADS: [StaticThread; NUM_PHIL]; |
250 |
| - static PHIL_STACKS: [ThreadStack<PHIL_STACK_SIZE>; NUM_PHIL]; |
251 |
| -
|
252 |
| - static STAT_MUTEX: StaticMutex; |
253 |
| -} |
254 |
| -*/ |
0 commit comments