diff --git a/include/zephyr/rtio/rtio.h b/include/zephyr/rtio/rtio.h index 516072a9e204f..79984e555ed31 100644 --- a/include/zephyr/rtio/rtio.h +++ b/include/zephyr/rtio/rtio.h @@ -97,9 +97,25 @@ struct rtio_iodev; /** * @brief The next request in the queue should wait on this one. + * + * Chained SQEs are individual units of work describing patterns of + * ordering and failure cascading. A chained SQE must be started only + * after the one before it. They are given to the iodevs one after another. */ #define RTIO_SQE_CHAINED BIT(0) +/** + * @brief The next request in the queue is part of a transaction. + * + * Transactional SQEs are sequential parts of a unit of work. + * Only the first transactional SQE is submitted to an iodev, the + * remaining SQEs are never individually submitted but instead considered + * to be part of the transaction to the single iodev. The first sqe in the + * sequence holds the iodev that will be used and the last holds the userdata + * that will be returned in a single completion on failure/success. + */ +#define RTIO_SQE_TRANSACTION BIT(1) + /** * @} */ @@ -318,6 +334,7 @@ static inline void rtio_sqe_prep_nop(struct rtio_sqe *sqe, void *userdata) { sqe->op = RTIO_OP_NOP; + sqe->flags = 0; sqe->iodev = iodev; sqe->userdata = userdata; } @@ -334,6 +351,7 @@ static inline void rtio_sqe_prep_read(struct rtio_sqe *sqe, { sqe->op = RTIO_OP_RX; sqe->prio = prio; + sqe->flags = 0; sqe->iodev = iodev; sqe->buf_len = len; sqe->buf = buf; @@ -352,6 +370,7 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe, { sqe->op = RTIO_OP_TX; sqe->prio = prio; + sqe->flags = 0; sqe->iodev = iodev; sqe->buf_len = len; sqe->buf = buf; @@ -402,7 +421,7 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe, IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, \ (static K_SEM_DEFINE(_submit_sem_##name, 0, K_SEM_MAX_LIMIT))) \ IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, \ - (static K_SEM_DEFINE(_consume_sem_##name, 0, 1))) \ + (static K_SEM_DEFINE(_consume_sem_##name, 0, K_SEM_MAX_LIMIT))) \ static RTIO_SQ_DEFINE(_sq_##name, sq_sz); \ static RTIO_CQ_DEFINE(_cq_##name, cq_sz); \ STRUCT_SECTION_ITERABLE(rtio, name) = { \ @@ -496,7 +515,15 @@ static inline void rtio_sqe_drop_all(struct rtio *r) */ static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r) { +#ifdef CONFIG_RTIO_CONSUME_SEM + if (k_sem_take(r->consume_sem, K_NO_WAIT) == 0) { + return rtio_spsc_consume(r->cq); + } else { + return NULL; + } +#else return rtio_spsc_consume(r->cq); +#endif } /** @@ -513,21 +540,18 @@ static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r) { struct rtio_cqe *cqe; - /* TODO is there a better way? reset this in submit? */ #ifdef CONFIG_RTIO_CONSUME_SEM - k_sem_reset(r->consume_sem); -#endif + k_sem_take(r->consume_sem, K_FOREVER); + + cqe = rtio_spsc_consume(r->cq); +#else cqe = rtio_spsc_consume(r->cq); while (cqe == NULL) { cqe = rtio_spsc_consume(r->cq); -#ifdef CONFIG_RTIO_CONSUME_SEM - k_sem_take(r->consume_sem, K_FOREVER); -#else - k_yield(); -#endif } +#endif return cqe; } @@ -569,6 +593,24 @@ static inline void rtio_iodev_sqe_err(struct rtio_iodev_sqe *iodev_sqe, int resu iodev_sqe->r->executor->api->err(iodev_sqe, result); } +/** + * @brief Cancel all requests that are pending for the iodev + * + * @param iodev IODev to cancel all requests for + */ +static inline void rtio_iodev_cancel_all(struct rtio_iodev *iodev) +{ + /* Clear pending requests as -ENODATA */ + struct rtio_mpsc_node *node = rtio_mpsc_pop(&iodev->iodev_sq); + + while (node != NULL) { + struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); + + rtio_iodev_sqe_err(iodev_sqe, -ECANCELED); + node = rtio_mpsc_pop(&iodev->iodev_sq); + } +} + /** * Submit a completion queue event with a given result and userdata * diff --git a/include/zephyr/rtio/rtio_executor_concurrent.h b/include/zephyr/rtio/rtio_executor_concurrent.h index 9ff5acdde074c..26bda6bfe0a9e 100644 --- a/include/zephyr/rtio/rtio_executor_concurrent.h +++ b/include/zephyr/rtio/rtio_executor_concurrent.h @@ -66,9 +66,6 @@ struct rtio_concurrent_executor { uint16_t task_in, task_out, task_mask; /* First pending sqe to start when a task becomes available */ - struct rtio_sqe *pending_sqe; - - /* Last sqe seen from the most recent submit */ struct rtio_sqe *last_sqe; /* Array of task statuses */ @@ -106,7 +103,6 @@ static const struct rtio_executor_api z_rtio_concurrent_api = { .task_in = 0, \ .task_out = 0, \ .task_mask = (concurrency)-1, \ - .pending_sqe = NULL, \ .last_sqe = NULL, \ .task_status = _task_status_##name, \ .task_cur = _task_cur_##name, \ diff --git a/include/zephyr/rtio/rtio_mpsc.h b/include/zephyr/rtio/rtio_mpsc.h index 77f7fa7801692..f0bc78a1b41d3 100644 --- a/include/zephyr/rtio/rtio_mpsc.h +++ b/include/zephyr/rtio/rtio_mpsc.h @@ -14,6 +14,10 @@ #include #include +#ifdef __cplusplus +extern "C" { +#endif + /** * @brief RTIO Multiple Producer Single Consumer (MPSC) Queue API * @defgroup rtio_mpsc RTIO MPSC API @@ -94,7 +98,7 @@ static inline void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n) atomic_ptr_set(&n->next, NULL); key = arch_irq_lock(); - prev = atomic_ptr_set(&q->head, n); + prev = (struct rtio_mpsc_node *)atomic_ptr_set(&q->head, n); atomic_ptr_set(&prev->next, n); arch_irq_unlock(key); } @@ -109,7 +113,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) { struct rtio_mpsc_node *head; struct rtio_mpsc_node *tail = q->tail; - struct rtio_mpsc_node *next = atomic_ptr_get(&tail->next); + struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)atomic_ptr_get(&tail->next); /* Skip over the stub/sentinel */ if (tail == &q->stub) { @@ -119,7 +123,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) q->tail = next; tail = next; - next = atomic_ptr_get(&next->next); + next = (struct rtio_mpsc_node *)atomic_ptr_get(&next->next); } /* If next is non-NULL then a valid node is found, return it */ @@ -128,7 +132,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) return tail; } - head = atomic_ptr_get(&q->head); + head = (struct rtio_mpsc_node *)atomic_ptr_get(&q->head); /* If next is NULL, and the tail != HEAD then the queue has pending * updates that can't yet be accessed. @@ -139,7 +143,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) rtio_mpsc_push(q, &q->stub); - next = atomic_ptr_get(&tail->next); + next = (struct rtio_mpsc_node *)atomic_ptr_get(&tail->next); if (next != NULL) { q->tail = next; @@ -153,4 +157,9 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) * @} */ +#ifdef __cplusplus +} +#endif + + #endif /* ZEPHYR_RTIO_MPSC_H_ */ diff --git a/subsys/rtio/rtio_executor_concurrent.c b/subsys/rtio/rtio_executor_concurrent.c index ae6c361f0018e..c1140c4d52351 100644 --- a/subsys/rtio/rtio_executor_concurrent.c +++ b/subsys/rtio/rtio_executor_concurrent.c @@ -49,7 +49,7 @@ static uint16_t conex_task_next(struct rtio_concurrent_executor *exc) uint16_t task_id = exc->task_in; exc->task_in++; - return task_id; + return task_id & exc->task_mask; } static inline uint16_t conex_task_id(struct rtio_concurrent_executor *exc, @@ -64,20 +64,31 @@ static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *ex { struct rtio_sqe *sqe = rtio_spsc_consume(r->sq); - while (sqe != NULL && sqe->flags & RTIO_SQE_CHAINED) { + while (sqe != NULL && (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION))) { rtio_spsc_release(r->sq); sqe = rtio_spsc_consume(r->sq); } rtio_spsc_release(r->sq); + + if (sqe == exc->last_sqe) { + exc->last_sqe = NULL; + } } +/** + * @brief Sweep like a GC of sorts old tasks that are completed in order + * + * Will only sweep tasks in the order they arrived in the submission queue. + * Meaning there might be completed tasks that could be freed but are not yet + * because something before it has not yet completed. + */ static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc) { /* In order sweep up */ for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) { if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_COMPLETE) { - LOG_INF("sweeping oldest task %d", task_id); + LOG_DBG("sweeping oldest task %d", task_id); conex_sweep_task(r, exc); exc->task_out++; } else { @@ -86,99 +97,93 @@ static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc) } } -static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc) -{ - /* In order resume tasks */ - for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) { - if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) { - LOG_INF("resuming suspended task %d", task_id); - exc->task_status[task_id] &= ~CONEX_TASK_SUSPENDED; - rtio_iodev_submit(&exc->task_cur[task_id]); - } - } -} - -static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *exc) -{ - conex_sweep(r, exc); - conex_resume(r, exc); -} - /** - * @brief Submit submissions to concurrent executor + * @brief Prepare tasks to run by iterating through the submission queue * - * @param r RTIO context - * - * @retval 0 Always succeeds + * For each submission in the queue that begins a chain or transaction + * start a task if possible. Concurrency is limited by the allocated concurrency + * per executor instance. */ -int rtio_concurrent_submit(struct rtio *r) +static void conex_prepare(struct rtio *r, struct rtio_concurrent_executor *exc) { - - LOG_INF("submit"); - - struct rtio_concurrent_executor *exc = - (struct rtio_concurrent_executor *)r->executor; - struct rtio_sqe *sqe; - struct rtio_sqe *last_sqe; - k_spinlock_key_t key; - - key = k_spin_lock(&exc->lock); + struct rtio_sqe *sqe, *last_sqe; /* If never submitted before peek at the first item * otherwise start back up where the last submit call * left off */ if (exc->last_sqe == NULL) { + last_sqe = NULL; sqe = rtio_spsc_peek(r->sq); } else { - /* Pickup from last submit call */ - sqe = rtio_spsc_next(r->sq, exc->last_sqe); + last_sqe = exc->last_sqe; + sqe = rtio_spsc_next(r->sq, last_sqe); } - last_sqe = sqe; - while (sqe != NULL && conex_task_free(exc)) { - LOG_INF("head SQE in chain %p", sqe); + LOG_DBG("starting at sqe %p, last %p", sqe, exc->last_sqe); - /* Get the next task id if one exists */ + while (sqe != NULL && conex_task_free(exc)) { + /* Get the next free task id */ uint16_t task_idx = conex_task_next(exc); - LOG_INF("setting up task %d", task_idx); + LOG_DBG("preparing task %d, sqe %p", task_idx, sqe); - /* Setup task (yes this is it) */ + /* Setup task */ exc->task_cur[task_idx].sqe = sqe; exc->task_cur[task_idx].r = r; exc->task_status[task_idx] = CONEX_TASK_SUSPENDED; - LOG_INF("submitted sqe %p", sqe); - /* Go to the next sqe not in the current chain */ - while (sqe != NULL && (sqe->flags & RTIO_SQE_CHAINED)) { + /* Go to the next sqe not in the current chain or transaction */ + while (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION)) { sqe = rtio_spsc_next(r->sq, sqe); } - LOG_INF("tail SQE in chain %p", sqe); - + /* SQE is the end of the previous chain or transaction so skip it */ last_sqe = sqe; - - /* SQE is the end of the previous chain */ sqe = rtio_spsc_next(r->sq, sqe); } - /* Out of available pointers, wait til others complete, note the - * first pending submission queue. May be NULL if nothing is pending. - */ - exc->pending_sqe = sqe; + /* Out of available tasks so remember where we left off to begin again once tasks free up */ + exc->last_sqe = last_sqe; +} - /** - * Run through the queue until the last item - * and take not of it - */ - while (sqe != NULL) { - last_sqe = sqe; - sqe = rtio_spsc_next(r->sq, sqe); + +/** + * @brief Resume tasks that are suspended + * + * All tasks begin as suspended tasks. This kicks them off to the submissions + * associated iodev. + */ +static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc) +{ + /* In order resume tasks */ + for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) { + if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) { + LOG_DBG("resuming suspended task %d", task_id); + exc->task_status[task_id & exc->task_mask] &= ~CONEX_TASK_SUSPENDED; + rtio_iodev_submit(&exc->task_cur[task_id & exc->task_mask]); + } } +} - /* Note the last sqe for the next submit call */ - exc->last_sqe = last_sqe; +/** + * @brief Submit submissions to concurrent executor + * + * @param r RTIO context + * + * @retval 0 Always succeeds + */ +int rtio_concurrent_submit(struct rtio *r) +{ + + struct rtio_concurrent_executor *exc = + (struct rtio_concurrent_executor *)r->executor; + k_spinlock_key_t key; + + key = k_spin_lock(&exc->lock); + + /* Prepare tasks to run, they start in a suspended state */ + conex_prepare(r, exc); /* Resume all suspended tasks */ conex_resume(r, exc); @@ -202,13 +207,10 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result) /* Interrupt may occur in spsc_acquire, breaking the contract * so spin around it effectively preventing another interrupt on * this core, and another core trying to concurrently work in here. - * - * This can and should be broken up into a few sections with a try - * lock around the sweep and resume. */ key = k_spin_lock(&exc->lock); - rtio_cqe_submit(r, result, sqe->userdata); + LOG_DBG("completed sqe %p", sqe); /* Determine the task id by memory offset O(1) */ uint16_t task_id = conex_task_id(exc, iodev_sqe); @@ -218,17 +220,21 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result) exc->task_cur[task_id].sqe = next_sqe; rtio_iodev_submit(&exc->task_cur[task_id]); - } else { exc->task_status[task_id] |= CONEX_TASK_COMPLETE; } + bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; - /* Sweep up unused SQEs and tasks, retry suspended tasks */ - /* TODO Use a try lock here and don't bother doing it if we are already - * doing it elsewhere - */ - conex_sweep_resume(r, exc); + while (transaction) { + sqe = rtio_spsc_next(r->sq, sqe); + transaction = sqe->flags & RTIO_SQE_TRANSACTION; + } + + conex_sweep(r, exc); + rtio_cqe_submit(r, result, sqe->userdata); + conex_prepare(r, exc); + conex_resume(r, exc); k_spin_unlock(&exc->lock, key); } @@ -238,42 +244,46 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result) */ void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result) { - const struct rtio_sqe *nsqe; k_spinlock_key_t key; struct rtio *r = iodev_sqe->r; const struct rtio_sqe *sqe = iodev_sqe->sqe; struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; + void *userdata = sqe->userdata; + bool chained = sqe->flags & RTIO_SQE_CHAINED; + bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; + uint16_t task_id = conex_task_id(exc, iodev_sqe); /* Another interrupt (and sqe complete) may occur in spsc_acquire, * breaking the contract so spin around it effectively preventing another * interrupt on this core, and another core trying to concurrently work * in here. - * - * This can and should be broken up into a few sections with a try - * lock around the sweep and resume. */ key = k_spin_lock(&exc->lock); - rtio_cqe_submit(r, result, sqe->userdata); - - /* Determine the task id : O(1) */ - uint16_t task_id = conex_task_id(exc, iodev_sqe); + if (!transaction) { + rtio_cqe_submit(r, result, userdata); + } - sqe = iodev_sqe->sqe; + /* While the last sqe was marked as chained or transactional, do more work */ + while (chained | transaction) { + sqe = rtio_spsc_next(r->sq, sqe); + chained = sqe->flags & RTIO_SQE_CHAINED; + transaction = sqe->flags & RTIO_SQE_TRANSACTION; + userdata = sqe->userdata; - /* Fail the remaining sqe's in the chain */ - if (sqe->flags & RTIO_SQE_CHAINED) { - nsqe = rtio_spsc_next(r->sq, sqe); - while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) { - rtio_cqe_submit(r, -ECANCELED, nsqe->userdata); - nsqe = rtio_spsc_next(r->sq, nsqe); + if (!transaction) { + rtio_cqe_submit(r, result, userdata); + } else { + rtio_cqe_submit(r, -ECANCELED, userdata); } } - /* Task is complete (failed) */ + /* Determine the task id : O(1) */ exc->task_status[task_id] |= CONEX_TASK_COMPLETE; - conex_sweep_resume(r, exc); + conex_sweep(r, exc); + conex_prepare(r, exc); + conex_resume(r, exc); k_spin_unlock(&exc->lock, key); } diff --git a/subsys/rtio/rtio_executor_simple.c b/subsys/rtio/rtio_executor_simple.c index 47515f237ade3..26645e8511bf7 100644 --- a/subsys/rtio/rtio_executor_simple.c +++ b/subsys/rtio/rtio_executor_simple.c @@ -33,6 +33,31 @@ int rtio_simple_submit(struct rtio *r) struct rtio_sqe *sqe = rtio_spsc_consume(r->sq); + if (sqe == NULL) { + return 0; + } + + /* Some validation on the sqe to ensure no programming errors were + * made so assumptions in ok and err are valid. + */ +#ifdef CONFIG_ASSERT + __ASSERT(sqe != NULL, "Expected a valid sqe on submit call"); + + bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; + bool chained = sqe->flags & RTIO_SQE_CHAINED; + + if (transaction || chained) { + struct rtio_sqe *next = rtio_spsc_next(r->sq, sqe); + + __ASSERT(next != NULL, + "sqe %p flagged as transaction (%d) or chained (%d) without subsequent sqe in queue", + sqe, transaction, chained); + } + __ASSERT(!(chained && transaction), + "sqe %p flagged as both being transaction and chained, only one is allowed", + sqe); +#endif + exc->task.sqe = sqe; exc->task.r = r; @@ -58,24 +83,38 @@ void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result) __ASSERT_NO_MSG(iodev_sqe == &exc->task); #endif + bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; + + while (transaction) { + rtio_spsc_release(r->sq); + sqe = rtio_spsc_consume(r->sq); + __ASSERT_NO_MSG(sqe != NULL); + transaction = sqe->flags & RTIO_SQE_TRANSACTION; + } + void *userdata = sqe->userdata; rtio_spsc_release(r->sq); iodev_sqe->sqe = NULL; + rtio_cqe_submit(r, result, userdata); rtio_simple_submit(r); } /** * @brief Callback from an iodev describing error + * + * Some assumptions are made and should have been validated on rtio_submit + * - a sqe marked as chained or transaction has a next sqe + * - a sqe is marked either chained or transaction but not both */ void rtio_simple_err(struct rtio_iodev_sqe *iodev_sqe, int result) { - const struct rtio_sqe *nsqe; + const struct rtio_sqe *sqe = iodev_sqe->sqe; struct rtio *r = iodev_sqe->r; - const struct rtio_sqe *sqe = iodev_sqe->sqe; void *userdata = sqe->userdata; bool chained = sqe->flags & RTIO_SQE_CHAINED; + bool transaction = sqe->flags & RTIO_SQE_TRANSACTION; #ifdef CONFIG_ASSERT struct rtio_simple_executor *exc = @@ -84,29 +123,27 @@ void rtio_simple_err(struct rtio_iodev_sqe *iodev_sqe, int result) __ASSERT_NO_MSG(iodev_sqe == &exc->task); #endif - rtio_spsc_release(r->sq); iodev_sqe->sqe = NULL; - rtio_cqe_submit(r, result, sqe->userdata); - - if (chained) { - - nsqe = rtio_spsc_consume(r->sq); - while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) { - userdata = nsqe->userdata; - rtio_spsc_release(r->sq); + if (!transaction) { + rtio_cqe_submit(r, result, userdata); + } + while (chained | transaction) { + sqe = rtio_spsc_consume(r->sq); + chained = sqe->flags & RTIO_SQE_CHAINED; + transaction = sqe->flags & RTIO_SQE_TRANSACTION; + userdata = sqe->userdata; + rtio_spsc_release(r->sq); + + if (!transaction) { + rtio_cqe_submit(r, result, userdata); + } else { rtio_cqe_submit(r, -ECANCELED, userdata); - nsqe = rtio_spsc_consume(r->sq); - } - - if (nsqe != NULL) { - - iodev_sqe->sqe = nsqe; - rtio_iodev_submit(iodev_sqe); } + } - } else { - /* Now we can submit the next in the queue if we aren't done */ - rtio_simple_submit(r); + iodev_sqe->sqe = rtio_spsc_consume(r->sq); + if (iodev_sqe->sqe != NULL) { + rtio_iodev_submit(iodev_sqe); } } diff --git a/tests/subsys/rtio/rtio_api/CMakeLists.txt b/tests/subsys/rtio/rtio_api/CMakeLists.txt index 68fd00d94bfc8..3ece5f0c51e2d 100644 --- a/tests/subsys/rtio/rtio_api/CMakeLists.txt +++ b/tests/subsys/rtio/rtio_api/CMakeLists.txt @@ -5,7 +5,7 @@ cmake_minimum_required(VERSION 3.20.0) find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) project(rtio_api_test) -target_sources(app PRIVATE src/main.c) +target_sources(app PRIVATE src/test_rtio_spsc.c src/test_rtio_mpsc.c src/test_rtio_api.c) target_include_directories(app PRIVATE ${ZEPHYR_BASE}/include diff --git a/tests/subsys/rtio/rtio_api/src/main.c b/tests/subsys/rtio/rtio_api/src/main.c deleted file mode 100644 index 035acf3b10734..0000000000000 --- a/tests/subsys/rtio/rtio_api/src/main.c +++ /dev/null @@ -1,676 +0,0 @@ -/* - * Copyright (c) 2021 Intel Corporation. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "rtio_iodev_test.h" - -/* - * @brief Produce and Consume a single uint32_t in the same execution context - * - * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() - * - * @ingroup rtio_tests - */ -ZTEST(rtio_spsc, test_produce_consume_size1) -{ - RTIO_SPSC_DEFINE(ezspsc, uint32_t, 1); - - const uint32_t magic = 43219876; - - uint32_t *acq = rtio_spsc_acquire(&ezspsc); - - zassert_not_null(acq, "Acquire should succeed"); - - *acq = magic; - - uint32_t *acq2 = rtio_spsc_acquire(&ezspsc); - - zassert_is_null(acq2, "Acquire should fail"); - - uint32_t *cons = rtio_spsc_consume(&ezspsc); - - zassert_is_null(cons, "Consume should fail"); - - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); - - rtio_spsc_produce(&ezspsc); - - zassert_equal(rtio_spsc_consumable(&ezspsc), 1, "Consumables should be 1"); - - uint32_t *cons2 = rtio_spsc_consume(&ezspsc); - - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); - - zassert_not_null(cons2, "Consume should not fail"); - zassert_equal(*cons2, magic, "Consume value should equal magic"); - - uint32_t *cons3 = rtio_spsc_consume(&ezspsc); - - zassert_is_null(cons3, "Consume should fail"); - - - uint32_t *acq3 = rtio_spsc_acquire(&ezspsc); - - zassert_is_null(acq3, "Acquire should not succeed"); - - rtio_spsc_release(&ezspsc); - - uint32_t *acq4 = rtio_spsc_acquire(&ezspsc); - - zassert_not_null(acq4, "Acquire should succeed"); -} - -/*&* - * @brief Produce and Consume 3 items at a time in a spsc of size 4 to validate masking - * and wrap around reads/writes. - * - * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() - * - * @ingroup rtio_tests - */ -ZTEST(rtio_spsc, test_produce_consume_wrap_around) -{ - RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); - - for (int i = 0; i < 10; i++) { - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); - for (int j = 0; j < 3; j++) { - uint32_t *entry = rtio_spsc_acquire(&ezspsc); - - zassert_not_null(entry, "Acquire should succeed"); - *entry = i * 3 + j; - rtio_spsc_produce(&ezspsc); - } - zassert_equal(rtio_spsc_consumable(&ezspsc), 3, "Consumables should be 3"); - - for (int k = 0; k < 3; k++) { - uint32_t *entry = rtio_spsc_consume(&ezspsc); - - zassert_not_null(entry, "Consume should succeed"); - zassert_equal(*entry, i * 3 + k, "Consume value should equal i*3+k"); - rtio_spsc_release(&ezspsc); - } - - zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); - - } -} - -/** - * @brief Ensure that integer wraps continue to work. - * - * Done by setting all values to UINTPTR_MAX - 2 and writing and reading enough - * to ensure integer wraps occur. - */ -ZTEST(rtio_spsc, test_int_wrap_around) -{ - RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); - ezspsc._spsc.in = ATOMIC_INIT(UINTPTR_MAX - 2); - ezspsc._spsc.out = ATOMIC_INIT(UINTPTR_MAX - 2); - - for (int j = 0; j < 3; j++) { - uint32_t *entry = rtio_spsc_acquire(&ezspsc); - - zassert_not_null(entry, "Acquire should succeed"); - *entry = j; - rtio_spsc_produce(&ezspsc); - } - - zassert_equal(atomic_get(&ezspsc._spsc.in), UINTPTR_MAX + 1, "Spsc in should wrap"); - - for (int k = 0; k < 3; k++) { - uint32_t *entry = rtio_spsc_consume(&ezspsc); - - zassert_not_null(entry, "Consume should succeed"); - zassert_equal(*entry, k, "Consume value should equal i*3+k"); - rtio_spsc_release(&ezspsc); - } - - zassert_equal(atomic_get(&ezspsc._spsc.out), UINTPTR_MAX + 1, "Spsc out should wrap"); -} - -#define MAX_RETRIES 5 -#define SMP_ITERATIONS 100 - -RTIO_SPSC_DEFINE(spsc, uint32_t, 4); - -static void t1_consume(void *p1, void *p2, void *p3) -{ - struct rtio_spsc_spsc *ezspsc = p1; - uint32_t retries = 0; - uint32_t *val = NULL; - - for (int i = 0; i < SMP_ITERATIONS; i++) { - val = NULL; - retries = 0; - while (val == NULL && retries < MAX_RETRIES) { - val = rtio_spsc_consume(ezspsc); - retries++; - } - if (val != NULL) { - rtio_spsc_release(ezspsc); - } else { - k_yield(); - } - } -} - -static void t2_produce(void *p1, void *p2, void *p3) -{ - struct rtio_spsc_spsc *ezspsc = p1; - uint32_t retries = 0; - uint32_t *val = NULL; - - for (int i = 0; i < SMP_ITERATIONS; i++) { - val = NULL; - retries = 0; - while (val == NULL && retries < MAX_RETRIES) { - val = rtio_spsc_acquire(ezspsc); - retries++; - } - if (val != NULL) { - *val = SMP_ITERATIONS; - rtio_spsc_produce(ezspsc); - } else { - k_yield(); - } - } -} - -#define STACK_SIZE (384 + CONFIG_TEST_EXTRA_STACK_SIZE) -#define THREADS_NUM 2 - -struct thread_info { - k_tid_t tid; - int executed; - int priority; - int cpu_id; -}; -static struct thread_info tinfo[THREADS_NUM]; -static struct k_thread tthread[THREADS_NUM]; -static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE); - - -/** - * @brief Test that the producer and consumer are indeed thread safe - * - * This can and should be validated on SMP machines where incoherent - * memory could cause issues. - */ -ZTEST(rtio_spsc, test_spsc_threaded) -{ - - tinfo[0].tid = - k_thread_create(&tthread[0], tstack[0], STACK_SIZE, - (k_thread_entry_t)t1_consume, - &spsc, NULL, NULL, - K_PRIO_PREEMPT(5), - K_INHERIT_PERMS, K_NO_WAIT); - tinfo[1].tid = - k_thread_create(&tthread[1], tstack[1], STACK_SIZE, - (k_thread_entry_t)t2_produce, - &spsc, NULL, NULL, - K_PRIO_PREEMPT(5), - K_INHERIT_PERMS, K_NO_WAIT); - - k_thread_join(tinfo[1].tid, K_FOREVER); - k_thread_join(tinfo[0].tid, K_FOREVER); -} - -static struct rtio_mpsc push_pop_q; -static struct rtio_mpsc_node push_pop_nodes[2]; - -/* - * @brief Push and pop one element - * - * @see rtio_mpsc_push(), rtio_mpsc_pop() - * - * @ingroup rtio_tests - */ -ZTEST(rtio_mpsc, test_push_pop) -{ - - struct rtio_mpsc_node *node, *head, *stub, *next, *tail; - - rtio_mpsc_init(&push_pop_q); - - head = atomic_ptr_get(&push_pop_q.head); - tail = push_pop_q.tail; - stub = &push_pop_q.stub; - next = atomic_ptr_get(&stub->next); - - zassert_equal(head, stub, "Head should point at stub"); - zassert_equal(tail, stub, "Tail should point at stub"); - zassert_is_null(next, "Next should be null"); - - node = rtio_mpsc_pop(&push_pop_q); - zassert_is_null(node, "Pop on empty queue should return null"); - - rtio_mpsc_push(&push_pop_q, &push_pop_nodes[0]); - - head = atomic_ptr_get(&push_pop_q.head); - - zassert_equal(head, &push_pop_nodes[0], "Queue head should point at push_pop_node"); - next = atomic_ptr_get(&push_pop_nodes[0].next); - zassert_is_null(next, NULL, "push_pop_node next should point at null"); - next = atomic_ptr_get(&push_pop_q.stub.next); - zassert_equal(next, &push_pop_nodes[0], "Queue stub should point at push_pop_node"); - tail = push_pop_q.tail; - stub = &push_pop_q.stub; - zassert_equal(tail, stub, "Tail should point at stub"); - - node = rtio_mpsc_pop(&push_pop_q); - stub = &push_pop_q.stub; - - zassert_not_equal(node, stub, "Pop should not return stub"); - zassert_not_null(node, "Pop should not return null"); - zassert_equal(node, &push_pop_nodes[0], - "Pop should return push_pop_node %p, instead was %p", - &push_pop_nodes[0], node); - - node = rtio_mpsc_pop(&push_pop_q); - zassert_is_null(node, "Pop on empty queue should return null"); -} - -#define MPSC_FREEQ_SZ 8 -#define MPSC_ITERATIONS 100000 -#define MPSC_STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE) -#define MPSC_THREADS_NUM 4 - -static struct thread_info mpsc_tinfo[MPSC_THREADS_NUM]; -static struct k_thread mpsc_thread[MPSC_THREADS_NUM]; -static K_THREAD_STACK_ARRAY_DEFINE(mpsc_stack, MPSC_THREADS_NUM, MPSC_STACK_SIZE); - -struct mpsc_node { - uint32_t id; - struct rtio_mpsc_node n; -}; - - -RTIO_SPSC_DECLARE(node_sq, struct mpsc_node, MPSC_FREEQ_SZ); - -#define SPSC_INIT(n, sz) RTIO_SPSC_INITIALIZER(sz) - -struct rtio_spsc_node_sq node_q[MPSC_THREADS_NUM] = { - LISTIFY(MPSC_THREADS_NUM, SPSC_INIT, (,), MPSC_FREEQ_SZ) -}; - -static struct rtio_mpsc mpsc_q; - -static void mpsc_consumer(void *p1, void *p2, void *p3) -{ - struct rtio_mpsc_node *n; - struct mpsc_node *nn; - - for (int i = 0; i < (MPSC_ITERATIONS)*(MPSC_THREADS_NUM - 1); i++) { - do { - n = rtio_mpsc_pop(&mpsc_q); - if (n == NULL) { - k_yield(); - } - } while (n == NULL); - - zassert_not_equal(n, &mpsc_q.stub, "mpsc should not produce stub"); - - nn = CONTAINER_OF(n, struct mpsc_node, n); - - rtio_spsc_acquire(&node_q[nn->id]); - rtio_spsc_produce(&node_q[nn->id]); - } -} - -static void mpsc_producer(void *p1, void *p2, void *p3) -{ - struct mpsc_node *n; - uint32_t id = (uint32_t)(uintptr_t)p1; - - for (int i = 0; i < MPSC_ITERATIONS; i++) { - do { - n = rtio_spsc_consume(&node_q[id]); - if (n == NULL) { - k_yield(); - } - } while (n == NULL); - - rtio_spsc_release(&node_q[id]); - n->id = id; - rtio_mpsc_push(&mpsc_q, &n->n); - } -} - -/** - * @brief Test that the producer and consumer are indeed thread safe - * - * This can and should be validated on SMP machines where incoherent - * memory could cause issues. - */ -ZTEST(rtio_mpsc, test_mpsc_threaded) -{ - rtio_mpsc_init(&mpsc_q); - - TC_PRINT("setting up mpsc producer free queues\n"); - /* Setup node free queues */ - for (int i = 0; i < MPSC_THREADS_NUM; i++) { - for (int j = 0; j < MPSC_FREEQ_SZ; j++) { - rtio_spsc_acquire(&node_q[i]); - } - rtio_spsc_produce_all(&node_q[i]); - } - - TC_PRINT("starting consumer\n"); - mpsc_tinfo[0].tid = - k_thread_create(&mpsc_thread[0], mpsc_stack[0], STACK_SIZE, - (k_thread_entry_t)mpsc_consumer, - NULL, NULL, NULL, - K_PRIO_PREEMPT(5), - K_INHERIT_PERMS, K_NO_WAIT); - - for (int i = 1; i < MPSC_THREADS_NUM; i++) { - TC_PRINT("starting producer %i\n", i); - mpsc_tinfo[i].tid = - k_thread_create(&mpsc_thread[i], mpsc_stack[i], STACK_SIZE, - (k_thread_entry_t)mpsc_producer, - (void *)(uintptr_t)i, NULL, NULL, - K_PRIO_PREEMPT(5), - K_INHERIT_PERMS, K_NO_WAIT); - } - - for (int i = 0; i < MPSC_THREADS_NUM; i++) { - TC_PRINT("joining mpsc thread %d\n", i); - k_thread_join(mpsc_tinfo[i].tid, K_FOREVER); - } -} - -RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec_simp); -RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4); - -RTIO_EXECUTOR_CONCURRENT_DEFINE(simple_exec_con, 1); -RTIO_DEFINE(r_simple_con, (struct rtio_executor *)&simple_exec_con, 4, 4); - -RTIO_IODEV_TEST_DEFINE(iodev_test_simple); - -/** - * @brief Test the basics of the RTIO API - * - * Ensures that we can setup an RTIO context, enqueue a request, and receive - * a completion event. - */ -void test_rtio_simple_(struct rtio *r) -{ - int res; - uintptr_t userdata[2] = {0, 1}; - struct rtio_sqe *sqe; - struct rtio_cqe *cqe; - - rtio_iodev_test_init(&iodev_test_simple); - - TC_PRINT("setting up single no-op\n"); - sqe = rtio_spsc_acquire(r->sq); - zassert_not_null(sqe, "Expected a valid sqe"); - rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_simple, &userdata[0]); - - TC_PRINT("submit with wait\n"); - res = rtio_submit(r, 1); - zassert_ok(res, "Should return ok from rtio_execute"); - - cqe = rtio_spsc_consume(r->cq); - zassert_not_null(cqe, "Expected a valid cqe"); - zassert_ok(cqe->result, "Result should be ok"); - zassert_equal_ptr(cqe->userdata, &userdata[0], "Expected userdata back"); - rtio_spsc_release(r->cq); -} - -ZTEST(rtio_api, test_rtio_simple) -{ - TC_PRINT("rtio simple simple\n"); - test_rtio_simple_(&r_simple_simp); - TC_PRINT("rtio simple concurrent\n"); - test_rtio_simple_(&r_simple_con); -} - -RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec_simp); -RTIO_DEFINE(r_chain_simp, (struct rtio_executor *)&chain_exec_simp, 4, 4); - -RTIO_EXECUTOR_CONCURRENT_DEFINE(chain_exec_con, 1); -RTIO_DEFINE(r_chain_con, (struct rtio_executor *)&chain_exec_con, 4, 4); - -RTIO_IODEV_TEST_DEFINE(iodev_test_chain0); -RTIO_IODEV_TEST_DEFINE(iodev_test_chain1); -struct rtio_iodev *iodev_test_chain[] = {&iodev_test_chain0, &iodev_test_chain1}; - -/** - * @brief Test chained requests - * - * Ensures that we can setup an RTIO context, enqueue a chained requests, - * and receive completion events in the correct order given the chained - * flag and multiple devices where serialization isn't guaranteed. - */ -void test_rtio_chain_(struct rtio *r) -{ - int res; - uintptr_t userdata[4] = {0, 1, 2, 3}; - struct rtio_sqe *sqe; - struct rtio_cqe *cqe; - - for (int i = 0; i < 4; i++) { - sqe = rtio_spsc_acquire(r->sq); - zassert_not_null(sqe, "Expected a valid sqe"); - rtio_sqe_prep_nop(sqe, iodev_test_chain[i % 2], - &userdata[i]); - sqe->flags |= RTIO_SQE_CHAINED; - } - - /* Clear the last one */ - sqe->flags = 0; - - TC_PRINT("submitting\n"); - res = rtio_submit(r, 4); - TC_PRINT("checking cq\n"); - zassert_ok(res, "Should return ok from rtio_execute"); - zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions"); - - for (int i = 0; i < 4; i++) { - TC_PRINT("consume %d\n", i); - cqe = rtio_spsc_consume(r->cq); - zassert_not_null(cqe, "Expected a valid cqe"); - zassert_ok(cqe->result, "Result should be ok"); - zassert_equal_ptr(cqe->userdata, &userdata[i], "Expected in order completions"); - rtio_spsc_release(r->cq); - } -} - -ZTEST(rtio_api, test_rtio_chain) -{ - TC_PRINT("initializing iodev test devices\n"); - - for (int i = 0; i < 2; i++) { - rtio_iodev_test_init(iodev_test_chain[i]); - } - - TC_PRINT("rtio chain simple\n"); - test_rtio_chain_(&r_chain_simp); - TC_PRINT("rtio chain concurrent\n"); - test_rtio_chain_(&r_chain_con); -} - - -RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec_simp); -RTIO_DEFINE(r_multi_simp, (struct rtio_executor *)&multi_exec_simp, 4, 4); - -RTIO_EXECUTOR_CONCURRENT_DEFINE(multi_exec_con, 2); -RTIO_DEFINE(r_multi_con, (struct rtio_executor *)&multi_exec_con, 4, 4); - -RTIO_IODEV_TEST_DEFINE(iodev_test_multi0); -RTIO_IODEV_TEST_DEFINE(iodev_test_multi1); -struct rtio_iodev *iodev_test_multi[] = {&iodev_test_multi0, &iodev_test_multi1}; - -/** - * @brief Test multiple asynchronous chains against one iodev - */ -void test_rtio_multiple_chains_(struct rtio *r) -{ - int res; - uintptr_t userdata[4] = {0, 1, 2, 3}; - struct rtio_sqe *sqe; - struct rtio_cqe *cqe; - - for (int i = 0; i < 2; i++) { - for (int j = 0; j < 2; j++) { - sqe = rtio_spsc_acquire(r->sq); - zassert_not_null(sqe, "Expected a valid sqe"); - rtio_sqe_prep_nop(sqe, iodev_test_multi[i], - (void *)userdata[i*2 + j]); - if (j == 0) { - sqe->flags |= RTIO_SQE_CHAINED; - } else { - sqe->flags |= 0; - } - } - } - - TC_PRINT("calling submit from test case\n"); - res = rtio_submit(r, 0); - zassert_ok(res, "Should return ok from rtio_execute"); - - bool seen[4] = { 0 }; - - TC_PRINT("waiting for 4 completions\n"); - for (int i = 0; i < 4; i++) { - TC_PRINT("waiting on completion %d\n", i); - cqe = rtio_spsc_consume(r->cq); - - while (cqe == NULL) { - k_sleep(K_MSEC(1)); - cqe = rtio_spsc_consume(r->cq); - } - - zassert_not_null(cqe, "Expected a valid cqe"); - TC_PRINT("result %d, would block is %d, inval is %d\n", - cqe->result, -EWOULDBLOCK, -EINVAL); - zassert_ok(cqe->result, "Result should be ok"); - seen[(uintptr_t)cqe->userdata] = true; - if (seen[1]) { - zassert_true(seen[0], "Should see 0 before 1"); - } - if (seen[3]) { - zassert_true(seen[2], "Should see 2 before 3"); - } - rtio_spsc_release(r->cq); - } -} - -ZTEST(rtio_api, test_rtio_multiple_chains) -{ - for (int i = 0; i < 2; i++) { - rtio_iodev_test_init(iodev_test_multi[i]); - } - - TC_PRINT("rtio multiple simple\n"); - test_rtio_multiple_chains_(&r_multi_simp); - TC_PRINT("rtio_multiple concurrent\n"); - test_rtio_multiple_chains_(&r_multi_con); -} - - - -#ifdef CONFIG_USERSPACE -K_APPMEM_PARTITION_DEFINE(rtio_partition); -K_APP_BMEM(rtio_partition) uint8_t syscall_bufs[4]; -struct k_mem_domain rtio_domain; -#else -uint8_t syscall_bufs[4]; -#endif - -RTIO_EXECUTOR_SIMPLE_DEFINE(syscall_simple); -RTIO_DEFINE(r_syscall, (struct rtio_executor *)&syscall_simple, 4, 4); -RTIO_IODEV_TEST_DEFINE(iodev_test_syscall); - -void rtio_syscall_test(void *p1, void *p2, void *p3) -{ - int res; - struct rtio_sqe sqe; - struct rtio_cqe cqe; - - struct rtio *r = &r_syscall; - - for (int i = 0; i < 4; i++) { - TC_PRINT("copying sqe in from stack\n"); - /* Not really legal from userspace! Ugh */ - rtio_sqe_prep_nop(&sqe, &iodev_test_syscall, - &syscall_bufs[i]); - res = rtio_sqe_copy_in(r, &sqe, 1); - zassert_equal(res, 0, "Expected success copying sqe"); - } - - TC_PRINT("submitting\n"); - res = rtio_submit(r, 4); - - for (int i = 0; i < 4; i++) { - TC_PRINT("consume %d\n", i); - res = rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER); - zassert_equal(res, 1, "Expected success copying cqe"); - zassert_ok(cqe.result, "Result should be ok"); - zassert_equal_ptr(cqe.userdata, &syscall_bufs[i], - "Expected in order completions"); - } - -} - -#ifdef CONFIG_USERSPACE -ZTEST(rtio_api, test_rtio_syscalls_usermode) -{ - struct k_mem_partition *parts[] = { -#if Z_LIBC_PARTITION_EXISTS - &z_libc_partition, -#endif - &rtio_partition - }; - - TC_PRINT("syscalls from user mode test\n"); - TC_PRINT("test iodev init\n"); - rtio_iodev_test_init(&iodev_test_syscall); - TC_PRINT("mem domain init\n"); - k_mem_domain_init(&rtio_domain, ARRAY_SIZE(parts), parts); - TC_PRINT("mem domain add current\n"); - k_mem_domain_add_thread(&rtio_domain, k_current_get()); - TC_PRINT("rtio access grant\n"); - rtio_access_grant(&r_syscall, k_current_get()); - TC_PRINT("rtio iodev access grant, ptr %p\n", &iodev_test_syscall); - k_object_access_grant(&iodev_test_syscall, k_current_get()); - TC_PRINT("user mode enter\n"); - k_thread_user_mode_enter(rtio_syscall_test, NULL, NULL, NULL); -} -#endif /* CONFIG_USERSPACE */ - - -ZTEST(rtio_api, test_rtio_syscalls) -{ - TC_PRINT("test iodev init\n"); - rtio_iodev_test_init(&iodev_test_syscall); - TC_PRINT("syscalls from kernel mode\n"); - rtio_syscall_test(NULL, NULL, NULL); -} - - - - -ZTEST_SUITE(rtio_spsc, NULL, NULL, NULL, NULL, NULL); -ZTEST_SUITE(rtio_mpsc, NULL, NULL, NULL, NULL, NULL); -ZTEST_SUITE(rtio_api, NULL, NULL, NULL, NULL, NULL); diff --git a/tests/subsys/rtio/rtio_api/src/rtio_api.h b/tests/subsys/rtio/rtio_api/src/rtio_api.h new file mode 100644 index 0000000000000..df8d46ae1f7d8 --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/rtio_api.h @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2023 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef ZEPHYR_TEST_RTIO_API_H_ +#define ZEPHYR_TEST_RTIO_API_H_ + +#include + +struct thread_info { + k_tid_t tid; + int executed; + int priority; + int cpu_id; +}; + +#endif /* ZEPHYR_TEST_RTIO_API_H_ */ diff --git a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h index e0f01226cdd31..12435dceab7d9 100644 --- a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h +++ b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h @@ -13,38 +13,61 @@ #define RTIO_IODEV_TEST_H_ struct rtio_iodev_test_data { - /** - * k_timer for an asynchronous task - */ + /* k_timer for an asynchronous task */ struct k_timer timer; - /** - * Currently executing sqe - */ - atomic_ptr_t iodev_sqe; + /* Currently executing sqe */ + struct rtio_iodev_sqe *iodev_sqe; + const struct rtio_sqe *sqe; + + /* Count of submit calls */ + atomic_t submit_count; + + /* Lock around kicking off next timer */ + struct k_spinlock lock; }; -static void rtio_iodev_timer_fn(struct k_timer *tm) +static void rtio_iodev_test_next(struct rtio_iodev *iodev) { - struct rtio_iodev_test_data *data = CONTAINER_OF(tm, struct rtio_iodev_test_data, timer); - struct rtio_iodev_sqe *iodev_sqe = atomic_ptr_get(&data->iodev_sqe); - struct rtio_mpsc_node *next = - rtio_mpsc_pop((struct rtio_mpsc *)&iodev_sqe->sqe->iodev->iodev_sq); + struct rtio_iodev_test_data *data = iodev->data; + + /* The next section must be serialized to ensure single consumer semantics */ + k_spinlock_key_t key = k_spin_lock(&data->lock); + + if (data->iodev_sqe != NULL) { + goto out; + } + + struct rtio_mpsc_node *next = rtio_mpsc_pop(&iodev->iodev_sq); if (next != NULL) { struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q); - atomic_ptr_set(&data->iodev_sqe, next_sqe); - TC_PRINT("starting timer again from queued iodev_sqe %p!\n", next); + data->iodev_sqe = next_sqe; + data->sqe = next_sqe->sqe; k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); - } else { - atomic_ptr_set(&data->iodev_sqe, NULL); } - /* Complete the request with Ok and a result */ - TC_PRINT("sqe ok callback\n"); +out: + k_spin_unlock(&data->lock, key); +} + +static void rtio_iodev_timer_fn(struct k_timer *tm) +{ + struct rtio_iodev_test_data *data = CONTAINER_OF(tm, struct rtio_iodev_test_data, timer); + struct rtio_iodev_sqe *iodev_sqe = data->iodev_sqe; + struct rtio_iodev *iodev = (struct rtio_iodev *)iodev_sqe->sqe->iodev; + + if (data->sqe->flags & RTIO_SQE_TRANSACTION) { + data->sqe = rtio_spsc_next(data->iodev_sqe->r->sq, data->sqe); + k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); + return; + } + data->iodev_sqe = NULL; + data->sqe = NULL; rtio_iodev_sqe_ok(iodev_sqe, 0); + rtio_iodev_test_next(iodev); } static void rtio_iodev_test_submit(struct rtio_iodev_sqe *iodev_sqe) @@ -52,22 +75,12 @@ static void rtio_iodev_test_submit(struct rtio_iodev_sqe *iodev_sqe) struct rtio_iodev *iodev = (struct rtio_iodev *)iodev_sqe->sqe->iodev; struct rtio_iodev_test_data *data = iodev->data; - /* - * If a task is already going queue up the next request in the mpsc. - */ - if (!atomic_ptr_cas(&data->iodev_sqe, NULL, iodev_sqe)) { - TC_PRINT("adding queued sqe\n"); - rtio_mpsc_push(&iodev->iodev_sq, &iodev_sqe->q); - } + atomic_inc(&data->submit_count); + + /* The only safe operation is enqueuing */ + rtio_mpsc_push(&iodev->iodev_sq, &iodev_sqe->q); - /* - * Simulate an async hardware request with a one shot timer - * - * In reality the time to complete might have some significant variance - * but this is proof enough of a working API flow. - */ - TC_PRINT("starting one shot\n"); - k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); + rtio_iodev_test_next(iodev); } const struct rtio_iodev_api rtio_iodev_test_api = { @@ -79,7 +92,7 @@ void rtio_iodev_test_init(struct rtio_iodev *test) struct rtio_iodev_test_data *data = test->data; rtio_mpsc_init(&test->iodev_sq); - atomic_ptr_set(&data->iodev_sqe, NULL); + data->iodev_sqe = NULL; k_timer_init(&data->timer, rtio_iodev_timer_fn, NULL); } diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_api.c b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c new file mode 100644 index 0000000000000..21dce9b880651 --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_api.c @@ -0,0 +1,394 @@ +/* + * Copyright (c) 2021 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rtio_iodev_test.h" + +/* Repeat tests to ensure they are repeatable */ +#define TEST_REPEATS 4 + +RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec_simp); +RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(simple_exec_con, 1); +RTIO_DEFINE(r_simple_con, (struct rtio_executor *)&simple_exec_con, 4, 4); + +RTIO_IODEV_TEST_DEFINE(iodev_test_simple); + +/** + * @brief Test the basics of the RTIO API + * + * Ensures that we can setup an RTIO context, enqueue a request, and receive + * a completion event. + */ +void test_rtio_simple_(struct rtio *r) +{ + int res; + uintptr_t userdata[2] = {0, 1}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + rtio_iodev_test_init(&iodev_test_simple); + + TC_PRINT("setting up single no-op\n"); + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, (struct rtio_iodev *)&iodev_test_simple, &userdata[0]); + + TC_PRINT("submit with wait\n"); + res = rtio_submit(r, 1); + zassert_ok(res, "Should return ok from rtio_execute"); + + cqe = rtio_spsc_consume(r->cq); + zassert_not_null(cqe, "Expected a valid cqe"); + zassert_ok(cqe->result, "Result should be ok"); + zassert_equal_ptr(cqe->userdata, &userdata[0], "Expected userdata back"); + rtio_spsc_release(r->cq); +} + +ZTEST(rtio_api, test_rtio_simple) +{ + TC_PRINT("rtio simple simple\n"); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_simple_(&r_simple_simp); + } + TC_PRINT("rtio simple concurrent\n"); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_simple_(&r_simple_con); + } +} + +RTIO_EXECUTOR_SIMPLE_DEFINE(chain_exec_simp); +RTIO_DEFINE(r_chain_simp, (struct rtio_executor *)&chain_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(chain_exec_con, 1); +RTIO_DEFINE(r_chain_con, (struct rtio_executor *)&chain_exec_con, 4, 4); + +RTIO_IODEV_TEST_DEFINE(iodev_test_chain0); +RTIO_IODEV_TEST_DEFINE(iodev_test_chain1); +struct rtio_iodev *iodev_test_chain[] = {&iodev_test_chain0, &iodev_test_chain1}; + +/** + * @brief Test chained requests + * + * Ensures that we can setup an RTIO context, enqueue a chained requests, + * and receive completion events in the correct order given the chained + * flag and multiple devices where serialization isn't guaranteed. + */ +void test_rtio_chain_(struct rtio *r) +{ + int res; + uint32_t userdata[4] = {0, 1, 2, 3}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + for (int i = 0; i < 4; i++) { + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, iodev_test_chain[i % 2], + &userdata[i]); + sqe->flags |= RTIO_SQE_CHAINED; + TC_PRINT("produce %d, sqe %p, userdata %d\n", i, sqe, userdata[i]); + } + + /* Clear the last one */ + sqe->flags = 0; + + TC_PRINT("submitting\n"); + res = rtio_submit(r, 4); + TC_PRINT("checking cq\n"); + zassert_ok(res, "Should return ok from rtio_execute"); + zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions"); + + for (int i = 0; i < 4; i++) { + cqe = rtio_spsc_consume(r->cq); + zassert_not_null(cqe, "Expected a valid cqe"); + TC_PRINT("consume %d, cqe %p, userdata %d\n", i, cqe, *(uint32_t *)cqe->userdata); + zassert_ok(cqe->result, "Result should be ok"); + + zassert_equal_ptr(cqe->userdata, &userdata[i], "Expected in order completions"); + rtio_spsc_release(r->cq); + } +} + +ZTEST(rtio_api, test_rtio_chain) +{ + TC_PRINT("initializing iodev test devices\n"); + + for (int i = 0; i < 2; i++) { + rtio_iodev_test_init(iodev_test_chain[i]); + } + + TC_PRINT("rtio chain simple\n"); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_chain_(&r_chain_simp); + } + TC_PRINT("rtio chain concurrent\n"); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_chain_(&r_chain_con); + } +} + +RTIO_EXECUTOR_SIMPLE_DEFINE(multi_exec_simp); +RTIO_DEFINE(r_multi_simp, (struct rtio_executor *)&multi_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(multi_exec_con, 2); +RTIO_DEFINE(r_multi_con, (struct rtio_executor *)&multi_exec_con, 4, 4); + +RTIO_IODEV_TEST_DEFINE(iodev_test_multi0); +RTIO_IODEV_TEST_DEFINE(iodev_test_multi1); +struct rtio_iodev *iodev_test_multi[] = {&iodev_test_multi0, &iodev_test_multi1}; + +/** + * @brief Test multiple asynchronous chains against one iodev + */ +void test_rtio_multiple_chains_(struct rtio *r) +{ + int res; + uintptr_t userdata[4] = {0, 1, 2, 3}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, iodev_test_multi[i], + (void *)userdata[i*2 + j]); + if (j == 0) { + sqe->flags |= RTIO_SQE_CHAINED; + } else { + sqe->flags |= 0; + } + } + } + + TC_PRINT("calling submit from test case\n"); + res = rtio_submit(r, 0); + zassert_ok(res, "Should return ok from rtio_execute"); + + bool seen[4] = { 0 }; + + TC_PRINT("waiting for 4 completions\n"); + for (int i = 0; i < 4; i++) { + TC_PRINT("waiting on completion %d\n", i); + cqe = rtio_spsc_consume(r->cq); + + while (cqe == NULL) { + k_sleep(K_MSEC(1)); + cqe = rtio_spsc_consume(r->cq); + } + + zassert_not_null(cqe, "Expected a valid cqe"); + TC_PRINT("result %d, would block is %d, inval is %d\n", + cqe->result, -EWOULDBLOCK, -EINVAL); + zassert_ok(cqe->result, "Result should be ok"); + seen[(uintptr_t)cqe->userdata] = true; + if (seen[1]) { + zassert_true(seen[0], "Should see 0 before 1"); + } + if (seen[3]) { + zassert_true(seen[2], "Should see 2 before 3"); + } + rtio_spsc_release(r->cq); + } +} + +ZTEST(rtio_api, test_rtio_multiple_chains) +{ + for (int i = 0; i < 2; i++) { + rtio_iodev_test_init(iodev_test_multi[i]); + } + + TC_PRINT("rtio multiple simple\n"); + test_rtio_multiple_chains_(&r_multi_simp); + TC_PRINT("rtio_multiple concurrent\n"); + test_rtio_multiple_chains_(&r_multi_con); +} + +#ifdef CONFIG_USERSPACE +K_APPMEM_PARTITION_DEFINE(rtio_partition); +K_APP_BMEM(rtio_partition) uint8_t syscall_bufs[4]; +struct k_mem_domain rtio_domain; +#else +uint8_t syscall_bufs[4]; +#endif + +RTIO_EXECUTOR_SIMPLE_DEFINE(syscall_simple); +RTIO_DEFINE(r_syscall, (struct rtio_executor *)&syscall_simple, 4, 4); +RTIO_IODEV_TEST_DEFINE(iodev_test_syscall); + +void rtio_syscall_test(void *p1, void *p2, void *p3) +{ + int res; + struct rtio_sqe sqe; + struct rtio_cqe cqe; + + struct rtio *r = &r_syscall; + + for (int i = 0; i < 4; i++) { + TC_PRINT("copying sqe in from stack\n"); + /* Not really legal from userspace! Ugh */ + rtio_sqe_prep_nop(&sqe, &iodev_test_syscall, + &syscall_bufs[i]); + res = rtio_sqe_copy_in(r, &sqe, 1); + zassert_equal(res, 0, "Expected success copying sqe"); + } + + TC_PRINT("submitting\n"); + res = rtio_submit(r, 4); + + for (int i = 0; i < 4; i++) { + TC_PRINT("consume %d\n", i); + res = rtio_cqe_copy_out(r, &cqe, 1, K_FOREVER); + zassert_equal(res, 1, "Expected success copying cqe"); + zassert_ok(cqe.result, "Result should be ok"); + zassert_equal_ptr(cqe.userdata, &syscall_bufs[i], + "Expected in order completions"); + } +} + +#ifdef CONFIG_USERSPACE +ZTEST(rtio_api, test_rtio_syscalls_usermode) +{ + struct k_mem_partition *parts[] = { +#if Z_LIBC_PARTITION_EXISTS + &z_libc_partition, +#endif + &rtio_partition + }; + + TC_PRINT("syscalls from user mode test\n"); + TC_PRINT("test iodev init\n"); + rtio_iodev_test_init(&iodev_test_syscall); + TC_PRINT("mem domain init\n"); + k_mem_domain_init(&rtio_domain, ARRAY_SIZE(parts), parts); + TC_PRINT("mem domain add current\n"); + k_mem_domain_add_thread(&rtio_domain, k_current_get()); + TC_PRINT("rtio access grant\n"); + rtio_access_grant(&r_syscall, k_current_get()); + TC_PRINT("rtio iodev access grant, ptr %p\n", &iodev_test_syscall); + k_object_access_grant(&iodev_test_syscall, k_current_get()); + TC_PRINT("user mode enter\n"); + k_thread_user_mode_enter(rtio_syscall_test, NULL, NULL, NULL); +} +#endif /* CONFIG_USERSPACE */ + + +ZTEST(rtio_api, test_rtio_syscalls) +{ + TC_PRINT("test iodev init\n"); + rtio_iodev_test_init(&iodev_test_syscall); + TC_PRINT("syscalls from kernel mode\n"); + for (int i = 0; i < TEST_REPEATS; i++) { + rtio_syscall_test(NULL, NULL, NULL); + } +} + +RTIO_EXECUTOR_SIMPLE_DEFINE(transaction_exec_simp); +RTIO_DEFINE(r_transaction_simp, (struct rtio_executor *)&transaction_exec_simp, 4, 4); + +RTIO_EXECUTOR_CONCURRENT_DEFINE(transaction_exec_con, 2); +RTIO_DEFINE(r_transaction_con, (struct rtio_executor *)&transaction_exec_con, 4, 4); + +RTIO_IODEV_TEST_DEFINE(iodev_test_transaction0); +RTIO_IODEV_TEST_DEFINE(iodev_test_transaction1); +struct rtio_iodev *iodev_test_transaction[] = {&iodev_test_transaction0, &iodev_test_transaction1}; + +/** + * @brief Test transaction requests + * + * Ensures that we can setup an RTIO context, enqueue a transaction requests, + * and receive completion events in the correct order given the transaction + * flag and multiple devices where serialization isn't guaranteed. + */ +void test_rtio_transaction_(struct rtio *r) +{ + int res; + uintptr_t userdata[2] = {0, 1}; + struct rtio_sqe *sqe; + struct rtio_cqe *cqe; + bool seen[2] = { 0 }; + + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, &iodev_test_transaction0, NULL); + sqe->flags |= RTIO_SQE_TRANSACTION; + + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, NULL, + &userdata[0]); + + + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, &iodev_test_transaction1, NULL); + sqe->flags |= RTIO_SQE_TRANSACTION; + + sqe = rtio_spsc_acquire(r->sq); + zassert_not_null(sqe, "Expected a valid sqe"); + rtio_sqe_prep_nop(sqe, NULL, + &userdata[1]); + + TC_PRINT("submitting userdata 0 %p, userdata 1 %p\n", &userdata[0], &userdata[1]); + res = rtio_submit(r, 2); + TC_PRINT("checking cq, completions available %lu\n", rtio_spsc_consumable(r->cq)); + zassert_ok(res, "Should return ok from rtio_execute"); + zassert_equal(rtio_spsc_consumable(r->cq), 2, "Should have 2 pending completions"); + + for (int i = 0; i < 2; i++) { + TC_PRINT("consume %d\n", i); + cqe = rtio_spsc_consume(r->cq); + zassert_not_null(cqe, "Expected a valid cqe"); + zassert_ok(cqe->result, "Result should be ok"); + uintptr_t idx = *(uintptr_t *)cqe->userdata; + + TC_PRINT("userdata is %p, value %lu\n", cqe->userdata, idx); + zassert(idx == 0 || idx == 1, "idx should be 0 or 1"); + seen[idx] = true; + rtio_spsc_release(r->cq); + } + + zassert_true(seen[0], "Should have seen transaction 0"); + zassert_true(seen[1], "Should have seen transaction 1"); +} + +ZTEST(rtio_api, test_rtio_transaction) +{ + TC_PRINT("initializing iodev test devices\n"); + + for (int i = 0; i < 2; i++) { + rtio_iodev_test_init(iodev_test_transaction[i]); + } + + TC_PRINT("rtio transaction simple\n"); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_transaction_(&r_transaction_simp); + } + TC_PRINT("rtio transaction concurrent\n"); + for (int i = 0; i < TEST_REPEATS; i++) { + test_rtio_transaction_(&r_transaction_con); + } +} + + + +ZTEST_SUITE(rtio_api, NULL, NULL, NULL, NULL, NULL); diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c b/tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c new file mode 100644 index 0000000000000..561b1064f4933 --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2023 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include + +#include "rtio_api.h" + +static struct rtio_mpsc push_pop_q; +static struct rtio_mpsc_node push_pop_nodes[2]; + +/* + * @brief Push and pop one element + * + * @see rtio_mpsc_push(), rtio_mpsc_pop() + * + * @ingroup rtio_tests + */ +ZTEST(rtio_mpsc, test_push_pop) +{ + + struct rtio_mpsc_node *node, *head, *stub, *next, *tail; + + rtio_mpsc_init(&push_pop_q); + + head = atomic_ptr_get(&push_pop_q.head); + tail = push_pop_q.tail; + stub = &push_pop_q.stub; + next = atomic_ptr_get(&stub->next); + + zassert_equal(head, stub, "Head should point at stub"); + zassert_equal(tail, stub, "Tail should point at stub"); + zassert_is_null(next, "Next should be null"); + + node = rtio_mpsc_pop(&push_pop_q); + zassert_is_null(node, "Pop on empty queue should return null"); + + rtio_mpsc_push(&push_pop_q, &push_pop_nodes[0]); + + head = atomic_ptr_get(&push_pop_q.head); + + zassert_equal(head, &push_pop_nodes[0], "Queue head should point at push_pop_node"); + next = atomic_ptr_get(&push_pop_nodes[0].next); + zassert_is_null(next, NULL, "push_pop_node next should point at null"); + next = atomic_ptr_get(&push_pop_q.stub.next); + zassert_equal(next, &push_pop_nodes[0], "Queue stub should point at push_pop_node"); + tail = push_pop_q.tail; + stub = &push_pop_q.stub; + zassert_equal(tail, stub, "Tail should point at stub"); + + node = rtio_mpsc_pop(&push_pop_q); + stub = &push_pop_q.stub; + + zassert_not_equal(node, stub, "Pop should not return stub"); + zassert_not_null(node, "Pop should not return null"); + zassert_equal(node, &push_pop_nodes[0], + "Pop should return push_pop_node %p, instead was %p", + &push_pop_nodes[0], node); + + node = rtio_mpsc_pop(&push_pop_q); + zassert_is_null(node, "Pop on empty queue should return null"); +} + +#define MPSC_FREEQ_SZ 8 +#define MPSC_ITERATIONS 100000 +#define MPSC_STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE) +#define MPSC_THREADS_NUM 4 + +static struct thread_info mpsc_tinfo[MPSC_THREADS_NUM]; +static struct k_thread mpsc_thread[MPSC_THREADS_NUM]; +static K_THREAD_STACK_ARRAY_DEFINE(mpsc_stack, MPSC_THREADS_NUM, MPSC_STACK_SIZE); + +struct mpsc_node { + uint32_t id; + struct rtio_mpsc_node n; +}; + + +RTIO_SPSC_DECLARE(node_sq, struct mpsc_node, MPSC_FREEQ_SZ); + +#define SPSC_INIT(n, sz) RTIO_SPSC_INITIALIZER(sz) + +struct rtio_spsc_node_sq node_q[MPSC_THREADS_NUM] = { + LISTIFY(MPSC_THREADS_NUM, SPSC_INIT, (,), MPSC_FREEQ_SZ) +}; + +static struct rtio_mpsc mpsc_q; + +static void mpsc_consumer(void *p1, void *p2, void *p3) +{ + struct rtio_mpsc_node *n; + struct mpsc_node *nn; + + for (int i = 0; i < (MPSC_ITERATIONS)*(MPSC_THREADS_NUM - 1); i++) { + do { + n = rtio_mpsc_pop(&mpsc_q); + if (n == NULL) { + k_yield(); + } + } while (n == NULL); + + zassert_not_equal(n, &mpsc_q.stub, "mpsc should not produce stub"); + + nn = CONTAINER_OF(n, struct mpsc_node, n); + + rtio_spsc_acquire(&node_q[nn->id]); + rtio_spsc_produce(&node_q[nn->id]); + } +} + +static void mpsc_producer(void *p1, void *p2, void *p3) +{ + struct mpsc_node *n; + uint32_t id = (uint32_t)(uintptr_t)p1; + + for (int i = 0; i < MPSC_ITERATIONS; i++) { + do { + n = rtio_spsc_consume(&node_q[id]); + if (n == NULL) { + k_yield(); + } + } while (n == NULL); + + rtio_spsc_release(&node_q[id]); + n->id = id; + rtio_mpsc_push(&mpsc_q, &n->n); + } +} + +/** + * @brief Test that the producer and consumer are indeed thread safe + * + * This can and should be validated on SMP machines where incoherent + * memory could cause issues. + */ +ZTEST(rtio_mpsc, test_mpsc_threaded) +{ + rtio_mpsc_init(&mpsc_q); + + TC_PRINT("setting up mpsc producer free queues\n"); + /* Setup node free queues */ + for (int i = 0; i < MPSC_THREADS_NUM; i++) { + for (int j = 0; j < MPSC_FREEQ_SZ; j++) { + rtio_spsc_acquire(&node_q[i]); + } + rtio_spsc_produce_all(&node_q[i]); + } + + TC_PRINT("starting consumer\n"); + mpsc_tinfo[0].tid = + k_thread_create(&mpsc_thread[0], mpsc_stack[0], MPSC_STACK_SIZE, + (k_thread_entry_t)mpsc_consumer, + NULL, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + + for (int i = 1; i < MPSC_THREADS_NUM; i++) { + TC_PRINT("starting producer %i\n", i); + mpsc_tinfo[i].tid = + k_thread_create(&mpsc_thread[i], mpsc_stack[i], MPSC_STACK_SIZE, + (k_thread_entry_t)mpsc_producer, + (void *)(uintptr_t)i, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + } + + for (int i = 0; i < MPSC_THREADS_NUM; i++) { + TC_PRINT("joining mpsc thread %d\n", i); + k_thread_join(mpsc_tinfo[i].tid, K_FOREVER); + } +} + +ZTEST_SUITE(rtio_mpsc, NULL, NULL, NULL, NULL, NULL); diff --git a/tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c b/tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c new file mode 100644 index 0000000000000..9dda0960dbcec --- /dev/null +++ b/tests/subsys/rtio/rtio_api/src/test_rtio_spsc.c @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2023 Intel Corporation. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include "rtio_api.h" + +/* + * @brief Produce and Consume a single uint32_t in the same execution context + * + * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() + * + * @ingroup rtio_tests + */ +ZTEST(rtio_spsc, test_produce_consume_size1) +{ + RTIO_SPSC_DEFINE(ezspsc, uint32_t, 1); + + const uint32_t magic = 43219876; + + uint32_t *acq = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(acq, "Acquire should succeed"); + + *acq = magic; + + uint32_t *acq2 = rtio_spsc_acquire(&ezspsc); + + zassert_is_null(acq2, "Acquire should fail"); + + uint32_t *cons = rtio_spsc_consume(&ezspsc); + + zassert_is_null(cons, "Consume should fail"); + + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + + rtio_spsc_produce(&ezspsc); + + zassert_equal(rtio_spsc_consumable(&ezspsc), 1, "Consumables should be 1"); + + uint32_t *cons2 = rtio_spsc_consume(&ezspsc); + + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + + zassert_not_null(cons2, "Consume should not fail"); + zassert_equal(*cons2, magic, "Consume value should equal magic"); + + uint32_t *cons3 = rtio_spsc_consume(&ezspsc); + + zassert_is_null(cons3, "Consume should fail"); + + + uint32_t *acq3 = rtio_spsc_acquire(&ezspsc); + + zassert_is_null(acq3, "Acquire should not succeed"); + + rtio_spsc_release(&ezspsc); + + uint32_t *acq4 = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(acq4, "Acquire should succeed"); +} + +/*&* + * @brief Produce and Consume 3 items at a time in a spsc of size 4 to validate masking + * and wrap around reads/writes. + * + * @see rtio_spsc_acquire(), rtio_spsc_produce(), rtio_spsc_consume(), rtio_spsc_release() + * + * @ingroup rtio_tests + */ +ZTEST(rtio_spsc, test_produce_consume_wrap_around) +{ + RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); + + for (int i = 0; i < 10; i++) { + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + for (int j = 0; j < 3; j++) { + uint32_t *entry = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(entry, "Acquire should succeed"); + *entry = i * 3 + j; + rtio_spsc_produce(&ezspsc); + } + zassert_equal(rtio_spsc_consumable(&ezspsc), 3, "Consumables should be 3"); + + for (int k = 0; k < 3; k++) { + uint32_t *entry = rtio_spsc_consume(&ezspsc); + + zassert_not_null(entry, "Consume should succeed"); + zassert_equal(*entry, i * 3 + k, "Consume value should equal i*3+k"); + rtio_spsc_release(&ezspsc); + } + + zassert_equal(rtio_spsc_consumable(&ezspsc), 0, "Consumables should be 0"); + + } +} + +/** + * @brief Ensure that integer wraps continue to work. + * + * Done by setting all values to UINTPTR_MAX - 2 and writing and reading enough + * to ensure integer wraps occur. + */ +ZTEST(rtio_spsc, test_int_wrap_around) +{ + RTIO_SPSC_DEFINE(ezspsc, uint32_t, 4); + ezspsc._spsc.in = ATOMIC_INIT(UINTPTR_MAX - 2); + ezspsc._spsc.out = ATOMIC_INIT(UINTPTR_MAX - 2); + + for (int j = 0; j < 3; j++) { + uint32_t *entry = rtio_spsc_acquire(&ezspsc); + + zassert_not_null(entry, "Acquire should succeed"); + *entry = j; + rtio_spsc_produce(&ezspsc); + } + + zassert_equal(atomic_get(&ezspsc._spsc.in), UINTPTR_MAX + 1, "Spsc in should wrap"); + + for (int k = 0; k < 3; k++) { + uint32_t *entry = rtio_spsc_consume(&ezspsc); + + zassert_not_null(entry, "Consume should succeed"); + zassert_equal(*entry, k, "Consume value should equal i*3+k"); + rtio_spsc_release(&ezspsc); + } + + zassert_equal(atomic_get(&ezspsc._spsc.out), UINTPTR_MAX + 1, "Spsc out should wrap"); +} + +#define MAX_RETRIES 5 +#define SMP_ITERATIONS 100 + +RTIO_SPSC_DEFINE(spsc, uint32_t, 4); + +static void t1_consume(void *p1, void *p2, void *p3) +{ + struct rtio_spsc_spsc *ezspsc = p1; + uint32_t retries = 0; + uint32_t *val = NULL; + + for (int i = 0; i < SMP_ITERATIONS; i++) { + val = NULL; + retries = 0; + while (val == NULL && retries < MAX_RETRIES) { + val = rtio_spsc_consume(ezspsc); + retries++; + } + if (val != NULL) { + rtio_spsc_release(ezspsc); + } else { + k_yield(); + } + } +} + +static void t2_produce(void *p1, void *p2, void *p3) +{ + struct rtio_spsc_spsc *ezspsc = p1; + uint32_t retries = 0; + uint32_t *val = NULL; + + for (int i = 0; i < SMP_ITERATIONS; i++) { + val = NULL; + retries = 0; + while (val == NULL && retries < MAX_RETRIES) { + val = rtio_spsc_acquire(ezspsc); + retries++; + } + if (val != NULL) { + *val = SMP_ITERATIONS; + rtio_spsc_produce(ezspsc); + } else { + k_yield(); + } + } +} + + +#define STACK_SIZE (384 + CONFIG_TEST_EXTRA_STACK_SIZE) +#define THREADS_NUM 2 + +static struct thread_info tinfo[THREADS_NUM]; +static struct k_thread tthread[THREADS_NUM]; +static K_THREAD_STACK_ARRAY_DEFINE(tstack, THREADS_NUM, STACK_SIZE); + +/** + * @brief Test that the producer and consumer are indeed thread safe + * + * This can and should be validated on SMP machines where incoherent + * memory could cause issues. + */ +ZTEST(rtio_spsc, test_spsc_threaded) +{ + + tinfo[0].tid = + k_thread_create(&tthread[0], tstack[0], STACK_SIZE, + (k_thread_entry_t)t1_consume, + &spsc, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + tinfo[1].tid = + k_thread_create(&tthread[1], tstack[1], STACK_SIZE, + (k_thread_entry_t)t2_produce, + &spsc, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + + k_thread_join(tinfo[1].tid, K_FOREVER); + k_thread_join(tinfo[0].tid, K_FOREVER); +} + +ZTEST_SUITE(rtio_spsc, NULL, NULL, NULL, NULL, NULL);