Skip to content

Commit 53105d7

Browse files
committed
samples: bench: Replace old async with new
Remove the sample that depended on the old non-standard executor built around work queues, and use the zephyr-executor. Signed-off-by: David Brown <[email protected]>
1 parent 0a78a5e commit 53105d7

File tree

4 files changed

+511
-325
lines changed

4 files changed

+511
-325
lines changed

samples/bench/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,18 @@ license = "Apache-2.0 or MIT"
1313
crate-type = ["staticlib"]
1414

1515
[dependencies]
16-
zephyr = "0.1.0"
16+
zephyr = { version = "0.1.0", features = ["time-driver", "executor-zephyr"] }
1717
critical-section = "1.1.2"
1818
heapless = "0.8"
1919
static_cell = "2.1"
2020

21+
embassy-executor = { version = "0.7.0", features = ["log", "task-arena-size-2048"] }
22+
embassy-sync = "0.6.2"
23+
embassy-futures = "0.1.1"
24+
25+
# Hard code the tick rate.
26+
embassy-time = { version = "0.4.0", features = ["tick-hz-10_000"] }
27+
2128
# Dependencies that are used by build.rs.
2229
[build-dependencies]
2330
zephyr-build = "0.1.0"

samples/bench/prj.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,16 @@ CONFIG_MAIN_STACK_SIZE=8192
88
CONFIG_POLL=y
99

1010
# CONFIG_USERSPACE=y
11+
# CONFIG_DEBUG=y
12+
# CONFIG_ASSERT=y
13+
# CONFIG_STACK_SENTINEL=y
14+
# CONFIG_STACK_USAGE=y
15+
# CONFIG_STACK_CANARIES=y
1116

1217
# Some debugging
1318
CONFIG_THREAD_MONITOR=y
1419
CONFIG_THREAD_ANALYZER=y
1520
CONFIG_THREAD_ANALYZER_USE_PRINTK=y
1621
CONFIG_THREAD_ANALYZER_AUTO=n
22+
CONFIG_DEBUG_THREAD_INFO=y
1723
# CONFIG_THREAD_ANALYZER_AUTO_INTERVAL=15

