Skip to content

Commit 886dbd0

Browse files
committed
rtio: Add transactional submissions
Transactional submissions treat a sequence of sqes as a single atomic submission given to a single iodev and expect as a reply a single completion. This is useful for scatter gather like APIs that exist in Zephyr already for I2C and SPI. Signed-off-by: Tom Burdick <[email protected]>
1 parent 00a317b commit 886dbd0

File tree

5 files changed

+288
-121
lines changed

5 files changed

+288
-121
lines changed

include/zephyr/rtio/rtio.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,25 @@ struct rtio_iodev;
9797

9898
/**
9999
* @brief The next request in the queue should wait on this one.
100+
*
101+
* Chained SQEs are individual units of work describing patterns of
102+
* ordering and failure cascading. A chained SQE must be started only
103+
* after the one before it. They are given to the iodevs one after another.
100104
*/
101105
#define RTIO_SQE_CHAINED BIT(0)
102106

107+
/**
108+
* @brief The next request in the queue is part of a transaction.
109+
*
110+
* Transactional SQEs are sequential parts of a unit of work.
111+
* Only the first transactional SQE is submitted to an iodev, the
112+
* remaining SQEs are never individually submitted but instead considered
113+
* to be part of the transaction to the single iodev. The first sqe in the
114+
* sequence holds the iodev that will be used and the last holds the userdata
115+
* that will be returned in a single completion on failure/success.
116+
*/
117+
#define RTIO_SQE_TRANSACTION BIT(1)
118+
103119
/**
104120
* @}
105121
*/

subsys/rtio/rtio_executor_concurrent.c

Lines changed: 97 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ static uint16_t conex_task_next(struct rtio_concurrent_executor *exc)
4949
uint16_t task_id = exc->task_in;
5050

5151
exc->task_in++;
52-
return task_id;
52+
return task_id & exc->task_mask;
5353
}
5454

5555
static inline uint16_t conex_task_id(struct rtio_concurrent_executor *exc,
@@ -62,16 +62,25 @@ static inline uint16_t conex_task_id(struct rtio_concurrent_executor *exc,
6262

6363
static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *exc)
6464
{
65+
uint32_t swept = 1;
6566
struct rtio_sqe *sqe = rtio_spsc_consume(r->sq);
6667

67-
while (sqe != NULL && sqe->flags & RTIO_SQE_CHAINED) {
68+
while (sqe != NULL && (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION))) {
6869
rtio_spsc_release(r->sq);
6970
sqe = rtio_spsc_consume(r->sq);
71+
swept++;
7072
}
7173

7274
rtio_spsc_release(r->sq);
7375
}
7476

77+
/**
78+
* @brief Sweep like a GC of sorts old tasks that are completed in order
79+
*
80+
* Will only sweep tasks in the order they arrived in the submission queue.
81+
* Meaning there might be completed tasks that could be freed but are not yet
82+
* because something before it has not yet completed.
83+
*/
7584
static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc)
7685
{
7786
/* In order sweep up */
@@ -86,21 +95,77 @@ static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc)
8695
}
8796
}
8897

