From 142eba8b14d4ca0fec2a08f3f81a7086a74e0006 Mon Sep 17 00:00:00 2001 From: Tom Burdick Date: Mon, 27 Feb 2023 10:09:45 -0600 Subject: [PATCH 1/8] rtio: Clear flags in prep helpers Tests were failing when the sqe was allocated on the stack for some architectures. Initialization of variables on the stack is not guaranteed to be zeroed like static data. This caused undefined behavior. Secondly if the sqe is recycled and had a flag set, there would unexpected behavior as well. Signed-off-by: Tom Burdick --- include/zephyr/rtio/rtio.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/zephyr/rtio/rtio.h b/include/zephyr/rtio/rtio.h index 516072a9e204f..8a40b867eea18 100644 --- a/include/zephyr/rtio/rtio.h +++ b/include/zephyr/rtio/rtio.h @@ -318,6 +318,7 @@ static inline void rtio_sqe_prep_nop(struct rtio_sqe *sqe, void *userdata) { sqe->op = RTIO_OP_NOP; + sqe->flags = 0; sqe->iodev = iodev; sqe->userdata = userdata; } @@ -334,6 +335,7 @@ static inline void rtio_sqe_prep_read(struct rtio_sqe *sqe, { sqe->op = RTIO_OP_RX; sqe->prio = prio; + sqe->flags = 0; sqe->iodev = iodev; sqe->buf_len = len; sqe->buf = buf; @@ -352,6 +354,7 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe, { sqe->op = RTIO_OP_TX; sqe->prio = prio; + sqe->flags = 0; sqe->iodev = iodev; sqe->buf_len = len; sqe->buf = buf; From fb07f543cedaf4ea0adebee423410990c74c1925 Mon Sep 17 00:00:00 2001 From: Tom Burdick Date: Mon, 27 Feb 2023 10:12:42 -0600 Subject: [PATCH 2/8] tests: rtio: Fix potential race in iodev test Race was possible though very unlikely between the atomic cas and queue push/pop operations. The outcome of the race had it shown up would have been a submission not worked on due to the timer never being started. A small critical section fixes this and clarifies where the single conumer part of the mpsc queue comes in despite there being multiple contexts which may enter that section. Signed-off-by: Tom Burdick --- .../rtio/rtio_api/src/rtio_iodev_test.h | 74 ++++++++++--------- 1 file changed, 40 insertions(+), 34 deletions(-) diff --git a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h index e0f01226cdd31..f89ada68e8433 100644 --- a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h +++ b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h @@ -13,38 +13,54 @@ #define RTIO_IODEV_TEST_H_ struct rtio_iodev_test_data { - /** - * k_timer for an asynchronous task - */ + /* k_timer for an asynchronous task */ struct k_timer timer; - /** - * Currently executing sqe - */ - atomic_ptr_t iodev_sqe; + /* Currently executing sqe */ + struct rtio_iodev_sqe *iodev_sqe; + + /* Count of submit calls */ + atomic_t submit_count; + + /* Lock around kicking off next timer */ + struct k_spinlock lock; }; -static void rtio_iodev_timer_fn(struct k_timer *tm) +static void rtio_iodev_test_next(struct rtio_iodev *iodev) { - struct rtio_iodev_test_data *data = CONTAINER_OF(tm, struct rtio_iodev_test_data, timer); - struct rtio_iodev_sqe *iodev_sqe = atomic_ptr_get(&data->iodev_sqe); - struct rtio_mpsc_node *next = - rtio_mpsc_pop((struct rtio_mpsc *)&iodev_sqe->sqe->iodev->iodev_sq); + struct rtio_iodev_test_data *data = iodev->data; + + /* The next section must be serialized to ensure single consumer semantics */ + k_spinlock_key_t key = k_spin_lock(&data->lock); + + if (data->iodev_sqe != NULL) { + goto out; + } + + struct rtio_mpsc_node *next = rtio_mpsc_pop(&iodev->iodev_sq); if (next != NULL) { struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q); - atomic_ptr_set(&data->iodev_sqe, next_sqe); - TC_PRINT("starting timer again from queued iodev_sqe %p!\n", next); + data->iodev_sqe = next_sqe; k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); - } else { - atomic_ptr_set(&data->iodev_sqe, NULL); } - /* Complete the request with Ok and a result */ - TC_PRINT("sqe ok callback\n"); +out: + k_spin_unlock(&data->lock, key); +} +static void rtio_iodev_timer_fn(struct k_timer *tm) +{ + struct rtio_iodev_test_data *data = CONTAINER_OF(tm, struct rtio_iodev_test_data, timer); + struct rtio_iodev_sqe *iodev_sqe = data->iodev_sqe; + struct rtio_iodev *iodev = (struct rtio_iodev *)iodev_sqe->sqe->iodev; + + /* Complete the request with Ok and a result, clear the current task */ rtio_iodev_sqe_ok(iodev_sqe, 0); + data->iodev_sqe = NULL; + + rtio_iodev_test_next(iodev); } static void rtio_iodev_test_submit(struct rtio_iodev_sqe *iodev_sqe) @@ -52,22 +68,12 @@ static void rtio_iodev_test_submit(struct rtio_iodev_sqe *iodev_sqe) struct rtio_iodev *iodev = (struct rtio_iodev *)iodev_sqe->sqe->iodev; struct rtio_iodev_test_data *data = iodev->data; - /* - * If a task is already going queue up the next request in the mpsc. - */ - if (!atomic_ptr_cas(&data->iodev_sqe, NULL, iodev_sqe)) { - TC_PRINT("adding queued sqe\n"); - rtio_mpsc_push(&iodev->iodev_sq, &iodev_sqe->q); - } + atomic_inc(&data->submit_count); + + /* The only safe operation is enqueuing */ + rtio_mpsc_push(&iodev->iodev_sq, &iodev_sqe->q); - /* - * Simulate an async hardware request with a one shot timer - * - * In reality the time to complete might have some significant variance - * but this is proof enough of a working API flow. - */ - TC_PRINT("starting one shot\n"); - k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); + rtio_iodev_test_next(iodev); } const struct rtio_iodev_api rtio_iodev_test_api = { @@ -79,7 +85,7 @@ void rtio_iodev_test_init(struct rtio_iodev *test) struct rtio_iodev_test_data *data = test->data; rtio_mpsc_init(&test->iodev_sq); - atomic_ptr_set(&data->iodev_sqe, NULL); + data->iodev_sqe = NULL; k_timer_init(&data->timer, rtio_iodev_timer_fn, NULL); } From ec3f6715c53b38fc6fc2bfb8abef5e8d16e101fb Mon Sep 17 00:00:00 2001 From: Tom Burdick Date: Mon, 27 Feb 2023 12:40:16 -0600 Subject: [PATCH 3/8] tests: rtio: Split test suites up into files The test suites have grown to cover different units really and having them in one file was becoming a bit much to scroll around in. Coincidentally found a accidental reuse of a define between the spsc and mpsc tests. Signed-off-by: Tom Burdick --- tests/subsys/rtio/rtio_api/CMakeLists.txt | 2 +- tests/subsys/rtio/rtio_api/src/main.c | 676 ------------------ tests/subsys/rtio/rtio_api/src/rtio_api.h | 19 + .../subsys/rtio/rtio_api/src/test_rtio_api.c | 293 ++++++++ .../subsys/rtio/rtio_api/src/test_rtio_mpsc.c | 179 +++++ .../subsys/rtio/rtio_api/src/test_rtio_spsc.c | 218 ++++++ 6 files changed, 710 insertions(+), 677 deletions(-) delete mode 100644 tests/subsys/rtio/rtio_api/src/main.c create mode 100644 tests/subsys/rtio/rtio_api/src/rtio_api.h create mode 100644 tests/subsys/rtio/rtio_api/src/test_rtio_api.c create mode 100644 tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c create mode 100644 tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c diff --git a/tests/subsys/rtio/rtio_api/CMakeLists.txt b/tests/subsys/rtio/rtio_api/CMakeLists.txt index 68fd00d94bfc8..3ece5f0c51e2d 100644 --- a/tests/subsys/rtio/rtio_api/CMakeLists.txt +++ b/tests/subsys/rtio/rtio_api/CMakeLists.txt @@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 3.20.0) find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) project(rtio_api_test) -target_sources(app PRIVATE src/main.c) +target_sources(app PRIVATE src/test_rtio_spsc.c src/test_rtio_mpsc.c src/test_rtio_api.c) target_include_directories(app PRIVATE ${ZEPHYR_BASE}/include diff --git a/tests/subsys/rtio/rtio_api/src/main.c b/tests/subsys/rtio/rtio_api/src/main.c deleted file mode 100644 index 035acf3b10734..0000000000000 --- a/tests/subsys/rtio/rtio_api/src/main.c +++ /dev/null @@ -1,676 +0,0 @@ -/* - * Copyright (c) 2021 Intel Corporation. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "rtio_iodev_test.h" - -/* - * @brief Produce and Consume a single uint32_t in the same execution context - * - * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() - * - * @ingroup rtio_tests - */ -ZTEST(rtio_spsc, test_produce_consume_size1) -{ - RTIO_SPSC_DEFINE(ezspsc, uint32_t, 1); - - const uint32_t magic = 43219876; - - uint32_t *acq = rtio_spsc_acquire(&ezspsc); - - zassert_not_null(acq, "Acquire should succeed"); - - *acq = magic; - - uint32_t *acq2 = rtio_spsc_acquire(&ezspsc); - - zassert_is_null(acq2, "Acquire should fail"); - - uint32_t *cons = rtio_spsc_consume(&ezspsc); - - zassert_is_null(cons, "Consume should fail"); - - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); - - rtio_spsc_produce(&ezspsc); - - zassert_equal(rtio_spsc_consumable(&ezspsc), 1, "Consumables should be 1"); - - uint32_t *cons2 = rtio_spsc_consume(&ezspsc); - - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); - - zassert_not_null(cons2, "Consume should not fail"); - zassert_equal(*cons2, magic, "Consume value should equal magic"); - - uint32_t *cons3 = rtio_spsc_consume(&ezspsc); - - zassert_is_null(cons3, "Consume should fail"); - - - uint32_t *acq3 = rtio_spsc_acquire(&ezspsc); - - zassert_is_null(acq3, "Acquire should not succeed"); - - rtio_spsc_release(&ezspsc); - - uint32_t *acq4 = rtio_spsc_acquire(&ezspsc); - - zassert_not_null(acq4, "Acquire should succeed"); -} - -/*&* - * @brief Produce and Consume 3 items at a time in a spsc of size 4 to validate masking - * and wrap around reads/writes. - * - * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() - * - * @ingroup rtio_tests - */ -ZTEST(rtio_spsc, test_produce_consume_wrap_around) -{ - RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); - - for (int i = 0; i < 10; i++) { - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); - for (int j = 0; j < 3; j++) { - uint32_t *entry = rtio_spsc_acquire(&ezspsc); - - zassert_not_null(entry, "Acquire should succeed"); - *entry = i * 3 + j; - rtio_spsc_produce(&ezspsc); - } - zassert_equal(rtio_spsc_consumable(&ezspsc), 3, "Consumables should be 3"); - - for (int k = 0; k < 3; k++) { - uint32_t *entry = rtio_spsc_consume(&ezspsc); - - zassert_not_null(entry, "Consume should succeed"); - zassert_equal(*entry, i * 3 + k, "Consume value should equal i*3+k"); - rtio_spsc_release(&ezspsc); - } - - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); - - } -} - -/** - * @brief Ensure that integer wraps continue to work. - * - * Done by setting all values to UINTPTR_MAX - 2 and writing and reading enough - * to ensure integer wraps occur. - */ -ZTEST(rtio_spsc, test_int_wrap_around) -{ - RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); - ezspsc._spsc.in = ATOMIC_INIT(UINTPTR_MAX - 2); - ezspsc._spsc.out = ATOMIC_INIT(UINTPTR_MAX - 2); - - for (int j = 0; j < 3; j++) { - uint32_t *entry = rtio_spsc_acquire(&ezspsc); - - zassert_not_null(entry, "Acquire should succeed"); - *entry = j; - rtio_spsc_produce(&ezspsc); - } - - zassert_equal(atomic_get(&ezspsc._spsc.in), UINTPTR_MAX + 1, "Spsc in should wrap"); - - for (int k = 0; k < 3; k++) { - uint32_t *entry = rtio_spsc_consume(&ezspsc); - - zassert_not_null(entry, "Consume should succeed"); - zassert_equal(*entry, k, "Consume value should equal i*3+k"); - rtio_spsc_release(&ezspsc); - } - - zassert_equal(atomic_get(&ezspsc._spsc.out), UINTPTR_MAX + 1, "Spsc out should wrap"); -} - -#define MAX_RETRIES 5 -#define SMP_ITERATIONS 100 - -RTIO_SPSC_DEFINE(spsc, uint32_t, 4); - -static void t1_consume(void *p1, void *p2, void *p3) -{ - struct rtio_spsc_spsc *ezspsc = p1; - uint32_t retries = 0; - uint32_t *val = NULL; - - for (int i = 0; i < SMP_ITERATIONS; i++) { - val = NULL; - retries = 0; - while (val == NULL && retries < MAX_RETRIES) { - val = rtio_spsc_consume(ezspsc); - retries++; - } - if (val != NULL) { - rtio_spsc_release(ezspsc); - } else { - k_yield(); - } - } -} - -static void t2_produce(void *p1, void *p2, void *p3) -{ - struct rtio_spsc_spsc *ezspsc = p1; - uint32_t retries = 0; - uint32_t *val = NULL; - - for (int i = 0; i < SMP_ITERATIONS; i++) { - val = NULL; - retries = 0; - while (val == NULL && retries < MAX_RETRIES) { - val = rtio_spsc_acquire(ezspsc); - retries++; - } - if (val != NULL) { - *val = SMP_ITERATIONS; - rtio_spsc_produce(ezspsc); - } else { - k_yield(); - } - } -} - -#define STACK_SIZE (384 + CONFIG_TEST_EXTRA_STACK_SIZE) -#define THREADS_NUM 2 - -struct thread_info { - k_tid_t tid; - int executed; - int priority; - int cpu_id; -}; -static struct thread_info tinfo[THREADS_NUM]; -static struct k_thread tthread[THREADS_NUM]; -static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE); - - -/** - * @brief Test that the producer and consumer are indeed thread safe - * - * This can and should be validated on SMP machines where incoherent - * memory could cause issues. - */ -ZTEST(rtio_spsc, test_spsc_threaded) -{ - - tinfo[0].tid = - k_thread_create(&tthread[0], tstack[0], STACK_SIZE, - (k_thread_entry_t)t1_consume, - &spsc, NULL, NULL, - K_PRIO_PREEMPT(5), - K_INHERIT_PERMS, K_NO_WAIT); - tinfo[1].tid = - k_thread_create(&tthread[1], tstack[1], STACK_SIZE, - (k_thread_entry_t)t2_produce, - &spsc, NULL, NULL, - K_PRIO_PREEMPT(5), - K_INHERIT_PERMS, K_NO_WAIT); - - k_thread_join(tinfo[1].tid, K_FOREVER); - k_thread_join(tinfo[0].tid, K_FOREVER); -} - -static struct rtio_mpsc push_pop_q; -static struct rtio_mpsc_node push_pop_nodes[2]; - -/* - * @brief Push and pop one element - * - * @see rtio_mpsc_push(), rtio_mpsc_pop() - * - * @ingroup rtio_tests - */ -ZTEST(rtio_mpsc, test_push_pop) -{ - - struct rtio_mpsc_node *node, *head, *stub, *next, *tail; - - rtio_mpsc_init(&push_pop_q); - - head = atomic_ptr_get(&push_pop_q.head); - tail = push_pop_q.tail; - stub = &push_pop_q.stub; - next = atomic_ptr_get(&stub->next); - - zassert_equal(head, stub, "Head should point at stub"); - zassert_equal(tail, stub, "Tail should point at stub"); - zassert_is_null(next, "Next should be null"); - - node = rtio_mpsc_pop(&push_pop_q); - zassert_is_null(node, "Pop on empty queue should return null"); - - rtio_mpsc_push(&push_pop_q, &push_pop_nodes[0]); - - head = atomic_ptr_get(&push_pop_q.head); - - zassert_equal(head, &push_pop_nodes[0], "Queue head should point at push_pop_node"); - next = atomic_ptr_get(&push_pop_nodes[0].next); - zassert_is_null(next, NULL, "push_pop_node next should point at null"); - next = atomic_ptr_get(&push_pop_q.stub.next); - zassert_equal(next, &push_pop_nodes[0], "Queue stub should point at push_pop_node"); - tail = push_pop_q.tail; - stub = &push_pop_q.stub; - zassert_equal(tail, stub, "Tail should point at stub"); - - node = rtio_mpsc_pop(&push_pop_q); - stub = &push_pop_q.stub; - - zassert_not_equal(node, stub, "Pop should not return stub"); - zassert_not_null(node, "Pop should not return null"); - zassert_equal(node, &push_pop_nodes[0], - "Pop should return push_pop_node %p, instead was %p", - &push_pop_nodes[0], node); - - node = rtio_mpsc_pop(&push_pop_q); - zassert_is_null(node, "Pop on empty queue should return null"); -} - -#define MPSC_FREEQ_SZ 8 -#define MPSC_ITERATIONS 100000 -#define MPSC_STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE) -#define MPSC_THREADS_NUM 4 - -static struct thread_info mpsc_tinfo[MPSC_THREADS_NUM]; -static struct k_thread mpsc_thread[MPSC_THREADS_NUM]; -static K_THREAD_STACK_ARRAY_DEFINE(mpsc_stack, MPSC_THREADS_NUM, MPSC_STACK_SIZE); - -struct mpsc_node { - uint32_t id; - struct rtio_mpsc_node n; -}; - - -RTIO_SPSC_DECLARE(node_sq, struct mpsc_node, MPSC_FREEQ_SZ); - -#define SPSC_INIT(n, sz) RTIO_SPSC_INITIALIZER(sz) - -struct rtio_spsc_node_sq node_q[MPSC_THREADS_NUM] = { - LISTIFY(MPSC_THREADS_NUM, SPSC_INIT, (,), MPSC_FREEQ_SZ) -}; - -static struct rtio_mpsc mpsc_q; - -static void mpsc_consumer(void *p1, void *p2, void *p3) -{ - struct rtio_mpsc_node *n; - struct mpsc_node *nn; - - for (int i = 0; i < (MPSC_ITERATIONS)*(MPSC_THREADS_NUM - 1); i++) { - do { - n = rtio_mpsc_pop(&mpsc_q); - if (n == NULL) { - k_yield(); - } - } while (n == NULL); - - zassert_not_equal(n, &mpsc_q.stub, "mpsc should not produce stub"); - - nn = CONTAINER_OF(n, struct mpsc_node, n); - - rtio_spsc_acquire(&node_q[nn->id]); - rtio_spsc_produce(&node_q[nn->id]); - } -} - -static void mpsc_producer(void *p1, void *p2, void *p3) -{ - struct mpsc_node *n; - uint32_t id = (uint32_t)(uintptr_t)p1; - - for (int i = 0; i < MPSC_ITERATIONS; i++) { - do { - n = rtio_spsc_consume(&node_q[id]); - if (n == NULL) { - k_yield(); - } - } while (n == NULL); - - rtio_spsc_release(&node_q[id]); - n->id = id; - rtio_mpsc_push(&mpsc_q, &n->n); - } -} - -/** - * @brief Test that the producer and consumer are indeed thread safe - * - * This can and should be validated on SMP machines where incoherent - * memory could cause issues. - */ -ZTEST(rtio_mpsc, test_mpsc_threaded) -{ - rtio_mpsc_init(&mpsc_q); - - TC_PRINT("setting up mpsc producer free queues\n"); - /* Setup node free queues */ - for (int i = 0; i < MPSC_THREADS_NUM; i++) { - for (int j = 0; j < MPSC_FREEQ_SZ; j++) { - rtio_spsc_acquire(&node_q[i]); - } - rtio_spsc_produce_all(&node_q[i]); - } - - TC_PRINT("starting consumer\n"); - mpsc_tinfo[0].tid = - k_thread_create(&mpsc_thread[0], mpsc_stack[0], STACK_SIZE, - (k_thread_entry_t)mpsc_consumer, - NULL, NULL, NULL, - K_PRIO_PREEMPT(5), - K_INHERIT_PERMS, K_NO_WAIT); - - for (int i = 1; i < MPSC_THREADS_NUM; i++) { - TC_PRINT("starting producer %i\n", i); - mpsc_tinfo[i].tid = - k_thread_create(&mpsc_thread[i], mpsc_stack[i], STACK_SIZE, - (k_thread_entry_t)mpsc_producer, - (void *)(uintptr_t)i, NULL, NULL, - K_PRIO_PREEMPT(5), - K_INHERIT_PERMS, K_NO_WAIT); - } - - for (int i = 0; i < MPSC_THREADS_NUM; i++) { - TC_PRINT("joining mpsc thread %d\n", i); - k_thread_join(mpsc_tinfo[i].tid, K_FOREVER); - } -} - -RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec_simp); -RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4); - -RTIO_EXECUTOR_CONCURRENT_DEFINE(simple_exec_con, 1); -RTIO_DEFINE(r_simple_con, (struct rtio_executor *)&simple_exec_con, 4, 4); - -RTIO_IODEV_TEST_DEFINE(iodev_test_simple); - -/** - * @brief Test the basics of the RTIO API - * - * Ensures that we can setup an RTIO context, enqueue a request, and receive - * a completion event. - */ -void test_rtio_simple_(struct rtio *r) -{ - int res; - uintptr_t userdata[2] = {0, 1}; - struct rtio_sqe *sqe; - struct rtio_cqe *cqe; - - rtio_iodev_test_init(&iodev_test_simple); - - TC_PRINT("setting up single no-op\n"); - sqe = rtio_spsc_acquire(r->sq); - zassert_not_null(sqe, "Expected a valid sqe"); - rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_simple, &userdata[0]); - - TC_PRINT("submit with wait\n"); - res = rtio_submit(r, 1); - zassert_ok(res, "Should return ok from rtio_execute"); - - cqe = rtio_spsc_consume(r->cq); - zassert_not_null(cqe, "Expected a valid cqe"); - zassert_ok(cqe->result, "Result should be ok"); - zassert_equal_ptr(cqe->userdata, &userdata[0], "Expected userdata back"); - rtio_spsc_release(r->cq); -} - -ZTEST(rtio_api, test_rtio_simple) -{ - TC_PRINT("rtio simple simple\n"); - test_rtio_simple_(&r_simple_simp); - TC_PRINT("rtio simple concurrent\n"); - test_rtio_simple_(&r_simple_con); -} - -RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec_simp); -RTIO_DEFINE(r_chain_simp, (struct rtio_executor *)&chain_exec_simp, 4, 4); - -RTIO_EXECUTOR_CONCURRENT_DEFINE(chain_exec_con, 1); -RTIO_DEFINE(r_chain_con, (struct rtio_executor *)&chain_exec_con, 4, 4); - -RTIO_IODEV_TEST_DEFINE(iodev_test_chain0); -RTIO_IODEV_TEST_DEFINE(iodev_test_chain1); -struct rtio_iodev *iodev_test_chain[] = {&iodev_test_chain0, &iodev_test_chain1}; - -/** - * @brief Test chained requests - * - * Ensures that we can setup an RTIO context, enqueue a chained requests, - * and receive completion events in the correct order given the chained - * flag and multiple devices where serialization isn't guaranteed. - */ -void test_rtio_chain_(struct rtio *r) -{ - int res; - uintptr_t userdata[4] = {0, 1, 2, 3}; - struct rtio_sqe *sqe; - struct rtio_cqe *cqe; - - for (int i = 0; i < 4; i++) { - sqe = rtio_spsc_acquire(r->sq); - zassert_not_null(sqe, "Expected a valid sqe"); - rtio_sqe_prep_nop(sqe, iodev_test_chain[i % 2], - &userdata[i]); - sqe->flags |= RTIO_SQE_CHAINED; - } - - /* Clear the last one */ - sqe->flags = 0; - - TC_PRINT("submitting\n"); - res = rtio_submit(r, 4); - TC_PRINT("checking cq\n"); - zassert_ok(res, "Should return ok from rtio_execute"); - zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions"); - - for (int i = 0; i < 4; i++) { - TC_PRINT("consume %d\n", i); - cqe = rtio_spsc_consume(r->cq); - zassert_not_null(cqe, "Expected a valid cqe"); - zassert_ok(cqe->result, "Result should be ok"); - zassert_equal_ptr(cqe->userdata, &userdata[i], "Expected in order completions"); - rtio_spsc_release(r->cq); - } -} - -ZTEST(rtio_api, test_rtio_chain) -{ - TC_PRINT("initializing iodev test devices\n"); - - for (int i = 0; i < 2; i++) { - rtio_iodev_test_init(iodev_test_chain[i]); - } - - TC_PRINT("rtio chain simple\n"); - test_rtio_chain_(&r_chain_simp); - TC_PRINT("rtio chain concurrent\n"); - test_rtio_chain_(&r_chain_con); -} - - -RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec_simp); -RTIO_DEFINE(r_multi_simp, (struct rtio_executor *)&multi_exec_simp, 4, 4); - -RTIO_EXECUTOR_CONCURRENT_DEFINE(multi_exec_con, 2); -RTIO_DEFINE(r_multi_con, (struct rtio_executor *)&multi_exec_con, 4, 4); - -RTIO_IODEV_TEST_DEFINE(iodev_test_multi0); -RTIO_IODEV_TEST_DEFINE(iodev_test_multi1); -struct rtio_iodev *iodev_test_multi[] = {&iodev_test_multi0, &iodev_test_multi1}; - -/** - * @brief Test multiple asynchronous chains against one iodev - */ -void test_rtio_multiple_chains_(struct rtio *r) -{ - int res; - uintptr_t userdata[4] = {0, 1, 2, 3}; - struct rtio_sqe *sqe; - struct rtio_cqe *cqe; - - for (int i = 0; i < 2; i++) { - for (int j = 0; j < 2; j++) { - sqe = rtio_spsc_acquire(r->sq); - zassert_not_null(sqe, "Expected a valid sqe"); - rtio_sqe_prep_nop(sqe, iodev_test_multi[i], - (void *)userdata[i*2 + j]); - if (j == 0) { - sqe->flags |= RTIO_SQE_CHAINED; - } else { - sqe->flags |= 0; - } - } - } - - TC_PRINT("calling submit from test case\n"); - res = rtio_submit(r, 0); - zassert_ok(res, "Should return ok from rtio_execute"); - - bool seen[4] = { 0 }; - - TC_PRINT("waiting for 4 completions\n"); - for (int i = 0; i < 4; i++) { - TC_PRINT("waiting on completion %d\n", i); - cqe = rtio_spsc_consume(r->cq); - - while (cqe == NULL) { - k_sleep(K_MSEC(1)); - cqe = rtio_spsc_consume(r->cq); - } - - zassert_not_null(cqe, "Expected a valid cqe"); - TC_PRINT("result %d, would block is %d, inval is %d\n", - cqe->result, -EWOULDBLOCK, -EINVAL); - zassert_ok(cqe->result, "Result should be ok"); - seen[(uintptr_t)cqe->userdata] = true; - if (seen[1]) { - zassert_true(seen[0], "Should see 0 before 1"); - } - if (seen[3]) { - zassert_true(seen[2], "Should see 2 before 3"); - } - rtio_spsc_release(r->cq); - } -} - -ZTEST(rtio_api, test_rtio_multiple_chains) -{ - for (int i = 0; i < 2; i++) { - rtio_iodev_test_init(iodev_test_multi[i]); - } - - TC_PRINT("rtio multiple simple\n"); - test_rtio_multiple_chains_(&r_multi_simp); - TC_PRINT("rtio_multiple concurrent\n"); - test_rtio_multiple_chains_(&r_multi_con); -} - - - -#ifdef CONFIG_USERSPACE -K_APPMEM_PARTITION_DEFINE(rtio_partition); -K_APP_BMEM(rtio_partition) uint8_t syscall_bufs[4]; -struct k_mem_domain rtio_domain; -#else -uint8_t syscall_bufs[4]; -#endif - -RTIO_EXECUTOR_SIMPLE_DEFINE(syscall_simple); -RTIO_DEFINE(r_syscall, (struct rtio_executor *)&syscall_simple, 4, 4); -RTIO_IODEV_TEST_DEFINE(iodev_test_syscall); - -void rtio_syscall_test(void *p1, void *p2, void *p3) -{ - int res; - struct rtio_sqe sqe; - struct rtio_cqe cqe; - - struct rtio *r = &r_syscall; - - for (int i = 0; i < 4; i++) { - TC_PRINT("copying sqe in from stack\n"); - /* Not really legal from userspace! Ugh */ - rtio_sqe_prep_nop(&sqe, &iodev_test_syscall, - &syscall_bufs[i]); - res = rtio_sqe_copy_in(r, &sqe, 1); - zassert_equal(res, 0, "Expected success copying sqe"); - } - - TC_PRINT("submitting\n"); - res = rtio_submit(r, 4); - - for (int i = 0; i < 4; i++) { - TC_PRINT("consume %d\n", i); - res = rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER); - zassert_equal(res, 1, "Expected success copying cqe"); - zassert_ok(cqe.result, "Result should be ok"); - zassert_equal_ptr(cqe.userdata, &syscall_bufs[i], - "Expected in order completions"); - } - -} - -#ifdef CONFIG_USERSPACE -ZTEST(rtio_api, test_rtio_syscalls_usermode) -{ - struct k_mem_partition *parts[] = { -#if Z_LIBC_PARTITION_EXISTS - &z_libc_partition, -#endif - &rtio_partition - }; - - TC_PRINT("syscalls from user mode test\n"); - TC_PRINT("test iodev init\n"); - rtio_iodev_test_init(&iodev_test_syscall); - TC_PRINT("mem domain init\n"); - k_mem_domain_init(&rtio_domain, ARRAY_SIZE(parts), parts); - TC_PRINT("mem domain add current\n"); - k_mem_domain_add_thread(&rtio_domain, k_current_get()); - TC_PRINT("rtio access grant\n"); - rtio_access_grant(&r_syscall, k_current_get()); - TC_PRINT("rtio iodev access grant, ptr %p\n", &iodev_test_syscall); - k_object_access_grant(&iodev_test_syscall, k_current_get()); - TC_PRINT("user mode enter\n"); - k_thread_user_mode_enter(rtio_syscall_test, NULL, NULL, NULL); -} -#endif /* CONFIG_USERSPACE */ - - -ZTEST(rtio_api, test_rtio_syscalls) -{ - TC_PRINT("test iodev init\n"); - rtio_iodev_test_init(&iodev_test_syscall); - TC_PRINT("syscalls from kernel mode\n"); - rtio_syscall_test(NULL, NULL, NULL); -} - - - - -ZTEST_SUITE(rtio_spsc, NULL, NULL, NULL, NULL, NULL); -ZTEST_SUITE(rtio_mpsc, NULL, NULL, NULL, NULL, NULL); -ZTEST_SUITE(rtio_api, NULL, NULL, NULL, NULL, NULL); diff --git a/tests/subsys/rtio/rtio_api/src/rtio_api.h b/tests/subsys/rtio/rtio_api/src/rtio_api.h new file mode 100644 index 0000000000000..df8d46ae1f7d8 --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/rtio_api.h @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2023 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef ZEPHYR_TEST_RTIO_API_H_ +#define ZEPHYR_TEST_RTIO_API_H_ + +#include + +struct thread_info { + k_tid_t tid; + int executed; + int priority; + int cpu_id; +}; + +#endif /* ZEPHYR_TEST_RTIO_API_H_ */ diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c new file mode 100644 index 0000000000000..5d304e4bd7f5f --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c @@ -0,0 +1,293 @@ +/* + * Copyright (c) 2021 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rtio_iodev_test.h" + +RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec_simp); +RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(simple_exec_con, 1); +RTIO_DEFINE(r_simple_con, (struct rtio_executor *)&simple_exec_con, 4, 4); + +RTIO_IODEV_TEST_DEFINE(iodev_test_simple); + +/** + * @brief Test the basics of the RTIO API + * + * Ensures that we can setup an RTIO context, enqueue a request, and receive + * a completion event. + */ +void test_rtio_simple_(struct rtio *r) +{ + int res; + uintptr_t userdata[2] = {0, 1}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + rtio_iodev_test_init(&iodev_test_simple); + + TC_PRINT("setting up single no-op\n"); + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_simple, &userdata[0]); + + TC_PRINT("submit with wait\n"); + res = rtio_submit(r, 1); + zassert_ok(res, "Should return ok from rtio_execute"); + + cqe = rtio_spsc_consume(r->cq); + zassert_not_null(cqe, "Expected a valid cqe"); + zassert_ok(cqe->result, "Result should be ok"); + zassert_equal_ptr(cqe->userdata, &userdata[0], "Expected userdata back"); + rtio_spsc_release(r->cq); +} + +ZTEST(rtio_api, test_rtio_simple) +{ + TC_PRINT("rtio simple simple\n"); + test_rtio_simple_(&r_simple_simp); + TC_PRINT("rtio simple concurrent\n"); + test_rtio_simple_(&r_simple_con); +} + +RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec_simp); +RTIO_DEFINE(r_chain_simp, (struct rtio_executor *)&chain_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(chain_exec_con, 1); +RTIO_DEFINE(r_chain_con, (struct rtio_executor *)&chain_exec_con, 4, 4); + +RTIO_IODEV_TEST_DEFINE(iodev_test_chain0); +RTIO_IODEV_TEST_DEFINE(iodev_test_chain1); +struct rtio_iodev *iodev_test_chain[] = {&iodev_test_chain0, &iodev_test_chain1}; + +/** + * @brief Test chained requests + * + * Ensures that we can setup an RTIO context, enqueue a chained requests, + * and receive completion events in the correct order given the chained + * flag and multiple devices where serialization isn't guaranteed. + */ +void test_rtio_chain_(struct rtio *r) +{ + int res; + uintptr_t userdata[4] = {0, 1, 2, 3}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + for (int i = 0; i < 4; i++) { + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, iodev_test_chain[i % 2], + &userdata[i]); + sqe->flags |= RTIO_SQE_CHAINED; + } + + /* Clear the last one */ + sqe->flags = 0; + + TC_PRINT("submitting\n"); + res = rtio_submit(r, 4); + TC_PRINT("checking cq\n"); + zassert_ok(res, "Should return ok from rtio_execute"); + zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions"); + + for (int i = 0; i < 4; i++) { + TC_PRINT("consume %d\n", i); + cqe = rtio_spsc_consume(r->cq); + zassert_not_null(cqe, "Expected a valid cqe"); + zassert_ok(cqe->result, "Result should be ok"); + zassert_equal_ptr(cqe->userdata, &userdata[i], "Expected in order completions"); + rtio_spsc_release(r->cq); + } +} + +ZTEST(rtio_api, test_rtio_chain) +{ + TC_PRINT("initializing iodev test devices\n"); + + for (int i = 0; i < 2; i++) { + rtio_iodev_test_init(iodev_test_chain[i]); + } + + TC_PRINT("rtio chain simple\n"); + test_rtio_chain_(&r_chain_simp); + TC_PRINT("rtio chain concurrent\n"); + test_rtio_chain_(&r_chain_con); +} + + +RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec_simp); +RTIO_DEFINE(r_multi_simp, (struct rtio_executor *)&multi_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(multi_exec_con, 2); +RTIO_DEFINE(r_multi_con, (struct rtio_executor *)&multi_exec_con, 4, 4); + +RTIO_IODEV_TEST_DEFINE(iodev_test_multi0); +RTIO_IODEV_TEST_DEFINE(iodev_test_multi1); +struct rtio_iodev *iodev_test_multi[] = {&iodev_test_multi0, &iodev_test_multi1}; + +/** + * @brief Test multiple asynchronous chains against one iodev + */ +void test_rtio_multiple_chains_(struct rtio *r) +{ + int res; + uintptr_t userdata[4] = {0, 1, 2, 3}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, iodev_test_multi[i], + (void *)userdata[i*2 + j]); + if (j == 0) { + sqe->flags |= RTIO_SQE_CHAINED; + } else { + sqe->flags |= 0; + } + } + } + + TC_PRINT("calling submit from test case\n"); + res = rtio_submit(r, 0); + zassert_ok(res, "Should return ok from rtio_execute"); + + bool seen[4] = { 0 }; + + TC_PRINT("waiting for 4 completions\n"); + for (int i = 0; i < 4; i++) { + TC_PRINT("waiting on completion %d\n", i); + cqe = rtio_spsc_consume(r->cq); + + while (cqe == NULL) { + k_sleep(K_MSEC(1)); + cqe = rtio_spsc_consume(r->cq); + } + + zassert_not_null(cqe, "Expected a valid cqe"); + TC_PRINT("result %d, would block is %d, inval is %d\n", + cqe->result, -EWOULDBLOCK, -EINVAL); + zassert_ok(cqe->result, "Result should be ok"); + seen[(uintptr_t)cqe->userdata] = true; + if (seen[1]) { + zassert_true(seen[0], "Should see 0 before 1"); + } + if (seen[3]) { + zassert_true(seen[2], "Should see 2 before 3"); + } + rtio_spsc_release(r->cq); + } +} + +ZTEST(rtio_api, test_rtio_multiple_chains) +{ + for (int i = 0; i < 2; i++) { + rtio_iodev_test_init(iodev_test_multi[i]); + } + + TC_PRINT("rtio multiple simple\n"); + test_rtio_multiple_chains_(&r_multi_simp); + TC_PRINT("rtio_multiple concurrent\n"); + test_rtio_multiple_chains_(&r_multi_con); +} + + + +#ifdef CONFIG_USERSPACE +K_APPMEM_PARTITION_DEFINE(rtio_partition); +K_APP_BMEM(rtio_partition) uint8_t syscall_bufs[4]; +struct k_mem_domain rtio_domain; +#else +uint8_t syscall_bufs[4]; +#endif + +RTIO_EXECUTOR_SIMPLE_DEFINE(syscall_simple); +RTIO_DEFINE(r_syscall, (struct rtio_executor *)&syscall_simple, 4, 4); +RTIO_IODEV_TEST_DEFINE(iodev_test_syscall); + +void rtio_syscall_test(void *p1, void *p2, void *p3) +{ + int res; + struct rtio_sqe sqe; + struct rtio_cqe cqe; + + struct rtio *r = &r_syscall; + + for (int i = 0; i < 4; i++) { + TC_PRINT("copying sqe in from stack\n"); + /* Not really legal from userspace! Ugh */ + rtio_sqe_prep_nop(&sqe, &iodev_test_syscall, + &syscall_bufs[i]); + res = rtio_sqe_copy_in(r, &sqe, 1); + zassert_equal(res, 0, "Expected success copying sqe"); + } + + TC_PRINT("submitting\n"); + res = rtio_submit(r, 4); + + for (int i = 0; i < 4; i++) { + TC_PRINT("consume %d\n", i); + res = rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER); + zassert_equal(res, 1, "Expected success copying cqe"); + zassert_ok(cqe.result, "Result should be ok"); + zassert_equal_ptr(cqe.userdata, &syscall_bufs[i], + "Expected in order completions"); + } +} + +#ifdef CONFIG_USERSPACE +ZTEST(rtio_api, test_rtio_syscalls_usermode) +{ + struct k_mem_partition *parts[] = { +#if Z_LIBC_PARTITION_EXISTS + &z_libc_partition, +#endif + &rtio_partition + }; + + TC_PRINT("syscalls from user mode test\n"); + TC_PRINT("test iodev init\n"); + rtio_iodev_test_init(&iodev_test_syscall); + TC_PRINT("mem domain init\n"); + k_mem_domain_init(&rtio_domain, ARRAY_SIZE(parts), parts); + TC_PRINT("mem domain add current\n"); + k_mem_domain_add_thread(&rtio_domain, k_current_get()); + TC_PRINT("rtio access grant\n"); + rtio_access_grant(&r_syscall, k_current_get()); + TC_PRINT("rtio iodev access grant, ptr %p\n", &iodev_test_syscall); + k_object_access_grant(&iodev_test_syscall, k_current_get()); + TC_PRINT("user mode enter\n"); + k_thread_user_mode_enter(rtio_syscall_test, NULL, NULL, NULL); +} +#endif /* CONFIG_USERSPACE */ + + +ZTEST(rtio_api, test_rtio_syscalls) +{ + TC_PRINT("test iodev init\n"); + rtio_iodev_test_init(&iodev_test_syscall); + TC_PRINT("syscalls from kernel mode\n"); + rtio_syscall_test(NULL, NULL, NULL); +} + +ZTEST_SUITE(rtio_api, NULL, NULL, NULL, NULL, NULL); diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c b/tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c new file mode 100644 index 0000000000000..561b1064f4933 --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2023 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include + +#include "rtio_api.h" + +static struct rtio_mpsc push_pop_q; +static struct rtio_mpsc_node push_pop_nodes[2]; + +/* + * @brief Push and pop one element + * + * @see rtio_mpsc_push(), rtio_mpsc_pop() + * + * @ingroup rtio_tests + */ +ZTEST(rtio_mpsc, test_push_pop) +{ + + struct rtio_mpsc_node *node, *head, *stub, *next, *tail; + + rtio_mpsc_init(&push_pop_q); + + head = atomic_ptr_get(&push_pop_q.head); + tail = push_pop_q.tail; + stub = &push_pop_q.stub; + next = atomic_ptr_get(&stub->next); + + zassert_equal(head, stub, "Head should point at stub"); + zassert_equal(tail, stub, "Tail should point at stub"); + zassert_is_null(next, "Next should be null"); + + node = rtio_mpsc_pop(&push_pop_q); + zassert_is_null(node, "Pop on empty queue should return null"); + + rtio_mpsc_push(&push_pop_q, &push_pop_nodes[0]); + + head = atomic_ptr_get(&push_pop_q.head); + + zassert_equal(head, &push_pop_nodes[0], "Queue head should point at push_pop_node"); + next = atomic_ptr_get(&push_pop_nodes[0].next); + zassert_is_null(next, NULL, "push_pop_node next should point at null"); + next = atomic_ptr_get(&push_pop_q.stub.next); + zassert_equal(next, &push_pop_nodes[0], "Queue stub should point at push_pop_node"); + tail = push_pop_q.tail; + stub = &push_pop_q.stub; + zassert_equal(tail, stub, "Tail should point at stub"); + + node = rtio_mpsc_pop(&push_pop_q); + stub = &push_pop_q.stub; + + zassert_not_equal(node, stub, "Pop should not return stub"); + zassert_not_null(node, "Pop should not return null"); + zassert_equal(node, &push_pop_nodes[0], + "Pop should return push_pop_node %p, instead was %p", + &push_pop_nodes[0], node); + + node = rtio_mpsc_pop(&push_pop_q); + zassert_is_null(node, "Pop on empty queue should return null"); +} + +#define MPSC_FREEQ_SZ 8 +#define MPSC_ITERATIONS 100000 +#define MPSC_STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE) +#define MPSC_THREADS_NUM 4 + +static struct thread_info mpsc_tinfo[MPSC_THREADS_NUM]; +static struct k_thread mpsc_thread[MPSC_THREADS_NUM]; +static K_THREAD_STACK_ARRAY_DEFINE(mpsc_stack, MPSC_THREADS_NUM, MPSC_STACK_SIZE); + +struct mpsc_node { + uint32_t id; + struct rtio_mpsc_node n; +}; + + +RTIO_SPSC_DECLARE(node_sq, struct mpsc_node, MPSC_FREEQ_SZ); + +#define SPSC_INIT(n, sz) RTIO_SPSC_INITIALIZER(sz) + +struct rtio_spsc_node_sq node_q[MPSC_THREADS_NUM] = { + LISTIFY(MPSC_THREADS_NUM, SPSC_INIT, (,), MPSC_FREEQ_SZ) +}; + +static struct rtio_mpsc mpsc_q; + +static void mpsc_consumer(void *p1, void *p2, void *p3) +{ + struct rtio_mpsc_node *n; + struct mpsc_node *nn; + + for (int i = 0; i < (MPSC_ITERATIONS)*(MPSC_THREADS_NUM - 1); i++) { + do { + n = rtio_mpsc_pop(&mpsc_q); + if (n == NULL) { + k_yield(); + } + } while (n == NULL); + + zassert_not_equal(n, &mpsc_q.stub, "mpsc should not produce stub"); + + nn = CONTAINER_OF(n, struct mpsc_node, n); + + rtio_spsc_acquire(&node_q[nn->id]); + rtio_spsc_produce(&node_q[nn->id]); + } +} + +static void mpsc_producer(void *p1, void *p2, void *p3) +{ + struct mpsc_node *n; + uint32_t id = (uint32_t)(uintptr_t)p1; + + for (int i = 0; i < MPSC_ITERATIONS; i++) { + do { + n = rtio_spsc_consume(&node_q[id]); + if (n == NULL) { + k_yield(); + } + } while (n == NULL); + + rtio_spsc_release(&node_q[id]); + n->id = id; + rtio_mpsc_push(&mpsc_q, &n->n); + } +} + +/** + * @brief Test that the producer and consumer are indeed thread safe + * + * This can and should be validated on SMP machines where incoherent + * memory could cause issues. + */ +ZTEST(rtio_mpsc, test_mpsc_threaded) +{ + rtio_mpsc_init(&mpsc_q); + + TC_PRINT("setting up mpsc producer free queues\n"); + /* Setup node free queues */ + for (int i = 0; i < MPSC_THREADS_NUM; i++) { + for (int j = 0; j < MPSC_FREEQ_SZ; j++) { + rtio_spsc_acquire(&node_q[i]); + } + rtio_spsc_produce_all(&node_q[i]); + } + + TC_PRINT("starting consumer\n"); + mpsc_tinfo[0].tid = + k_thread_create(&mpsc_thread[0], mpsc_stack[0], MPSC_STACK_SIZE, + (k_thread_entry_t)mpsc_consumer, + NULL, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + + for (int i = 1; i < MPSC_THREADS_NUM; i++) { + TC_PRINT("starting producer %i\n", i); + mpsc_tinfo[i].tid = + k_thread_create(&mpsc_thread[i], mpsc_stack[i], MPSC_STACK_SIZE, + (k_thread_entry_t)mpsc_producer, + (void *)(uintptr_t)i, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + } + + for (int i = 0; i < MPSC_THREADS_NUM; i++) { + TC_PRINT("joining mpsc thread %d\n", i); + k_thread_join(mpsc_tinfo[i].tid, K_FOREVER); + } +} + +ZTEST_SUITE(rtio_mpsc, NULL, NULL, NULL, NULL, NULL); diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c b/tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c new file mode 100644 index 0000000000000..9dda0960dbcec --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2023 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include "rtio_api.h" + +/* + * @brief Produce and Consume a single uint32_t in the same execution context + * + * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() + * + * @ingroup rtio_tests + */ +ZTEST(rtio_spsc, test_produce_consume_size1) +{ + RTIO_SPSC_DEFINE(ezspsc, uint32_t, 1); + + const uint32_t magic = 43219876; + + uint32_t *acq = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(acq, "Acquire should succeed"); + + *acq = magic; + + uint32_t *acq2 = rtio_spsc_acquire(&ezspsc); + + zassert_is_null(acq2, "Acquire should fail"); + + uint32_t *cons = rtio_spsc_consume(&ezspsc); + + zassert_is_null(cons, "Consume should fail"); + + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + + rtio_spsc_produce(&ezspsc); + + zassert_equal(rtio_spsc_consumable(&ezspsc), 1, "Consumables should be 1"); + + uint32_t *cons2 = rtio_spsc_consume(&ezspsc); + + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + + zassert_not_null(cons2, "Consume should not fail"); + zassert_equal(*cons2, magic, "Consume value should equal magic"); + + uint32_t *cons3 = rtio_spsc_consume(&ezspsc); + + zassert_is_null(cons3, "Consume should fail"); + + + uint32_t *acq3 = rtio_spsc_acquire(&ezspsc); + + zassert_is_null(acq3, "Acquire should not succeed"); + + rtio_spsc_release(&ezspsc); + + uint32_t *acq4 = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(acq4, "Acquire should succeed"); +} + +/*&* + * @brief Produce and Consume 3 items at a time in a spsc of size 4 to validate masking + * and wrap around reads/writes. + * + * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() + * + * @ingroup rtio_tests + */ +ZTEST(rtio_spsc, test_produce_consume_wrap_around) +{ + RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); + + for (int i = 0; i < 10; i++) { + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + for (int j = 0; j < 3; j++) { + uint32_t *entry = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(entry, "Acquire should succeed"); + *entry = i * 3 + j; + rtio_spsc_produce(&ezspsc); + } + zassert_equal(rtio_spsc_consumable(&ezspsc), 3, "Consumables should be 3"); + + for (int k = 0; k < 3; k++) { + uint32_t *entry = rtio_spsc_consume(&ezspsc); + + zassert_not_null(entry, "Consume should succeed"); + zassert_equal(*entry, i * 3 + k, "Consume value should equal i*3+k"); + rtio_spsc_release(&ezspsc); + } + + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + + } +} + +/** + * @brief Ensure that integer wraps continue to work. + * + * Done by setting all values to UINTPTR_MAX - 2 and writing and reading enough + * to ensure integer wraps occur. + */ +ZTEST(rtio_spsc, test_int_wrap_around) +{ + RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); + ezspsc._spsc.in = ATOMIC_INIT(UINTPTR_MAX - 2); + ezspsc._spsc.out = ATOMIC_INIT(UINTPTR_MAX - 2); + + for (int j = 0; j < 3; j++) { + uint32_t *entry = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(entry, "Acquire should succeed"); + *entry = j; + rtio_spsc_produce(&ezspsc); + } + + zassert_equal(atomic_get(&ezspsc._spsc.in), UINTPTR_MAX + 1, "Spsc in should wrap"); + + for (int k = 0; k < 3; k++) { + uint32_t *entry = rtio_spsc_consume(&ezspsc); + + zassert_not_null(entry, "Consume should succeed"); + zassert_equal(*entry, k, "Consume value should equal i*3+k"); + rtio_spsc_release(&ezspsc); + } + + zassert_equal(atomic_get(&ezspsc._spsc.out), UINTPTR_MAX + 1, "Spsc out should wrap"); +} + +#define MAX_RETRIES 5 +#define SMP_ITERATIONS 100 + +RTIO_SPSC_DEFINE(spsc, uint32_t, 4); + +static void t1_consume(void *p1, void *p2, void *p3) +{ + struct rtio_spsc_spsc *ezspsc = p1; + uint32_t retries = 0; + uint32_t *val = NULL; + + for (int i = 0; i < SMP_ITERATIONS; i++) { + val = NULL; + retries = 0; + while (val == NULL && retries < MAX_RETRIES) { + val = rtio_spsc_consume(ezspsc); + retries++; + } + if (val != NULL) { + rtio_spsc_release(ezspsc); + } else { + k_yield(); + } + } +} + +static void t2_produce(void *p1, void *p2, void *p3) +{ + struct rtio_spsc_spsc *ezspsc = p1; + uint32_t retries = 0; + uint32_t *val = NULL; + + for (int i = 0; i < SMP_ITERATIONS; i++) { + val = NULL; + retries = 0; + while (val == NULL && retries < MAX_RETRIES) { + val = rtio_spsc_acquire(ezspsc); + retries++; + } + if (val != NULL) { + *val = SMP_ITERATIONS; + rtio_spsc_produce(ezspsc); + } else { + k_yield(); + } + } +} + + +#define STACK_SIZE (384 + CONFIG_TEST_EXTRA_STACK_SIZE) +#define THREADS_NUM 2 + +static struct thread_info tinfo[THREADS_NUM]; +static struct k_thread tthread[THREADS_NUM]; +static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE); + +/** + * @brief Test that the producer and consumer are indeed thread safe + * + * This can and should be validated on SMP machines where incoherent + * memory could cause issues. + */ +ZTEST(rtio_spsc, test_spsc_threaded) +{ + + tinfo[0].tid = + k_thread_create(&tthread[0], tstack[0], STACK_SIZE, + (k_thread_entry_t)t1_consume, + &spsc, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + tinfo[1].tid = + k_thread_create(&tthread[1], tstack[1], STACK_SIZE, + (k_thread_entry_t)t2_produce, + &spsc, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + + k_thread_join(tinfo[1].tid, K_FOREVER); + k_thread_join(tinfo[0].tid, K_FOREVER); +} + +ZTEST_SUITE(rtio_spsc, NULL, NULL, NULL, NULL, NULL); From 918cac925bb381040be6c80b5c03c8f506e24981 Mon Sep 17 00:00:00 2001 From: Tom Burdick Date: Fri, 24 Feb 2023 17:43:59 -0600 Subject: [PATCH 4/8] rtio: Add transactional submissions Transactional submissions treat a sequence of sqes as a single atomic submission given to a single iodev and expect as a reply a single completion. This is useful for scatter gather like APIs that exist in Zephyr already for I2C and SPI. Signed-off-by: Tom Burdick --- include/zephyr/rtio/rtio.h | 16 ++ subsys/rtio/rtio_executor_concurrent.c | 184 +++++++++--------- subsys/rtio/rtio_executor_simple.c | 79 ++++++-- .../rtio/rtio_api/src/rtio_iodev_test.h | 13 +- .../subsys/rtio/rtio_api/src/test_rtio_api.c | 115 ++++++++++- 5 files changed, 286 insertions(+), 121 deletions(-) diff --git a/include/zephyr/rtio/rtio.h b/include/zephyr/rtio/rtio.h index 8a40b867eea18..df3dd72e42892 100644 --- a/include/zephyr/rtio/rtio.h +++ b/include/zephyr/rtio/rtio.h @@ -97,9 +97,25 @@ struct rtio_iodev; /** * @brief The next request in the queue should wait on this one. + * + * Chained SQEs are individual units of work describing patterns of + * ordering and failure cascading. A chained SQE must be started only + * after the one before it. They are given to the iodevs one after another. */ #define RTIO_SQE_CHAINED BIT(0) +/** + * @brief The next request in the queue is part of a transaction. + * + * Transactional SQEs are sequential parts of a unit of work. + * Only the first transactional SQE is submitted to an iodev, the + * remaining SQEs are never individually submitted but instead considered + * to be part of the transaction to the single iodev. The first sqe in the + * sequence holds the iodev that will be used and the last holds the userdata + * that will be returned in a single completion on failure/success. + */ +#define RTIO_SQE_TRANSACTION BIT(1) + /** * @} */ diff --git a/subsys/rtio/rtio_executor_concurrent.c b/subsys/rtio/rtio_executor_concurrent.c index ae6c361f0018e..e5618c830540a 100644 --- a/subsys/rtio/rtio_executor_concurrent.c +++ b/subsys/rtio/rtio_executor_concurrent.c @@ -49,7 +49,7 @@ static uint16_t conex_task_next(struct rtio_concurrent_executor *exc) uint16_t task_id = exc->task_in; exc->task_in++; - return task_id; + return task_id & exc->task_mask; } static inline uint16_t conex_task_id(struct rtio_concurrent_executor *exc, @@ -64,7 +64,7 @@ static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *ex { struct rtio_sqe *sqe = rtio_spsc_consume(r->sq); - while (sqe != NULL && sqe->flags & RTIO_SQE_CHAINED) { + while (sqe != NULL && (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION))) { rtio_spsc_release(r->sq); sqe = rtio_spsc_consume(r->sq); } @@ -72,6 +72,13 @@ static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *ex rtio_spsc_release(r->sq); } +/** + * @brief Sweep like a GC of sorts old tasks that are completed in order + * + * Will only sweep tasks in the order they arrived in the submission queue. + * Meaning there might be completed tasks that could be freed but are not yet + * because something before it has not yet completed. + */ static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc) { /* In order sweep up */ @@ -86,21 +93,77 @@ static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc) } } +/** + * @brief Prepare tasks to run by iterating through the submission queue + * + * For each submission in the queue that begins a chain or transaction + * start a task if possible. Concurrency is limited by the allocated concurrency + * per executor instance. + */ +static void conex_prepare(struct rtio *r, struct rtio_concurrent_executor *exc) +{ + struct rtio_sqe *sqe; + + /* If never submitted before peek at the first item + * otherwise start back up where the last submit call + * left off + */ + if (exc->pending_sqe == NULL) { + sqe = rtio_spsc_peek(r->sq); + } else { + sqe = exc->pending_sqe; + } + + while (sqe != NULL && conex_task_free(exc)) { + /* Get the next free task id */ + uint16_t task_idx = conex_task_next(exc); + + /* Setup task */ + exc->task_cur[task_idx].sqe = sqe; + exc->task_cur[task_idx].r = r; + exc->task_status[task_idx] = CONEX_TASK_SUSPENDED; + + /* Go to the next sqe not in the current chain or transaction */ + while (sqe != NULL && (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION))) { + sqe = rtio_spsc_next(r->sq, sqe); + } + + /* SQE is the end of the previous chain or transaction so skip it */ + sqe = rtio_spsc_next(r->sq, sqe); + } + + /* Out of available tasks so remember where we left off to begin again once tasks free up */ + exc->pending_sqe = sqe; +} + + +/** + * @brief Resume tasks that are suspended + * + * All tasks begin as suspended tasks. This kicks them off to the submissions + * associated iodev. + */ static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc) { /* In order resume tasks */ for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) { if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) { LOG_INF("resuming suspended task %d", task_id); - exc->task_status[task_id] &= ~CONEX_TASK_SUSPENDED; - rtio_iodev_submit(&exc->task_cur[task_id]); + exc->task_status[task_id & exc->task_mask] &= ~CONEX_TASK_SUSPENDED; + rtio_iodev_submit(&exc->task_cur[task_id & exc->task_mask]); } } } +/** + * @brief Sweep, Prepare, and Resume in one go + * + * Called after a completion to continue doing more work if needed. + */ static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *exc) { conex_sweep(r, exc); + conex_prepare(r, exc); conex_resume(r, exc); } @@ -114,71 +177,14 @@ static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor * int rtio_concurrent_submit(struct rtio *r) { - LOG_INF("submit"); - struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; - struct rtio_sqe *sqe; - struct rtio_sqe *last_sqe; k_spinlock_key_t key; key = k_spin_lock(&exc->lock); - /* If never submitted before peek at the first item - * otherwise start back up where the last submit call - * left off - */ - if (exc->last_sqe == NULL) { - sqe = rtio_spsc_peek(r->sq); - } else { - /* Pickup from last submit call */ - sqe = rtio_spsc_next(r->sq, exc->last_sqe); - } - - last_sqe = sqe; - while (sqe != NULL && conex_task_free(exc)) { - LOG_INF("head SQE in chain %p", sqe); - - /* Get the next task id if one exists */ - uint16_t task_idx = conex_task_next(exc); - - LOG_INF("setting up task %d", task_idx); - - /* Setup task (yes this is it) */ - exc->task_cur[task_idx].sqe = sqe; - exc->task_cur[task_idx].r = r; - exc->task_status[task_idx] = CONEX_TASK_SUSPENDED; - - LOG_INF("submitted sqe %p", sqe); - /* Go to the next sqe not in the current chain */ - while (sqe != NULL && (sqe->flags & RTIO_SQE_CHAINED)) { - sqe = rtio_spsc_next(r->sq, sqe); - } - - LOG_INF("tail SQE in chain %p", sqe); - - last_sqe = sqe; - - /* SQE is the end of the previous chain */ - sqe = rtio_spsc_next(r->sq, sqe); - } - - /* Out of available pointers, wait til others complete, note the - * first pending submission queue. May be NULL if nothing is pending. - */ - exc->pending_sqe = sqe; - - /** - * Run through the queue until the last item - * and take not of it - */ - while (sqe != NULL) { - last_sqe = sqe; - sqe = rtio_spsc_next(r->sq, sqe); - } - - /* Note the last sqe for the next submit call */ - exc->last_sqe = last_sqe; + /* Prepare tasks to run, they start in a suspended state */ + conex_prepare(r, exc); /* Resume all suspended tasks */ conex_resume(r, exc); @@ -202,14 +208,9 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result) /* Interrupt may occur in spsc_acquire, breaking the contract * so spin around it effectively preventing another interrupt on * this core, and another core trying to concurrently work in here. - * - * This can and should be broken up into a few sections with a try - * lock around the sweep and resume. */ key = k_spin_lock(&exc->lock); - rtio_cqe_submit(r, result, sqe->userdata); - /* Determine the task id by memory offset O(1) */ uint16_t task_id = conex_task_id(exc, iodev_sqe); @@ -218,16 +219,19 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result) exc->task_cur[task_id].sqe = next_sqe; rtio_iodev_submit(&exc->task_cur[task_id]); - } else { exc->task_status[task_id] |= CONEX_TASK_COMPLETE; } + bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; + + while (transaction) { + sqe = rtio_spsc_next(r->sq, sqe); + transaction = sqe->flags & RTIO_SQE_TRANSACTION; + } + + rtio_cqe_submit(r, result, sqe->userdata); - /* Sweep up unused SQEs and tasks, retry suspended tasks */ - /* TODO Use a try lock here and don't bother doing it if we are already - * doing it elsewhere - */ conex_sweep_resume(r, exc); k_spin_unlock(&exc->lock, key); @@ -238,39 +242,41 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result) */ void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result) { - const struct rtio_sqe *nsqe; k_spinlock_key_t key; struct rtio *r = iodev_sqe->r; const struct rtio_sqe *sqe = iodev_sqe->sqe; struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; + void *userdata = sqe->userdata; + bool chained = sqe->flags & RTIO_SQE_CHAINED; + bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; + uint16_t task_id = conex_task_id(exc, iodev_sqe); /* Another interrupt (and sqe complete) may occur in spsc_acquire, * breaking the contract so spin around it effectively preventing another * interrupt on this core, and another core trying to concurrently work * in here. - * - * This can and should be broken up into a few sections with a try - * lock around the sweep and resume. */ key = k_spin_lock(&exc->lock); - rtio_cqe_submit(r, result, sqe->userdata); - - /* Determine the task id : O(1) */ - uint16_t task_id = conex_task_id(exc, iodev_sqe); + if (!transaction) { + rtio_cqe_submit(r, result, userdata); + } - sqe = iodev_sqe->sqe; + /* While the last sqe was marked as chained or transactional, do more work */ + while (chained | transaction) { + sqe = rtio_spsc_next(r->sq, sqe); + chained = sqe->flags & RTIO_SQE_CHAINED; + transaction = sqe->flags & RTIO_SQE_TRANSACTION; + userdata = sqe->userdata; - /* Fail the remaining sqe's in the chain */ - if (sqe->flags & RTIO_SQE_CHAINED) { - nsqe = rtio_spsc_next(r->sq, sqe); - while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) { - rtio_cqe_submit(r, -ECANCELED, nsqe->userdata); - nsqe = rtio_spsc_next(r->sq, nsqe); + if (!transaction) { + rtio_cqe_submit(r, result, userdata); + } else { + rtio_cqe_submit(r, -ECANCELED, userdata); } } - /* Task is complete (failed) */ + /* Determine the task id : O(1) */ exc->task_status[task_id] |= CONEX_TASK_COMPLETE; conex_sweep_resume(r, exc); diff --git a/subsys/rtio/rtio_executor_simple.c b/subsys/rtio/rtio_executor_simple.c index 47515f237ade3..26645e8511bf7 100644 --- a/subsys/rtio/rtio_executor_simple.c +++ b/subsys/rtio/rtio_executor_simple.c @@ -33,6 +33,31 @@ int rtio_simple_submit(struct rtio *r) struct rtio_sqe *sqe = rtio_spsc_consume(r->sq); + if (sqe == NULL) { + return 0; + } + + /* Some validation on the sqe to ensure no programming errors were + * made so assumptions in ok and err are valid. + */ +#ifdef CONFIG_ASSERT + __ASSERT(sqe != NULL, "Expected a valid sqe on submit call"); + + bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; + bool chained = sqe->flags & RTIO_SQE_CHAINED; + + if (transaction || chained) { + struct rtio_sqe *next = rtio_spsc_next(r->sq, sqe); + + __ASSERT(next != NULL, + "sqe %p flagged as transaction (%d) or chained (%d) without subsequent sqe in queue", + sqe, transaction, chained); + } + __ASSERT(!(chained && transaction), + "sqe %p flagged as both being transaction and chained, only one is allowed", + sqe); +#endif + exc->task.sqe = sqe; exc->task.r = r; @@ -58,24 +83,38 @@ void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result) __ASSERT_NO_MSG(iodev_sqe == &exc->task); #endif + bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; + + while (transaction) { + rtio_spsc_release(r->sq); + sqe = rtio_spsc_consume(r->sq); + __ASSERT_NO_MSG(sqe != NULL); + transaction = sqe->flags & RTIO_SQE_TRANSACTION; + } + void *userdata = sqe->userdata; rtio_spsc_release(r->sq); iodev_sqe->sqe = NULL; + rtio_cqe_submit(r, result, userdata); rtio_simple_submit(r); } /** * @brief Callback from an iodev describing error + * + * Some assumptions are made and should have been validated on rtio_submit + * - a sqe marked as chained or transaction has a next sqe + * - a sqe is marked either chained or transaction but not both */ void rtio_simple_err(struct rtio_iodev_sqe *iodev_sqe, int result) { - const struct rtio_sqe *nsqe; + const struct rtio_sqe *sqe = iodev_sqe->sqe; struct rtio *r = iodev_sqe->r; - const struct rtio_sqe *sqe = iodev_sqe->sqe; void *userdata = sqe->userdata; bool chained = sqe->flags & RTIO_SQE_CHAINED; + bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; #ifdef CONFIG_ASSERT struct rtio_simple_executor *exc = @@ -84,29 +123,27 @@ void rtio_simple_err(struct rtio_iodev_sqe *iodev_sqe, int result) __ASSERT_NO_MSG(iodev_sqe == &exc->task); #endif - rtio_spsc_release(r->sq); iodev_sqe->sqe = NULL; - rtio_cqe_submit(r, result, sqe->userdata); - - if (chained) { - - nsqe = rtio_spsc_consume(r->sq); - while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) { - userdata = nsqe->userdata; - rtio_spsc_release(r->sq); + if (!transaction) { + rtio_cqe_submit(r, result, userdata); + } + while (chained | transaction) { + sqe = rtio_spsc_consume(r->sq); + chained = sqe->flags & RTIO_SQE_CHAINED; + transaction = sqe->flags & RTIO_SQE_TRANSACTION; + userdata = sqe->userdata; + rtio_spsc_release(r->sq); + + if (!transaction) { + rtio_cqe_submit(r, result, userdata); + } else { rtio_cqe_submit(r, -ECANCELED, userdata); - nsqe = rtio_spsc_consume(r->sq); - } - - if (nsqe != NULL) { - - iodev_sqe->sqe = nsqe; - rtio_iodev_submit(iodev_sqe); } + } - } else { - /* Now we can submit the next in the queue if we aren't done */ - rtio_simple_submit(r); + iodev_sqe->sqe = rtio_spsc_consume(r->sq); + if (iodev_sqe->sqe != NULL) { + rtio_iodev_submit(iodev_sqe); } } diff --git a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h index f89ada68e8433..12435dceab7d9 100644 --- a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h +++ b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h @@ -18,6 +18,7 @@ struct rtio_iodev_test_data { /* Currently executing sqe */ struct rtio_iodev_sqe *iodev_sqe; + const struct rtio_sqe *sqe; /* Count of submit calls */ atomic_t submit_count; @@ -43,6 +44,7 @@ static void rtio_iodev_test_next(struct rtio_iodev *iodev) struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q); data->iodev_sqe = next_sqe; + data->sqe = next_sqe->sqe; k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); } @@ -56,10 +58,15 @@ static void rtio_iodev_timer_fn(struct k_timer *tm) struct rtio_iodev_sqe *iodev_sqe = data->iodev_sqe; struct rtio_iodev *iodev = (struct rtio_iodev *)iodev_sqe->sqe->iodev; - /* Complete the request with Ok and a result, clear the current task */ - rtio_iodev_sqe_ok(iodev_sqe, 0); - data->iodev_sqe = NULL; + if (data->sqe->flags & RTIO_SQE_TRANSACTION) { + data->sqe = rtio_spsc_next(data->iodev_sqe->r->sq, data->sqe); + k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); + return; + } + data->iodev_sqe = NULL; + data->sqe = NULL; + rtio_iodev_sqe_ok(iodev_sqe, 0); rtio_iodev_test_next(iodev); } diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c index 5d304e4bd7f5f..ed1652058db99 100644 --- a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c @@ -21,6 +21,9 @@ #include "rtio_iodev_test.h" +/* Repeat tests to ensure they are repeatable */ +#define TEST_REPEATS 4 + RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec_simp); RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4); @@ -63,9 +66,13 @@ void test_rtio_simple_(struct rtio *r) ZTEST(rtio_api, test_rtio_simple) { TC_PRINT("rtio simple simple\n"); - test_rtio_simple_(&r_simple_simp); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_simple_(&r_simple_simp); + } TC_PRINT("rtio simple concurrent\n"); - test_rtio_simple_(&r_simple_con); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_simple_(&r_simple_con); + } } RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec_simp); @@ -128,12 +135,15 @@ ZTEST(rtio_api, test_rtio_chain) } TC_PRINT("rtio chain simple\n"); - test_rtio_chain_(&r_chain_simp); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_chain_(&r_chain_simp); + } TC_PRINT("rtio chain concurrent\n"); - test_rtio_chain_(&r_chain_con); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_chain_(&r_chain_con); + } } - RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec_simp); RTIO_DEFINE(r_multi_simp, (struct rtio_executor *)&multi_exec_simp, 4, 4); @@ -211,8 +221,6 @@ ZTEST(rtio_api, test_rtio_multiple_chains) test_rtio_multiple_chains_(&r_multi_con); } - - #ifdef CONFIG_USERSPACE K_APPMEM_PARTITION_DEFINE(rtio_partition); K_APP_BMEM(rtio_partition) uint8_t syscall_bufs[4]; @@ -287,7 +295,98 @@ ZTEST(rtio_api, test_rtio_syscalls) TC_PRINT("test iodev init\n"); rtio_iodev_test_init(&iodev_test_syscall); TC_PRINT("syscalls from kernel mode\n"); - rtio_syscall_test(NULL, NULL, NULL); + for (int i = 0; i < TEST_REPEATS; i++) { + rtio_syscall_test(NULL, NULL, NULL); + } +} + +RTIO_EXECUTOR_SIMPLE_DEFINE(transaction_exec_simp); +RTIO_DEFINE(r_transaction_simp, (struct rtio_executor *)&transaction_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(transaction_exec_con, 2); +RTIO_DEFINE(r_transaction_con, (struct rtio_executor *)&transaction_exec_con, 4, 4); + +RTIO_IODEV_TEST_DEFINE(iodev_test_transaction0); +RTIO_IODEV_TEST_DEFINE(iodev_test_transaction1); +struct rtio_iodev *iodev_test_transaction[] = {&iodev_test_transaction0, &iodev_test_transaction1}; + +/** + * @brief Test transaction requests + * + * Ensures that we can setup an RTIO context, enqueue a transaction requests, + * and receive completion events in the correct order given the transaction + * flag and multiple devices where serialization isn't guaranteed. + */ +void test_rtio_transaction_(struct rtio *r) +{ + int res; + uintptr_t userdata[2] = {0, 1}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + bool seen[2] = { 0 }; + + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, &iodev_test_transaction0, NULL); + sqe->flags |= RTIO_SQE_TRANSACTION; + + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, NULL, + &userdata[0]); + + + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, &iodev_test_transaction1, NULL); + sqe->flags |= RTIO_SQE_TRANSACTION; + + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, NULL, + &userdata[1]); + + TC_PRINT("submitting userdata 0 %p, userdata 1 %p\n", &userdata[0], &userdata[1]); + res = rtio_submit(r, 2); + TC_PRINT("checking cq, completions available %lu\n", rtio_spsc_consumable(r->cq)); + zassert_ok(res, "Should return ok from rtio_execute"); + zassert_equal(rtio_spsc_consumable(r->cq), 2, "Should have 2 pending completions"); + + for (int i = 0; i < 2; i++) { + TC_PRINT("consume %d\n", i); + cqe = rtio_spsc_consume(r->cq); + zassert_not_null(cqe, "Expected a valid cqe"); + zassert_ok(cqe->result, "Result should be ok"); + uintptr_t idx = *(uintptr_t *)cqe->userdata; + + TC_PRINT("userdata is %p, value %lu\n", cqe->userdata, idx); + zassert(idx == 0 || idx == 1, "idx should be 0 or 1"); + seen[idx] = true; + rtio_spsc_release(r->cq); + } + + zassert_true(seen[0], "Should have seen transaction 0"); + zassert_true(seen[1], "Should have seen transaction 1"); } +ZTEST(rtio_api, test_rtio_transaction) +{ + TC_PRINT("initializing iodev test devices\n"); + + for (int i = 0; i < 2; i++) { + rtio_iodev_test_init(iodev_test_transaction[i]); + } + + TC_PRINT("rtio transaction simple\n"); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_transaction_(&r_transaction_simp); + } + TC_PRINT("rtio transaction concurrent\n"); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_transaction_(&r_transaction_con); + } +} + + + ZTEST_SUITE(rtio_api, NULL, NULL, NULL, NULL, NULL); From 3ccbe51a374eaa6940ea6dfab6730601a70422a7 Mon Sep 17 00:00:00 2001 From: Tom Burdick Date: Fri, 17 Mar 2023 10:59:52 -0500 Subject: [PATCH 5/8] rtio: Properly track last sqe in the queue The pending_sqe logic to track where in the ring queue the concurrent executor had left off was slightly flawed. It didn't account for starting all sqes in the queue and ending back up at the beginning. Instead track the last SQE in the queue, from which the next one in the queue will the one to start next. If we happen to sweep the last known SQE in the queue, reset it to NULL so the next time prepare is called we start at the beginning of the queue again. Signed-off-by: Tom Burdick --- .../zephyr/rtio/rtio_executor_concurrent.h | 4 -- subsys/rtio/rtio_executor_concurrent.c | 48 ++++++++++--------- .../subsys/rtio/rtio_api/src/test_rtio_api.c | 6 ++- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/include/zephyr/rtio/rtio_executor_concurrent.h b/include/zephyr/rtio/rtio_executor_concurrent.h index 9ff5acdde074c..26bda6bfe0a9e 100644 --- a/include/zephyr/rtio/rtio_executor_concurrent.h +++ b/include/zephyr/rtio/rtio_executor_concurrent.h @@ -66,9 +66,6 @@ struct rtio_concurrent_executor { uint16_t task_in, task_out, task_mask; /* First pending sqe to start when a task becomes available */ - struct rtio_sqe *pending_sqe; - - /* Last sqe seen from the most recent submit */ struct rtio_sqe *last_sqe; /* Array of task statuses */ @@ -106,7 +103,6 @@ static const struct rtio_executor_api z_rtio_concurrent_api = { .task_in = 0, \ .task_out = 0, \ .task_mask = (concurrency)-1, \ - .pending_sqe = NULL, \ .last_sqe = NULL, \ .task_status = _task_status_##name, \ .task_cur = _task_cur_##name, \ diff --git a/subsys/rtio/rtio_executor_concurrent.c b/subsys/rtio/rtio_executor_concurrent.c index e5618c830540a..c1140c4d52351 100644 --- a/subsys/rtio/rtio_executor_concurrent.c +++ b/subsys/rtio/rtio_executor_concurrent.c @@ -70,6 +70,10 @@ static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *ex } rtio_spsc_release(r->sq); + + if (sqe == exc->last_sqe) { + exc->last_sqe = NULL; + } } /** @@ -84,7 +88,7 @@ static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc) /* In order sweep up */ for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) { if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_COMPLETE) { - LOG_INF("sweeping oldest task %d", task_id); + LOG_DBG("sweeping oldest task %d", task_id); conex_sweep_task(r, exc); exc->task_out++; } else { @@ -102,38 +106,45 @@ static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc) */ static void conex_prepare(struct rtio *r, struct rtio_concurrent_executor *exc) { - struct rtio_sqe *sqe; + struct rtio_sqe *sqe, *last_sqe; /* If never submitted before peek at the first item * otherwise start back up where the last submit call * left off */ - if (exc->pending_sqe == NULL) { + if (exc->last_sqe == NULL) { + last_sqe = NULL; sqe = rtio_spsc_peek(r->sq); } else { - sqe = exc->pending_sqe; + last_sqe = exc->last_sqe; + sqe = rtio_spsc_next(r->sq, last_sqe); } + LOG_DBG("starting at sqe %p, last %p", sqe, exc->last_sqe); + while (sqe != NULL && conex_task_free(exc)) { /* Get the next free task id */ uint16_t task_idx = conex_task_next(exc); + LOG_DBG("preparing task %d, sqe %p", task_idx, sqe); + /* Setup task */ exc->task_cur[task_idx].sqe = sqe; exc->task_cur[task_idx].r = r; exc->task_status[task_idx] = CONEX_TASK_SUSPENDED; /* Go to the next sqe not in the current chain or transaction */ - while (sqe != NULL && (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION))) { + while (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION)) { sqe = rtio_spsc_next(r->sq, sqe); } /* SQE is the end of the previous chain or transaction so skip it */ + last_sqe = sqe; sqe = rtio_spsc_next(r->sq, sqe); } /* Out of available tasks so remember where we left off to begin again once tasks free up */ - exc->pending_sqe = sqe; + exc->last_sqe = last_sqe; } @@ -148,25 +159,13 @@ static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc) /* In order resume tasks */ for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) { if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) { - LOG_INF("resuming suspended task %d", task_id); + LOG_DBG("resuming suspended task %d", task_id); exc->task_status[task_id & exc->task_mask] &= ~CONEX_TASK_SUSPENDED; rtio_iodev_submit(&exc->task_cur[task_id & exc->task_mask]); } } } -/** - * @brief Sweep, Prepare, and Resume in one go - * - * Called after a completion to continue doing more work if needed. - */ -static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *exc) -{ - conex_sweep(r, exc); - conex_prepare(r, exc); - conex_resume(r, exc); -} - /** * @brief Submit submissions to concurrent executor * @@ -211,6 +210,8 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result) */ key = k_spin_lock(&exc->lock); + LOG_DBG("completed sqe %p", sqe); + /* Determine the task id by memory offset O(1) */ uint16_t task_id = conex_task_id(exc, iodev_sqe); @@ -230,9 +231,10 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result) transaction = sqe->flags & RTIO_SQE_TRANSACTION; } + conex_sweep(r, exc); rtio_cqe_submit(r, result, sqe->userdata); - - conex_sweep_resume(r, exc); + conex_prepare(r, exc); + conex_resume(r, exc); k_spin_unlock(&exc->lock, key); } @@ -279,7 +281,9 @@ void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result) /* Determine the task id : O(1) */ exc->task_status[task_id] |= CONEX_TASK_COMPLETE; - conex_sweep_resume(r, exc); + conex_sweep(r, exc); + conex_prepare(r, exc); + conex_resume(r, exc); k_spin_unlock(&exc->lock, key); } diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c index ed1652058db99..21dce9b880651 100644 --- a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c @@ -95,7 +95,7 @@ struct rtio_iodev *iodev_test_chain[] = {&iodev_test_chain0, &iodev_test_chain1} void test_rtio_chain_(struct rtio *r) { int res; - uintptr_t userdata[4] = {0, 1, 2, 3}; + uint32_t userdata[4] = {0, 1, 2, 3}; struct rtio_sqe *sqe; struct rtio_cqe *cqe; @@ -105,6 +105,7 @@ void test_rtio_chain_(struct rtio *r) rtio_sqe_prep_nop(sqe, iodev_test_chain[i % 2], &userdata[i]); sqe->flags |= RTIO_SQE_CHAINED; + TC_PRINT("produce %d, sqe %p, userdata %d\n", i, sqe, userdata[i]); } /* Clear the last one */ @@ -117,10 +118,11 @@ void test_rtio_chain_(struct rtio *r) zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions"); for (int i = 0; i < 4; i++) { - TC_PRINT("consume %d\n", i); cqe = rtio_spsc_consume(r->cq); zassert_not_null(cqe, "Expected a valid cqe"); + TC_PRINT("consume %d, cqe %p, userdata %d\n", i, cqe, *(uint32_t *)cqe->userdata); zassert_ok(cqe->result, "Result should be ok"); + zassert_equal_ptr(cqe->userdata, &userdata[i], "Expected in order completions"); rtio_spsc_release(r->cq); } From 8f99fbdee676ef22101f1a591c102c66c272f2d6 Mon Sep 17 00:00:00 2001 From: Tom Burdick Date: Mon, 6 Mar 2023 11:21:43 -0600 Subject: [PATCH 6/8] rtio: Make rtio_mpsc.h C++ friendly C++ requires casting void * as the implicit cast isn't enough and the C++ sample app fails to build without duplicating type information here. Do that, and wrap it in extern C allowing the C++ to include rtio_mpsc.h directly or indirectly. Signed-off-by: Tom Burdick --- include/zephyr/rtio/rtio_mpsc.h | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/include/zephyr/rtio/rtio_mpsc.h b/include/zephyr/rtio/rtio_mpsc.h index 77f7fa7801692..f0bc78a1b41d3 100644 --- a/include/zephyr/rtio/rtio_mpsc.h +++ b/include/zephyr/rtio/rtio_mpsc.h @@ -14,6 +14,10 @@ #include #include +#ifdef __cplusplus +extern "C" { +#endif + /** * @brief RTIO Multiple Producer Single Consumer (MPSC) Queue API * @defgroup rtio_mpsc RTIO MPSC API @@ -94,7 +98,7 @@ static inline void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n) atomic_ptr_set(&n->next, NULL); key = arch_irq_lock(); - prev = atomic_ptr_set(&q->head, n); + prev = (struct rtio_mpsc_node *)atomic_ptr_set(&q->head, n); atomic_ptr_set(&prev->next, n); arch_irq_unlock(key); } @@ -109,7 +113,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) { struct rtio_mpsc_node *head; struct rtio_mpsc_node *tail = q->tail; - struct rtio_mpsc_node *next = atomic_ptr_get(&tail->next); + struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)atomic_ptr_get(&tail->next); /* Skip over the stub/sentinel */ if (tail == &q->stub) { @@ -119,7 +123,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) q->tail = next; tail = next; - next = atomic_ptr_get(&next->next); + next = (struct rtio_mpsc_node *)atomic_ptr_get(&next->next); } /* If next is non-NULL then a valid node is found, return it */ @@ -128,7 +132,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) return tail; } - head = atomic_ptr_get(&q->head); + head = (struct rtio_mpsc_node *)atomic_ptr_get(&q->head); /* If next is NULL, and the tail != HEAD then the queue has pending * updates that can't yet be accessed. @@ -139,7 +143,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) rtio_mpsc_push(q, &q->stub); - next = atomic_ptr_get(&tail->next); + next = (struct rtio_mpsc_node *)atomic_ptr_get(&tail->next); if (next != NULL) { q->tail = next; @@ -153,4 +157,9 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) * @} */ +#ifdef __cplusplus +} +#endif + + #endif /* ZEPHYR_RTIO_MPSC_H_ */ From 164757bc3bae36cd97de58a5c15c06412a13e01f Mon Sep 17 00:00:00 2001 From: Tom Burdick Date: Mon, 6 Feb 2023 21:21:16 -0600 Subject: [PATCH 7/8] rtio: Adds rtio_iodev_cancel_all Ability to cancel all pending requests in the queue turns out to be useful and shareable. Signed-off-by: Tom Burdick --- include/zephyr/rtio/rtio.h | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/include/zephyr/rtio/rtio.h b/include/zephyr/rtio/rtio.h index df3dd72e42892..a0a968b331028 100644 --- a/include/zephyr/rtio/rtio.h +++ b/include/zephyr/rtio/rtio.h @@ -588,6 +588,24 @@ static inline void rtio_iodev_sqe_err(struct rtio_iodev_sqe *iodev_sqe, int resu iodev_sqe->r->executor->api->err(iodev_sqe, result); } +/** + * @brief Cancel all requests that are pending for the iodev + * + * @param iodev IODev to cancel all requests for + */ +static inline void rtio_iodev_cancel_all(struct rtio_iodev *iodev) +{ + /* Clear pending requests as -ENODATA */ + struct rtio_mpsc_node *node = rtio_mpsc_pop(&iodev->iodev_sq); + + while (node != NULL) { + struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); + + rtio_iodev_sqe_err(iodev_sqe, -ECANCELED); + node = rtio_mpsc_pop(&iodev->iodev_sq); + } +} + /** * Submit a completion queue event with a given result and userdata * From 5eafe25a4217fc15103eba3b28bb2575fb829a71 Mon Sep 17 00:00:00 2001 From: Tom Burdick Date: Fri, 7 Oct 2022 13:17:13 -0500 Subject: [PATCH 8/8] rtio: Fix consume semaphore usage When the consume semaphore is enabled always give and take from the semaphore. It's expected that rtio_cqe_consume and rtio_cqe_consume_block will be used exclusively rather than directly poking at the SPSC queues. Signed-off-by: Tom Burdick --- include/zephyr/rtio/rtio.h | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/include/zephyr/rtio/rtio.h b/include/zephyr/rtio/rtio.h index a0a968b331028..79984e555ed31 100644 --- a/include/zephyr/rtio/rtio.h +++ b/include/zephyr/rtio/rtio.h @@ -421,7 +421,7 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe, IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, \ (static K_SEM_DEFINE(_submit_sem_##name, 0, K_SEM_MAX_LIMIT))) \ IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, \ - (static K_SEM_DEFINE(_consume_sem_##name, 0, 1))) \ + (static K_SEM_DEFINE(_consume_sem_##name, 0, K_SEM_MAX_LIMIT))) \ static RTIO_SQ_DEFINE(_sq_##name, sq_sz); \ static RTIO_CQ_DEFINE(_cq_##name, cq_sz); \ STRUCT_SECTION_ITERABLE(rtio, name) = { \ @@ -515,7 +515,15 @@ static inline void rtio_sqe_drop_all(struct rtio *r) */ static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r) { +#ifdef CONFIG_RTIO_CONSUME_SEM + if (k_sem_take(r->consume_sem, K_NO_WAIT) == 0) { + return rtio_spsc_consume(r->cq); + } else { + return NULL; + } +#else return rtio_spsc_consume(r->cq); +#endif } /** @@ -532,21 +540,18 @@ static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r) { struct rtio_cqe *cqe; - /* TODO is there a better way? reset this in submit? */ #ifdef CONFIG_RTIO_CONSUME_SEM - k_sem_reset(r->consume_sem); -#endif + k_sem_take(r->consume_sem, K_FOREVER); + + cqe = rtio_spsc_consume(r->cq); +#else cqe = rtio_spsc_consume(r->cq); while (cqe == NULL) { cqe = rtio_spsc_consume(r->cq); -#ifdef CONFIG_RTIO_CONSUME_SEM - k_sem_take(r->consume_sem, K_FOREVER); -#else - k_yield(); -#endif } +#endif return cqe; }