Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions include/zephyr/rtio/rtio.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,25 @@ struct rtio_iodev;

/**
* @brief The next request in the queue should wait on this one.
*
* Chained SQEs are individual units of work describing patterns of
* ordering and failure cascading. A chained SQE must be started only
* after the one before it. They are given to the iodevs one after another.
*/
#define RTIO_SQE_CHAINED BIT(0)

/**
* @brief The next request in the queue is part of a transaction.
*
* Transactional SQEs are sequential parts of a unit of work.
* Only the first transactional SQE is submitted to an iodev, the
* remaining SQEs are never individually submitted but instead considered
* to be part of the transaction to the single iodev. The first sqe in the
* sequence holds the iodev that will be used and the last holds the userdata
* that will be returned in a single completion on failure/success.
*/
#define RTIO_SQE_TRANSACTION BIT(1)

/**
* @}
*/
Expand Down Expand Up @@ -318,6 +334,7 @@ static inline void rtio_sqe_prep_nop(struct rtio_sqe *sqe,
void *userdata)
{
sqe->op = RTIO_OP_NOP;
sqe->flags = 0;
sqe->iodev = iodev;
sqe->userdata = userdata;
}
Expand All @@ -334,6 +351,7 @@ static inline void rtio_sqe_prep_read(struct rtio_sqe *sqe,
{
sqe->op = RTIO_OP_RX;
sqe->prio = prio;
sqe->flags = 0;
sqe->iodev = iodev;
sqe->buf_len = len;
sqe->buf = buf;
Expand All @@ -352,6 +370,7 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
{
sqe->op = RTIO_OP_TX;
sqe->prio = prio;
sqe->flags = 0;
sqe->iodev = iodev;
sqe->buf_len = len;
sqe->buf = buf;
Expand Down Expand Up @@ -402,7 +421,7 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, \
(static K_SEM_DEFINE(_submit_sem_##name, 0, K_SEM_MAX_LIMIT))) \
IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, \
(static K_SEM_DEFINE(_consume_sem_##name, 0, 1))) \
(static K_SEM_DEFINE(_consume_sem_##name, 0, K_SEM_MAX_LIMIT))) \
static RTIO_SQ_DEFINE(_sq_##name, sq_sz); \
static RTIO_CQ_DEFINE(_cq_##name, cq_sz); \
STRUCT_SECTION_ITERABLE(rtio, name) = { \
Expand Down Expand Up @@ -496,7 +515,15 @@ static inline void rtio_sqe_drop_all(struct rtio *r)
*/
static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r)
{
#ifdef CONFIG_RTIO_CONSUME_SEM
if (k_sem_take(r->consume_sem, K_NO_WAIT) == 0) {
return rtio_spsc_consume(r->cq);
} else {
return NULL;
}
#else
return rtio_spsc_consume(r->cq);
#endif
}

/**
Expand All @@ -513,21 +540,18 @@ static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r)
{
struct rtio_cqe *cqe;

/* TODO is there a better way? reset this in submit? */
#ifdef CONFIG_RTIO_CONSUME_SEM
k_sem_reset(r->consume_sem);
#endif
k_sem_take(r->consume_sem, K_FOREVER);

cqe = rtio_spsc_consume(r->cq);
#else
cqe = rtio_spsc_consume(r->cq);

while (cqe == NULL) {
cqe = rtio_spsc_consume(r->cq);

#ifdef CONFIG_RTIO_CONSUME_SEM
k_sem_take(r->consume_sem, K_FOREVER);
#else
k_yield();
#endif
}
#endif

return cqe;
}
Expand Down Expand Up @@ -569,6 +593,24 @@ static inline void rtio_iodev_sqe_err(struct rtio_iodev_sqe *iodev_sqe, int resu
iodev_sqe->r->executor->api->err(iodev_sqe, result);
}

/**
* @brief Cancel all requests that are pending for the iodev
*
* @param iodev IODev to cancel all requests for
*/
static inline void rtio_iodev_cancel_all(struct rtio_iodev *iodev)
{
/* Clear pending requests as -ENODATA */
struct rtio_mpsc_node *node = rtio_mpsc_pop(&iodev->iodev_sq);

while (node != NULL) {
struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);

rtio_iodev_sqe_err(iodev_sqe, -ECANCELED);
node = rtio_mpsc_pop(&iodev->iodev_sq);
}
}

/**
* Submit a completion queue event with a given result and userdata
*
Expand Down
4 changes: 0 additions & 4 deletions include/zephyr/rtio/rtio_executor_concurrent.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ struct rtio_concurrent_executor {
uint16_t task_in, task_out, task_mask;

/* First pending sqe to start when a task becomes available */
struct rtio_sqe *pending_sqe;

/* Last sqe seen from the most recent submit */
struct rtio_sqe *last_sqe;

/* Array of task statuses */
Expand Down Expand Up @@ -106,7 +103,6 @@ static const struct rtio_executor_api z_rtio_concurrent_api = {
.task_in = 0, \
.task_out = 0, \
.task_mask = (concurrency)-1, \
.pending_sqe = NULL, \
.last_sqe = NULL, \
.task_status = _task_status_##name, \
.task_cur = _task_cur_##name, \
Expand Down
19 changes: 14 additions & 5 deletions include/zephyr/rtio/rtio_mpsc.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
#include <zephyr/sys/atomic.h>
#include <zephyr/kernel.h>

#ifdef __cplusplus
extern "C" {
#endif

/**
* @brief RTIO Multiple Producer Single Consumer (MPSC) Queue API
* @defgroup rtio_mpsc RTIO MPSC API
Expand Down Expand Up @@ -94,7 +98,7 @@ static inline void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n)
atomic_ptr_set(&n->next, NULL);

key = arch_irq_lock();
prev = atomic_ptr_set(&q->head, n);
prev = (struct rtio_mpsc_node *)atomic_ptr_set(&q->head, n);
atomic_ptr_set(&prev->next, n);
arch_irq_unlock(key);
}
Expand All @@ -109,7 +113,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
{
struct rtio_mpsc_node *head;
struct rtio_mpsc_node *tail = q->tail;
struct rtio_mpsc_node *next = atomic_ptr_get(&tail->next);
struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)atomic_ptr_get(&tail->next);

/* Skip over the stub/sentinel */
if (tail == &q->stub) {
Expand All @@ -119,7 +123,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)

q->tail = next;
tail = next;
next = atomic_ptr_get(&next->next);
next = (struct rtio_mpsc_node *)atomic_ptr_get(&next->next);
}

/* If next is non-NULL then a valid node is found, return it */
Expand All @@ -128,7 +132,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
return tail;
}

head = atomic_ptr_get(&q->head);
head = (struct rtio_mpsc_node *)atomic_ptr_get(&q->head);

/* If next is NULL, and the tail != HEAD then the queue has pending
* updates that can't yet be accessed.
Expand All @@ -139,7 +143,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)

rtio_mpsc_push(q, &q->stub);

next = atomic_ptr_get(&tail->next);
next = (struct rtio_mpsc_node *)atomic_ptr_get(&tail->next);

if (next != NULL) {
q->tail = next;
Expand All @@ -153,4 +157,9 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
* @}
*/

#ifdef __cplusplus
}
#endif


#endif /* ZEPHYR_RTIO_MPSC_H_ */
Loading