Skip to content

Commit 74ec8a8

Browse files
committed
zephyr: work: Change WorkQueue from dynamic to static
Change this declaration from a dynamic allocated type that must never be dropped to a static declaration. This is in preparation for a macro to help with the declaration. Signed-off-by: David Brown <[email protected]>
1 parent 3b0c2a1 commit 74ec8a8

File tree

1 file changed

+75
-115
lines changed

1 file changed

+75
-115
lines changed

zephyr/src/work.rs

Lines changed: 75 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -178,149 +178,105 @@
178178
extern crate alloc;
179179

180180
use core::{
181-
cell::UnsafeCell,
182-
ffi::{c_int, c_uint, CStr},
181+
cell::{RefCell, UnsafeCell},
182+
ffi::{c_int, c_uint},
183183
future::Future,
184184
mem,
185185
pin::Pin,
186-
ptr,
186+
sync::atomic::Ordering,
187187
task::Poll,
188188
};
189189

190+
use critical_section::Mutex;
191+
use portable_atomic::AtomicBool;
192+
use portable_atomic_util::Arc;
190193
use zephyr_sys::{
191194
k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise,
192195
k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init,
193-
k_work_queue_start, k_work_submit, k_work_submit_to_queue, ETIMEDOUT,
196+
k_work_queue_start, k_work_submit, k_work_submit_to_queue, z_thread_stack_element, ETIMEDOUT,
194197
};
195198

196199
use crate::{
197-
error::to_result_void,
198-
kio::ContextExt,
199-
object::Fixed,
200-
simpletls::SimpleTls,
201-
sync::{Arc, Mutex},
202-
sys::thread::ThreadStack,
203-
time::Timeout,
200+
error::to_result_void, kio::ContextExt, object::Fixed, simpletls::SimpleTls, time::Timeout,
204201
};
205202

206203
pub mod futures;
207204

