Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/kernel/data_structures/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ needed will be provided by the user.
spsc_pbuf.rst
rbtree.rst
ring_buffers.rst
mpsc_lockfree.rst
spsc_lockfree.rst
14 changes: 14 additions & 0 deletions doc/kernel/data_structures/mpsc_lockfree.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
.. _mpsc_lockfree:

Multi Producer Single Consumer Lock Free Queue
==============================================

A :dfn:`Multi Producer Single Consumer Lock Free Queue (MPSC)` is an lockfree
intrusive queue based on atomic pointer swaps as described by Dmitry Vyukov
at `1024cores <https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue>`_.


API Reference
*************

.. doxygengroup:: mpsc_lockfree
12 changes: 12 additions & 0 deletions doc/kernel/data_structures/spsc_lockfree.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.. _spsc_lockfree:

Single Producer Single Consumer Lock Free Queue
===============================================

A :dfn:`Single Producer Single Consumer Lock Free Queue (MPSC)` is a lock free
atomic ring buffer based queue.

API Reference
*************

.. doxygengroup:: spsc_lockfree
10 changes: 0 additions & 10 deletions doc/services/rtio/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,3 @@ API Reference
*************

.. doxygengroup:: rtio

MPSC Lock-free Queue API
========================

.. doxygengroup:: rtio_mpsc

SPSC Lock-free Queue API
========================

.. doxygengroup:: rtio_spsc
Comment on lines -219 to -228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing these categories from RTIO pages help with understanding RTIO: MPSC and SPSC were looking like part of the API, but are only used internally rather than two variants of RTIO.

10 changes: 5 additions & 5 deletions drivers/i2c/i2c_rtio.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <zephyr/drivers/i2c.h>
#include <zephyr/drivers/i2c/rtio.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/rtio/rtio_mpsc.h>
#include <zephyr/sys/mpsc_lockfree.h>
#include <zephyr/sys/__assert.h>

#define LOG_LEVEL CONFIG_I2C_LOG_LEVEL
Expand Down Expand Up @@ -55,14 +55,14 @@ struct rtio_sqe *i2c_rtio_copy(struct rtio *r, struct rtio_iodev *iodev, const s
void i2c_rtio_init(struct i2c_rtio *ctx, const struct device *dev)
{
k_sem_init(&ctx->lock, 1, 1);
rtio_mpsc_init(&ctx->io_q);
mpsc_init(&ctx->io_q);
ctx->txn_curr = NULL;
ctx->txn_head = NULL;
ctx->dt_spec.bus = dev;
ctx->iodev.data = &ctx->dt_spec;
ctx->iodev.api = &i2c_iodev_api;
/* TODO drop the builtin submission queue? */
rtio_mpsc_init(&ctx->iodev.iodev_sq);
mpsc_init(&ctx->iodev.iodev_sq);
}

/**
Expand All @@ -82,7 +82,7 @@ static bool i2c_rtio_next(struct i2c_rtio *ctx, bool completion)
return false;
}

struct rtio_mpsc_node *next = rtio_mpsc_pop(&ctx->io_q);
struct mpsc_node *next = mpsc_pop(&ctx->io_q);

/* Nothing left to do */
if (next == NULL) {
Expand Down Expand Up @@ -119,7 +119,7 @@ bool i2c_rtio_complete(struct i2c_rtio *ctx, int status)
}
bool i2c_rtio_submit(struct i2c_rtio *ctx, struct rtio_iodev_sqe *iodev_sqe)
{
rtio_mpsc_push(&ctx->io_q, &iodev_sqe->q);
mpsc_push(&ctx->io_q, &iodev_sqe->q);
return i2c_rtio_next(ctx, false);
}

Expand Down
6 changes: 3 additions & 3 deletions drivers/spi/spi_mcux_lpspi.c
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ static int spi_mcux_init(const struct device *dev)
data->dt_spec.bus = dev;
data->iodev.api = &spi_iodev_api;
data->iodev.data = &data->dt_spec;
rtio_mpsc_init(&data->iodev.iodev_sq);
mpsc_init(&data->iodev.iodev_sq);
#endif

err = pinctrl_apply_state(config->pincfg, PINCTRL_STATE_DEFAULT);
Expand Down Expand Up @@ -803,7 +803,7 @@ static void spi_mcux_iodev_next(const struct device *dev, bool completion)
return;
}

