Skip to content

Commit de5fdcd

Browse files
committed
[TaskGroup] fix missing retain in scheduling next() immediately on offer
1 parent 655d8f1 commit de5fdcd

File tree

10 files changed

+42
-108
lines changed

10 files changed

+42
-108
lines changed

include/swift/ABI/TaskGroup.h

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ namespace swift {
3737

3838
class TaskGroup {
3939
public:
40-
/// Describes the status of the channel.
40+
/// Describes the status of the group.
4141
enum class ReadyStatus : uintptr_t {
42-
/// The channel is empty, no tasks are pending.
42+
/// The task group is empty, no tasks are pending.
4343
/// Return immediately, there is no point in suspending.
4444
///
4545
/// The storage is not accessible.
@@ -54,10 +54,10 @@ namespace swift {
5454
};
5555

5656
enum class PollStatus : uintptr_t {
57-
/// The channel is known to be empty and we can immediately return nil.
57+
/// The group is known to be empty and we can immediately return nil.
5858
Empty = 0,
5959

60-
/// The task has been enqueued to the channels wait queue.
60+
/// The task has been enqueued to the groups wait queue.
6161
MustWait = 1,
6262

6363
/// The task has completed with result (of type \c resultType).
@@ -68,7 +68,7 @@ namespace swift {
6868
Error = 3,
6969
};
7070

71-
/// The result of waiting on a Channel (TaskGroup).
71+
/// The result of waiting on the TaskGroup.
7272
struct PollResult {
7373
PollStatus status; // TODO: pack it into storage pointer or not worth it?
7474

@@ -81,10 +81,10 @@ namespace swift {
8181
/// object itself.
8282
OpaqueValue *storage;
8383

84-
/// Optional, the completed task that was polled out of the ready queue.
84+
/// The completed task, if necessary to keep alive until consumed by next().
8585
///
8686
/// # Important: swift_release
87-
/// If if a task is returned here, the task MUST be swift_release'd
87+
/// If if a task is returned here, the task MUST be swift_released
8888
/// once we are done with it, to balance out the retain made before
8989
/// when the task was enqueued into the ready queue to keep it alive
9090
/// until a next() call eventually picks it up.
@@ -96,8 +96,7 @@ namespace swift {
9696
status == PollStatus::Empty;
9797
}
9898

99-
static PollResult get(AsyncTask *asyncTask, bool hadErrorResult,
100-
bool needsSwiftRelease) {
99+
static PollResult get(AsyncTask *asyncTask, bool hadErrorResult) {
101100
auto fragment = asyncTask->futureFragment();
102101
return PollResult{
103102
/*status*/ hadErrorResult ?
@@ -106,14 +105,12 @@ namespace swift {
106105
/*storage*/ hadErrorResult ?
107106
reinterpret_cast<OpaqueValue *>(fragment->getError()) :
108107
fragment->getStoragePtr(),
109-
/*task*/ needsSwiftRelease ?
110-
asyncTask :
111-
nullptr
108+
/*task*/ asyncTask
112109
};
113110
}
114111
};
115112

116-
/// An item within the message queue of a channel.
113+
/// An item within the message queue of a group.
117114
struct ReadyQueueItem {
118115
/// Mask used for the low status bits in a message queue item.
119116
static const uintptr_t statusMask = 0x03;
@@ -263,7 +260,7 @@ namespace swift {
263260
/// we have to keep this pointer here so we know which record to remove then.
264261
TaskGroupTaskStatusRecord* Record;
265262

266-
/// Queue containing completed tasks offered into this channel.
263+
/// Queue containing completed tasks offered into this group.
267264
///
268265
/// The low bits contain the status, the rest of the pointer is the
269266
/// AsyncTask.
@@ -284,7 +281,7 @@ namespace swift {
284281
// readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
285282
waitQueue(nullptr) {}
286283

287-
/// Destroy the storage associated with the channel.
284+
/// Destroy the storage associated with the group.
288285
void destroy(AsyncTask *task);
289286

290287
bool isEmpty() {
@@ -373,8 +370,11 @@ namespace swift {
373370
}
374371

375372

376-
/// Offer result of a task into this channel.
377-
/// The value is enqueued at the end of the channel.
373+
/// Offer result of a task into this task group.
374+
///
375+
/// If possible, and an existing task is already waiting on next(), this will
376+
/// schedule it immediately. If not, the result is enqueued and will be picked
377+
/// up whenever a task calls next() the next time.
378378
void offer(AsyncTask *completed, AsyncContext *context, ExecutorRef executor);
379379

380380
/// Attempt to dequeue ready tasks and complete the waitingTask.

include/swift/Runtime/Concurrency.h

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,8 @@ SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swiftasync)
161161
TaskFutureWaitThrowingSignature::FunctionType
162162
swift_task_future_wait_throwing;
163163

164-
//using TaskFutureWaitSignature =
165-
// AsyncSignature<void(AsyncTask *, OpaqueValue *), /*throws*/ false>;
166-
167-
using TaskGroupFutureWaitSignature =
168-
AsyncSignature<void(AsyncTask *, TaskGroup *, OpaqueValue *), /*throws*/ true>; // TODO: is this correct?
164+
using TaskGroupFutureWaitThrowingSignature =
165+
AsyncSignature<void(AsyncTask *, TaskGroup *, Metadata *), /*throws*/ true>;
169166

170167
/// Wait for a readyQueue of a Channel to become non empty.
171168
///
@@ -178,7 +175,7 @@ using TaskGroupFutureWaitSignature =
178175
/// ) async -> T
179176
/// \endcode
180177
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swiftasync)
181-
TaskGroupFutureWaitSignature::FunctionType
178+
TaskGroupFutureWaitThrowingSignature::FunctionType
182179
swift_task_group_wait_next_throwing;
183180

184181
/// Create a new `TaskGroup` using the task's allocator.
@@ -194,23 +191,6 @@ swift_task_group_wait_next_throwing;
194191
SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
195192
swift::TaskGroup* swift_task_group_create(AsyncTask *task);
196193

197-
///// Attach the task group to the passed in (current) task that runs it.
198-
/////
199-
///// Returns the allocated and attached record that must be removed from the task,
200-
///// when the group exits.
201-
/////
202-
///// Its Swift signature is
203-
/////
204-
///// \code
205-
///// func swift_task_group_attach(
206-
///// group: Builtin.NativeObject
207-
///// to task: Builtin.NativeObject
208-
///// ) -> UnsafeRawPointer
209-
///// \endcode
210-
//SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
211-
//TaskGroupTaskStatusRecord*
212-
//swift_task_group_attach(TaskGroup *group, AsyncTask *task);
213-
214194
/// Attach a child task to the parent task's task group record.
215195
///
216196
/// Its Swift signature is
@@ -226,20 +206,6 @@ SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
226206
void swift_task_group_attachChild(TaskGroup *group,
227207
AsyncTask *parent, AsyncTask *child);
228208

229-
///// Detach the group from its parent and remove the passed in record.
230-
/////
231-
///// Its Swift signature is
232-
/////
233-
///// \code
234-
///// func swift_task_group_detach(
235-
///// group: UnsafeRawPointer,
236-
///// from task: Builtin.NativeObject
237-
///// ) -> UnsafeRawPointer
238-
///// \endcode
239-
//SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift)
240-
//void
241-
//swift_task_group_detach(TaskGroupTaskStatusRecord *record, AsyncTask *task);
242-
243209
/// Its Swift signature is
244210
///
245211
/// \code

stdlib/public/Concurrency/Task.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ void AsyncTask::completeFuture(AsyncContext *context, ExecutorRef executor) {
115115
// then we must offer into the parent group that we completed,
116116
// so it may `next()` poll completed child tasks in completion order.
117117
auto group = groupChildFragment()->getGroup();
118-
assert(group);
119118
group->offer(this, context, executor);
120119
}
121120

@@ -147,7 +146,6 @@ void AsyncTask::completeFuture(AsyncContext *context, ExecutorRef executor) {
147146
SWIFT_CC(swift)
148147
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
149148
auto task = static_cast<AsyncTask*>(obj);
150-
151149
// For a future, destroy the result.
152150
if (task->isFuture()) {
153151
task->futureFragment()->destroy();

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,16 @@ void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
146146
assert(completedTask->hasGroupChildFragment());
147147
assert(completedTask->groupChildFragment()->getGroup() == this);
148148

149+
// We retain the completed task, because we will either:
150+
// - (a) schedule the waiter to resume on the next() that it is waiting on, or
151+
// - (b) will need to store this task until the group task enters next() and
152+
// picks up this task.
153+
// either way, there is some time between us returning here, and the `completeTask`
154+
// issuing a swift_release on this very task. We need to keep it alive until
155+
// we have the chance to poll it from the queue (via the waiter task entering
156+
// calling next()).
157+
swift_retain(completedTask);
158+
149159
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
150160

151161
// Immediately increment ready count and acquire the status
@@ -179,11 +189,10 @@ void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
179189
/*failure*/ std::memory_order_acquire) &&
180190
statusCompletePendingReadyWaiting(assumed)) {
181191
// Run the task.
182-
auto result = PollResult::get(
183-
completedTask, hadErrorResult, /*needsRelease*/ false);
192+
auto result = PollResult::get(completedTask, hadErrorResult);
184193

185194
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
186-
// swift::runTaskWithPollResult(waitingTask, completingExecutor, result);
195+
187196
auto waitingContext =
188197
static_cast<TaskFutureWaitAsyncContext *>(
189198
waitingTask->ResumeContext);
@@ -193,8 +202,6 @@ void TaskGroup::offer(AsyncTask *completedTask, AsyncContext *context,
193202
swift_task_enqueueGlobal(waitingTask);
194203
return;
195204
} // else, try again
196-
197-
assert(false && "why should this have to try again ever?"); // FIXME
198205
}
199206
}
200207

@@ -234,9 +241,8 @@ void swift::swift_task_group_wait_next_throwing(
234241
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
235242
auto task = context->task;
236243
auto group = context->group;
237-
fprintf(stderr, "[%s:%d](%s) group: %d\n", __FILE_NAME__, __LINE__, __FUNCTION__, group);
238-
239-
assert(group && "swift_task_group_wait_next_throwing was passed context without group!");
244+
assert(waitingTask == task && "attempted to wait on group.next() from other task, which is illegal!");
245+
assert(group && "swift_task_group_wait_next_throwing was passed context without group!");
240246

241247
TaskGroup::PollResult polled = group->poll(waitingTask);
242248
switch (polled.status) {

stdlib/public/Concurrency/TaskGroup.swift

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -220,29 +220,6 @@ extension Task {
220220
#endif
221221

222222
return try await _taskGroupWaitNext(waitingTask: _task, group: _group)
223-
// let rawResult = await _taskGroupWaitNext(waitingTask: _task, group: _group)
224-
//
225-
// // FIXME: we are now in the parent task again, and can safely modify the
226-
// // records again; we should remove this specific child from the
227-
// // group->record
228-
//
229-
// if rawResult.hadErrorResult {
230-
// // Throw the result on error.
231-
// let error = unsafeBitCast(rawResult.storage, to: Error.self)
232-
// throw error
233-
// }
234-
//
235-
// guard let storage = rawResult.storage else {
236-
// // The group was empty, return nil
237-
// return nil
238-
// }
239-
//
240-
// // Take the value on success.
241-
// let storagePtr =
242-
// storage.bindMemory(to: TaskResult.self, capacity: 1)
243-
// let value = UnsafeMutablePointer<TaskResult>(mutating: storagePtr).pointee
244-
//
245-
// return value
246223
}
247224

248225
/// Query whether the group has any remaining tasks.

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,9 @@ class TaskFutureWaitAsyncContext : public AsyncContext {
8181
// Arguments.
8282
AsyncTask *task;
8383

84-
// Only in swift_task_group_next
84+
// Only in swift_task_group_wait_next_throwing.
8585
TaskGroup *group;
86+
// Only in swift_task_group_wait_next_throwing.
8687
const Metadata *successType;
8788

8889
using AsyncContext::AsyncContext;

test/Concurrency/Runtime/async_task_equals_hashCode.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
// REQUIRES: executable_test
44
// REQUIRES: concurrency
55

6-
import Dispatch
76
#if canImport(Darwin)
87
import Darwin
98
#elseif canImport(Glibc)

test/Concurrency/Runtime/async_taskgroup_is_asyncsequence.swift

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
1-
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency %import-libdispatch -parse-as-library) | %FileCheck %s --dump-input=always
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency -parse-as-library) | %FileCheck %s --dump-input=always
22

33
// REQUIRES: executable_test
44
// REQUIRES: concurrency
5-
// REQUIRES: libdispatch
6-
// FIXME: unlock on other OSes
7-
// REQUIRES: OS=macosx
8-
// REQUIRES: CPU=x86_64
9-
10-
import Dispatch
115

126
#if canImport(Darwin)
137
import Darwin

test/Concurrency/Runtime/async_taskgroup_throw_recover.swift

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency %import-libdispatch -parse-as-library) | %FileCheck %s --dump-input=always
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency -parse-as-library) | %FileCheck %s --dump-input=always
22

33
// REQUIRES: executable_test
44
// REQUIRES: concurrency
5-
// REQUIRES: libdispatch
6-
// FIXME: unlock on other OSes
7-
// REQUIRES: OS=macosx
8-
// REQUIRES: CPU=x86_64
95

106
struct Boom: Error {}
117
struct IgnoredBoom: Error {}

test/Concurrency/Runtime/async_taskgroup_throw_rethrow.swift

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency %import-libdispatch -parse-as-library) | %FileCheck %s
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency -parse-as-library) | %FileCheck %s
22

33
// REQUIRES: executable_test
44
// REQUIRES: concurrency
5-
// REQUIRES: libdispatch
6-
7-
import Dispatch
85

96
struct Boom: Error {}
107
struct IgnoredBoom: Error {}
@@ -29,11 +26,11 @@ func test_taskGroup_throws_rethrows() async {
2926
throw error
3027
}
3128

32-
fatalError("should have thrown")
29+
print("should have thrown")
30+
return 0
3331
}
3432

35-
print("got: \(got)")
36-
fatalError("Expected error to be thrown, but got: \(got)")
33+
print("Expected error to be thrown, but got: \(got)")
3734
} catch {
3835
// CHECK: rethrown: Boom()
3936
print("rethrown: \(error)")

0 commit comments

Comments
 (0)