Skip to content

Commit 6dc9d6f

Browse files
committed
samples: bench: Replace old async with new
Remove the sample that depended on the old non-standard executor built around work queues, and use the zephyr-executor. Signed-off-by: David Brown <[email protected]>
1 parent 54a1aee commit 6dc9d6f

File tree

4 files changed

+540
-347
lines changed

4 files changed

+540
-347
lines changed

samples/bench/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,18 @@ license = "Apache-2.0 or MIT"
1313
crate-type = ["staticlib"]
1414

1515
[dependencies]
16-
zephyr = "0.1.0"
16+
zephyr = { version = "0.1.0", features = ["time-driver", "executor-zephyr"] }
1717
critical-section = "1.1.2"
1818
heapless = "0.8"
1919
static_cell = "2.1"
2020

21+
embassy-executor = { version = "0.7.0", features = ["log", "task-arena-size-2048"] }
22+
embassy-sync = "0.6.2"
23+
embassy-futures = "0.1.1"
24+
25+
# Hard code the tick rate.
26+
embassy-time = { version = "0.4.0", features = ["tick-hz-10_000"] }
27+
2128
# Dependencies that are used by build.rs.
2229
[build-dependencies]
2330
zephyr-build = "0.1.0"

samples/bench/prj.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,16 @@ CONFIG_MAIN_STACK_SIZE=8192
88
CONFIG_POLL=y
99

1010
# CONFIG_USERSPACE=y
11+
# CONFIG_DEBUG=y
12+
# CONFIG_ASSERT=y
13+
# CONFIG_STACK_SENTINEL=y
14+
# CONFIG_STACK_USAGE=y
15+
# CONFIG_STACK_CANARIES=y
1116

1217
# Some debugging
1318
CONFIG_THREAD_MONITOR=y
1419
CONFIG_THREAD_ANALYZER=y
1520
CONFIG_THREAD_ANALYZER_USE_PRINTK=y
1621
CONFIG_THREAD_ANALYZER_AUTO=n
22+
CONFIG_DEBUG_THREAD_INFO=y
1723
# CONFIG_THREAD_ANALYZER_AUTO_INTERVAL=15

