Skip to content

Commit 63b4cf0

Browse files
authored
Merge pull request swiftlang#36689 from apple/tsan-task-groups
2 parents 2e8a19c + 3d93ec3 commit 63b4cf0

File tree

2 files changed

+66
-0
lines changed

2 files changed

+66
-0
lines changed

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
611611

612612
fillGroupNextResult(waitingContext, result);
613613

614+
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
615+
614616
// TODO: allow the caller to suggest an executor
615617
swift_task_enqueueGlobal(waitingTask);
616618
return;
@@ -740,6 +742,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
740742
result.status = PollStatus::Success;
741743
result.storage = futureFragment->getStoragePtr();
742744
assert(result.retainedTask && "polled a task, it must be not null");
745+
_swift_tsan_acquire(static_cast<Job *>(result.retainedTask));
743746
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
744747
return result;
745748

@@ -749,6 +752,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
749752
result.storage =
750753
reinterpret_cast<OpaqueValue *>(futureFragment->getError());
751754
assert(result.retainedTask && "polled a task, it must be not null");
755+
_swift_tsan_acquire(static_cast<Job *>(result.retainedTask));
752756
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
753757
return result;
754758

@@ -765,6 +769,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
765769

766770
// ==== 3) Add to wait queue -------------------------------------------------
767771
assert(assumed.readyTasks() == 0);
772+
_swift_tsan_release(static_cast<Job *>(waitingTask));
768773
while (true) {
769774
// Put the waiting task at the beginning of the wait queue.
770775
if (waitQueue.compare_exchange_weak(
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// RUN: %target-run-simple-swift(-Xfrontend -enable-experimental-concurrency %import-libdispatch -parse-as-library -sanitize=thread)
2+
3+
// REQUIRES: executable_test
4+
// REQUIRES: concurrency
5+
// REQUIRES: libdispatch
6+
// REQUIRES: tsan_runtime
7+
8+
var scratchBuffer: UnsafeMutableBufferPointer<Int> = .allocate(capacity: 1000)
9+
10+
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
11+
func completeFastOrSlow(n: Int) async -> Int {
12+
if n % 2 == 0 {
13+
await Task.sleep(2_000_000_000)
14+
}
15+
assert(scratchBuffer[n] == 6)
16+
scratchBuffer[n] = 7
17+
return n
18+
}
19+
20+
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
21+
func test_sum_nextOnCompletedOrPending() async {
22+
scratchBuffer.initialize(repeating: 0)
23+
24+
let numbers = 0..<1000
25+
let expected = 499_500
26+
27+
let sum = await withTaskGroup(of: Int.self) { (group) async -> Int in
28+
for n in numbers {
29+
scratchBuffer[n] = 6
30+
await group.spawn {
31+
let res = await completeFastOrSlow(n: n)
32+
return res
33+
}
34+
}
35+
36+
// We want to await on completed and pending child tasks. This gives the
37+
// fast tasks some time to complete before we call group.next().
38+
await Task.sleep(1_000_000_000)
39+
40+
var sum = 0
41+
while let r = try! await group.next() {
42+
assert(scratchBuffer[r] == 7)
43+
sum += r
44+
}
45+
46+
assert(group.isEmpty, "Group must be empty after we consumed all tasks")
47+
48+
print("task group returning: \(sum)")
49+
return sum
50+
}
51+
52+
print("result: \(sum)")
53+
assert(sum == expected, "Expected: \(expected), got: \(sum)")
54+
}
55+
56+
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
57+
@main struct Main {
58+
static func main() async {
59+
await test_sum_nextOnCompletedOrPending()
60+
}
61+
}

0 commit comments

Comments
 (0)