Skip to content

Commit e294c7c

Browse files
committed
[Concurrency] Implement TaskGroup.isEmpty via readyQueue
before reversing order of fragments; future must be last since dynamic size offer fixed before implementing poll
1 parent 520b513 commit e294c7c

File tree

10 files changed

+644
-449
lines changed

10 files changed

+644
-449
lines changed

include/swift/ABI/Task.h

Lines changed: 250 additions & 190 deletions
Large diffs are not rendered by default.

include/swift/Runtime/Concurrency.h

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -133,29 +133,6 @@ struct TaskFutureWaitResult {
133133
OpaqueValue *storage;
134134
};
135135

136-
/// The result of waiting on a Channel (TaskGroup).
137-
struct TaskChannelPollResult {
138-
/// Whether the storage represents an existing value, or "known no value,"
139-
/// instructing the `group.next()` call to return `nil` immediately.
140-
bool hadAnyResult;
141-
142-
/// Whether the storage represents the error result vs. the successful
143-
/// result.
144-
bool hadErrorResult;
145-
146-
/// Storage for the result of the future.
147-
///
148-
/// When the future completed normally, this is a pointer to the storage
149-
/// of the result value, which lives inside the future task itself.
150-
///
151-
/// When the future completed by throwing an error, this is the error
152-
/// object itself.
153-
OpaqueValue *storage;
154-
};
155-
156-
using TaskFutureWaitSignature =
157-
AsyncSignature<TaskFutureWaitResult(AsyncTask *), /*throws*/ false>;
158-
159136
/// Wait for a future task to complete.
160137
///
161138
/// This can be called from any thread. Its Swift signature is
@@ -174,12 +151,23 @@ swift_task_future_wait;
174151
///
175152
/// \code
176153
/// func swift_task_channel_poll(on channelTask: Builtin.NativeObject) async
177-
/// -> RawTaskChannelPollResult?
154+
/// -> RawChannelPollResult?
178155
/// \endcode
179156
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
180-
AsyncFunctionType<TaskChannelPollResult(AsyncTask *task)>
157+
AsyncFunctionType<AsyncTask::ChannelFragment::ChannelPollResult(AsyncTask *task)>
181158
swift_task_channel_poll;
182159

160+
/// Check the readyQueue of a Channel, return true if it has no pending tasks.
161+
///
162+
/// This can be called from any thread. Its Swift signature is
163+
///
164+
/// \code
165+
/// func swift_task_channel_is_empty(on channelTask: Builtin.NativeObject) -> Bool
166+
/// \endcode
167+
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
168+
bool
169+
swift_task_channel_is_empty(AsyncTask *task);
170+
183171
/// Add a status record to a task. The record should not be
184172
/// modified while it is registered with a task.
185173
///

