Skip to content

Commit 3fb72cd

Browse files
authored
Merge pull request #2089 from ahoppen/async-fixes
Be a little more pedantic about making sure that we cancel `bodyTask` and `timeoutTask` in `withTimeout` and `withTaskPriorityChangedHandler`
2 parents 93d59cb + a691ee3 commit 3fb72cd

File tree

2 files changed

+18
-7
lines changed

2 files changed

+18
-7
lines changed

Sources/SwiftExtensions/AsyncUtils.swift

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,16 +215,23 @@ package func withTimeout<T: Sendable>(
215215

216216
let tasks = mutableTasks
217217

218+
defer {
219+
// Be extra careful and ensure that we don't leave `bodyTask` or `timeoutTask` running when `withTimeout` finishes,
220+
// eg. if `withTaskPriorityChangedHandler` adds some behavior that never executes `body` if the task gets cancelled.
221+
for task in tasks {
222+
task.cancel()
223+
}
224+
}
225+
218226
return try await withTaskPriorityChangedHandler(initialPriority: priority) {
219227
for try await value in stream {
220228
return value
221229
}
222-
// The only reason for the loop above to terminate is if the Task got cancelled or if the continuation finishes
230+
// The only reason for the loop above to terminate is if the Task got cancelled or if the stream finishes
223231
// (which it never does).
224232
if Task.isCancelled {
225-
for task in tasks {
226-
task.cancel()
227-
}
233+
// Throwing a `CancellationError` will make us return from `withTimeout`. We will cancel the `bodyTask` from the
234+
// `defer` method above.
228235
throw CancellationError()
229236
} else {
230237
preconditionFailure("Continuation never finishes")

Sources/SwiftExtensions/Task+WithPriorityChangedHandler.swift

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ package func withTaskPriorityChangedHandler<T: Sendable>(
2424
) async throws -> T {
2525
let lastPriority = ThreadSafeBox(initialValue: initialPriority)
2626
let result: T? = try await withThrowingTaskGroup(of: Optional<T>.self) { taskGroup in
27+
defer {
28+
// We leave this closure when either we have received a result or we registered cancellation. In either case, we
29+
// want to make sure that we don't leave the body task or the priority watching task running.
30+
taskGroup.cancelAll()
31+
}
2732
// Run the task priority watcher with high priority instead of inheriting the initial priority. Otherwise a
2833
// `.background` task might not get its priority elevated because the priority watching task also runs at
2934
// `.background` priority and might not actually get executed in time.
@@ -54,11 +59,10 @@ package func withTaskPriorityChangedHandler<T: Sendable>(
5459
taskGroup.addTask {
5560
try await operation()
5661
}
57-
// The first task that watches the priority never finishes, so we are effectively await the `operation` task here
58-
// and cancelling the priority observation task once the operation task is done.
62+
// The first task that watches the priority never finishes unless it is cancelled, so we are effectively await the
63+
// `operation` task here.
5964
// We do need to await the observation task as well so that priority escalation also affects the observation task.
6065
for try await case let value? in taskGroup {
61-
taskGroup.cancelAll()
6266
return value
6367
}
6468
return nil

0 commit comments

Comments
 (0)