Skip to content

Commit 7e9d994

Browse files
teburdfabiobaltieri
authored andcommitted
samples: rtio: Simple producer consumer sample
A simple sample showing how an interrupt (or thread) could produce data a thread (potentially a user mode thread) could then consume. Some great suggestions added thanks to Luis Ubieda <[email protected]> to show multishot in use which avoids extra syscalls in the tight processing loop of the consumer. Signed-off-by: Tom Burdick <[email protected]>
1 parent 9158d90 commit 7e9d994

File tree

5 files changed

+209
-0
lines changed

5 files changed

+209
-0
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Copyright (c) 2025 Intel Corporation
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
cmake_minimum_required(VERSION 3.20.0)
6+
7+
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
8+
project(rtio_producer_consumer)
9+
10+
FILE(GLOB app_sources src/*.c)
11+
target_sources(app PRIVATE ${app_sources})
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
.. zephyr:code-sample:: producer_consumer
2+
:name: Producer Consumer
3+
:relevant-api: rtio
4+
5+
Implement a producer->consumer pipe using RTIO.
6+
7+
Overview
8+
********
9+
10+
A simple sample that be used with any :ref:`supported board <boards>` showing a
11+
producer and consumer pattern implemented using RTIO. In this case the producer
12+
is a k_timer generating cycle accurate timestamps of when the timer callback ran
13+
in an ISR.
14+
15+
Building and Running
16+
********************
17+
18+
The sample can be built for the native_sim target and run as follows:
19+
20+
.. zephyr-app-commands::
21+
:zephyr-app: samples/subsys/rtio/producer_consumer
22+
:board: native_sim
23+
:goals: build run
24+
:compact:
25+
26+
When running, the output on the console shows the operations of submitting for,
27+
consuming, and processing the data.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
CONFIG_LOG=y
2+
CONFIG_LOG_MODE_MINIMAL=y
3+
CONFIG_RTIO=y
4+
CONFIG_RTIO_SYS_MEM_BLOCKS=y
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
sample:
2+
name: RTIO producer consumer sample
3+
tests:
4+
sample.rtio.producer_consumer:
5+
tags: rtio
6+
integration_platforms:
7+
- native_sim
8+
harness: console
9+
harness_config:
10+
type: multi_line
11+
regex:
12+
- "(.*)producer (.*) trigger"
13+
- "(.*)buf (.*), len 8"
14+
- "(.*)read result 0, cycle count is (.*)"
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright (c) 2025 Intel Corporation
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
#include <zephyr/kernel.h>
8+
#include <zephyr/rtio/rtio.h>
9+
#include <zephyr/logging/log.h>
10+
11+
LOG_MODULE_REGISTER(main, LOG_LEVEL_DBG);
12+
13+
/* Our producer is a timer (interrupt) but could be a thread as well */
14+
struct producer {
15+
struct mpsc io_q;
16+
};
17+
18+
/* Our producer function, could be done in a thread or interrupt, here
19+
* we are producing cycle count values from a timer interrupt.
20+
*/
21+
static void producer_periodic(struct k_timer *timer)
22+
{
23+
struct producer *p = timer->user_data;
24+
25+
LOG_INF("producer %p trigger", (void *)p);
26+
27+
struct mpsc_node *n = mpsc_pop(&p->io_q);
28+
29+
if (n == NULL) {
30+
LOG_WRN("producer overflowed consumers");
31+
return;
32+
}
33+
34+
struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(n, struct rtio_iodev_sqe, q);
35+
36+
/* Only accept read/rx requests */
37+
if (iodev_sqe->sqe.op != RTIO_OP_RX) {
38+
rtio_iodev_sqe_err(iodev_sqe, -EINVAL);
39+
return;
40+
}
41+
42+
/* Get the rx buffer with a minimum/maximum size pair to fill with the current time */
43+
uint8_t *buf = NULL;
44+
uint32_t buf_len = 0;
45+
int rc = rtio_sqe_rx_buf(iodev_sqe, sizeof(uint64_t), sizeof(uint64_t), &buf, &buf_len);
46+
47+
LOG_INF("buf %p, len %u", (void *)buf, buf_len);
48+
49+
if (rc < 0) {
50+
/* buffer wasn't available or too small */
51+
rtio_iodev_sqe_err(iodev_sqe, -ENOMEM);
52+
return;
53+
}
54+
55+
/* We now have a buffer we can produce data into and then signal back to the consumer */
56+
uint64_t *cycle_count = (uint64_t *)buf;
57+
58+
/* "Produce" a timestamp */
59+
*cycle_count = k_cycle_get_64();
60+
61+
/* Signal read has completed */
62+
rtio_iodev_sqe_ok(iodev_sqe, 0);
63+
}
64+
65+
/* Accept incoming commands (e.g. read requests), could come from multiple sources
66+
* so the only real safe thing to do here is put it into the lock free queue
67+
*/
68+
static void producer_submit(struct rtio_iodev_sqe *iodev_sqe)
69+
{
70+
struct mpsc *producer_ioq = iodev_sqe->sqe.iodev->data;
71+
72+
mpsc_push(producer_ioq, &iodev_sqe->q);
73+
}
74+
75+
K_TIMER_DEFINE(producer_tmr, producer_periodic, NULL);
76+
static struct producer producer_data;
77+
const struct rtio_iodev_api producer_api = {.submit = producer_submit};
78+
79+
/* Setup our i/o device, akin to a file handle we can read from */
80+
RTIO_IODEV_DEFINE(producer_iodev, &producer_api, &producer_data);
81+
82+
/* Setup our pair of queues for our consumer, with 1 submission and 1 completion available */
83+
RTIO_DEFINE(rconsumer, 1, 1);
84+
85+
int consumer_loop(struct rtio *consumer, struct rtio_iodev *producer)
86+
{
87+
/* We can share memory with kernel space without any work, to share
88+
* between usermode threads we'd need a k_mem_partition added to
89+
* both domains instead
90+
*/
91+
uint64_t cycle_count;
92+
93+
/* Our read submission and completion pair */
94+
struct rtio_sqe read_sqe;
95+
struct rtio_cqe read_cqe;
96+
struct rtio_sqe *read_sqe_handle;
97+
98+
/* Helper that sets up the submission to be a read request, reading *directly*
99+
* into the given buffer pointer without copying
100+
*/
101+
rtio_sqe_prep_read(&read_sqe, producer, RTIO_PRIO_NORM, (uint8_t *)&cycle_count,
102+
sizeof(cycle_count), NULL);
103+
104+
/* We can automatically have this read request resubmitted for us */
105+
read_sqe.flags |= RTIO_SQE_MULTISHOT;
106+
107+
/* A syscall to copy the control structure (sqe) into kernel mode, and get a handle out
108+
* so we can cancel it later if we want
109+
*/
110+
rtio_sqe_copy_in_get_handles(consumer, &read_sqe, &read_sqe_handle, 1);
111+
112+
/* A syscall to submit the queued up requests (there could be many) to all iodevs */
113+
rtio_submit(consumer, 0);
114+
115+
/* A consumer loop that waits for read completions in a single syscall
116+
*
117+
* This never ends but to end the loop we'd cancel the requests to read.
118+
*
119+
* NOTE: There could be multiple read requests out to multiple producers we could
120+
* be waiting on!
121+
*/
122+
while (true) {
123+
/* A syscall to consume a completion, waiting forever for it to arrive */
124+
rtio_cqe_copy_out(consumer, &read_cqe, 1, K_FOREVER);
125+
126+
/* The read has been completed, and its safe to read the value until
127+
* we attach and submit a request to read into it again
128+
*/
129+
LOG_INF("read result %d, cycle count is %llu", read_cqe.result, cycle_count);
130+
}
131+
132+
return 0;
133+
}
134+
135+
int main(void)
136+
{
137+
/* init stuff */
138+
mpsc_init(&producer_data.io_q);
139+
producer_tmr.user_data = &producer_data;
140+
141+
/* Start our producer task (timer interrupt based) */
142+
k_timer_start(&producer_tmr, K_MSEC(100), K_MSEC(100));
143+
144+
/* We can enter usermode here with a little work to setup k_objects for the iodev
145+
* and struct rtio context
146+
* E.g.
147+
* k_object_access_grant(&producer, k_current_get());
148+
* k_object_access_grant(&consumer, k_current_get());
149+
* k_thread_user_mode_enter(consumer_loop, &producer, &consumer, NULL);
150+
*/
151+
152+
return consumer_loop(&rconsumer, &producer_iodev);
153+
}

0 commit comments

Comments
 (0)