struct rtio_mpsc_node *next = rtio_mpsc_pop(&data->iodev.iodev_sq);
struct mpsc_node *next = mpsc_pop(&data->iodev.iodev_sq);

if (next != NULL) {
struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q);
Expand Down Expand Up @@ -832,7 +832,7 @@ static void spi_mcux_iodev_submit(const struct device *dev,
{
struct spi_mcux_data *data = dev->data;

rtio_mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q);
mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q);
spi_mcux_iodev_next(dev, false);
}

Expand Down
6 changes: 3 additions & 3 deletions drivers/spi/spi_sam.c
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ static void spi_sam_iodev_next(const struct device *dev, bool completion)
return;
}

struct rtio_mpsc_node *next = rtio_mpsc_pop(&data->iodev.iodev_sq);
struct mpsc_node *next = mpsc_pop(&data->iodev.iodev_sq);

if (next != NULL) {
struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q);
Expand Down Expand Up @@ -736,7 +736,7 @@ static void spi_sam_iodev_submit(const struct device *dev,
{
struct spi_sam_data *data = dev->data;

rtio_mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q);
mpsc_push(&data->iodev.iodev_sq, &iodev_sqe->q);
spi_sam_iodev_next(dev, false);
}
#endif
Expand Down Expand Up @@ -866,7 +866,7 @@ static int spi_sam_init(const struct device *dev)
data->dt_spec.bus = dev;
data->iodev.api = &spi_iodev_api;
data->iodev.data = &data->dt_spec;
rtio_mpsc_init(&data->iodev.iodev_sq);
mpsc_init(&data->iodev.iodev_sq);
#endif