98+
/**
99+
* @brief Prepare tasks to run by iterating through the submission queue
100+
*
101+
* For each submission in the queue that begins a chain or transaction
102+
* start a task if possible. Concurrency is limited by the allocated concurrency
103+
* per executor instance.
104+
*/
105+
static void conex_prepare(struct rtio *r, struct rtio_concurrent_executor *exc)
106+
{
107+
struct rtio_sqe *sqe;
108+
109+
/* If never submitted before peek at the first item
110+
* otherwise start back up where the last submit call
111+
* left off
112+
*/
113+
if (exc->pending_sqe == NULL) {
114+
sqe = rtio_spsc_peek(r->sq);
115+
} else {
116+
sqe = exc->pending_sqe;
117+
}
118+
119+
while (sqe != NULL && conex_task_free(exc)) {
120+
/* Get the next free task id */
121+
uint16_t task_idx = conex_task_next(exc);
122+
123+
/* Setup task */
124+
exc->task_cur[task_idx].sqe = sqe;
125+
exc->task_cur[task_idx].r = r;
126+
exc->task_status[task_idx] = CONEX_TASK_SUSPENDED;
127+
128+
/* Go to the next sqe not in the current chain or transaction */
129+
while (sqe != NULL && (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION))) {
130+
sqe = rtio_spsc_next(r->sq, sqe);
131+
}
132+
133+
/* SQE is the end of the previous chain or transaction so skip it */
134+
sqe = rtio_spsc_next(r->sq, sqe);
135+
}
136+
137+
/* Out of available tasks so remember where we left off to begin again once tasks free up */
138+
exc->pending_sqe = sqe;
139+
}
140+
141+
142+
/**
143+
* @brief Resume tasks that are suspended
144+
*
145+
* All tasks begin as suspended tasks. This kicks them off to the submissions
146+
* associated iodev.
147+
*/
89148
static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc)
90149
{
91150
/* In order resume tasks */
92151
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
93152
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) {
94153
LOG_INF("resuming suspended task %d", task_id);
95-
exc->task_status[task_id] &= ~CONEX_TASK_SUSPENDED;
96-
rtio_iodev_submit(&exc->task_cur[task_id]);
154+
exc->task_status[task_id & exc->task_mask] &= ~CONEX_TASK_SUSPENDED;
155+
rtio_iodev_submit(&exc->task_cur[task_id & exc->task_mask]);
97156
}
98157
}
99158
}
100159

160+
/**
161+
* @brief Sweep, Prepare, and Resume in one go
162+
*
163+
* Called after a completion to continue doing more work if needed.
164+
*/
101165
static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *exc)
102166
{
103167
conex_sweep(r, exc);
168+
conex_prepare(r, exc);
104169
conex_resume(r, exc);
105170
}
106171

@@ -114,71 +179,14 @@ static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *
114179
int rtio_concurrent_submit(struct rtio *r)
115180
{
116181

117-
LOG_INF("submit");
118-
119182
struct rtio_concurrent_executor *exc =
120183
(struct rtio_concurrent_executor *)r->executor;
121-
struct rtio_sqe *sqe;
122-
struct rtio_sqe *last_sqe;
123184
k_spinlock_key_t key;
124185

125186
key = k_spin_lock(&exc->lock);
126187

127-
/* If never submitted before peek at the first item
128-
* otherwise start back up where the last submit call
129-
* left off
130-
*/
131-
if (exc->last_sqe == NULL) {
132-
sqe = rtio_spsc_peek(r->sq);
133-
} else {
134-
/* Pickup from last submit call */
135-
sqe = rtio_spsc_next(r->sq, exc->last_sqe);
136-
}
137-
138-
last_sqe = sqe;
139-
while (sqe != NULL && conex_task_free(exc)) {
140-
LOG_INF("head SQE in chain %p", sqe);
141-
142-
/* Get the next task id if one exists */
143-
uint16_t task_idx = conex_task_next(exc);
144-
145-
LOG_INF("setting up task %d", task_idx);
146-
147-
/* Setup task (yes this is it) */
148-
exc->task_cur[task_idx].sqe = sqe;
149-
exc->task_cur[task_idx].r = r;
150-
exc->task_status[task_idx] = CONEX_TASK_SUSPENDED;
151-
152-
LOG_INF("submitted sqe %p", sqe);
153-
/* Go to the next sqe not in the current chain */
154-
while (sqe != NULL && (sqe->flags & RTIO_SQE_CHAINED)) {
155-
sqe = rtio_spsc_next(r->sq, sqe);
156-
}
157-
158-
LOG_INF("tail SQE in chain %p", sqe);
159-
160-
last_sqe = sqe;
161-
162-
/* SQE is the end of the previous chain */
163-
sqe = rtio_spsc_next(r->sq, sqe);
164-
}
165-
166-
/* Out of available pointers, wait til others complete, note the
167-
* first pending submission queue. May be NULL if nothing is pending.
168-
*/
169-
exc->pending_sqe = sqe;
170-
171-
/**
172-
* Run through the queue until the last item
173-
* and take not of it
174-
*/
175-
while (sqe != NULL) {
176-
last_sqe = sqe;
177-
sqe = rtio_spsc_next(r->sq, sqe);
178-
}
179-
180-
/* Note the last sqe for the next submit call */
181-
exc->last_sqe = last_sqe;
188+
/* Prepare tasks to run, they start in a suspended state */
189+
conex_prepare(r, exc);
182190

183191
/* Resume all suspended tasks */
184192
conex_resume(r, exc);
@@ -202,14 +210,9 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
202210
/* Interrupt may occur in spsc_acquire, breaking the contract
203211
* so spin around it effectively preventing another interrupt on
204212
* this core, and another core trying to concurrently work in here.
205-
*
206-
* This can and should be broken up into a few sections with a try
207-
* lock around the sweep and resume.
208213
*/
209214
key = k_spin_lock(&exc->lock);
210215

211-
rtio_cqe_submit(r, result, sqe->userdata);
212-
213216
/* Determine the task id by memory offset O(1) */
214217
uint16_t task_id = conex_task_id(exc, iodev_sqe);
215218

@@ -218,16 +221,19 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
218221

219222
exc->task_cur[task_id].sqe = next_sqe;
220223
rtio_iodev_submit(&exc->task_cur[task_id]);
221-
222224
} else {
223225
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
224226
}
225227

