@@ -49,7 +49,7 @@ static uint16_t conex_task_next(struct rtio_concurrent_executor *exc)
4949 uint16_t task_id = exc -> task_in ;
5050
5151 exc -> task_in ++ ;
52- return task_id ;
52+ return task_id & exc -> task_mask ;
5353}
5454
5555static inline uint16_t conex_task_id (struct rtio_concurrent_executor * exc ,
@@ -64,14 +64,21 @@ static void conex_sweep_task(struct rtio *r, struct rtio_concurrent_executor *ex
6464{
6565 struct rtio_sqe * sqe = rtio_spsc_consume (r -> sq );
6666
67- while (sqe != NULL && sqe -> flags & RTIO_SQE_CHAINED ) {
67+ while (sqe != NULL && ( sqe -> flags & ( RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION )) ) {
6868 rtio_spsc_release (r -> sq );
6969 sqe = rtio_spsc_consume (r -> sq );
7070 }
7171
7272 rtio_spsc_release (r -> sq );
7373}
7474
75+ /**
76+ * @brief Sweep like a GC of sorts old tasks that are completed in order
77+ *
78+ * Will only sweep tasks in the order they arrived in the submission queue.
79+ * Meaning there might be completed tasks that could be freed but are not yet
80+ * because something before it has not yet completed.
81+ */
7582static void conex_sweep (struct rtio * r , struct rtio_concurrent_executor * exc )
7683{
7784 /* In order sweep up */
@@ -86,21 +93,77 @@ static void conex_sweep(struct rtio *r, struct rtio_concurrent_executor *exc)
8693 }
8794}
8895
96+ /**
97+ * @brief Prepare tasks to run by iterating through the submission queue
98+ *
99+ * For each submission in the queue that begins a chain or transaction
100+ * start a task if possible. Concurrency is limited by the allocated concurrency
101+ * per executor instance.
102+ */
103+ static void conex_prepare (struct rtio * r , struct rtio_concurrent_executor * exc )
104+ {
105+ struct rtio_sqe * sqe ;
106+
107+ /* If never submitted before peek at the first item
108+ * otherwise start back up where the last submit call
109+ * left off
110+ */
111+ if (exc -> pending_sqe == NULL ) {
112+ sqe = rtio_spsc_peek (r -> sq );
113+ } else {
114+ sqe = exc -> pending_sqe ;
115+ }
116+
117+ while (sqe != NULL && conex_task_free (exc )) {
118+ /* Get the next free task id */
119+ uint16_t task_idx = conex_task_next (exc );
120+
121+ /* Setup task */
122+ exc -> task_cur [task_idx ].sqe = sqe ;
123+ exc -> task_cur [task_idx ].r = r ;
124+ exc -> task_status [task_idx ] = CONEX_TASK_SUSPENDED ;
125+
126+ /* Go to the next sqe not in the current chain or transaction */
127+ while (sqe != NULL && (sqe -> flags & (RTIO_SQE_CHAINED | RTIO_SQE_TRANSACTION ))) {
128+ sqe = rtio_spsc_next (r -> sq , sqe );
129+ }
130+
131+ /* SQE is the end of the previous chain or transaction so skip it */
132+ sqe = rtio_spsc_next (r -> sq , sqe );
133+ }
134+
135+ /* Out of available tasks so remember where we left off to begin again once tasks free up */
136+ exc -> pending_sqe = sqe ;
137+ }
138+
139+
140+ /**
141+ * @brief Resume tasks that are suspended
142+ *
143+ * All tasks begin as suspended tasks. This kicks them off to the submissions
144+ * associated iodev.
145+ */
89146static void conex_resume (struct rtio * r , struct rtio_concurrent_executor * exc )
90147{
91148 /* In order resume tasks */
92149 for (uint16_t task_id = exc -> task_out ; task_id < exc -> task_in ; task_id ++ ) {
93150 if (exc -> task_status [task_id & exc -> task_mask ] & CONEX_TASK_SUSPENDED ) {
94151 LOG_INF ("resuming suspended task %d" , task_id );
95- exc -> task_status [task_id ] &= ~CONEX_TASK_SUSPENDED ;
96- rtio_iodev_submit (& exc -> task_cur [task_id ]);
152+ exc -> task_status [task_id & exc -> task_mask ] &= ~CONEX_TASK_SUSPENDED ;
153+ rtio_iodev_submit (& exc -> task_cur [task_id & exc -> task_mask ]);
97154 }
98155 }
99156}
100157
158+ /**
159+ * @brief Sweep, Prepare, and Resume in one go
160+ *
161+ * Called after a completion to continue doing more work if needed.
162+ */
101163static void conex_sweep_resume (struct rtio * r , struct rtio_concurrent_executor * exc )
102164{
103165 conex_sweep (r , exc );
166+ conex_prepare (r , exc );
104167 conex_resume (r , exc );
105168}
106169
@@ -114,71 +177,14 @@ static void conex_sweep_resume(struct rtio *r, struct rtio_concurrent_executor *
114177int rtio_concurrent_submit (struct rtio * r )
115178{
116179
117- LOG_INF ("submit" );
118-
119180 struct rtio_concurrent_executor * exc =
120181 (struct rtio_concurrent_executor * )r -> executor ;
121- struct rtio_sqe * sqe ;
122- struct rtio_sqe * last_sqe ;
123182 k_spinlock_key_t key ;
124183
125184 key = k_spin_lock (& exc -> lock );
126185
127- /* If never submitted before peek at the first item
128- * otherwise start back up where the last submit call
129- * left off
130- */
131- if (exc -> last_sqe == NULL ) {
132- sqe = rtio_spsc_peek (r -> sq );
133- } else {
134- /* Pickup from last submit call */
135- sqe = rtio_spsc_next (r -> sq , exc -> last_sqe );
136- }
137-
138- last_sqe = sqe ;
139- while (sqe != NULL && conex_task_free (exc )) {
140- LOG_INF ("head SQE in chain %p" , sqe );
141-
142- /* Get the next task id if one exists */
143- uint16_t task_idx = conex_task_next (exc );
144-
145- LOG_INF ("setting up task %d" , task_idx );
146-
147- /* Setup task (yes this is it) */
148- exc -> task_cur [task_idx ].sqe = sqe ;
149- exc -> task_cur [task_idx ].r = r ;
150- exc -> task_status [task_idx ] = CONEX_TASK_SUSPENDED ;
151-
152- LOG_INF ("submitted sqe %p" , sqe );
153- /* Go to the next sqe not in the current chain */
154- while (sqe != NULL && (sqe -> flags & RTIO_SQE_CHAINED )) {
155- sqe = rtio_spsc_next (r -> sq , sqe );
156- }
157-
158- LOG_INF ("tail SQE in chain %p" , sqe );
159-
160- last_sqe = sqe ;
161-
162- /* SQE is the end of the previous chain */
163- sqe = rtio_spsc_next (r -> sq , sqe );
164- }
165-
166- /* Out of available pointers, wait til others complete, note the
167- * first pending submission queue. May be NULL if nothing is pending.
168- */
169- exc -> pending_sqe = sqe ;
170-
171- /**
172- * Run through the queue until the last item
173- * and take not of it
174- */
175- while (sqe != NULL ) {
176- last_sqe = sqe ;
177- sqe = rtio_spsc_next (r -> sq , sqe );
178- }
179-
180- /* Note the last sqe for the next submit call */
181- exc -> last_sqe = last_sqe ;
186+ /* Prepare tasks to run, they start in a suspended state */
187+ conex_prepare (r , exc );
182188
183189 /* Resume all suspended tasks */
184190 conex_resume (r , exc );
@@ -202,14 +208,9 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
202208 /* Interrupt may occur in spsc_acquire, breaking the contract
203209 * so spin around it effectively preventing another interrupt on
204210 * this core, and another core trying to concurrently work in here.
205- *
206- * This can and should be broken up into a few sections with a try
207- * lock around the sweep and resume.
208211 */
209212 key = k_spin_lock (& exc -> lock );
210213
211- rtio_cqe_submit (r , result , sqe -> userdata );
212-
213214 /* Determine the task id by memory offset O(1) */
214215 uint16_t task_id = conex_task_id (exc , iodev_sqe );
215216
@@ -218,16 +219,19 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
218219
219220 exc -> task_cur [task_id ].sqe = next_sqe ;
220221 rtio_iodev_submit (& exc -> task_cur [task_id ]);
221-
222222 } else {
223223 exc -> task_status [task_id ] |= CONEX_TASK_COMPLETE ;
224224 }
225225
226+ bool transaction = sqe -> flags & RTIO_SQE_TRANSACTION ;
227+
228+ while (transaction ) {
229+ sqe = rtio_spsc_next (r -> sq , sqe );
230+ transaction = sqe -> flags & RTIO_SQE_TRANSACTION ;
231+ }
232+
233+ rtio_cqe_submit (r , result , sqe -> userdata );
226234
227- /* Sweep up unused SQEs and tasks, retry suspended tasks */
228- /* TODO Use a try lock here and don't bother doing it if we are already
229- * doing it elsewhere
230- */
231235 conex_sweep_resume (r , exc );
232236
233237 k_spin_unlock (& exc -> lock , key );
@@ -238,39 +242,41 @@ void rtio_concurrent_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
238242 */
239243void rtio_concurrent_err (struct rtio_iodev_sqe * iodev_sqe , int result )
240244{
241- const struct rtio_sqe * nsqe ;
242245 k_spinlock_key_t key ;
243246 struct rtio * r = iodev_sqe -> r ;
244247 const struct rtio_sqe * sqe = iodev_sqe -> sqe ;
245248 struct rtio_concurrent_executor * exc = (struct rtio_concurrent_executor * )r -> executor ;
249+ void * userdata = sqe -> userdata ;
250+ bool chained = sqe -> flags & RTIO_SQE_CHAINED ;
251+ bool transaction = sqe -> flags & RTIO_SQE_TRANSACTION ;
252+ uint16_t task_id = conex_task_id (exc , iodev_sqe );
246253
247254 /* Another interrupt (and sqe complete) may occur in spsc_acquire,
248255 * breaking the contract so spin around it effectively preventing another
249256 * interrupt on this core, and another core trying to concurrently work
250257 * in here.
251- *
252- * This can and should be broken up into a few sections with a try
253- * lock around the sweep and resume.
254258 */
255259 key = k_spin_lock (& exc -> lock );
256260
257- rtio_cqe_submit (r , result , sqe -> userdata );
258-
259- /* Determine the task id : O(1) */
260- uint16_t task_id = conex_task_id (exc , iodev_sqe );
261+ if (!transaction ) {
262+ rtio_cqe_submit (r , result , userdata );
263+ }
261264
262- sqe = iodev_sqe -> sqe ;
265+ /* While the last sqe was marked as chained or transactional, do more work */
266+ while (chained | transaction ) {
267+ sqe = rtio_spsc_next (r -> sq , sqe );
268+ chained = sqe -> flags & RTIO_SQE_CHAINED ;
269+ transaction = sqe -> flags & RTIO_SQE_TRANSACTION ;
270+ userdata = sqe -> userdata ;
263271
264- /* Fail the remaining sqe's in the chain */
265- if (sqe -> flags & RTIO_SQE_CHAINED ) {
266- nsqe = rtio_spsc_next (r -> sq , sqe );
267- while (nsqe != NULL && nsqe -> flags & RTIO_SQE_CHAINED ) {
268- rtio_cqe_submit (r , - ECANCELED , nsqe -> userdata );
269- nsqe = rtio_spsc_next (r -> sq , nsqe );
272+ if (!transaction ) {
273+ rtio_cqe_submit (r , result , userdata );
274+ } else {
275+ rtio_cqe_submit (r , - ECANCELED , userdata );
270276 }
271277 }
272278
273- /* Task is complete (failed ) */
279+ /* Determine the task id : O(1 ) */
274280 exc -> task_status [task_id ] |= CONEX_TASK_COMPLETE ;
275281
276282 conex_sweep_resume (r , exc );
0 commit comments