Skip to content

Commit 33881c7

Browse files
committed
rtio: Shareable lock-free iodevs
By using an mpsc queue for each iodev, the iodev itself is shareable across contexts. Since its lock free, submits may occur even from an ISR context. Rather than a fixed size queue, and with it the possibility of running out of pre-allocated spots, each iodev now holds a wait-free mpsc queue head. This changes the parameter of iodev submit to be a struct containing 4 pointers for the rtio context, the submission queue entry, and the mpsc node for the iodevs submission queue. This solves the problem involving busy iodevs working with real devices. For example a busy SPI bus driver could enqueue, without locking, a request to start once the current request is done. The queue entries are expected to be owned and allocated by the executor rather than the iodev. This helps simplify potential tuning knobs to one place, the RTIO context and its executor an application directly uses. As the test case shows iodevs can operate effectively lock free with the mpsc queue and a single atomic denoting the current task. Signed-off-by: Tom Burdick <[email protected]>
1 parent 66746e6 commit 33881c7

File tree

9 files changed

+216
-202
lines changed

9 files changed

+216
-202
lines changed

include/zephyr/rtio/rtio.h

Lines changed: 54 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#define ZEPHYR_INCLUDE_RTIO_RTIO_H_
3434

3535
#include <zephyr/rtio/rtio_spsc.h>
36+
#include <zephyr/rtio/rtio_mpsc.h>
3637
#include <zephyr/sys/__assert.h>
3738
#include <zephyr/sys/atomic.h>
3839
#include <zephyr/device.h>
@@ -164,6 +165,7 @@ struct rtio_cq {
164165
};
165166

166167
struct rtio;
168+
struct rtio_iodev_sqe;
167169

168170
struct rtio_executor_api {
169171
/**
@@ -179,12 +181,12 @@ struct rtio_executor_api {
179181
/**
180182
* @brief SQE completes successfully
181183
*/
182-
void (*ok)(struct rtio *r, const struct rtio_sqe *sqe, int result);
184+
void (*ok)(struct rtio_iodev_sqe *iodev_sqe, int result);
183185

184186
/**
185-
* @brief SQE fails to complete
187+
* @brief SQE fails to complete successfully
186188
*/
187-
void (*err)(struct rtio *r, const struct rtio_sqe *sqe, int result);
189+
void (*err)(struct rtio_iodev_sqe *iodev_sqe, int result);
188190
};
189191

190192
/**
@@ -257,61 +259,32 @@ struct rtio {
257259
struct rtio_cq *cq;
258260
};
259261

262+
260263
/**
261-
* @brief API that an RTIO IO device should implement
264+
* @brief IO device submission queue entry
262265
*/
263-
struct rtio_iodev_api {
264-
/**
265-
* @brief Submission function for a request to the iodev
266-
*
267-
* The iodev is responsible for doing the operation described
268-
* as a submission queue entry and reporting results using using
269-
* `rtio_sqe_ok` or `rtio_sqe_err` once done.
270-
*/
271-
void (*submit)(const struct rtio_sqe *sqe,
272-
struct rtio *r);
273-
274-
/**
275-
* TODO some form of transactional piece is missing here
276-
* where we wish to "transact" on an iodev with multiple requests
277-
* over a chain.
278-
*
279-
* Only once explicitly released or the chain fails do we want
280-
* to release. Once released any pending iodevs in the queue
281-
* should be done.
282-
*
283-
* Something akin to a lock/unlock pair.
284-
*/
285-
};
286-
287-
/* IO device submission queue entry */
288266
struct rtio_iodev_sqe {
267+
struct rtio_mpsc_node q;
289268
const struct rtio_sqe *sqe;
290269
struct rtio *r;
291270
};
292271