samples/bench/src/executor.rs

Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
//! Executor-based tests
2+
//!
3+
//! These tests try to be as similar as possible to the thread-based tests, but instead are built
4+
//! around async.
5+
6+
use core::{ffi::c_int, sync::atomic::Ordering};
7+
8+
use alloc::format;
9+
use embassy_executor::SendSpawner;
10+
use embassy_futures::yield_now;
11+
#[allow(unused_imports)]
12+
use embassy_sync::semaphore::{FairSemaphore, GreedySemaphore};
13+
use embassy_sync::{
14+
blocking_mutex::raw::CriticalSectionRawMutex, channel::Channel, semaphore::Semaphore,
15+
signal::Signal,
16+
};
17+
use static_cell::StaticCell;
18+
use zephyr::{embassy::Executor, sync::atomic::AtomicBool};
19+
20+
use crate::{BenchTimer, Command, TestResult, NUM_THREADS, THREAD_STACK_SIZE};
21+
22+
/// As the tests do exercise different executors at different priorities, use the critical section
23+
/// variant.
24+
// type ASemaphore = GreedySemaphore<CriticalSectionRawMutex>;
25+
type ASemaphore = FairSemaphore<CriticalSectionRawMutex, { NUM_THREADS + 2 }>;
26+
27+
/// A Signal for reporting back spawners.
28+
type SpawnerSignal = Signal<CriticalSectionRawMutex, SendSpawner>;
29+
30+
/// Async-based test runner.
31+
pub struct AsyncTests {
32+
/// Each test thread gets a semaphore, to use as appropriate for that test.
33+
sems: &'static [ASemaphore],
34+
35+
/// A back sem for the reverse direction.
36+
back_sem: &'static ASemaphore,
37+
38+
/// The spawners, of the three priorities.
39+
spawners: [SendSpawner; 3],
40+
41+
/// Tests results are communicated back via this channels.
42+
results: Channel<CriticalSectionRawMutex, TestResult, 1>,
43+
44+
/// Main started, allows main to be started lazily, after we've become static.
45+
main_started: AtomicBool,
46+
47+
/// Indicates back to the main thread that the given test is finished.
48+
end_wait: zephyr::sys::sync::Semaphore,
49+
}
50+
51+
/// The executors for the tests. The values are low, medium, and high priority.
52+
static EXECUTORS: [StaticCell<Executor>; 3] = [const { StaticCell::new() }; 3];
53+
54+
/// Each executor returns a SendSpawner to span tasks on that executor.
55+
static SPAWNERS: [Signal<CriticalSectionRawMutex, SendSpawner>; 3] = [const { Signal::new() }; 3];
56+
57+
/// Static semaphores for the above.
58+
static SEMS: [ASemaphore; NUM_THREADS] = [const { ASemaphore::new(0) }; NUM_THREADS];
59+
60+
static BACK_SEM: ASemaphore = ASemaphore::new(0);
61+
62+
/// Main command
63+
static MAIN_CMD: Channel<CriticalSectionRawMutex, Command, 1> = Channel::new();
64+
65+
impl AsyncTests {
66+
/// Construct a new set of tests, firing up the threads for the differernt priority executors.
67+
pub fn new(count: usize) -> Self {
68+
// printkln!("Starting executors");
69+
let spawners = [0, 1, 2].map(|id| {
70+
let thread = executor_thread(&EXECUTORS[id], &SPAWNERS[id]);
71+
thread.set_priority(id as c_int + 5);
72+
thread.start();
73+
74+
zephyr::time::sleep(zephyr::time::Duration::millis_at_least(1));
75+
if let Some(sp) = SPAWNERS[id].try_take() {
76+
sp
77+
} else {
78+
panic!("Executor thread did not initialize properly");
79+
}
80+
});
81+
82+
Self {
83+
sems: &SEMS[0..count],
84+
back_sem: &BACK_SEM,
85+
spawners,
86+
results: Channel::new(),
87+
main_started: AtomicBool::new(false),
88+
end_wait: zephyr::sys::sync::Semaphore::new(0, 1),
89+
}
90+
}
91+
92+
/// Run one of the given tests.
93+
///
94+
/// Fires off the appropriate workers for the given test. Tests follow a basic pattern:
95+
/// There are NUM_THREADS + 2 tasks spawned. These communicate as appropriate for the given
96+
/// test, and the results are then collected, waiting for everything to finish, and reported.
97+
pub fn run(&'static self, command: Command) {
98+
if !self.main_started.load(Ordering::Relaxed) {
99+
self.spawners[1].spawn(main_run(self)).unwrap();
100+
self.main_started.store(true, Ordering::Relaxed);
101+
}
102+
103+
if MAIN_CMD.try_send(command).is_err() {
104+
panic!("Main queue filled up");
105+
}
106+
107+
// Wait for the Zephyr semaphore indicating the test is finished.
108+
self.end_wait.take(zephyr::time::Forever).unwrap();
109+
}
110+
111+
/// A simple semaphore test. Tests the time to use the semaphore with no blocking.
112+
async fn simple_sem(&self, id: usize, count: usize) -> usize {
113+
let sem = &self.sems[id];
114+
115+
for _ in 0..count {
116+
sem.release(1);
117+
let rel = sem.acquire(1).await.unwrap();
118+
rel.disarm();
119+
}
120+
121+
count
122+
}
123+
124+
/// A simple semaphore test, with yield. Tests the time to use the semaphore with no blocking.
125+
async fn simple_sem_yield(&self, id: usize, count: usize) -> usize {
126+
let sem = &self.sems[id];
127+
128+
for _ in 0..count {
129+
sem.release(1);
130+
let rel = sem.acquire(1).await.unwrap();
131+
rel.disarm();
132+
yield_now().await;
133+
}
134+
135+
count
136+
}
137+
138+
/// The taker side of the SemWait test.
139+
async fn sem_wait_taker(&self, id: usize, count: usize) -> usize {
140+
let sem = &self.sems[id];
141+
for _ in 0..count {
142+
let rel = sem.acquire(1).await.unwrap();
143+
rel.disarm();
144+
}
145+
count
146+
}
147+
148+
/// The giver side of the SemWait test.
149+
async fn sem_wait_giver(&self, count: usize) {
150+
for _ in 0..count {
151+
for sem in self.sems {
152+
sem.release(1);
153+
}
154+
}
155+
}
156+
157+
/// The taker side of the SemPingPong test.
158+
async fn sem_ping_pong_taker(&self, count: usize) -> usize {
159+
let sem = &self.sems[0];
160+
let back_sem = self.back_sem;
161+
162+
// zephyr::printkln!("Taking {count} sems");
163+
for _ in 0..count {
164+
//zephyr::printkln!("acquire1");
165+
let rel = sem.acquire(1).await.unwrap();
166+
//zephyr::printkln!("acquired1");
167+
rel.disarm();
168+
//zephyr::printkln!("release2");
169+
back_sem.release(1);
170+
}
171+
// zephyr::printkln!("Taking sems done");
172+
173+
count
174+
}
175+
176+
/// The giver side of the SemPingPong test. This uses the first sem and back sem to ping pong
177+
/// across all of the workers.
178+
async fn sem_ping_pong_giver(&self, count: usize) {
179+
let sem = &self.sems[0];
180+
let back_sem = self.back_sem;
181+
182+
// zephyr::printkln!("Giving {},{} sems", count, self.sems.len());
183+
for _ in 0..count {
184+
for _ in 0..self.sems.len() {
185+
//zephyr::printkln!("release1");
186+
sem.release(1);
187+
//zephyr::printkln!("acquire2");
188+
let rel = back_sem.acquire(1).await.unwrap();
189+
//zephyr::printkln!("acquired2");
190+
rel.disarm();
191+
}
192+
}
193+
// zephyr::printkln!("Giving sems done");
194+
}
195+
}
196+
197+
/// The low priority worker, depending on the test, performs some operations at a lower priority
198+
/// than the main threads.
199+
#[embassy_executor::task]
200+
async fn low_worker(this: &'static AsyncTests, command: Command) {
201+
match command {
202+
Command::Empty => (),
203+
Command::SimpleSem(_) => (),
204+
Command::SimpleSemYield(_) => (),
205+
Command::SemWait(count) => this.sem_wait_giver(count).await,
206+
Command::SemWaitSame(count) => this.sem_wait_giver(count).await,
207+
Command::SemPingPong(count) => this.sem_ping_pong_giver(count).await,
208+
Command::SemOnePingPong(count) => this.sem_ping_pong_giver(count).await,
209+
command => panic!("Not implemented: {:?}", command),
210+
}
211+
212+
this.results.send(TestResult::Low).await;
213+
}
214+
215+
/// The high priority worker, performs some operations at a higher priority than the main threads.
216+
#[embassy_executor::task]
217+
async fn high_worker(this: &'static AsyncTests, command: Command) {
218+
// printkln!("high_worker started");
219+
match command {
220+
Command::Empty => (),
221+
Command::SimpleSem(_) => (),
222+
Command::SimpleSemYield(_) => (),
223+
Command::SemWait(_) => (),
224+
Command::SemWaitSame(_) => (),
225+
Command::SemPingPong(_) => (),
226+
Command::SemOnePingPong(_) => (),
227+
command => panic!("Not implemented: {:?}", command),
228+
}
229+
230+
this.results.send(TestResult::High).await;
231+
}
232+
233+
/// The main worker threads.
234+
///
235+
/// These perform the main work of the test, generally communicating with either the main test task,
236+
/// which is in the same executor, or with the higher or lower priority worker.
237+
#[embassy_executor::task(pool_size = NUM_THREADS)]
238+
async fn worker(this: &'static AsyncTests, command: Command, id: usize) {
239+
let total;
240+
match command {
241+
Command::Empty => total = 1,
242+
Command::SimpleSem(count) => total = this.simple_sem(id, count).await,
243+
Command::SimpleSemYield(count) => total = this.simple_sem_yield(id, count).await,
244+
Command::SemWait(count) => total = this.sem_wait_taker(id, count).await,
245+
Command::SemWaitSame(count) => total = this.sem_wait_taker(id, count).await,
246+
Command::SemPingPong(count) => total = this.sem_ping_pong_taker(count).await,
247+
Command::SemOnePingPong(count) => total = this.sem_ping_pong_taker(count).await,
248+
command => panic!("Not implemented: {:?}", command),
249+
}
250+
251+
this.results
252+
.send(TestResult::Worker { id, count: total })
253+
.await;
254+
}
255+
256+
/// Main worker.
257+
///
258+
/// This task performs the main work.
259+
#[embassy_executor::task]
260+
async fn main_run(this: &'static AsyncTests) {
261+
loop {
262+
let command = MAIN_CMD.receive().await;
263+
let msg = format!("async {:?}", command);
264+
265+
let mut timer = BenchTimer::new(&msg, 1);
266+
267+
let ntasks = this.sems.len();
268+
269+
// Before starting, reset the semaphores.
270+
for sem in this.sems {
271+
sem.set(0);
272+
}
273+
this.back_sem.set(0);
274+
275+
if true {
276+
this.spawners[2].spawn(high_worker(this, command)).unwrap();
277+
if command.is_same_priority() {
278+
this.spawners[1].spawn(low_worker(this, command)).unwrap();
279+
} else {
280+
this.spawners[0].spawn(low_worker(this, command)).unwrap();
281+
}
282+
} else {
283+
this.spawners[1].spawn(high_worker(this, command)).unwrap();
284+
this.spawners[1].spawn(low_worker(this, command)).unwrap();
285+
}
286+
287+
for id in 0..ntasks {
288+
this.spawners[1].spawn(worker(this, command, id)).unwrap();
289+
}
290+
291+
let mut results: heapless::Vec<Option<usize>, NUM_THREADS> = heapless::Vec::new();
292+
let mut low = false;
293+
let mut high = false;
294+
let mut msg_count = (2 + ntasks) as isize;
295+
296+
for _ in 0..ntasks {
297+
results.push(None).unwrap();
298+
}
299+
300+
loop {
301+
match this.results.receive().await {
302+
TestResult::Worker { id, count } => {
303+
if results[id].replace(count).is_some() {
304+
panic!("Multiple results from worker {}", id);
305+
}
306+
}
307+
TestResult::Low => {
308+
if low {
309+
panic!("Multiple results from 'low' worker");
310+
}
311+
low = true;
312+
}
313+
TestResult::High => {
314+
if high {
315+
panic!("Multiple results from 'high' worker");
316+
}
317+
high = true;
318+
}
319+
}
320+
msg_count -= 1;
321+
if msg_count <= 0 {
322+
break;
323+
}
324+
}
325+
326+
let count: usize = results.iter().map(|x| x.unwrap_or(0)).sum();
327+
timer.adjust_count(count);
328+
329+
timer.stop();
330+
this.end_wait.give();
331+
}
332+
}
333+
334+
/// Main for each executor.
335+
///
336+
/// This thread starts up an executor, and then places that executor into a 'signal' so the
337+
/// non-async code can get the value.
338+
#[zephyr::thread(stack_size = THREAD_STACK_SIZE, pool_size = 3)]
339+
fn executor_thread(exec: &'static StaticCell<Executor>, spawner_sig: &'static SpawnerSignal) -> ! {
340+
let exec = exec.init(Executor::new());
341+
exec.run(|spawner| {
342+
// TODO: Should we try to have a local spawner as well?
343+
spawner_sig.signal(spawner.make_send());
344+
})
345+
}
346+
347+
// For debugging
348+
#[unsafe(no_mangle)]
349+
extern "C" fn invalid_spinlock(_l: *mut zephyr::raw::k_spinlock, _thread: u32, _id: u32) {}

0 commit comments

Comments
 (0)