Skip to content

Commit 753cde2

Browse files
committed
zephyr: sync: channel: Implement bounded channels
In addition to unbounded channels, also implement bounded channels. These use pre-allocated message buffers, where send as well as recv can block. They are implemented by using two `z_fifo`s to hold the items, treating one as a free list. Signed-off-by: David Brown <[email protected]>
1 parent b8d92ed commit 753cde2

File tree

1 file changed

+106
-1
lines changed

1 file changed

+106
-1
lines changed

zephyr/src/sync/channel.rs

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ extern crate alloc;
1515

1616
use alloc::boxed::Box;
1717

18+
use core::cell::UnsafeCell;
1819
use core::ffi::c_void;
1920
use core::fmt;
2021
use core::marker::PhantomData;
22+
use core::mem::MaybeUninit;
2123

2224
use crate::sys::queue::Queue;
2325

@@ -62,6 +64,29 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
6264
unbounded_from(Queue::new().unwrap())
6365
}
6466

67+
/// Create a multi-producer multi-consumer channel with bounded capacity.
68+
///
69+
/// The messages are allocated at channel creation time. If there are no messages at `send` time,
70+
/// send will block (possibly waiting for a timeout).
71+
///
72+
/// At this time, Zephyr does not support crossbeam's 0 capacity queues, which are also called
73+
/// a rendezvous, where both threads wait until in the same region. `bounded` will panic if called
74+
/// with a capacity of zero.
75+
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
76+
if cap == 0 {
77+
panic!("Zero capacity queues no supported on Zephyr");
78+
}
79+
80+
let (s, r) = counter::new(Bounded::new(cap));
81+
let s = Sender {
82+
flavor: SenderFlavor::Bounded(s),
83+
};
84+
let r = Receiver {
85+
flavor: ReceiverFlavor::Bounded(r),
86+
};
87+
(s, r)
88+
}
89+
6590
/// The underlying type for Messages through Zephyr's [`Queue`].
6691
///
6792
/// This wrapper is used internally to wrap user messages through the queue. It is not useful in
@@ -103,6 +128,15 @@ impl<T> Sender<T> {
103128
queue.send(msg as *mut c_void);
104129
}
105130
}
131+
SenderFlavor::Bounded(chan) => {
132+
// Retrieve a message buffer from the free list.
133+
let buf = unsafe { chan.free.recv() };
134+
let buf = buf as *mut Message<T>;
135+
unsafe {
136+
buf.write(Message::new(msg));
137+
chan.chan.send(buf as *mut c_void);
138+
}
139+
}
106140
}
107141
Ok(())
108142
}
@@ -119,6 +153,13 @@ impl<T> Drop for Sender<T> {
119153
})
120154
}
121155
}
156+
SenderFlavor::Bounded(chan) => {
157+
unsafe {
158+
chan.release(|_| {
159+
panic!("Bounded queues cannot be dropped");
160+
})
161+
}
162+
}
122163
}
123164
}
124165
}
@@ -132,6 +173,9 @@ impl<T> Clone for Sender<T> {
132173
_phantom: PhantomData,
133174
}
134175
}
176+
SenderFlavor::Bounded(chan) => {
177+
SenderFlavor::Bounded(chan.acquire())
178+
}
135179
};
136180

137181
Sender { flavor }
@@ -144,7 +188,8 @@ enum SenderFlavor<T> {
144188
Unbounded {
145189
queue: counter::Sender<Queue>,
146190
_phantom: PhantomData<T>,
147-
}
191+
},
192+
Bounded(counter::Sender<Bounded<T>>),
148193
}
149194

150195
impl<T: fmt::Debug> fmt::Debug for Sender<T> {
@@ -178,6 +223,17 @@ impl<T> Receiver<T> {
178223
let msg = unsafe { Box::from_raw(msg) };
179224
Ok(msg.data)
180225
}
226+
ReceiverFlavor::Bounded(chan) => {
227+
let rawbuf = unsafe {
228+
chan.chan.recv()
229+
};
230+
let buf = rawbuf as *mut Message<T>;
231+
let msg: Message<T> = unsafe { buf.read() };
232+
unsafe {
233+
chan.free.send(buf as *mut c_void);
234+
}
235+
Ok(msg.data)
236+
}
181237
}
182238
}
183239
}
@@ -193,6 +249,13 @@ impl<T> Drop for Receiver<T> {
193249
})
194250
}
195251
}
252+
ReceiverFlavor::Bounded(chan) => {
253+
unsafe {
254+
chan.release(|_| {
255+
panic!("Bounded channels cannot be dropped");
256+
})
257+
}
258+
}
196259
}
197260
}
198261
}
@@ -206,6 +269,9 @@ impl<T> Clone for Receiver<T> {
206269
_phantom: PhantomData,
207270
}
208271
}
272+
ReceiverFlavor::Bounded(chan) => {
273+
ReceiverFlavor::Bounded(chan.acquire())
274+
}
209275
};
210276

211277
Receiver { flavor }
@@ -224,6 +290,45 @@ enum ReceiverFlavor<T> {
224290
Unbounded {
225291
queue: counter::Receiver<Queue>,
226292
_phantom: PhantomData<T>,
293+
},
294+
Bounded(counter::Receiver<Bounded<T>>),
295+
}
296+
297+
type Slot<T> = UnsafeCell<MaybeUninit<Message<T>>>;
298+
299+
/// Bounded channel implementation.
300+
struct Bounded<T> {
301+
/// The messages themselves. This Box owns the allocation of the messages, although it is
302+
/// unsafe to drop this with any messages stored in either of the Zephyr queues.
303+
///
304+
/// The UnsafeCell is needed to indicate that this data is handled outside of what Rust is aware
305+
/// of. MaybeUninit allows us to create this without allocation.
306+
_slots: Box<[Slot<T>]>,
307+
/// The free queue, holds messages that aren't be used.
308+
free: Queue,
309+
/// The channel queue. These are messages that have been sent and are waiting to be received.
310+
chan: Queue,
311+
}
312+
313+
impl<T> Bounded<T> {
314+
fn new(cap: usize) -> Self {
315+
let slots: Box<[Slot<T>]> = (0..cap)
316+
.map(|_| {
317+
UnsafeCell::new(MaybeUninit::uninit())
318+
})
319+
.collect();
320+
321+
let free = Queue::new().unwrap();
322+
let chan = Queue::new().unwrap();
323+
324+
// Add each of the boxes to the free list.
325+
for chan in &slots {
326+
unsafe {
327+
free.send(chan.get() as *mut c_void);
328+
}
329+
}
330+
331+
Bounded { _slots: slots, free, chan }
227332
}
228333
}
229334

0 commit comments

Comments
 (0)