228+
bool transaction = sqe->flags & RTIO_SQE_TRANSACTION;
229+
230+
while (transaction) {
231+
sqe = rtio_spsc_next(r->sq, sqe);
232+
transaction = sqe->flags & RTIO_SQE_TRANSACTION;
233+
}
234+
235+
rtio_cqe_submit(r, result, sqe->userdata);
226236

227-
/* Sweep up unused SQEs and tasks, retry suspended tasks */
228-
/* TODO Use a try lock here and don't bother doing it if we are already
229-
* doing it elsewhere
230-
*/
231237
conex_sweep_resume(r, exc);
232238

233239
k_spin_unlock(&exc->lock, key);
@@ -238,39 +244,41 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
238244
*/
239245
void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result)
240246
{
241-
const struct rtio_sqe *nsqe;
242247
k_spinlock_key_t key;
243248
struct rtio *r = iodev_sqe->r;
244249
const struct rtio_sqe *sqe = iodev_sqe->sqe;
245250
struct rtio_concurrent_executor *exc = (struct rtio_concurrent_executor *)r->executor;
251+
void *userdata = sqe->userdata;
252+
bool chained = sqe->flags & RTIO_SQE_CHAINED;
253+
bool transaction = sqe->flags & RTIO_SQE_TRANSACTION;
254+
uint16_t task_id = conex_task_id(exc, iodev_sqe);
246255

247256
/* Another interrupt (and sqe complete) may occur in spsc_acquire,
248257
* breaking the contract so spin around it effectively preventing another
249258
* interrupt on this core, and another core trying to concurrently work
250259
* in here.
251-
*
252-
* This can and should be broken up into a few sections with a try
253-
* lock around the sweep and resume.
254260
*/
255261
key = k_spin_lock(&exc->lock);
256262

257-
rtio_cqe_submit(r, result, sqe->userdata);
258-
259-
/* Determine the task id : O(1) */
260-
uint16_t task_id = conex_task_id(exc, iodev_sqe);
263+
if (!transaction) {
264+
rtio_cqe_submit(r, result, userdata);
265+
}
261266

262-
sqe = iodev_sqe->sqe;
267+
/* While the last sqe was marked as chained or transactional, do more work */
268+
while (chained | transaction) {
269+
sqe = rtio_spsc_next(r->sq, sqe);
270+
chained = sqe->flags & RTIO_SQE_CHAINED;
271+
transaction = sqe->flags & RTIO_SQE_TRANSACTION;
272+
userdata = sqe->userdata;
263273

264-
/* Fail the remaining sqe's in the chain */
265-
if (sqe->flags & RTIO_SQE_CHAINED) {
266-
nsqe = rtio_spsc_next(r->sq, sqe);
267-
while (nsqe != NULL && nsqe->flags & RTIO_SQE_CHAINED) {
268-
rtio_cqe_submit(r, -ECANCELED, nsqe->userdata);
269-
nsqe = rtio_spsc_next(r->sq, nsqe);
274+
if (!transaction) {
275+
rtio_cqe_submit(r, result, userdata);
276+
} else {
277+
rtio_cqe_submit(r, -ECANCELED, userdata);
270278
}
271279
}
272280

273-
/* Task is complete (failed) */
281+
/* Determine the task id : O(1) */
274282
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
275283

276284
conex_sweep_resume(r, exc);

0 commit comments

Comments
 (0)