293272
/**
294-
* @brief IO device submission queue
295-
*
296-
* This is used for reifying the member of the rtio_iodev struct
297-
*/
298-
struct rtio_iodev_sq {
299-
struct rtio_spsc _spsc;
300-
struct rtio_iodev_sqe buffer[];
301-
};
302-
303-
/**
304-
* @brief An IO device with a function table for submitting requests
273+
* @brief API that an RTIO IO device should implement
305274
*/
306-
struct rtio_iodev {
307-
/* Function pointer table */
308-
const struct rtio_iodev_api *api;
309-
310-
/* Queue of RTIO contexts with requests */
311-
struct rtio_iodev_sq *iodev_sq;
312-
313-
/* Data associated with this iodev */
314-
void *data;
275+
struct rtio_iodev_api {
276+
/**
277+
* @brief Submit to the iodev an entry to work on
278+
*
279+
* This call should be short in duration and most likely
280+
* either enqueue or kick off an entry with the hardware.
281+
*
282+
* If polling is required the iodev should add itself to the execution
283+
* context (@see rtio_add_pollable())
284+
*
285+
* @param iodev_sqe Submission queue entry
286+
*/
287+
void (*submit)(struct rtio_iodev_sqe *iodev_sqe);
315288
};
316289

317290
/** An operation that does nothing and will complete immediately */
@@ -389,29 +362,17 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
389362
#define RTIO_CQ_DEFINE(name, len) \
390363
RTIO_SPSC_DEFINE(name, struct rtio_cqe, len)
391364

392-
393-
/**
394-
* @brief Statically define and initialize a fixed length iodev submission queue
395-
*
396-
* @param name Name of the queue.
397-
* @param len Queue length, power of 2 required
398-
*/
399-
#define RTIO_IODEV_SQ_DEFINE(name, len) \
400-
RTIO_SPSC_DEFINE(name, struct rtio_iodev_sqe, len)
401-
402365
/**
403366
* @brief Statically define and initialize an RTIO IODev
404367
*
405368
* @param name Name of the iodev
406369
* @param iodev_api Pointer to struct rtio_iodev_api
407-
* @param qsize Size of the submission queue, must be power of 2
408370
* @param iodev_data Data pointer
409371
*/
410-
#define RTIO_IODEV_DEFINE(name, iodev_api, qsize, iodev_data) \
411-
static RTIO_IODEV_SQ_DEFINE(_iodev_sq_##name, qsize); \
412-
const STRUCT_SECTION_ITERABLE(rtio_iodev, name) = { \
372+
#define RTIO_IODEV_DEFINE(name, iodev_api, iodev_data) \
373+
STRUCT_SECTION_ITERABLE(rtio_iodev, name) = { \
413374
.api = (iodev_api), \
414-
.iodev_sq = (struct rtio_iodev_sq *const)&_iodev_sq_##name, \
375+
.iodev_q = RTIO_MPSC_INIT((name.iodev_q)), \
415376
.data = (iodev_data), \
416377
}
417378

@@ -449,14 +410,32 @@ static inline void rtio_set_executor(struct rtio *r, struct rtio_executor *exc)
449410
}
450411

451412
/**
452-
* @brief Perform a submitted operation with an iodev
413+
* @brief Submit to an iodev a submission to work on
453414
*
454-
* @param sqe Submission to work on
455-
* @param r RTIO context
415+
* Should be called by the executor when it wishes to submit work
416+
* to an iodev.
417+
*
418+
* @param iodev_sqe Submission to work on
456419
*/
457-
static inline void rtio_iodev_submit(const struct rtio_sqe *sqe, struct rtio *r)
420+
static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
458421
{
459-
sqe->iodev->api->submit(sqe, r);
422+
return iodev_sqe->sqe->iodev->api->submit(iodev_sqe);
423+
}
424+
425+
/**
426+
* @brief Poll an iodev for completion of a submission
427+
*
428+
* Should be called by the executor when rtio_submit or rtio_cqe is called
429+
* and polling the device is required.
430+
*
431+
* @param iodev IODev to poll
432+
*
433+
* @retval RTIO_POLL_COMPLETED Polling is no longer required
434+
* @retval RTIO_POLL_PENDING Polling is still required (task is pending)
435+
*/
436+
static inline enum rtio_poll_status rtio_iodev_poll(struct rtio_iodev *iodev)
437+
{
438+
return iodev->api->poll(iodev);
460439
}
461440

462441
/**
@@ -571,27 +550,25 @@ static inline void rtio_cqe_release_all(struct rtio *r)
571550
*
572551
* This may start the next asynchronous request if one is available.
573552
*
574-
* @param r RTIO context
575-
* @param sqe Submission that has succeeded
553+
* @param iodev_sqe IODev Submission that has succeeded
576554
* @param result Result of the request
577555
*/
578-
static inline void rtio_sqe_ok(struct rtio *r, const struct rtio_sqe *sqe, int result)
556+
static inline void rtio_iodev_sqe_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
579557
{
580-
r->executor->api->ok(r, sqe, result);
558+
iodev_sqe->r->executor->api->ok(iodev_sqe, result);
581559
}
582560

583561
/**
584562
* @brief Inform the executor of a submissions completion with error
585563
*
586564
* This SHALL fail the remaining submissions in the chain.
587565
*
588-
* @param r RTIO context
589-
* @param sqe Submission that has failed
566+
* @param iodev_sqe Submission that has failed
590567
* @param result Result of the request
591568
*/
592-
static inline void rtio_sqe_err(struct rtio *r, const struct rtio_sqe *sqe, int result)
569+
static inline void rtio_iodev_sqe_err(struct rtio_iodev_sqe *iodev_sqe, int result)
593570
{
594-
r->executor->api->err(r, sqe, result);
571+
iodev_sqe->r->executor->api->err(iodev_sqe, result);
595572
}
596573

597574
/**

include/zephyr/rtio/rtio_executor_concurrent.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,18 @@ int rtio_concurrent_submit(struct rtio *r);
3737
/**
3838
* @brief Report a SQE has completed successfully
3939
*
40-
* @param r RTIO context to use
41-
* @param sqe RTIO SQE to report success
40+
* @param sqe RTIO IODev SQE to report success
4241
* @param result Result of the SQE
4342
*/
44-
void rtio_concurrent_ok(struct rtio *r, const struct rtio_sqe *sqe, int result);
43+
void rtio_concurrent_ok(struct rtio_iodev_sqe *sqe, int result);
4544

4645
/**
4746
* @brief Report a SQE has completed with error
4847
*
49-
* @param r RTIO context to use
50-
* @param sqe RTIO SQE to report success
48+
* @param sqe RTIO IODev SQE to report success
5149
* @param result Result of the SQE
5250
*/
53-
void rtio_concurrent_err(struct rtio *r, const struct rtio_sqe *sqe, int result);
51+
void rtio_concurrent_err(struct rtio_iodev_sqe *sqe, int result);
5452

5553
/**
5654
* @brief Concurrent Executor
@@ -76,8 +74,8 @@ struct rtio_concurrent_executor {
7674
/* Array of task statuses */
7775
uint8_t *task_status;
7876

79-
/* Array of struct rtio_sqe *'s one per task' */
80-
struct rtio_sqe **task_cur;
77+
/* Array of struct rtio_iodev_sqe *'s one per task' */
78+
struct rtio_iodev_sqe *task_cur;
8179
};
8280

8381
/**
@@ -101,7 +99,7 @@ static const struct rtio_executor_api z_rtio_concurrent_api = {
10199
* @param concurrency Allowed concurrency (number of concurrent tasks).
102100
*/
103101
#define RTIO_EXECUTOR_CONCURRENT_DEFINE(name, concurrency) \
104-
static struct rtio_sqe *_task_cur_##name[(concurrency)]; \
102+
static struct rtio_iodev_sqe _task_cur_##name[(concurrency)]; \
105103
uint8_t _task_status_##name[(concurrency)]; \
106104
static struct rtio_concurrent_executor name = { \
107105
.ctx = { .api = &z_rtio_concurrent_api }, \

include/zephyr/rtio/rtio_executor_simple.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,25 @@ int rtio_simple_submit(struct rtio *r);
3636
/**
3737
* @brief Report a SQE has completed successfully
3838
*
39-
* @param r RTIO context to use
40-
* @param sqe RTIO SQE to report success
39+
* @param iodev_sqe RTIO IODEV SQE to report success
4140
* @param result Result of the SQE
4241
*/
43-
void rtio_simple_ok(struct rtio *r, const struct rtio_sqe *sqe, int result);
42+
void rtio_simple_ok(struct rtio_iodev_sqe *iodev_sqe, int result);
4443

4544
/**
4645
* @brief Report a SQE has completed with error
4746
*
48-
* @param r RTIO context to use
49-
* @param sqe RTIO SQE to report success
47+
* @param iodev_sqe RTIO IODEV SQE to report success
5048
* @param result Result of the SQE
5149
*/
52-
void rtio_simple_err(struct rtio *r, const struct rtio_sqe *sqe, int result);
50+
void rtio_simple_err(struct rtio_iodev_sqe *iodev_sqe, int result);
5351

5452
/**
5553
* @brief Simple Executor
5654
*/
5755
struct rtio_simple_executor {
5856
struct rtio_executor ctx;
57+
struct rtio_iodev_sqe task;
5958
};
6059

6160
/**

include/zephyr/rtio/rtio_mpsc.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <stdint.h>
1313
#include <stdbool.h>
1414
#include <zephyr/sys/atomic.h>
15+
#include <zephyr/kernel.h>
1516

1617
/**
1718
* @brief RTIO Multiple Producer Single Consumer (MPSC) Queue API
@@ -52,6 +53,21 @@ struct rtio_mpsc {
5253
struct rtio_mpsc_node stub;
5354
};
5455

56+
57+
/**
58+
* @brief Static initializer for a mpsc queue
59+
*
60+
* Since the queue is
61+
*
62+
* @param symbol name of the queue
63+
*/
64+
#define RTIO_MPSC_INIT(symbol) \
65+
{ \
66+
.head = (struct rtio_mpsc_node *)&symbol.stub, \
67+
.tail = (struct rtio_mpsc_node *)&symbol.stub, \
68+
.stub.next = NULL, \
69+
}
70+
5571
/**
5672
* @brief Initialize queue
5773
*

0 commit comments

Comments
 (0)