208-
/// A builder for work queues themselves.
209-
///
210-
/// A work queue is a Zephyr thread that instead of directly running a piece of code, manages a work
211-
/// queue. Various types of `Work` can be submitted to these queues, along with various types of
212-
/// triggering conditions.
213-
pub struct WorkQueueBuilder {
214-
/// The "config" value passed in.
205+
/// A static declaration of a work-queue. This associates a work queue, with a stack, and an atomic
206+
/// to determine if it has been initialized.
207+
// TODO: Remove the pub on the fields, and make a constructor.
208+
pub struct WorkQueueDecl<const SIZE: usize> {
209+
queue: WorkQueue,
210+
stack: &'static crate::thread::ThreadStack<SIZE>,
215211
config: k_work_queue_config,
216-
/// Priority for the work queue thread.
217212
priority: c_int,
213+
started: AtomicBool,
218214
}
219215

220-
impl WorkQueueBuilder {
221-
/// Construct a new WorkQueueBuilder with default values.
222-
pub fn new() -> Self {
216+
// SAFETY: Sync is needed here to make a static declaration, despite the `*const i8` that is burried
217+
// in the config.
218+
unsafe impl<const SIZE: usize> Sync for WorkQueueDecl<SIZE> {}
219+
220+
impl<const SIZE: usize> WorkQueueDecl<SIZE> {
221+
/// Static constructor. Mostly for use by the macro.
222+
pub const fn new(
223+
stack: &'static crate::thread::ThreadStack<SIZE>,
224+
config: k_work_queue_config,
225+
priority: c_int,
226+
) -> Self {
223227
Self {
224-
config: k_work_queue_config {
225-
name: ptr::null(),
226-
no_yield: false,
227-
essential: false,
228-
},
229-
priority: 0,
228+
queue: unsafe { mem::zeroed() },
229+
stack,
230+
config,
231+
priority,
232+
started: AtomicBool::new(false),
230233
}
231234
}
232235

233-
/// Set the name for the WorkQueue thread.
234-
///
235-
/// This name shows up in debuggers and some analysis tools.
236-
pub fn set_name(&mut self, name: &'static CStr) -> &mut Self {
237-
self.config.name = name.as_ptr();
238-
self
239-
}
240-
241-
/// Set the "no yield" flag for the created worker.
242-
///
243-
/// If this is not set, the work queue will call `k_yield` between each enqueued work item. For
244-
/// non-preemptible threads, this will allow other threads to run. For preemptible threads,
245-
/// this will allow other threads at the same priority to run.
246-
///
247-
/// This method has a negative in the name, which goes against typical conventions. This is
248-
/// done to match the field in the Zephyr config.
249-
pub fn set_no_yield(&mut self, value: bool) -> &mut Self {
250-
self.config.no_yield = value;
251-
self
252-
}
253-
254-
/// Set the "essential" flag for the created worker.
255-
///
256-
/// This sets the essential flag on the running thread. The system considers the termination of
257-
/// an essential thread to be a fatal error.
258-
pub fn set_essential(&mut self, value: bool) -> &mut Self {
259-
self.config.essential = value;
260-
self
261-
}
262-
263-
/// Set the priority for the worker thread.
264-
///
265-
/// See the Zephyr docs for the meaning of priority.
266-
pub fn set_priority(&mut self, value: c_int) -> &mut Self {
267-
self.priority = value;
268-
self
269-
}
270-
271-
/// Start the given work queue thread.
272-
///
273-
/// TODO: Implement a 'start' that works from a static work queue.
274-
pub fn start(&self, stack: ThreadStack) -> WorkQueue {
275-
let item: Fixed<k_work_q> = Fixed::new(unsafe { mem::zeroed() });
276-
unsafe {
277-
// SAFETY: Initialize zeroed memory.
278-
k_work_queue_init(item.get());
279-
280-
// SAFETY: This associates the workqueue with the thread ID that runs it. The thread is
281-
// a pointer into this work item, which will not move, because of the Fixed.
282-
let this = &mut *item.get();
283-
WORK_QUEUES
284-
.lock()
285-
.unwrap()
286-
.insert(&this.thread, WorkQueueRef(item.get()));
287-
288-
// SAFETY: Start work queue thread. The main issue here is that the work queue cannot
289-
// be deallocated once the thread has started. We enforce this by making Drop panic.
290-
k_work_queue_start(
291-
item.get(),
292-
stack.base,
293-
stack.size,
294-
self.priority,
295-
&self.config,
296-
);
297-
}
298-
299-
WorkQueue { item }
236+
/// Start the work queue thread, if needed, and return a reference to it.
237+
pub fn start(&'static self) -> &'static WorkQueue {
238+
critical_section::with(|cs| {
239+
if self.started.load(Ordering::Relaxed) {
240+
// Already started, just return it.
241+
return &self.queue;
242+
}
243+
244+
// SAFETY: Starting is coordinated by the atomic, as well as being protected in a
245+
// critical section.
246+
unsafe {
247+
let this = &mut *self.queue.item.get();
248+
249+
k_work_queue_init(self.queue.item.get());
250+
251+
// Add to the WORK_QUEUES data. That needs to be changed to a critical
252+
// section Mutex from a Zephyr Mutex, as that would deadlock if called while in a
253+
// critrical section.
254+
let mut tls = WORK_QUEUES.borrow_ref_mut(cs);
255+
tls.insert(&this.thread, WorkQueueRef(self.queue.item.get()));
256+
257+
// Start the work queue thread.
258+
k_work_queue_start(
259+
self.queue.item.get(),
260+
self.stack.data.get() as *mut z_thread_stack_element,
261+
self.stack.size(),
262+
self.priority,
263+
&self.config,
264+
);
265+
}
266+
267+
&self.queue
268+
})
300269
}
301270
}
302271

303272
/// A running work queue thread.
304273
///
305-
/// # Panic
306-
///
307-
/// Allowing a work queue to drop will result in a panic. There are two ways to handle this,
308-
/// depending on whether the WorkQueue is in a Box, or an Arc:
309-
/// ```
310-
/// // Leak a work queue in an Arc.
311-
/// let wq = Arc::new(WorkQueueBuilder::new().start(...));
312-
/// // If the Arc is used after this:
313-
/// let _ = Arc::into_raw(wq.clone());
314-
/// // If the Arc is no longer needed:
315-
/// let _ = Arc::into_raw(wq);
316-
///
317-
/// // Leak a work queue in a Box.
318-
/// let wq = Box::new(WorkQueueBuilder::new().start(...));
319-
/// let _ = Box::leak(wq);
320-
/// ```
274+
/// This must be declared statically, and initialized once. Please see the macro
275+
/// [`define_work_queue`] which declares this with a [`StaticWorkQueue`] to help with the
276+
/// association with a stack, and making sure the queue is only started once.
321277
pub struct WorkQueue {
322278
#[allow(dead_code)]
323-
item: Fixed<k_work_q>,
279+
item: UnsafeCell<k_work_q>,
324280
}
325281

326282
/// Work queues can be referenced from multiple threads, and thus are Send and Sync.
@@ -348,7 +304,8 @@ impl Drop for WorkQueue {
348304
///
349305
/// This is a little bit messy as we don't have a lazy mechanism, so we have to handle this a bit
350306
/// manually right now.
351-
static WORK_QUEUES: Mutex<SimpleTls<WorkQueueRef>> = Mutex::new(SimpleTls::new());
307+
static WORK_QUEUES: Mutex<RefCell<SimpleTls<WorkQueueRef>>> =
308+
Mutex::new(RefCell::new(SimpleTls::new()));
352309

353310
/// For the queue mapping, we need a simple wrapper around the underlying pointer, one that doesn't
354311
/// implement stop.
@@ -361,7 +318,7 @@ unsafe impl Sync for WorkQueueRef {}
361318

362319
/// Retrieve the current work queue, if we are running within one.
363320
pub fn get_current_workq() -> Option<*mut k_work_q> {
364-
WORK_QUEUES.lock().unwrap().get().map(|wq| wq.0)
321+
critical_section::with(|cs| WORK_QUEUES.borrow_ref(cs).get().map(|wq| wq.0))
365322
}
366323

367324
/// A Rust wrapper for `k_poll_signal`.
@@ -628,7 +585,10 @@ impl<T: SimpleAction + Send> Work<T> {
628585
/// Submit this work to a specified work queue.
629586
///
630587
/// TODO: Change when we have better wrappers for work queues.
631-
pub fn submit_to_queue(this: Pin<Arc<Self>>, queue: &WorkQueue) -> crate::Result<SubmitResult> {
588+
pub fn submit_to_queue(
589+
this: Pin<Arc<Self>>,
590+
queue: &'static WorkQueue,
591+
) -> crate::Result<SubmitResult> {
632592
let work = this.work.get();
633593

634594
// "leak" the arc to give to C. We'll reconstruct it in the handler.

0 commit comments

Comments
 (0)