@@ -93,7 +93,7 @@ void TaskGroup::destroy(AsyncTask *task) {
93
93
bool taskDequeued = readyQueue.dequeue (item);
94
94
while (taskDequeued) {
95
95
swift_release (item.getTask ());
96
- bool taskDequeued = readyQueue.dequeue (item);
96
+ taskDequeued = readyQueue.dequeue (item);
97
97
}
98
98
mutex.unlock (); // TODO: remove fragment lock, and use status for synchronization
99
99
@@ -105,17 +105,18 @@ void TaskGroup::destroy(AsyncTask *task) {
105
105
// ==== offer ------------------------------------------------------------------
106
106
107
107
static void fillGroupNextResult (TaskFutureWaitAsyncContext *context,
108
- AsyncTask::GroupFragment::GroupPollResult result) {
108
+ TaskGroup::PollResult result) {
109
+ // / Fill in the result value
109
110
switch (result.status ) {
110
- case GroupFragment::GroupPollStatus::Waiting :
111
+ case TaskGroup::PollStatus::MustWait :
111
112
assert (false && " filling a waiting status?" );
112
113
return ;
113
114
114
- case GroupFragment::GroupPollStatus ::Error:
115
+ case TaskGroup::PollStatus ::Error:
115
116
context->fillWithError (reinterpret_cast <SwiftError*>(result.storage ));
116
117
return ;
117
118
118
- case GroupFragment::GroupPollStatus ::Success: {
119
+ case TaskGroup::PollStatus ::Success: {
119
120
// Initialize the result as an Optional<Success>.
120
121
const Metadata *successType = context->successType ;
121
122
OpaqueValue *destPtr = context->successResultPointer ;
@@ -127,7 +128,7 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
127
128
return ;
128
129
}
129
130
130
- case GroupFragment::GroupPollStatus ::Empty: {
131
+ case TaskGroup::PollStatus ::Empty: {
131
132
// Initialize the result as a nil Optional<Success>.
132
133
const Metadata *successType = context->successType ;
133
134
OpaqueValue *destPtr = context->successResultPointer ;
@@ -182,7 +183,14 @@ void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
182
183
completedTask, hadErrorResult, /* needsRelease*/ false );
183
184
184
185
mutex.unlock (); // TODO: remove fragment lock, and use status for synchronization
185
- swift::runTaskWithPollResult (waitingTask, completingExecutor, result);
186
+ // swift::runTaskWithPollResult(waitingTask, completingExecutor, result);
187
+ auto waitingContext =
188
+ static_cast <TaskFutureWaitAsyncContext *>(
189
+ waitingTask->ResumeContext );
190
+ fillGroupNextResult (waitingContext, result);
191
+
192
+ // TODO: allow the caller to suggest an executor
193
+ swift_task_enqueueGlobal (waitingTask);
186
194
return ;
187
195
} // else, try again
188
196
@@ -216,31 +224,33 @@ void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
216
224
// ==== group.next() implementation (wait_next and groupPoll) ------------------
217
225
218
226
SWIFT_CC (swiftasync)
219
- void swift::swift_task_group_wait_next (
227
+ void swift::swift_task_group_wait_next_throwing (
220
228
AsyncTask *waitingTask,
221
229
ExecutorRef executor,
222
230
SWIFT_ASYNC_CONTEXT AsyncContext *rawContext) {
223
231
waitingTask->ResumeTask = rawContext->ResumeParent ;
224
232
waitingTask->ResumeContext = rawContext;
225
233
226
- auto context = static_cast <TaskGroupNextWaitAsyncContext *>(rawContext);
234
+ auto context = static_cast <TaskFutureWaitAsyncContext *>(rawContext);
227
235
auto task = context->task ;
228
236
auto group = context->group ;
229
- TaskGroup::PollResult polled = group->poll (waitingTask);
237
+ fprintf (stderr, " [%s:%d](%s) group: %d\n " , __FILE_NAME__, __LINE__, __FUNCTION__, group);
238
+
239
+ assert (group && " swift_task_group_wait_next_throwing was passed context without group!" );
230
240
231
- if (polled.status == TaskGroup::GroupPollStatus::MustWait) {
241
+ TaskGroup::PollResult polled = group->poll (waitingTask);
242
+ switch (polled.status ) {
243
+ case TaskGroup::PollStatus::MustWait:
232
244
// The waiting task has been queued on the channel,
233
245
// there were pending tasks so it will be woken up eventually.
234
246
return ;
235
247
236
- case GroupFragment::GroupPollStatus ::Empty:
237
- case GroupFragment::GroupPollStatus ::Error:
238
- case GroupFragment::GroupPollStatus ::Success:
248
+ case TaskGroup::PollStatus ::Empty:
249
+ case TaskGroup::PollStatus ::Error:
250
+ case TaskGroup::PollStatus ::Success:
239
251
fillGroupNextResult (context, polled);
240
252
return waitingTask->runInFullyEstablishedContext (executor);
241
253
}
242
-
243
- runTaskWithPollResult (waitingTask, executor, polled);
244
254
}
245
255
246
256
TaskGroup::PollResult TaskGroup::poll (AsyncTask *waitingTask) {
@@ -257,7 +267,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
257
267
// was issued, and if we parked here we'd potentially never be woken up.
258
268
// Bail out and return `nil` from `group.next()`.
259
269
statusRemoveWaiting ();
260
- result.status = TaskGroup::GroupPollStatus ::Empty;
270
+ result.status = TaskGroup::PollStatus ::Empty;
261
271
mutex.unlock (); // TODO: remove group lock, and use status for synchronization
262
272
return result;
263
273
}
@@ -277,7 +287,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
277
287
ReadyQueueItem item;
278
288
bool taskDequeued = readyQueue.dequeue (item);
279
289
if (!taskDequeued) {
280
- result.status = TaskGroup::GroupPollStatus ::MustWait;
290
+ result.status = TaskGroup::PollStatus ::MustWait;
281
291
result.storage = nullptr ;
282
292
result.retainedTask = nullptr ;
283
293
mutex.unlock (); // TODO: remove group lock, and use status for synchronization
@@ -294,23 +304,23 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
294
304
switch (item.getStatus ()) {
295
305
case ReadyStatus::Success:
296
306
// Immediately return the polled value
297
- result.status = TaskGroup::GroupPollStatus ::Success;
307
+ result.status = TaskGroup::PollStatus ::Success;
298
308
result.storage = futureFragment->getStoragePtr ();
299
309
assert (result.retainedTask && " polled a task, it must be not null" );
300
310
mutex.unlock (); // TODO: remove fragment lock, and use status for synchronization
301
311
return result;
302
312
303
313
case ReadyStatus::Error:
304
314
// Immediately return the polled value
305
- result.status = TaskGroup::GroupPollStatus ::Error;
315
+ result.status = TaskGroup::PollStatus ::Error;
306
316
result.storage =
307
317
reinterpret_cast <OpaqueValue *>(futureFragment->getError ());
308
318
assert (result.retainedTask && " polled a task, it must be not null" );
309
319
mutex.unlock (); // TODO: remove fragment lock, and use status for synchronization
310
320
return result;
311
321
312
322
case ReadyStatus::Empty:
313
- result.status = TaskGroup::GroupPollStatus ::Empty;
323
+ result.status = TaskGroup::PollStatus ::Empty;
314
324
result.storage = nullptr ;
315
325
result.retainedTask = nullptr ;
316
326
mutex.unlock (); // TODO: remove fragment lock, and use status for synchronization
@@ -330,7 +340,7 @@ TaskGroup::PollResult TaskGroup::poll(AsyncTask *waitingTask) {
330
340
/* failure*/ std::memory_order_acquire)) {
331
341
mutex.unlock (); // TODO: remove fragment lock, and use status for synchronization
332
342
// no ready tasks, so we must wait.
333
- result.status = TaskGroup::GroupPollStatus ::MustWait;
343
+ result.status = TaskGroup::PollStatus ::MustWait;
334
344
return result;
335
345
} // else, try again
336
346
}
0 commit comments