Skip to content

Commit 610d307

Browse files
teburdMaureenHelm
authored andcommitted
rtio: Properly track last sqe in the queue
The pending_sqe logic to track where in the ring queue the concurrent executor had left off was slightly flawed. It didn't account for starting all sqes in the queue and ending back up at the beginning. Instead track the last SQE in the queue, from which the next one in the queue will the one to start next. If we happen to sweep the last known SQE in the queue, reset it to NULL so the next time prepare is called we start at the beginning of the queue again. Signed-off-by: Tom Burdick <[email protected]>
1 parent 3353342 commit 610d307

File tree

3 files changed

+30
-28
lines changed

3 files changed

+30
-28
lines changed

include/zephyr/rtio/rtio_executor_concurrent.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ struct rtio_concurrent_executor {
6666
uint16_t task_in, task_out, task_mask;
6767

6868
/* First pending sqe to start when a task becomes available */
69-
struct rtio_sqe *pending_sqe;
70-
71-
/* Last sqe seen from the most recent submit */
7269
struct rtio_sqe *last_sqe;
7370

7471
/* Array of task statuses */
@@ -106,7 +103,6 @@ static const struct rtio_executor_api z_rtio_concurrent_api = {
106103
.task_in = 0, \
107104
.task_out = 0, \
108105
.task_mask = (concurrency)-1, \
109-
.pending_sqe = NULL, \
110106
.last_sqe = NULL, \
111107
.task_status = _task_status_##name, \
112108
.task_cur = _task_cur_##name, \

subsys/rtio/rtio_executor_concurrent.c

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *ex
7070
}
7171

7272
rtio_spsc_release(r->sq);
73+
74+
if (sqe == exc->last_sqe) {
75+
exc->last_sqe = NULL;
76+
}
7377
}
7478

7579
/**
@@ -84,7 +88,7 @@ static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc)
8488
/* In order sweep up */
8589
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
8690
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_COMPLETE) {
87-
LOG_INF("sweeping oldest task %d", task_id);
91+
LOG_DBG("sweeping oldest task %d", task_id);
8892
conex_sweep_task(r, exc);
8993
exc->task_out++;
9094
} else {
@@ -102,38 +106,45 @@ static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc)
102106
*/
103107
static void conex_prepare(struct rtio *r, struct rtio_concurrent_executor *exc)
104108
{
105-
struct rtio_sqe *sqe;
109+
struct rtio_sqe *sqe, *last_sqe;
106110

107111
/* If never submitted before peek at the first item
108112
* otherwise start back up where the last submit call
109113
* left off
110114
*/
111-
if (exc->pending_sqe == NULL) {
115+
if (exc->last_sqe == NULL) {
116+
last_sqe = NULL;
112117
sqe = rtio_spsc_peek(r->sq);
113118
} else {
114-
sqe = exc->pending_sqe;
119+
last_sqe = exc->last_sqe;
120+
sqe = rtio_spsc_next(r->sq, last_sqe);
115121
}
116122

123+
LOG_DBG("starting at sqe %p, last %p", sqe, exc->last_sqe);
124+
117125
while (sqe != NULL && conex_task_free(exc)) {
118126
/* Get the next free task id */
119127
uint16_t task_idx = conex_task_next(exc);
120128

129+
LOG_DBG("preparing task %d, sqe %p", task_idx, sqe);
130+
121131
/* Setup task */
122132
exc->task_cur[task_idx].sqe = sqe;
123133
exc->task_cur[task_idx].r = r;
124134
exc->task_status[task_idx] = CONEX_TASK_SUSPENDED;
125135

126136
/* Go to the next sqe not in the current chain or transaction */
127-
while (sqe != NULL && (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION))) {
137+
while (sqe->flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION)) {
128138
sqe = rtio_spsc_next(r->sq, sqe);
129139
}
130140

131141
/* SQE is the end of the previous chain or transaction so skip it */
142+
last_sqe = sqe;
132143
sqe = rtio_spsc_next(r->sq, sqe);
133144
}
134145

135146
/* Out of available tasks so remember where we left off to begin again once tasks free up */
136-
exc->pending_sqe = sqe;
147+
exc->last_sqe = last_sqe;
137148
}
138149

139150

