Skip to content

Commit 2eaaf35

Browse files
committed
complete impl, except body throws
1 parent 60ee652 commit 2eaaf35

File tree

2 files changed

+21
-51
lines changed

2 files changed

+21
-51
lines changed

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,6 @@ TaskGroupTaskStatusRecord * TaskGroup::getTaskRecord() {
577577
// Initializes into the preallocated _group an actual TaskGroupImpl.
578578
SWIFT_CC(swift)
579579
static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T) {
580-
fprintf(stderr, "[%s:%d](%s) INITIALIZE...\n", __FILE_NAME__, __LINE__, __FUNCTION__);
581580
swift_taskGroup_initializeWithFlags(0, group, T);
582581
}
583582

@@ -588,9 +587,6 @@ static void swift_taskGroup_initializeWithFlagsImpl(size_t rawGroupFlags, TaskGr
588587

589588
TaskGroupFlags groupFlags(rawGroupFlags);
590589

591-
fprintf(stderr, "[%s:%d](%s) INITIALIZE FLAGS: flags.discardResults:%d\n", __FILE_NAME__, __LINE__, __FUNCTION__,
592-
groupFlags.isDiscardResults());
593-
594590
TaskGroupImpl *impl = ::new(group)
595591
TaskGroupImpl(T, groupFlags.isDiscardResults());
596592
auto record = impl->getTaskRecord();
@@ -809,23 +805,28 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
809805
// and we immediately discard the result.
810806
SWIFT_TASK_GROUP_DEBUG_LOG(this, "discard result, hadError:%d, was pending:%llu",
811807
hadErrorResult, assumed.pendingTasks(this));
812-
if (!lastPendingTaskAndWaitingTask) {
813-
// we're not able to immediately complete a waitingTask with this task, so we may have to store it...
814-
if (hadErrorResult && readyQueue.isEmpty()) {
815-
// a discardResults throwing task group must retain the FIRST error it encounters.
816-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer error, completedTask:%p", completedTask);
817-
enqueueCompletedTask(completedTask, /*hadErrorResult=*/hadErrorResult);
818-
}
819-
} // else, no need to store the task, as we'll immediately complete the waitingTask using it.
820-
821808
// If this was the last pending task, and there is a waiting task (from waitAll),
822809
// we must resume the task; but not otherwise. There cannot be any waiters on next()
823810
// while we're discarding results.
824811
if (lastPendingTaskAndWaitingTask) {
812+
ReadyQueueItem item;
813+
bool dequeuedErrorItem = readyQueue.dequeue(item);
825814
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, offered last pending task, resume waiting task:%p",
826815
waitQueue.load(std::memory_order_relaxed));
827-
resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/hadErrorResult);
816+
if (dequeuedErrorItem) {
817+
assert(item.getStatus() == ReadyStatus::Error && "only errors can be stored by a discarding task group, yet it wasn't an error!");
818+
resumeWaitingTask(item.getTask(), assumed, /*hadErrorResult=*/true);
819+
} else {
820+
resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/hadErrorResult);
821+
}
828822
} else {
823+
assert(!lastPendingTaskAndWaitingTask);
824+
if (hadErrorResult && readyQueue.isEmpty()) {
825+
// a discardResults throwing task group must retain the FIRST error it encounters.
826+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer error, completedTask:%p", completedTask);
827+
enqueueCompletedTask(completedTask, /*hadErrorResult=*/hadErrorResult);
828+
} // else, we just are going to discard it.
829+
829830
auto afterComplete = statusCompletePendingAssumeRelease();
830831
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, either more pending tasks, or no waiting task, status:%s",
831832
afterComplete.to_string(this).c_str());

test/Concurrency/Runtime/async_taskgroup_throw_rethrow.swift

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,12 @@ func test_discardingTaskGroup_automaticallyRethrows() async {
8787
}
8888

8989
func test_discardingTaskGroup_automaticallyRethrowsOnlyFirst() async {
90-
print("==== \(#function) ------") // CHECK_LABEL: test_discardingTaskGroup_automaticallyRethrowsOnlyFirst
90+
print("==== \(#function) ------") // CHECK-LABEL: test_discardingTaskGroup_automaticallyRethrowsOnlyFirst
9191
do {
9292
let got = try await withThrowingDiscardingTaskGroup(returning: Int.self) { group in
93-
group.addTask { await echo(1) }
93+
group.addTask {
94+
await echo(1)
95+
}
9496
group.addTask {
9597
let error = Boom(id: "first, isCancelled:\(Task.isCancelled)")
9698
print("Throwing: \(error)")
@@ -102,7 +104,7 @@ func test_discardingTaskGroup_automaticallyRethrowsOnlyFirst() async {
102104
do {
103105
try await Task.sleep(until: .now + .seconds(120), clock: .continuous)
104106
} catch {
105-
print("Throwing: \(error)")
107+
print("Awoken, throwing: \(error)")
106108
throw error
107109
}
108110
}
@@ -112,46 +114,13 @@ func test_discardingTaskGroup_automaticallyRethrowsOnlyFirst() async {
112114
print("Expected error to be thrown, but got: \(got)")
113115
} catch {
114116
// CHECK: Throwing: Boom(id: "first, isCancelled:false
115-
// CHECK: Throwing: CancellationError()
117+
// CHECK: Awoken, throwing: CancellationError()
116118
// and only then the re-throw happens:
117119
// CHECK: rethrown: Boom(id: "first
118120
print("rethrown: \(error)")
119121
}
120122
}
121123

122-
func test_discardingTaskGroup_automaticallyRethrowsOnlyFirstIncludingGroupBody() async {
123-
print("==== \(#function) ------") // CHECK_LABEL: test_discardingTaskGroup_automaticallyRethrowsOnlyFirstIncludingGroupBody
124-
do {
125-
try await withThrowingDiscardingTaskGroup(returning: Int.self) { group in
126-
group.addTask { await echo(1) }
127-
group.addTask {
128-
try await Task.sleep(until: .now + .seconds(1), clock: .continuous)
129-
let error = Boom(id: "first, isCancelled:\(Task.isCancelled)")
130-
print("Throwing: \(error)")
131-
throw error
132-
}
133-
group.addTask {
134-
// we wait "forever" but since the group will get cancelled after
135-
// the first error, this will be woken up and throw a cancellation
136-
try await Task.sleep(until: .now + .seconds(20), clock: .continuous)
137-
}
138-
139-
let bodyError = Boom(id: "body, isCancelled:\(group.isCancelled)")
140-
print("Throwing: \(bodyError)")
141-
throw bodyError
142-
}
143-
144-
print("Expected error to be thrown")
145-
} catch {
146-
// CHECK: Throwing: Boom(id: "body, isCancelled:false
147-
// CHECK: Throwing: Boom(id: "first, isCancelled:true
148-
// CHECK: Throwing: Boom(id: "second, isCancelled:true
149-
// and only then the re-throw happens:
150-
// CHECK: rethrown: Boom(id: "body
151-
print("rethrown: \(error)")
152-
}
153-
}
154-
155124
@available(SwiftStdlib 5.1, *)
156125
@main struct Main {
157126
static func main() async {

0 commit comments

Comments
 (0)