samples/bench/src/executor.rs

Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
//! Executor-based tests
2+
//!
3+
//! These tests try to be as similar as possible to the thread-based tests, but instead are built
4+
//! around async.
5+
6+
use core::{ffi::c_int, sync::atomic::Ordering};
7+
8+
use alloc::format;
9+
use embassy_executor::SendSpawner;
10+
use embassy_futures::yield_now;
11+
use embassy_sync::{
12+
blocking_mutex::raw::CriticalSectionRawMutex,
13+
channel::Channel,
14+
semaphore::Semaphore,
15+
signal::Signal,
16+
};
17+
#[allow(unused_imports)]
18+
use embassy_sync::semaphore::{GreedySemaphore, FairSemaphore};
19+
use static_cell::StaticCell;
20+
use zephyr::{embassy::Executor, sync::atomic::AtomicBool};
21+
22+
use crate::{BenchTimer, Command, TestResult, NUM_THREADS, THREAD_STACK_SIZE};
23+
24+
/// As the tests do exercise different executors at different priorities, use the critical section
25+
/// variant.
26+
// type ASemaphore = GreedySemaphore<CriticalSectionRawMutex>;
27+
type ASemaphore = FairSemaphore<CriticalSectionRawMutex, {NUM_THREADS + 2}>;
28+
29+
/// A Signal for reporting back spawners.
30+
type SpawnerSignal = Signal<CriticalSectionRawMutex, SendSpawner>;
31+
32+
/// Async-based test runner.
33+
pub struct AsyncTests {
34+
/// Each test thread gets a semaphore, to use as appropriate for that test.
35+
sems: &'static [ASemaphore],
36+
37+
/// A back sem for the reverse direction.
38+
back_sem: &'static ASemaphore,
39+
40+
/// The spawners, of the three priorities.
41+
spawners: [SendSpawner; 3],
42+
43+
/// Tests results are communicated back via this channels.
44+
results: Channel<CriticalSectionRawMutex, TestResult, 1>,
45+
46+
/// Main started, allows main to be started lazily, after we've become static.
47+
main_started: AtomicBool,
48+
49+
/// Indicates back to the main thread that the given test is finished.
50+
end_wait: zephyr::sys::sync::Semaphore,
51+
}
52+
53+
/// The executors for the tests. The values are low, medium, and high priority.
54+
static EXECUTORS: [StaticCell<Executor>; 3] = [const { StaticCell::new() }; 3];
55+
56+
/// Each executor returns a SendSpawner to span tasks on that executor.
57+
static SPAWNERS: [Signal<CriticalSectionRawMutex, SendSpawner>; 3] = [const { Signal::new() }; 3];
58+
59+
/// Static semaphores for the above.
60+
static SEMS: [ASemaphore; NUM_THREADS] = [const { ASemaphore::new(0) }; NUM_THREADS];
61+
62+
static BACK_SEM: ASemaphore = ASemaphore::new(0);
63+
64+
/// Main command
65+
static MAIN_CMD: Channel<CriticalSectionRawMutex, Command, 1> = Channel::new();
66+
67+
impl AsyncTests {
68+
/// Construct a new set of tests, firing up the threads for the differernt priority executors.
69+
pub fn new(count: usize) -> Self {
70+
// printkln!("Starting executors");
71+
let spawners = [0, 1, 2].map(|id| {
72+
let thread = executor_thread(&EXECUTORS[id], &SPAWNERS[id]);
73+
thread.set_priority(id as c_int + 5);
74+
thread.start();
75+
76+
zephyr::time::sleep(zephyr::time::Duration::millis_at_least(1));
77+
if let Some(sp) = SPAWNERS[id].try_take() {
78+
sp
79+
} else {
80+
panic!("Executor thread did not initialize properly");
81+
}
82+
});
83+
84+
Self {
85+
sems: &SEMS[0..count],
86+
back_sem: &BACK_SEM,
87+
spawners,
88+
results: Channel::new(),
89+
main_started: AtomicBool::new(false),
90+
end_wait: zephyr::sys::sync::Semaphore::new(0, 1),
91+
}
92+
}
93+
94+
/// Run one of the given tests.
95+
///
96+
/// Fires off the appropriate workers for the given test. Tests follow a basic pattern:
97+
/// There are NUM_THREADS + 2 tasks spawned. These communicate as appropriate for the given
98+
/// test, and the results are then collected, waiting for everything to finish, and reported.
99+
pub fn run(&'static self, command: Command) {
100+
if !self.main_started.load(Ordering::Relaxed) {
101+
self.spawners[1].spawn(main_run(self)).unwrap();
102+
self.main_started.store(true, Ordering::Relaxed);
103+
}
104+
105+
if MAIN_CMD.try_send(command).is_err() {
106+
panic!("Main queue filled up");
107+
}
108+
109+
// Wait for the Zephyr semaphore indicating the test is finished.
110+
self.end_wait.take(zephyr::time::Forever).unwrap();
111+
}
112+
113+
/// A simple semaphore test. Tests the time to use the semaphore with no blocking.
114+
async fn simple_sem(&self, id: usize, count: usize) -> usize {
115+
let sem = &self.sems[id];
116+
117+
for _ in 0..count {
118+
sem.release(1);
119+
let rel = sem.acquire(1).await.unwrap();
120+
rel.disarm();
121+
}
122+
123+
count
124+
}
125+
126+
/// A simple semaphore test, with yield. Tests the time to use the semaphore with no blocking.
127+
async fn simple_sem_yield(&self, id: usize, count: usize) -> usize {
128+
let sem = &self.sems[id];
129+
130+
for _ in 0..count {
131+
sem.release(1);
132+
let rel = sem.acquire(1).await.unwrap();
133+
rel.disarm();
134+
yield_now().await;
135+
}
136+
137+
count
138+
}
139+
140+
/// The taker side of the SemWait test.
141+
async fn sem_wait_taker(&self, id: usize, count: usize) -> usize {
142+
let sem = &self.sems[id];
143+
for _ in 0..count {
144+
let rel = sem.acquire(1).await.unwrap();
145+
rel.disarm();
146+
}
147+
count
148+
}
149+
150+
/// The giver side of the SemWait test.
151+
async fn sem_wait_giver(&self, count: usize) {
152+
for _ in 0..count {
153+
for sem in self.sems {
154+
sem.release(1);
155+
}
156+
}
157+
}
158+
159+
/// The taker side of the SemPingPong test.
160+
async fn sem_ping_pong_taker(&self, count: usize) -> usize {
161+
let sem = &self.sems[0];
162+
let back_sem = self.back_sem;
163+
164+
// zephyr::printkln!("Taking {count} sems");
165+
for _ in 0..count {
166+
//zephyr::printkln!("acquire1");
167+
let rel = sem.acquire(1).await.unwrap();
168+
//zephyr::printkln!("acquired1");
169+
rel.disarm();
170+
//zephyr::printkln!("release2");
171+
back_sem.release(1);
172+
}
173+
// zephyr::printkln!("Taking sems done");
174+
175+
count
176+
}
177+
178+
/// The giver side of the SemPingPong test. This uses the first sem and back sem to ping pong
179+
/// across all of the workers.
180+
async fn sem_ping_pong_giver(&self, count: usize) {
181+
let sem = &self.sems[0];
182+
let back_sem = self.back_sem;
183+
184+
// zephyr::printkln!("Giving {},{} sems", count, self.sems.len());
185+
for _ in 0..count {
186+
for _ in 0..self.sems.len() {
187+
//zephyr::printkln!("release1");
188+
sem.release(1);
189+
//zephyr::printkln!("acquire2");
190+
let rel = back_sem.acquire(1).await.unwrap();
191+
//zephyr::printkln!("acquired2");
192+
rel.disarm();
193+
}
194+
}
195+
// zephyr::printkln!("Giving sems done");
196+
}
197+
}
198+
199+
/// The low priority worker, depending on the test, performs some operations at a lower priority
200+
/// than the main threads.
201+
#[embassy_executor::task]
202+
async fn low_worker(this: &'static AsyncTests, command: Command) {
203+
match command {
204+
Command::Empty => (),
205+
Command::SimpleSem(_) => (),
206+
Command::SimpleSemYield(_) => (),
207+
Command::SemWait(count) => this.sem_wait_giver(count).await,
208+
Command::SemWaitSame(count) => this.sem_wait_giver(count).await,
209+
Command::SemPingPong(count) => this.sem_ping_pong_giver(count).await,
210+
Command::SemOnePingPong(count) => this.sem_ping_pong_giver(count).await,
211+
command => panic!("Not implemented: {:?}", command),
212+
}
213+
214+
this.results.send(TestResult::Low).await;
215+
}
216+
217+
/// The high priority worker, performs some operations at a higher priority than the main threads.
218+
#[embassy_executor::task]
219+
async fn high_worker(this: &'static AsyncTests, command: Command) {
220+
// printkln!("high_worker started");
221+
match command {
222+
Command::Empty => (),
223+
Command::SimpleSem(_) => (),
224+
Command::SimpleSemYield(_) => (),
225+
Command::SemWait(_) => (),
226+
Command::SemWaitSame(_) => (),
227+
Command::SemPingPong(_) => (),
228+
Command::SemOnePingPong(_) => (),
229+
command => panic!("Not implemented: {:?}", command),
230+
}
231+
232+
this.results.send(TestResult::High).await;
233+
}
234+
235+
/// The main worker threads.
236+
///
237+
/// These perform the main work of the test, generally communicating with either the main test task,
238+
/// which is in the same executor, or with the higher or lower priority worker.
239+
#[embassy_executor::task(pool_size = NUM_THREADS)]
240+
async fn worker(this: &'static AsyncTests, command: Command, id: usize) {
241+
let total;
242+
match command {
243+
Command::Empty => total = 1,
244+
Command::SimpleSem(count) => total = this.simple_sem(id, count).await,
245+
Command::SimpleSemYield(count) => total = this.simple_sem_yield(id, count).await,
246+
Command::SemWait(count) => total = this.sem_wait_taker(id, count).await,
247+
Command::SemWaitSame(count) => total = this.sem_wait_taker(id, count).await,
248+
Command::SemPingPong(count) => total = this.sem_ping_pong_taker(count).await,
249+
Command::SemOnePingPong(count) => total = this.sem_ping_pong_taker(count).await,
250+
command => panic!("Not implemented: {:?}", command),
251+
}
252+
253+
this.results
254+
.send(TestResult::Worker { id, count: total })
255+
.await;
256+
}
257+
258+
/// Main worker.
259+
///
260+
/// This task performs the main work.
261+
#[embassy_executor::task]
262+
async fn main_run(this: &'static AsyncTests) {
263+
loop {
264+
let command = MAIN_CMD.receive().await;
265+
let msg = format!("async {:?}", command);
266+
267+
let mut timer = BenchTimer::new(&msg, 1);
268+
269+
let ntasks = this.sems.len();
270+
271+
// Before starting, reset the semaphores.
272+
for sem in this.sems {
273+
sem.set(0);
274+
}
275+
this.back_sem.set(0);
276+
277+
if true {
278+
this.spawners[2].spawn(high_worker(this, command)).unwrap();
279+
if command.is_same_priority() {
280+
this.spawners[1].spawn(low_worker(this, command)).unwrap();
281+
} else {
282+
this.spawners[0].spawn(low_worker(this, command)).unwrap();
283+
}
284+
285+
} else {
286+
this.spawners[1].spawn(high_worker(this, command)).unwrap();
287+
this.spawners[1].spawn(low_worker(this, command)).unwrap();
288+
}
289+
290+
for id in 0..ntasks {
291+
this.spawners[1].spawn(worker(this, command, id)).unwrap();
292+
}
293+
294+
let mut results: heapless::Vec<Option<usize>, NUM_THREADS> = heapless::Vec::new();
295+
let mut low = false;
296+
let mut high = false;
297+
let mut msg_count = (2 + ntasks) as isize;
298+
299+
for _ in 0..ntasks {
300+
results.push(None).unwrap();
301+
}
302+
303+
loop {
304+
match this.results.receive().await {
305+
TestResult::Worker { id, count } => {
306+
if results[id].replace(count).is_some() {
307+
panic!("Multiple results from worker {}", id);
308+
}
309+
}
310+
TestResult::Low => {
311+
if low {
312+
panic!("Multiple results from 'low' worker");
313+
}
314+
low = true;
315+
}
316+
TestResult::High => {
317+
if high {
318+
panic!("Multiple results from 'high' worker");
319+
}
320+
high = true;
321+
}
322+
}
323+
msg_count -= 1;
324+
if msg_count <= 0 {
325+
break;
326+
}
327+
}
328+
329+
let count: usize = results.iter().map(|x| x.unwrap_or(0)).sum();
330+
timer.adjust_count(count);
331+
332+
timer.stop();
333+
this.end_wait.give();
334+
}
335+
}
336+
337+
/// Main for each executor.
338+
///
339+
/// This thread starts up an executor, and then places that executor into a 'signal' so the
340+
/// non-async code can get the value.
341+
#[zephyr::thread(stack_size = THREAD_STACK_SIZE, pool_size = 3)]
342+
fn executor_thread(exec: &'static StaticCell<Executor>, spawner_sig: &'static SpawnerSignal) -> ! {
343+
let exec = exec.init(Executor::new());
344+
exec.run(|spawner| {
345+
// TODO: Should we try to have a local spawner as well?
346+
spawner_sig.signal(spawner.make_send());
347+
})
348+
}
349+
350+
// For debugging
351+
#[unsafe(no_mangle)]
352+
extern "C" fn invalid_spinlock(_l: *mut zephyr::raw::k_spinlock, _thread: u32, _id: u32) {
353+
}

0 commit comments

Comments
 (0)