lib/IRGen/IRGenSIL.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2278,8 +2278,13 @@ void IRGenSILFunction::visitFunctionRefBaseInst(FunctionRefBaseInst *i) {
22782278
fn, NotForDefinition, false /*isDynamicallyReplaceableImplementation*/,
22792279
isa<PreviousDynamicFunctionRefInst>(i));
22802280
llvm::Value *value;
2281+
// auto isSpecialAsyncWithoutCtxtSize =
2282+
// fn->isAsync() && fn->getName().equals("swift_task_future_wait");
22812283
auto isSpecialAsyncWithoutCtxtSize =
2282-
fn->isAsync() && fn->getName().equals("swift_task_future_wait");
2284+
fn->isAsync() && (
2285+
fn->getName().equals("swift_task_future_wait") ||
2286+
fn->getName().equals("swift_task_channel_poll") // TODO: do we need this?
2287+
);
22832288
if (fn->isAsync() && !isSpecialAsyncWithoutCtxtSize) {
22842289
value = IGM.getAddrOfAsyncFunctionPointer(fn);
22852290
value = Builder.CreateBitCast(value, fnPtr->getType());

stdlib/public/Concurrency/Task.cpp

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
using namespace swift;
2626
using FutureFragment = AsyncTask::FutureFragment;
27+
using ChannelFragment = AsyncTask::ChannelFragment;
2728

2829
void FutureFragment::destroy() {
2930
auto queueHead = waitQueue.load(std::memory_order_acquire);
@@ -41,10 +42,15 @@ void FutureFragment::destroy() {
4142
}
4243
}
4344

44-
FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask) {
45+
FutureFragment::Status
46+
AsyncTask::waitFuture(AsyncTask *waitingTask) {
4547
using Status = FutureFragment::Status;
4648
using WaitQueueItem = FutureFragment::WaitQueueItem;
4749

50+
// if (isChannel()) {
51+
// assert(false && "waiting on channel!");
52+
// }
53+
4854
assert(isFuture());
4955
auto fragment = futureFragment();
5056

@@ -263,15 +269,20 @@ AsyncTaskAndContext swift::swift_task_create_future_f(
263269
initialContextSize >= sizeof(FutureAsyncContext));
264270
assert((parent != nullptr) == flags.task_isChildTask());
265271

266-
// TODO: cleanup
267-
if (flags.task_isGroupChild()) {
268-
assert(flags.task_isGroupChild() && flags.task_isChildTask());
269-
assert(flags.task_isGroupChild() && parent->Flags.task_isChannel());
272+
if (flags.task_isGroupChild()) { // TODO: express without the `if`?
273+
assert(flags.task_isChildTask());
274+
assert(parent->Flags.task_isChannel());
270275
}
271276

272277
// Figure out the size of the header.
273278
size_t headerSize = sizeof(AsyncTask);
274-
if (parent) headerSize += sizeof(AsyncTask::ChildFragment);
279+
if (parent) {
280+
headerSize += sizeof(AsyncTask::ChildFragment);
281+
}
282+
283+
if (flags.task_isChannel()) {
284+
headerSize += ChannelFragment::fragmentSize();
285+
}
275286

276287
if (futureResultType) {
277288
headerSize += FutureFragment::fragmentSize(futureResultType);
@@ -304,6 +315,12 @@ AsyncTaskAndContext swift::swift_task_create_future_f(
304315
new (childFragment) AsyncTask::ChildFragment(parent);
305316
}
306317

318+
// Initialize the channel fragment if applicable.
319+
if (flags.task_isChannel()) {
320+
auto channelFragment = task->channelFragment();
321+
new (channelFragment) ChannelFragment(futureResultType);
322+
}
323+
307324
// Initialize the future fragment if applicable.
308325
if (futureResultType) {
309326
auto futureFragment = task->futureFragment();
@@ -314,18 +331,6 @@ AsyncTaskAndContext swift::swift_task_create_future_f(
314331
auto futureContext = static_cast<FutureAsyncContext *>(initialContext);
315332
futureContext->errorResult = nullptr;
316333
futureContext->indirectResult = futureFragment->getStoragePtr();
317-
318-
if (flags.task_isChannel()) {
319-
auto parentChannelFragment = parent->channelFragment();
320-
// new(channelFragment) ChannelFragment(futureResultType); // TODO make sure parent does init
321-
assert(parentChannelFragment && "group child task could not find parent channel fragment!");
322-
323-
//
324-
// // Set up the context for the future such that results ar *offered*
325-
// // into the channel fragment's message queue.
326-
// futureContext->indirectResult = parentChannelFragment;
327-
}
328-
329334
}
330335

331336
// Configure the initial context.
@@ -355,9 +360,16 @@ void swift::swift_task_future_wait(
355360
waitingTask->ResumeTask = rawContext->ResumeParent;
356361
waitingTask->ResumeContext = rawContext;
357362

358-
// Wait on the future.
359363
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
360364
auto task = context->task;
365+
366+
if (task->isChannel()) {
367+
// assert(false && "WAITING ON GROUP CHILD TASK");
368+
swift::swift_task_channel_poll(waitingTask, executor, rawContext);
369+
return;
370+
}
371+
372+
// Wait on the future.
361373
assert(task->isFuture());
362374
switch (task->waitFuture(waitingTask)) {
363375
case FutureFragment::Status::Executing:

0 commit comments

Comments
 (0)