@@ -148,25 +159,13 @@ static void conex_resume(struct rtio *r, struct rtio_concurrent_executor *exc)
148159
/* In order resume tasks */
149160
for (uint16_t task_id = exc->task_out; task_id < exc->task_in; task_id++) {
150161
if (exc->task_status[task_id & exc->task_mask] & CONEX_TASK_SUSPENDED) {
151-
LOG_INF("resuming suspended task %d", task_id);
162+
LOG_DBG("resuming suspended task %d", task_id);
152163
exc->task_status[task_id & exc->task_mask] &= ~CONEX_TASK_SUSPENDED;
153164
rtio_iodev_submit(&exc->task_cur[task_id & exc->task_mask]);
154165
}
155166
}
156167
}
157168

158-
/**
159-
* @brief Sweep, Prepare, and Resume in one go
160-
*
161-
* Called after a completion to continue doing more work if needed.
162-
*/
163-
static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *exc)
164-
{
165-
conex_sweep(r, exc);
166-
conex_prepare(r, exc);
167-
conex_resume(r, exc);
168-
}
169-
170169
/**
171170
* @brief Submit submissions to concurrent executor
172171
*
@@ -211,6 +210,8 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
211210
*/
212211
key = k_spin_lock(&exc->lock);
213212

213+
LOG_DBG("completed sqe %p", sqe);
214+
214215
/* Determine the task id by memory offset O(1) */
215216
uint16_t task_id = conex_task_id(exc, iodev_sqe);
216217

@@ -230,9 +231,10 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
230231
transaction = sqe->flags & RTIO_SQE_TRANSACTION;
231232
}
232233

234+
conex_sweep(r, exc);
233235
rtio_cqe_submit(r, result, sqe->userdata);
234-
235-
conex_sweep_resume(r, exc);
236+
conex_prepare(r, exc);
237+
conex_resume(r, exc);
236238

237239
k_spin_unlock(&exc->lock, key);
238240
}
@@ -279,7 +281,9 @@ void rtio_concurrent_err(struct rtio_iodev_sqe *iodev_sqe, int result)
279281
/* Determine the task id : O(1) */
280282
exc->task_status[task_id] |= CONEX_TASK_COMPLETE;
281283

282-
conex_sweep_resume(r, exc);
284+
conex_sweep(r, exc);
285+
conex_prepare(r, exc);
286+
conex_resume(r, exc);
283287

284288
k_spin_unlock(&exc->lock, key);
285289
}

tests/subsys/rtio/rtio_api/src/test_rtio_api.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ struct rtio_iodev *iodev_test_chain[] = {&iodev_test_chain0, &iodev_test_chain1}
9595
void test_rtio_chain_(struct rtio *r)
9696
{
9797
int res;
98-
uintptr_t userdata[4] = {0, 1, 2, 3};
98+
uint32_t userdata[4] = {0, 1, 2, 3};
9999
struct rtio_sqe *sqe;
100100
struct rtio_cqe *cqe;
101101

@@ -105,6 +105,7 @@ void test_rtio_chain_(struct rtio *r)
105105
rtio_sqe_prep_nop(sqe, iodev_test_chain[i % 2],
106106
&userdata[i]);
107107
sqe->flags |= RTIO_SQE_CHAINED;
108+
TC_PRINT("produce %d, sqe %p, userdata %d\n", i, sqe, userdata[i]);
108109
}
109110

110111
/* Clear the last one */
@@ -117,10 +118,11 @@ void test_rtio_chain_(struct rtio *r)
117118
zassert_equal(rtio_spsc_consumable(r->cq), 4, "Should have 4 pending completions");
118119

119120
for (int i = 0; i < 4; i++) {
120-
TC_PRINT("consume %d\n", i);
121121
cqe = rtio_spsc_consume(r->cq);
122122
zassert_not_null(cqe, "Expected a valid cqe");
123+
TC_PRINT("consume %d, cqe %p, userdata %d\n", i, cqe, *(uint32_t *)cqe->userdata);
123124
zassert_ok(cqe->result, "Result should be ok");
125+
124126
zassert_equal_ptr(cqe->userdata, &userdata[i], "Expected in order completions");
125127
rtio_spsc_release(r->cq);
126128
}

0 commit comments

Comments
 (0)