diff --git a/include/zephyr/rtio/rtio.h b/include/zephyr/rtio/rtio.h index abbd1f4794733..ea11d15072ee6 100644 --- a/include/zephyr/rtio/rtio.h +++ b/include/zephyr/rtio/rtio.h @@ -33,6 +33,7 @@ #define ZEPHYR_INCLUDE_RTIO_RTIO_H_ #include +#include #include #include #include @@ -164,6 +165,7 @@ struct rtio_cq { }; struct rtio; +struct rtio_iodev_sqe; struct rtio_executor_api { /** @@ -179,12 +181,12 @@ struct rtio_executor_api { /** * @brief SQE completes successfully */ - void (*ok)(struct rtio *r, const struct rtio_sqe *sqe, int result); + void (*ok)(struct rtio_iodev_sqe *iodev_sqe, int result); /** - * @brief SQE fails to complete + * @brief SQE fails to complete successfully */ - void (*err)(struct rtio *r, const struct rtio_sqe *sqe, int result); + void (*err)(struct rtio_iodev_sqe *iodev_sqe, int result); }; /** @@ -257,47 +259,32 @@ struct rtio { struct rtio_cq *cq; }; + /** - * @brief API that an RTIO IO device should implement + * @brief IO device submission queue entry */ -struct rtio_iodev_api { - /** - * @brief Submission function for a request to the iodev - * - * The iodev is responsible for doing the operation described - * as a submission queue entry and reporting results using using - * `rtio_sqe_ok` or `rtio_sqe_err` once done. - */ - void (*submit)(const struct rtio_sqe *sqe, - struct rtio *r); - - /** - * TODO some form of transactional piece is missing here - * where we wish to "transact" on an iodev with multiple requests - * over a chain. - * - * Only once explicitly released or the chain fails do we want - * to release. Once released any pending iodevs in the queue - * should be done. - * - * Something akin to a lock/unlock pair. - */ -}; - -/* IO device submission queue entry */ struct rtio_iodev_sqe { + struct rtio_mpsc_node q; const struct rtio_sqe *sqe; struct rtio *r; }; /** - * @brief IO device submission queue - * - * This is used for reifying the member of the rtio_iodev struct + * @brief API that an RTIO IO device should implement */ -struct rtio_iodev_sq { - struct rtio_spsc _spsc; - struct rtio_iodev_sqe buffer[]; +struct rtio_iodev_api { + /** + * @brief Submit to the iodev an entry to work on + * + * This call should be short in duration and most likely + * either enqueue or kick off an entry with the hardware. + * + * If polling is required the iodev should add itself to the execution + * context (@see rtio_add_pollable()) + * + * @param iodev_sqe Submission queue entry + */ + void (*submit)(struct rtio_iodev_sqe *iodev_sqe); }; /** @@ -308,7 +295,7 @@ struct rtio_iodev { const struct rtio_iodev_api *api; /* Queue of RTIO contexts with requests */ - struct rtio_iodev_sq *iodev_sq; + struct rtio_mpsc iodev_sq; /* Data associated with this iodev */ void *data; @@ -389,29 +376,17 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe, #define RTIO_CQ_DEFINE(name, len) \ RTIO_SPSC_DEFINE(name, struct rtio_cqe, len) - -/** - * @brief Statically define and initialize a fixed length iodev submission queue - * - * @param name Name of the queue. - * @param len Queue length, power of 2 required - */ -#define RTIO_IODEV_SQ_DEFINE(name, len) \ - RTIO_SPSC_DEFINE(name, struct rtio_iodev_sqe, len) - /** * @brief Statically define and initialize an RTIO IODev * * @param name Name of the iodev * @param iodev_api Pointer to struct rtio_iodev_api - * @param qsize Size of the submission queue, must be power of 2 * @param iodev_data Data pointer */ -#define RTIO_IODEV_DEFINE(name, iodev_api, qsize, iodev_data) \ - static RTIO_IODEV_SQ_DEFINE(_iodev_sq_##name, qsize); \ - const STRUCT_SECTION_ITERABLE(rtio_iodev, name) = { \ +#define RTIO_IODEV_DEFINE(name, iodev_api, iodev_data) \ + STRUCT_SECTION_ITERABLE(rtio_iodev, name) = { \ .api = (iodev_api), \ - .iodev_sq = (struct rtio_iodev_sq *const)&_iodev_sq_##name, \ + .iodev_sq = RTIO_MPSC_INIT((name.iodev_sq)), \ .data = (iodev_data), \ } @@ -449,14 +424,16 @@ static inline void rtio_set_executor(struct rtio *r, struct rtio_executor *exc) } /** - * @brief Perform a submitted operation with an iodev + * @brief Submit to an iodev a submission to work on * - * @param sqe Submission to work on - * @param r RTIO context + * Should be called by the executor when it wishes to submit work + * to an iodev. + * + * @param iodev_sqe Submission to work on */ -static inline void rtio_iodev_submit(const struct rtio_sqe *sqe, struct rtio *r) +static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe) { - sqe->iodev->api->submit(sqe, r); + iodev_sqe->sqe->iodev->api->submit(iodev_sqe); } /** @@ -571,13 +548,12 @@ static inline void rtio_cqe_release_all(struct rtio *r) * * This may start the next asynchronous request if one is available. * - * @param r RTIO context - * @param sqe Submission that has succeeded + * @param iodev_sqe IODev Submission that has succeeded * @param result Result of the request */ -static inline void rtio_sqe_ok(struct rtio *r, const struct rtio_sqe *sqe, int result) +static inline void rtio_iodev_sqe_ok(struct rtio_iodev_sqe *iodev_sqe, int result) { - r->executor->api->ok(r, sqe, result); + iodev_sqe->r->executor->api->ok(iodev_sqe, result); } /** @@ -585,13 +561,12 @@ static inline void rtio_sqe_ok(struct rtio *r, const struct rtio_sqe *sqe, int r * * This SHALL fail the remaining submissions in the chain. * - * @param r RTIO context - * @param sqe Submission that has failed + * @param iodev_sqe Submission that has failed * @param result Result of the request */ -static inline void rtio_sqe_err(struct rtio *r, const struct rtio_sqe *sqe, int result) +static inline void rtio_iodev_sqe_err(struct rtio_iodev_sqe *iodev_sqe, int result) { - r->executor->api->err(r, sqe, result); + iodev_sqe->r->executor->api->err(iodev_sqe, result); } /** diff --git a/include/zephyr/rtio/rtio_executor_concurrent.h b/include/zephyr/rtio/rtio_executor_concurrent.h index 1b9de4b4da084..9ff5acdde074c 100644 --- a/include/zephyr/rtio/rtio_executor_concurrent.h +++ b/include/zephyr/rtio/rtio_executor_concurrent.h @@ -37,20 +37,18 @@ int rtio_concurrent_submit(struct rtio *r); /** * @brief Report a SQE has completed successfully * - * @param r RTIO context to use - * @param sqe RTIO SQE to report success + * @param sqe RTIO IODev SQE to report success * @param result Result of the SQE */ -void rtio_concurrent_ok(struct rtio *r, const struct rtio_sqe *sqe, int result); +void rtio_concurrent_ok(struct rtio_iodev_sqe *sqe, int result); /** * @brief Report a SQE has completed with error * - * @param r RTIO context to use - * @param sqe RTIO SQE to report success + * @param sqe RTIO IODev SQE to report success * @param result Result of the SQE */ -void rtio_concurrent_err(struct rtio *r, const struct rtio_sqe *sqe, int result); +void rtio_concurrent_err(struct rtio_iodev_sqe *sqe, int result); /** * @brief Concurrent Executor @@ -76,8 +74,8 @@ struct rtio_concurrent_executor { /* Array of task statuses */ uint8_t *task_status; - /* Array of struct rtio_sqe *'s one per task' */ - struct rtio_sqe **task_cur; + /* Array of struct rtio_iodev_sqe *'s one per task' */ + struct rtio_iodev_sqe *task_cur; }; /** @@ -101,7 +99,7 @@ static const struct rtio_executor_api z_rtio_concurrent_api = { * @param concurrency Allowed concurrency (number of concurrent tasks). */ #define RTIO_EXECUTOR_CONCURRENT_DEFINE(name, concurrency) \ - static struct rtio_sqe *_task_cur_##name[(concurrency)]; \ + static struct rtio_iodev_sqe _task_cur_##name[(concurrency)]; \ uint8_t _task_status_##name[(concurrency)]; \ static struct rtio_concurrent_executor name = { \ .ctx = { .api = &z_rtio_concurrent_api }, \ diff --git a/include/zephyr/rtio/rtio_executor_simple.h b/include/zephyr/rtio/rtio_executor_simple.h index d39a64f46aec2..3a394b019e7fd 100644 --- a/include/zephyr/rtio/rtio_executor_simple.h +++ b/include/zephyr/rtio/rtio_executor_simple.h @@ -36,26 +36,25 @@ int rtio_simple_submit(struct rtio *r); /** * @brief Report a SQE has completed successfully * - * @param r RTIO context to use - * @param sqe RTIO SQE to report success + * @param iodev_sqe RTIO IODEV SQE to report success * @param result Result of the SQE */ -void rtio_simple_ok(struct rtio *r, const struct rtio_sqe *sqe, int result); +void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result); /** * @brief Report a SQE has completed with error * - * @param r RTIO context to use - * @param sqe RTIO SQE to report success + * @param iodev_sqe RTIO IODEV SQE to report success * @param result Result of the SQE */ -void rtio_simple_err(struct rtio *r, const struct rtio_sqe *sqe, int result); +void rtio_simple_err(struct rtio_iodev_sqe *iodev_sqe, int result); /** * @brief Simple Executor */ struct rtio_simple_executor { struct rtio_executor ctx; + struct rtio_iodev_sqe task; }; /** diff --git a/include/zephyr/rtio/rtio_mpsc.h b/include/zephyr/rtio/rtio_mpsc.h new file mode 100644 index 0000000000000..77f7fa7801692 --- /dev/null +++ b/include/zephyr/rtio/rtio_mpsc.h @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2010-2011 Dmitry Vyukov + * Copyright (c) 2022 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + */ + + +#ifndef ZEPHYR_RTIO_MPSC_H_ +#define ZEPHYR_RTIO_MPSC_H_ + +#include +#include +#include +#include + +/** + * @brief RTIO Multiple Producer Single Consumer (MPSC) Queue API + * @defgroup rtio_mpsc RTIO MPSC API + * @ingroup rtio + * @{ + */ + +/** + * @file rtio_mpsc.h + * + * @brief A wait-free intrusive multi producer single consumer (MPSC) queue using + * a singly linked list. Ordering is First-In-First-Out. + * + * Based on the well known and widely used wait-free MPSC queue described by + * Dmitry Vyukov with some slight changes to account for needs of an + * RTOS on a variety of archs. Both consumer and producer are wait free. No CAS + * loop or lock is needed. + * + * An MPSC queue is safe to produce or consume in an ISR with O(1) push/pop. + * + * @warning MPSC is *not* safe to consume in multiple execution contexts. + */ + +/** + * @brief Queue member + */ +struct rtio_mpsc_node { + atomic_ptr_t next; +}; + +/** + * @brief MPSC Queue + */ +struct rtio_mpsc { + atomic_ptr_t head; + struct rtio_mpsc_node *tail; + struct rtio_mpsc_node stub; +}; + + +/** + * @brief Static initializer for a mpsc queue + * + * Since the queue is + * + * @param symbol name of the queue + */ +#define RTIO_MPSC_INIT(symbol) \ + { \ + .head = (struct rtio_mpsc_node *)&symbol.stub, \ + .tail = (struct rtio_mpsc_node *)&symbol.stub, \ + .stub.next = NULL, \ + } + +/** + * @brief Initialize queue + * + * @param q Queue to initialize or reset + */ +static inline void rtio_mpsc_init(struct rtio_mpsc *q) +{ + atomic_ptr_set(&q->head, &q->stub); + q->tail = &q->stub; + atomic_ptr_set(&q->stub.next, NULL); +} + +/** + * @brief Push a node + * + * @param q Queue to push the node to + * @param n Node to push into the queue + */ +static inline void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n) +{ + struct rtio_mpsc_node *prev; + int key; + + atomic_ptr_set(&n->next, NULL); + + key = arch_irq_lock(); + prev = atomic_ptr_set(&q->head, n); + atomic_ptr_set(&prev->next, n); + arch_irq_unlock(key); +} + +/** + * @brief Pop a node off of the list + * + * @retval NULL When no node is available + * @retval node When node is available + */ +static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q) +{ + struct rtio_mpsc_node *head; + struct rtio_mpsc_node *tail = q->tail; + struct rtio_mpsc_node *next = atomic_ptr_get(&tail->next); + + /* Skip over the stub/sentinel */ + if (tail == &q->stub) { + if (next == NULL) { + return NULL; + } + + q->tail = next; + tail = next; + next = atomic_ptr_get(&next->next); + } + + /* If next is non-NULL then a valid node is found, return it */ + if (next != NULL) { + q->tail = next; + return tail; + } + + head = atomic_ptr_get(&q->head); + + /* If next is NULL, and the tail != HEAD then the queue has pending + * updates that can't yet be accessed. + */ + if (tail != head) { + return NULL; + } + + rtio_mpsc_push(q, &q->stub); + + next = atomic_ptr_get(&tail->next); + + if (next != NULL) { + q->tail = next; + return tail; + } + + return NULL; +} + +/** + * @} + */ + +#endif /* ZEPHYR_RTIO_MPSC_H_ */ diff --git a/include/zephyr/rtio/rtio_spsc.h b/include/zephyr/rtio/rtio_spsc.h index 21d5f3c5744fc..dc2a165495261 100644 --- a/include/zephyr/rtio/rtio_spsc.h +++ b/include/zephyr/rtio/rtio_spsc.h @@ -75,11 +75,9 @@ struct rtio_spsc { /** * @brief Statically initialize an rtio_spsc * - * @param name Name of the spsc symbol to be provided - * @param type Type stored in the spsc * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) */ -#define RTIO_SPSC_INITIALIZER(name, type, sz) \ +#define RTIO_SPSC_INITIALIZER(sz) \ { ._spsc = { \ .acquire = 0, \ .consume = 0, \ @@ -110,7 +108,7 @@ struct rtio_spsc { * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) */ #define RTIO_SPSC_DEFINE(name, type, sz) \ - RTIO_SPSC_DECLARE(name, type, sz) name = RTIO_SPSC_INITIALIZER(name, type, sz); + RTIO_SPSC_DECLARE(name, type, sz) name = RTIO_SPSC_INITIALIZER(sz); /** * @brief Size of the SPSC queue diff --git a/samples/subsys/rtio/sensor_batch_processing/src/vnd_sensor.c b/samples/subsys/rtio/sensor_batch_processing/src/vnd_sensor.c index e11ff49cb414d..1f272382ef365 100644 --- a/samples/subsys/rtio/sensor_batch_processing/src/vnd_sensor.c +++ b/samples/subsys/rtio/sensor_batch_processing/src/vnd_sensor.c @@ -22,7 +22,6 @@ struct vnd_sensor_data { struct rtio_iodev iodev; struct k_timer timer; const struct device *dev; - struct k_msgq msgq; uint32_t sample_number; }; @@ -53,9 +52,10 @@ static int vnd_sensor_iodev_read(const struct device *dev, uint8_t *buf, } static void vnd_sensor_iodev_execute(const struct device *dev, - const struct rtio_sqe *sqe, struct rtio *r) + struct rtio_iodev_sqe *iodev_sqe) { int result; + const struct rtio_sqe *sqe = iodev_sqe->sqe; if (sqe->op == RTIO_OP_RX) { result = vnd_sensor_iodev_read(dev, sqe->buf, sqe->buf_len); @@ -65,36 +65,28 @@ static void vnd_sensor_iodev_execute(const struct device *dev, } if (result < 0) { - rtio_sqe_err(r, sqe, result); + rtio_iodev_sqe_err(iodev_sqe, result); } else { - rtio_sqe_ok(r, sqe, result); + rtio_iodev_sqe_ok(iodev_sqe, result); } } -static void vnd_sensor_iodev_submit(const struct rtio_sqe *sqe, struct rtio *r) +static void vnd_sensor_iodev_submit(struct rtio_iodev_sqe *iodev_sqe) { - struct vnd_sensor_data *data = (struct vnd_sensor_data *) sqe->iodev; - const struct device *dev = data->dev; - struct rtio_iodev_sqe *iodev_sqe = rtio_spsc_acquire(data->iodev.iodev_sq); - - if (iodev_sqe != NULL) { - iodev_sqe->sqe = sqe; - iodev_sqe->r = r; - rtio_spsc_produce(data->iodev.iodev_sq); - } else { - LOG_ERR("%s: Could not put a msg", dev->name); - rtio_sqe_err(r, sqe, -EWOULDBLOCK); - } + struct vnd_sensor_data *data = (struct vnd_sensor_data *) iodev_sqe->sqe->iodev; + + rtio_mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q); } static void vnd_sensor_handle_int(const struct device *dev) { struct vnd_sensor_data *data = dev->data; - struct rtio_iodev_sqe *iodev_sqe = rtio_spsc_consume(data->iodev.iodev_sq); + struct rtio_mpsc_node *node = rtio_mpsc_pop(&data->iodev.iodev_sq); - if (iodev_sqe != NULL) { - vnd_sensor_iodev_execute(dev, iodev_sqe->sqe, iodev_sqe->r); - rtio_spsc_release(data->iodev.iodev_sq); + if (node != NULL) { + struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q); + + vnd_sensor_iodev_execute(dev, iodev_sqe); } else { LOG_ERR("%s: Could not get a msg", dev->name); } @@ -116,6 +108,8 @@ static int vnd_sensor_init(const struct device *dev) data->dev = dev; + rtio_mpsc_init(&data->iodev.iodev_sq); + k_timer_init(&data->timer, vnd_sensor_timer_expiry, NULL); k_timer_start(&data->timer, K_MSEC(sample_period), @@ -135,13 +129,10 @@ static const struct rtio_iodev_api vnd_sensor_iodev_api = { .sample_size = DT_INST_PROP(n, sample_size), \ }; \ \ - RTIO_IODEV_SQ_DEFINE(vnd_sensor_iodev_sq_##n, DT_INST_PROP(n, max_msgs)); \ - \ static struct vnd_sensor_data vnd_sensor_data_##n = { \ .iodev = \ { \ .api = &vnd_sensor_iodev_api, \ - .iodev_sq = (struct rtio_iodev_sq *)&vnd_sensor_iodev_sq_##n, \ }, \ }; \ \ diff --git a/subsys/rtio/rtio_executor_concurrent.c b/subsys/rtio/rtio_executor_concurrent.c index 776784f6bfdbb..ae6c361f0018e 100644 --- a/subsys/rtio/rtio_executor_concurrent.c +++ b/subsys/rtio/rtio_executor_concurrent.c @@ -52,17 +52,12 @@ static uint16_t conex_task_next(struct rtio_concurrent_executor *exc) return task_id; } -static uint16_t conex_task_id(struct rtio_concurrent_executor *exc, - const struct rtio_sqe *sqe) +static inline uint16_t conex_task_id(struct rtio_concurrent_executor *exc, + const struct rtio_iodev_sqe *iodev_sqe) { - uint16_t task_id = exc->task_out; - - for (; task_id < exc->task_in; task_id++) { - if (exc->task_cur[task_id & exc->task_mask] == sqe) { - break; - } - } - return task_id; + __ASSERT_NO_MSG(iodev_sqe <= &exc->task_cur[exc->task_mask] && + iodev_sqe >= &exc->task_cur[0]); + return iodev_sqe - &exc->task_cur[0]; } static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *exc) @@ -98,7 +93,7 @@ static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc) if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) { LOG_INF("resuming suspended task %d", task_id); exc->task_status[task_id] &= ~CONEX_TASK_SUSPENDED; - rtio_iodev_submit(exc->task_cur[task_id], r); + rtio_iodev_submit(&exc->task_cur[task_id]); } } } @@ -150,7 +145,8 @@ int rtio_concurrent_submit(struct rtio *r) LOG_INF("setting up task %d", task_idx); /* Setup task (yes this is it) */ - exc->task_cur[task_idx] = sqe; + exc->task_cur[task_idx].sqe = sqe; + exc->task_cur[task_idx].r = r; exc->task_status[task_idx] = CONEX_TASK_SUSPENDED; LOG_INF("submitted sqe %p", sqe); @@ -195,11 +191,13 @@ int rtio_concurrent_submit(struct rtio *r) /** * @brief Callback from an iodev describing success */ -void rtio_concurrent_ok(struct rtio *r, const struct rtio_sqe *sqe, int result) +void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result) { - struct rtio_sqe *next_sqe; - k_spinlock_key_t key; + struct rtio *r = iodev_sqe->r; + const struct rtio_sqe *sqe = iodev_sqe->sqe; struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; + const struct rtio_sqe *next_sqe; + k_spinlock_key_t key; /* Interrupt may occur in spsc_acquire, breaking the contract * so spin around it effectively preventing another interrupt on @@ -212,15 +210,15 @@ void rtio_concurrent_ok(struct rtio *r, const struct rtio_sqe *sqe, int result) rtio_cqe_submit(r, result, sqe->userdata); - /* Determine the task id : O(n) */ - uint16_t task_id = conex_task_id(exc, sqe); + /* Determine the task id by memory offset O(1) */ + uint16_t task_id = conex_task_id(exc, iodev_sqe); if (sqe->flags & RTIO_SQE_CHAINED) { next_sqe = rtio_spsc_next(r->sq, sqe); - rtio_iodev_submit(next_sqe, r); + exc->task_cur[task_id].sqe = next_sqe; + rtio_iodev_submit(&exc->task_cur[task_id]); - exc->task_cur[task_id] = next_sqe; } else { exc->task_status[task_id] |= CONEX_TASK_COMPLETE; } @@ -238,10 +236,12 @@ void rtio_concurrent_ok(struct rtio *r, const struct rtio_sqe *sqe, int result) /** * @brief Callback from an iodev describing error */ -void rtio_concurrent_err(struct rtio *r, const struct rtio_sqe *sqe, int result) +void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result) { - struct rtio_sqe *nsqe; + const struct rtio_sqe *nsqe; k_spinlock_key_t key; + struct rtio *r = iodev_sqe->r; + const struct rtio_sqe *sqe = iodev_sqe->sqe; struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor; /* Another interrupt (and sqe complete) may occur in spsc_acquire, @@ -256,9 +256,10 @@ void rtio_concurrent_err(struct rtio *r, const struct rtio_sqe *sqe, int result) rtio_cqe_submit(r, result, sqe->userdata); - /* Determine the task id : O(n) */ - uint16_t task_id = conex_task_id(exc, sqe); + /* Determine the task id : O(1) */ + uint16_t task_id = conex_task_id(exc, iodev_sqe); + sqe = iodev_sqe->sqe; /* Fail the remaining sqe's in the chain */ if (sqe->flags & RTIO_SQE_CHAINED) { diff --git a/subsys/rtio/rtio_executor_simple.c b/subsys/rtio/rtio_executor_simple.c index ace34db72b05c..47515f237ade3 100644 --- a/subsys/rtio/rtio_executor_simple.c +++ b/subsys/rtio/rtio_executor_simple.c @@ -24,13 +24,20 @@ LOG_MODULE_REGISTER(rtio_executor_simple, CONFIG_RTIO_LOG_LEVEL); */ int rtio_simple_submit(struct rtio *r) { - /* TODO For each submission queue entry chain, - * submit the chain to the first iodev - */ + struct rtio_simple_executor *exc = (struct rtio_simple_executor *)r->executor; + + /* Task is already running */ + if (exc->task.sqe != NULL) { + return 0; + } + struct rtio_sqe *sqe = rtio_spsc_consume(r->sq); + exc->task.sqe = sqe; + exc->task.r = r; + if (sqe != NULL) { - rtio_iodev_submit(sqe, r); + rtio_iodev_submit(&exc->task); } return 0; @@ -39,11 +46,22 @@ int rtio_simple_submit(struct rtio *r) /** * @brief Callback from an iodev describing success */ -void rtio_simple_ok(struct rtio *r, const struct rtio_sqe *sqe, int result) +void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result) { + struct rtio *r = iodev_sqe->r; + const struct rtio_sqe *sqe = iodev_sqe->sqe; + +#ifdef CONFIG_ASSERT + struct rtio_simple_executor *exc = + (struct rtio_simple_executor *)r->executor; + + __ASSERT_NO_MSG(iodev_sqe == &exc->task); +#endif + void *userdata = sqe->userdata; rtio_spsc_release(r->sq); + iodev_sqe->sqe = NULL; rtio_cqe_submit(r, result, userdata); rtio_simple_submit(r); } @@ -51,13 +69,24 @@ void rtio_simple_ok(struct rtio *r, const struct rtio_sqe *sqe, int result) /** * @brief Callback from an iodev describing error */ -void rtio_simple_err(struct rtio *r, const struct rtio_sqe *sqe, int result) +void rtio_simple_err(struct rtio_iodev_sqe *iodev_sqe, int result) { - struct rtio_sqe *nsqe; + const struct rtio_sqe *nsqe; + struct rtio *r = iodev_sqe->r; + const struct rtio_sqe *sqe = iodev_sqe->sqe; void *userdata = sqe->userdata; bool chained = sqe->flags & RTIO_SQE_CHAINED; +#ifdef CONFIG_ASSERT + struct rtio_simple_executor *exc = + (struct rtio_simple_executor *)r->executor; + + __ASSERT_NO_MSG(iodev_sqe == &exc->task); +#endif + + rtio_spsc_release(r->sq); + iodev_sqe->sqe = NULL; rtio_cqe_submit(r, result, sqe->userdata); if (chained) { @@ -71,7 +100,9 @@ void rtio_simple_err(struct rtio *r, const struct rtio_sqe *sqe, int result) } if (nsqe != NULL) { - rtio_iodev_submit(nsqe, r); + + iodev_sqe->sqe = nsqe; + rtio_iodev_submit(iodev_sqe); } } else { diff --git a/tests/subsys/rtio/rtio_api/src/main.c b/tests/subsys/rtio/rtio_api/src/main.c index 3462dddb516b3..035acf3b10734 100644 --- a/tests/subsys/rtio/rtio_api/src/main.c +++ b/tests/subsys/rtio/rtio_api/src/main.c @@ -11,7 +11,10 @@ #include #include #include +#include +#include #include +#include #include #include #include @@ -164,7 +167,6 @@ static void t1_consume(void *p1, void *p2, void *p3) if (val != NULL) { rtio_spsc_release(ezspsc); } else { - printk("consumer yield\n"); k_yield(); } } @@ -179,7 +181,6 @@ static void t2_produce(void *p1, void *p2, void *p3) for (int i = 0; i < SMP_ITERATIONS; i++) { val = NULL; retries = 0; - printk("producer acquiring\n"); while (val == NULL && retries < MAX_RETRIES) { val = rtio_spsc_acquire(ezspsc); retries++; @@ -188,7 +189,6 @@ static void t2_produce(void *p1, void *p2, void *p3) *val = SMP_ITERATIONS; rtio_spsc_produce(ezspsc); } else { - printk("producer yield\n"); k_yield(); } } @@ -234,6 +234,169 @@ ZTEST(rtio_spsc, test_spsc_threaded) k_thread_join(tinfo[0].tid, K_FOREVER); } +static struct rtio_mpsc push_pop_q; +static struct rtio_mpsc_node push_pop_nodes[2]; + +/* + * @brief Push and pop one element + * + * @see rtio_mpsc_push(), rtio_mpsc_pop() + * + * @ingroup rtio_tests + */ +ZTEST(rtio_mpsc, test_push_pop) +{ + + struct rtio_mpsc_node *node, *head, *stub, *next, *tail; + + rtio_mpsc_init(&push_pop_q); + + head = atomic_ptr_get(&push_pop_q.head); + tail = push_pop_q.tail; + stub = &push_pop_q.stub; + next = atomic_ptr_get(&stub->next); + + zassert_equal(head, stub, "Head should point at stub"); + zassert_equal(tail, stub, "Tail should point at stub"); + zassert_is_null(next, "Next should be null"); + + node = rtio_mpsc_pop(&push_pop_q); + zassert_is_null(node, "Pop on empty queue should return null"); + + rtio_mpsc_push(&push_pop_q, &push_pop_nodes[0]); + + head = atomic_ptr_get(&push_pop_q.head); + + zassert_equal(head, &push_pop_nodes[0], "Queue head should point at push_pop_node"); + next = atomic_ptr_get(&push_pop_nodes[0].next); + zassert_is_null(next, NULL, "push_pop_node next should point at null"); + next = atomic_ptr_get(&push_pop_q.stub.next); + zassert_equal(next, &push_pop_nodes[0], "Queue stub should point at push_pop_node"); + tail = push_pop_q.tail; + stub = &push_pop_q.stub; + zassert_equal(tail, stub, "Tail should point at stub"); + + node = rtio_mpsc_pop(&push_pop_q); + stub = &push_pop_q.stub; + + zassert_not_equal(node, stub, "Pop should not return stub"); + zassert_not_null(node, "Pop should not return null"); + zassert_equal(node, &push_pop_nodes[0], + "Pop should return push_pop_node %p, instead was %p", + &push_pop_nodes[0], node); + + node = rtio_mpsc_pop(&push_pop_q); + zassert_is_null(node, "Pop on empty queue should return null"); +} + +#define MPSC_FREEQ_SZ 8 +#define MPSC_ITERATIONS 100000 +#define MPSC_STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE) +#define MPSC_THREADS_NUM 4 + +static struct thread_info mpsc_tinfo[MPSC_THREADS_NUM]; +static struct k_thread mpsc_thread[MPSC_THREADS_NUM]; +static K_THREAD_STACK_ARRAY_DEFINE(mpsc_stack, MPSC_THREADS_NUM, MPSC_STACK_SIZE); + +struct mpsc_node { + uint32_t id; + struct rtio_mpsc_node n; +}; + + +RTIO_SPSC_DECLARE(node_sq, struct mpsc_node, MPSC_FREEQ_SZ); + +#define SPSC_INIT(n, sz) RTIO_SPSC_INITIALIZER(sz) + +struct rtio_spsc_node_sq node_q[MPSC_THREADS_NUM] = { + LISTIFY(MPSC_THREADS_NUM, SPSC_INIT, (,), MPSC_FREEQ_SZ) +}; + +static struct rtio_mpsc mpsc_q; + +static void mpsc_consumer(void *p1, void *p2, void *p3) +{ + struct rtio_mpsc_node *n; + struct mpsc_node *nn; + + for (int i = 0; i < (MPSC_ITERATIONS)*(MPSC_THREADS_NUM - 1); i++) { + do { + n = rtio_mpsc_pop(&mpsc_q); + if (n == NULL) { + k_yield(); + } + } while (n == NULL); + + zassert_not_equal(n, &mpsc_q.stub, "mpsc should not produce stub"); + + nn = CONTAINER_OF(n, struct mpsc_node, n); + + rtio_spsc_acquire(&node_q[nn->id]); + rtio_spsc_produce(&node_q[nn->id]); + } +} + +static void mpsc_producer(void *p1, void *p2, void *p3) +{ + struct mpsc_node *n; + uint32_t id = (uint32_t)(uintptr_t)p1; + + for (int i = 0; i < MPSC_ITERATIONS; i++) { + do { + n = rtio_spsc_consume(&node_q[id]); + if (n == NULL) { + k_yield(); + } + } while (n == NULL); + + rtio_spsc_release(&node_q[id]); + n->id = id; + rtio_mpsc_push(&mpsc_q, &n->n); + } +} + +/** + * @brief Test that the producer and consumer are indeed thread safe + * + * This can and should be validated on SMP machines where incoherent + * memory could cause issues. + */ +ZTEST(rtio_mpsc, test_mpsc_threaded) +{ + rtio_mpsc_init(&mpsc_q); + + TC_PRINT("setting up mpsc producer free queues\n"); + /* Setup node free queues */ + for (int i = 0; i < MPSC_THREADS_NUM; i++) { + for (int j = 0; j < MPSC_FREEQ_SZ; j++) { + rtio_spsc_acquire(&node_q[i]); + } + rtio_spsc_produce_all(&node_q[i]); + } + + TC_PRINT("starting consumer\n"); + mpsc_tinfo[0].tid = + k_thread_create(&mpsc_thread[0], mpsc_stack[0], STACK_SIZE, + (k_thread_entry_t)mpsc_consumer, + NULL, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + + for (int i = 1; i < MPSC_THREADS_NUM; i++) { + TC_PRINT("starting producer %i\n", i); + mpsc_tinfo[i].tid = + k_thread_create(&mpsc_thread[i], mpsc_stack[i], STACK_SIZE, + (k_thread_entry_t)mpsc_producer, + (void *)(uintptr_t)i, NULL, NULL, + K_PRIO_PREEMPT(5), + K_INHERIT_PERMS, K_NO_WAIT); + } + + for (int i = 0; i < MPSC_THREADS_NUM; i++) { + TC_PRINT("joining mpsc thread %d\n", i); + k_thread_join(mpsc_tinfo[i].tid, K_FOREVER); + } +} RTIO_EXECUTOR_SIMPLE_DEFINE(simple_exec_simp); RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4); @@ -241,7 +404,7 @@ RTIO_DEFINE(r_simple_simp, (struct rtio_executor *)&simple_exec_simp, 4, 4); RTIO_EXECUTOR_CONCURRENT_DEFINE(simple_exec_con, 1); RTIO_DEFINE(r_simple_con, (struct rtio_executor *)&simple_exec_con, 4, 4); -RTIO_IODEV_TEST_DEFINE(iodev_test_simple, 1); +RTIO_IODEV_TEST_DEFINE(iodev_test_simple); /** * @brief Test the basics of the RTIO API @@ -288,9 +451,9 @@ RTIO_DEFINE(r_chain_simp, (struct rtio_executor *)&chain_exec_simp, 4, 4); RTIO_EXECUTOR_CONCURRENT_DEFINE(chain_exec_con, 1); RTIO_DEFINE(r_chain_con, (struct rtio_executor *)&chain_exec_con, 4, 4); -RTIO_IODEV_TEST_DEFINE(iodev_test_chain0, 1); -RTIO_IODEV_TEST_DEFINE(iodev_test_chain1, 1); -const struct rtio_iodev *iodev_test_chain[] = {&iodev_test_chain0, &iodev_test_chain1}; +RTIO_IODEV_TEST_DEFINE(iodev_test_chain0); +RTIO_IODEV_TEST_DEFINE(iodev_test_chain1); +struct rtio_iodev *iodev_test_chain[] = {&iodev_test_chain0, &iodev_test_chain1}; /** * @brief Test chained requests @@ -317,7 +480,9 @@ void test_rtio_chain_(struct rtio *r) /* Clear the last one */ sqe->flags = 0; + TC_PRINT("submitting\n"); res = rtio_submit(r, 4); + TC_PRINT("checking cq\n"); zassert_ok(res, "Should return ok from rtio_execute"); zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions"); @@ -333,6 +498,8 @@ void test_rtio_chain_(struct rtio *r) ZTEST(rtio_api, test_rtio_chain) { + TC_PRINT("initializing iodev test devices\n"); + for (int i = 0; i < 2; i++) { rtio_iodev_test_init(iodev_test_chain[i]); } @@ -350,9 +517,9 @@ RTIO_DEFINE(r_multi_simp, (struct rtio_executor *)&multi_exec_simp, 4, 4); RTIO_EXECUTOR_CONCURRENT_DEFINE(multi_exec_con, 2); RTIO_DEFINE(r_multi_con, (struct rtio_executor *)&multi_exec_con, 4, 4); -RTIO_IODEV_TEST_DEFINE(iodev_test_multi0, 1); -RTIO_IODEV_TEST_DEFINE(iodev_test_multi1, 1); -const struct rtio_iodev *iodev_test_multi[] = {&iodev_test_multi0, &iodev_test_multi1}; +RTIO_IODEV_TEST_DEFINE(iodev_test_multi0); +RTIO_IODEV_TEST_DEFINE(iodev_test_multi1); +struct rtio_iodev *iodev_test_multi[] = {&iodev_test_multi0, &iodev_test_multi1}; /** * @brief Test multiple asynchronous chains against one iodev @@ -433,7 +600,7 @@ uint8_t syscall_bufs[4]; RTIO_EXECUTOR_SIMPLE_DEFINE(syscall_simple); RTIO_DEFINE(r_syscall, (struct rtio_executor *)&syscall_simple, 4, 4); -RTIO_IODEV_TEST_DEFINE(iodev_test_syscall, 1); +RTIO_IODEV_TEST_DEFINE(iodev_test_syscall); void rtio_syscall_test(void *p1, void *p2, void *p3) { @@ -505,4 +672,5 @@ ZTEST(rtio_api, test_rtio_syscalls) ZTEST_SUITE(rtio_spsc, NULL, NULL, NULL, NULL, NULL); +ZTEST_SUITE(rtio_mpsc, NULL, NULL, NULL, NULL, NULL); ZTEST_SUITE(rtio_api, NULL, NULL, NULL, NULL, NULL); diff --git a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h index fabfc65cfcabb..e0f01226cdd31 100644 --- a/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h +++ b/tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h @@ -5,6 +5,7 @@ */ #include +#include #include #include @@ -12,7 +13,7 @@ #define RTIO_IODEV_TEST_H_ struct rtio_iodev_test_data { - /** + /** * k_timer for an asynchronous task */ struct k_timer timer; @@ -20,50 +21,46 @@ struct rtio_iodev_test_data { /** * Currently executing sqe */ - const struct rtio_sqe *sqe; - - /** - * Currently executing rtio context - */ - struct rtio *r; + atomic_ptr_t iodev_sqe; }; - static void rtio_iodev_timer_fn(struct k_timer *tm) { struct rtio_iodev_test_data *data = CONTAINER_OF(tm, struct rtio_iodev_test_data, timer); - - struct rtio *r = data->r; - const struct rtio_sqe *sqe = data->sqe; - - data->r = NULL; - data->sqe = NULL; + struct rtio_iodev_sqe *iodev_sqe = atomic_ptr_get(&data->iodev_sqe); + struct rtio_mpsc_node *next = + rtio_mpsc_pop((struct rtio_mpsc *)&iodev_sqe->sqe->iodev->iodev_sq); + + if (next != NULL) { + struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q); + + atomic_ptr_set(&data->iodev_sqe, next_sqe); + TC_PRINT("starting timer again from queued iodev_sqe %p!\n", next); + k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); + } else { + atomic_ptr_set(&data->iodev_sqe, NULL); + } /* Complete the request with Ok and a result */ TC_PRINT("sqe ok callback\n"); - rtio_sqe_ok(r, sqe, 0); + + rtio_iodev_sqe_ok(iodev_sqe, 0); } -static void rtio_iodev_test_submit(const struct rtio_sqe *sqe, struct rtio *r) +static void rtio_iodev_test_submit(struct rtio_iodev_sqe *iodev_sqe) { - struct rtio_iodev_test_data *data = sqe->iodev->data; + struct rtio_iodev *iodev = (struct rtio_iodev *)iodev_sqe->sqe->iodev; + struct rtio_iodev_test_data *data = iodev->data; - /** - * This isn't quite right, probably should be equivalent to a - * pend instead of a fail here. In reality if the device is busy - * this should be enqueued to the iodev_sq and started as soon - * as the device is no longer busy (scheduled for the future). + /* + * If a task is already going queue up the next request in the mpsc. */ - if (k_timer_remaining_get(&data->timer) != 0) { - TC_PRINT("would block, timer not free!\n"); - rtio_sqe_err(r, sqe, -EWOULDBLOCK); - return; + if (!atomic_ptr_cas(&data->iodev_sqe, NULL, iodev_sqe)) { + TC_PRINT("adding queued sqe\n"); + rtio_mpsc_push(&iodev->iodev_sq, &iodev_sqe->q); } - data->sqe = sqe; - data->r = r; - - /** + /* * Simulate an async hardware request with a one shot timer * * In reality the time to complete might have some significant variance @@ -73,21 +70,23 @@ static void rtio_iodev_test_submit(const struct rtio_sqe *sqe, struct rtio *r) k_timer_start(&data->timer, K_MSEC(10), K_NO_WAIT); } -static const struct rtio_iodev_api rtio_iodev_test_api = { +const struct rtio_iodev_api rtio_iodev_test_api = { .submit = rtio_iodev_test_submit, }; -const struct rtio_iodev_api *the_api = &rtio_iodev_test_api; - -static inline void rtio_iodev_test_init(const struct rtio_iodev *test) +void rtio_iodev_test_init(struct rtio_iodev *test) { struct rtio_iodev_test_data *data = test->data; + rtio_mpsc_init(&test->iodev_sq); + atomic_ptr_set(&data->iodev_sqe, NULL); k_timer_init(&data->timer, rtio_iodev_timer_fn, NULL); } -#define RTIO_IODEV_TEST_DEFINE(name, qsize) \ +#define RTIO_IODEV_TEST_DEFINE(name) \ static struct rtio_iodev_test_data _iodev_data_##name; \ - RTIO_IODEV_DEFINE(name, &rtio_iodev_test_api, qsize, &_iodev_data_##name) + RTIO_IODEV_DEFINE(name, &rtio_iodev_test_api, &_iodev_data_##name) + + #endif /* RTIO_IODEV_TEST_H_ */ diff --git a/tests/subsys/rtio/rtio_api/testcase.yaml b/tests/subsys/rtio/rtio_api/testcase.yaml index 3be530364f59d..a07f0fb023e23 100644 --- a/tests/subsys/rtio/rtio_api/testcase.yaml +++ b/tests/subsys/rtio/rtio_api/testcase.yaml @@ -1,4 +1,5 @@ common: + platform_exclude: m2gl025_miv platform_key: - arch - simulation