spi_context_unlock_unconditionally(&data->ctx);
Expand Down
2 changes: 1 addition & 1 deletion include/zephyr/drivers/i2c/rtio.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct i2c_rtio {
struct k_sem lock;
struct k_spinlock slock;
struct rtio *r;
struct rtio_mpsc io_q;
struct mpsc io_q;
struct rtio_iodev iodev;
struct rtio_iodev_sqe *txn_head;
struct rtio_iodev_sqe *txn_curr;
Expand Down
56 changes: 28 additions & 28 deletions include/zephyr/rtio/rtio.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
#include <zephyr/app_memory/app_memdomain.h>
#include <zephyr/device.h>
#include <zephyr/kernel.h>
#include <zephyr/rtio/rtio_mpsc.h>
#include <zephyr/sys/__assert.h>
#include <zephyr/sys/atomic.h>
#include <zephyr/sys/mem_blocks.h>
#include <zephyr/sys/util.h>
#include <zephyr/sys/iterable_sections.h>
#include <zephyr/sys/mpsc_lockfree.h>

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -292,22 +292,22 @@ BUILD_ASSERT(sizeof(struct rtio_sqe) <= 64);
* @brief A completion queue event
*/
struct rtio_cqe {
struct rtio_mpsc_node q;
struct mpsc_node q;

int32_t result; /**< Result from operation */
void *userdata; /**< Associated userdata with operation */
uint32_t flags; /**< Flags associated with the operation */
};

struct rtio_sqe_pool {
struct rtio_mpsc free_q;
struct mpsc free_q;
const uint16_t pool_size;
uint16_t pool_free;
struct rtio_iodev_sqe *pool;
};

struct rtio_cqe_pool {
struct rtio_mpsc free_q;
struct mpsc free_q;
const uint16_t pool_size;
uint16_t pool_free;
struct rtio_cqe *pool;
Expand Down Expand Up @@ -362,10 +362,10 @@ struct rtio {
#endif

/* Submission queue */
struct rtio_mpsc sq;
struct mpsc sq;

/* Completion queue */
struct rtio_mpsc cq;
struct mpsc cq;
};

/** The memory partition associated with all RTIO context information */
Expand Down Expand Up @@ -422,7 +422,7 @@ static inline uint16_t __rtio_compute_mempool_block_index(const struct rtio *r,
*/
struct rtio_iodev_sqe {
struct rtio_sqe sqe;
struct rtio_mpsc_node q;
struct mpsc_node q;
struct rtio_iodev_sqe *next;
struct rtio *r;
};
Expand Down Expand Up @@ -450,7 +450,7 @@ struct rtio_iodev {
const struct rtio_iodev_api *api;

/* Queue of RTIO contexts with requests */
struct rtio_mpsc iodev_sq;
struct mpsc iodev_sq;

/* Data associated with this iodev */
void *data;
Expand Down Expand Up @@ -625,7 +625,7 @@ static inline void rtio_sqe_prep_transceive(struct rtio_sqe *sqe,

static inline struct rtio_iodev_sqe *rtio_sqe_pool_alloc(struct rtio_sqe_pool *pool)
{
struct rtio_mpsc_node *node = rtio_mpsc_pop(&pool->free_q);
struct mpsc_node *node = mpsc_pop(&pool->free_q);

if (node == NULL) {
return NULL;
Expand All @@ -640,14 +640,14 @@ static inline struct rtio_iodev_sqe *rtio_sqe_pool_alloc(struct rtio_sqe_pool *p

static inline void rtio_sqe_pool_free(struct rtio_sqe_pool *pool, struct rtio_iodev_sqe *iodev_sqe)
{
rtio_mpsc_push(&pool->free_q, &iodev_sqe->q);
mpsc_push(&pool->free_q, &iodev_sqe->q);

pool->pool_free++;
}

static inline struct rtio_cqe *rtio_cqe_pool_alloc(struct rtio_cqe_pool *pool)
{
struct rtio_mpsc_node *node = rtio_mpsc_pop(&pool->free_q);
struct mpsc_node *node = mpsc_pop(&pool->free_q);

if (node == NULL) {
return NULL;
Expand All @@ -664,7 +664,7 @@ static inline struct rtio_cqe *rtio_cqe_pool_alloc(struct rtio_cqe_pool *pool)

static inline void rtio_cqe_pool_free(struct rtio_cqe_pool *pool, struct rtio_cqe *cqe)
{
rtio_mpsc_push(&pool->free_q, &cqe->q);
mpsc_push(&pool->free_q, &cqe->q);

pool->pool_free++;
}
Expand Down Expand Up @@ -732,14 +732,14 @@ static inline void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_
#define RTIO_IODEV_DEFINE(name, iodev_api, iodev_data) \
STRUCT_SECTION_ITERABLE(rtio_iodev, name) = { \
.api = (iodev_api), \
.iodev_sq = RTIO_MPSC_INIT((name.iodev_sq)), \
.iodev_sq = MPSC_INIT((name.iodev_sq)), \
.data = (iodev_data), \
}

#define Z_RTIO_SQE_POOL_DEFINE(name, sz) \
static struct rtio_iodev_sqe CONCAT(_sqe_pool_, name)[sz]; \
STRUCT_SECTION_ITERABLE(rtio_sqe_pool, name) = { \
.free_q = RTIO_MPSC_INIT((name.free_q)), \
.free_q = MPSC_INIT((name.free_q)), \
.pool_size = sz, \
.pool_free = sz, \
.pool = CONCAT(_sqe_pool_, name), \
Expand All @@ -749,7 +749,7 @@ static inline void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_
#define Z_RTIO_CQE_POOL_DEFINE(name, sz) \
static struct rtio_cqe CONCAT(_cqe_pool_, name)[sz]; \
STRUCT_SECTION_ITERABLE(rtio_cqe_pool, name) = { \
.free_q = RTIO_MPSC_INIT((name.free_q)), \
.free_q = MPSC_INIT((name.free_q)), \
.pool_size = sz, \
.pool_free = sz, \
.pool = CONCAT(_cqe_pool_, name), \
Expand Down Expand Up @@ -797,8 +797,8 @@ static inline void rtio_block_pool_free(struct rtio *r, void *buf, uint32_t buf_
.sqe_pool = _sqe_pool, \
.cqe_pool = _cqe_pool, \
IF_ENABLED(CONFIG_RTIO_SYS_MEM_BLOCKS, (.block_pool = _block_pool,)) \
.sq = RTIO_MPSC_INIT((name.sq)), \
.cq = RTIO_MPSC_INIT((name.cq)), \
.sq = MPSC_INIT((name.sq)), \
.cq = MPSC_INIT((name.cq)), \
}

/**
Expand Down Expand Up @@ -910,7 +910,7 @@ static inline struct rtio_sqe *rtio_sqe_acquire(struct rtio *r)
return NULL;
}

rtio_mpsc_push(&r->sq, &iodev_sqe->q);
mpsc_push(&r->sq, &iodev_sqe->q);

return &iodev_sqe->sqe;
}
Expand All @@ -923,12 +923,12 @@ static inline struct rtio_sqe *rtio_sqe_acquire(struct rtio *r)
static inline void rtio_sqe_drop_all(struct rtio *r)
{
struct rtio_iodev_sqe *iodev_sqe;
struct rtio_mpsc_node *node = rtio_mpsc_pop(&r->sq);
struct mpsc_node *node = mpsc_pop(&r->sq);

while (node != NULL) {
iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
rtio_sqe_pool_free(r->sqe_pool, iodev_sqe);
node = rtio_mpsc_pop(&r->sq);
node = mpsc_pop(&r->sq);
}
}

Expand All @@ -953,7 +953,7 @@ static inline struct rtio_cqe *rtio_cqe_acquire(struct rtio *r)
*/
static inline void rtio_cqe_produce(struct rtio *r, struct rtio_cqe *cqe)
{
rtio_mpsc_push(&r->cq, &cqe->q);
mpsc_push(&r->cq, &cqe->q);
}

/**
Expand All @@ -969,7 +969,7 @@ static inline void rtio_cqe_produce(struct rtio *r, struct rtio_cqe *cqe)
*/
static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r)
{
struct rtio_mpsc_node *node;
struct mpsc_node *node;
struct rtio_cqe *cqe = NULL;

#ifdef CONFIG_RTIO_CONSUME_SEM
Expand All @@ -978,7 +978,7 @@ static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r)
}
#endif

node = rtio_mpsc_pop(&r->cq);
node = mpsc_pop(&r->cq);
if (node == NULL) {
return NULL;
}
Expand All @@ -999,16 +999,16 @@ static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r)
*/
static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r)
{
struct rtio_mpsc_node *node;
struct mpsc_node *node;
struct rtio_cqe *cqe;

#ifdef CONFIG_RTIO_CONSUME_SEM
k_sem_take(r->consume_sem, K_FOREVER);
#endif
node = rtio_mpsc_pop(&r->cq);
node = mpsc_pop(&r->cq);
while (node == NULL) {
Z_SPIN_DELAY(1);
node = rtio_mpsc_pop(&r->cq);
node = mpsc_pop(&r->cq);
}
cqe = CONTAINER_OF(node, struct rtio_cqe, q);

Expand Down Expand Up @@ -1136,13 +1136,13 @@ static inline void rtio_iodev_sqe_err(struct rtio_iodev_sqe *iodev_sqe, int resu
static inline void rtio_iodev_cancel_all(struct rtio_iodev *iodev)
{
/* Clear pending requests as -ENODATA */
struct rtio_mpsc_node *node = rtio_mpsc_pop(&iodev->iodev_sq);
struct mpsc_node *node = mpsc_pop(&iodev->iodev_sq);

while (node != NULL) {
struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);

rtio_iodev_sqe_err(iodev_sqe, -ECANCELED);
node = rtio_mpsc_pop(&iodev->iodev_sq);
node = mpsc_pop(&iodev->iodev_sq);
}
}

Expand Down
Loading