From b1f6d55f66c0f6a5a60b7a6eee8141e91b4aebe1 Mon Sep 17 00:00:00 2001 From: Brandon Schoenfeld Date: Wed, 10 Sep 2025 16:38:42 -0500 Subject: [PATCH 1/8] refactor: enable injecting a custom State type into Channel This will enable testing to expose a leaked CheckedContinuation. --- Sources/AsyncDataLoader/Channel/Channel.swift | 19 +++++++++++++++++-- Sources/AsyncDataLoader/Channel/State.swift | 16 +++++++++++++++- Sources/AsyncDataLoader/DataLoader.swift | 8 ++++---- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/Sources/AsyncDataLoader/Channel/Channel.swift b/Sources/AsyncDataLoader/Channel/Channel.swift index 0950bf9..fc4be0e 100644 --- a/Sources/AsyncDataLoader/Channel/Channel.swift +++ b/Sources/AsyncDataLoader/Channel/Channel.swift @@ -1,5 +1,20 @@ -actor Channel: Sendable { - private var state = State() +actor Channel< + Success: Sendable, + Failure: Error, + ChannelState: Stateful +>: Sendable where ChannelState.Success == Success, ChannelState.Failure == Failure { + private var state: ChannelState + + init(_ state: ChannelState) { + self.state = state + } +} + +extension Channel where ChannelState == State { + /// Default initializer: uses DefaultState + init() { + self.init(State()) + } } extension Channel { diff --git a/Sources/AsyncDataLoader/Channel/State.swift b/Sources/AsyncDataLoader/Channel/State.swift index ee5e784..8922a59 100644 --- a/Sources/AsyncDataLoader/Channel/State.swift +++ b/Sources/AsyncDataLoader/Channel/State.swift @@ -1,6 +1,20 @@ typealias Waiter = CheckedContinuation -actor State { +protocol Stateful: Actor { + associatedtype Success: Sendable + associatedtype Failure: Sendable + + var waiters: [Waiter] { get set } + var result: Success? { get set } + var failure: Failure? { get set } + + func setResult(result: Success) async + func setFailure(failure: Failure) async + func appendWaiters(waiters: Waiter...) async + func removeAllWaiters() async +} + +actor State: Stateful { var waiters = [Waiter]() var result: Success? var failure: Failure? diff --git a/Sources/AsyncDataLoader/DataLoader.swift b/Sources/AsyncDataLoader/DataLoader.swift index d60b829..298c3d5 100644 --- a/Sources/AsyncDataLoader/DataLoader.swift +++ b/Sources/AsyncDataLoader/DataLoader.swift @@ -10,7 +10,7 @@ public typealias BatchLoadFunction = @Sendable (_ keys: [Key]) async throws -> [DataLoaderValue] private typealias LoaderQueue = [( key: Key, - channel: Channel + channel: Channel> )] /// DataLoader creates a public API for loading data from a particular @@ -25,7 +25,7 @@ public actor DataLoader { private let batchLoadFunction: BatchLoadFunction private let options: DataLoaderOptions - private var cache = [Key: Channel]() + private var cache = [Key: Channel>]() private var queue = LoaderQueue() private var dispatchScheduled = false @@ -46,7 +46,7 @@ public actor DataLoader { return try await cached.value } - let channel = Channel() + let channel = Channel>() if options.batchingEnabled { queue.append((key: key, channel: channel)) @@ -148,7 +148,7 @@ public actor DataLoader { let cacheKey = options.cacheKeyFunction?(key) ?? key if cache[cacheKey] == nil { - let channel = Channel() + let channel = Channel>() Task.detached { await channel.fulfill(value) From edd14fe2a6983336e9d32bd107f70651525e6fea Mon Sep 17 00:00:00 2001 From: Brandon Schoenfeld Date: Wed, 10 Sep 2025 18:28:34 -0500 Subject: [PATCH 2/8] test: reproduce leaked continuation --- Tests/AsyncDataLoaderTests/ChannelTests.swift | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 Tests/AsyncDataLoaderTests/ChannelTests.swift diff --git a/Tests/AsyncDataLoaderTests/ChannelTests.swift b/Tests/AsyncDataLoaderTests/ChannelTests.swift new file mode 100644 index 0000000..5e62b39 --- /dev/null +++ b/Tests/AsyncDataLoaderTests/ChannelTests.swift @@ -0,0 +1,107 @@ +@testable import class AsyncDataLoader.Channel +@testable import protocol AsyncDataLoader.Stateful +@testable import typealias AsyncDataLoader.Waiter +import Testing + +/// When Channel.fulfill or .fail is called, an arbitrarily long suspension of +/// await state.removeAllWaiters() +/// would allow actor reentrancy, where another continuation could be added, removed, but not resumed. +/// This reproducer injects a mock state object that artificially pauses the removal of all waiters until a new one is added. +/// At that point, Channel will no longer resume the water and just remove it. +/// This test passes, but with a logged message +/// SWIFT TASK CONTINUATION MISUSE: reproduceLeakingCheckedContinuation() leaked its continuation without resuming it. This may cause tasks waiting on it to remain suspended forever. +/// +/// Note that it is not leaked in the computed property `value`. This reproducer only shows that a continuation could be leaked through the state and actor reentrancy. +/// Calling Channel.value instead causes this test to hang indefinitely because it is waiting for an unresumed continuation. +@Test func reproduceLeakedCheckedContinuation() async throws { + // setup global variables + // + // fulfill task + // - create state and channel + // - pass out to main thread + // - call fulfill so that removeAllWaiters is called + // - signal removeAllWaiters was called + // - suspend Task until append is called + // + // wait for removeAllWaiters signal + // + // leak continuation task + // - call append + // - appends the waiter + // - resumes the continuation created in removeAllWaiters, allowing the fulfill task to resume + // + // wait for fulfill task to complete, which removesAllWaiters + // test ends with SWIFT TASK CONTINUATION MISUSE + + var state: MockState? + var channel: Channel>? + + var fulfillTask: Task? = nil + + // continuation gets resumed when state.removeAllWaiters is called + await withCheckedContinuation { continuation in + let localState = MockState(continuation) + state = localState + channel = Channel(localState) + fulfillTask = Task { + _ = try await #require(channel).fulfill(42) + } + } + + // leaking continuation task + Task { + // calling channel.value here would be ideal, to make that append a continuation to the state + // but this causes the process to hand forever + // + // try await #require(channel).value + + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) -> Void in + Task { // wrap in task to allow throwing + try await #require(state).appendWaiters(waiters: continuation) + } + } + } + + try await #require(fulfillTask).value +} + +actor MockState: Stateful { + var waiters = [Waiter]() + var result: Success? + var failure: Failure? + + /// suspend until + let removeAllCalledContinuation: CheckedContinuation + var appendCalledContinuation: CheckedContinuation? + + init(_ continuation: CheckedContinuation) { + removeAllCalledContinuation = continuation + } +} + +extension MockState { + func setResult(result: Success) { + self.result = result + } + + func setFailure(failure: Failure) { + self.failure = failure + } + + func appendWaiters(waiters: Waiter...) { + self.waiters.append(contentsOf: waiters) + guard let continuation = appendCalledContinuation else { + Issue.record("removeAllWaiters was not called before appendWaiters") + return + } + continuation.resume() + } + + func removeAllWaiters() async { + removeAllCalledContinuation.resume() + await withCheckedContinuation { continuation in + appendCalledContinuation = continuation + } + waiters.removeAll() + } +} From b587a1a26e347c3150378677b9e2366971442124 Mon Sep 17 00:00:00 2001 From: Brandon Schoenfeld Date: Wed, 10 Sep 2025 19:07:47 -0500 Subject: [PATCH 3/8] Revert "test: reproduce leaked continuation" This reverts commit edd14fe2a6983336e9d32bd107f70651525e6fea. --- Tests/AsyncDataLoaderTests/ChannelTests.swift | 107 ------------------ 1 file changed, 107 deletions(-) delete mode 100644 Tests/AsyncDataLoaderTests/ChannelTests.swift diff --git a/Tests/AsyncDataLoaderTests/ChannelTests.swift b/Tests/AsyncDataLoaderTests/ChannelTests.swift deleted file mode 100644 index 5e62b39..0000000 --- a/Tests/AsyncDataLoaderTests/ChannelTests.swift +++ /dev/null @@ -1,107 +0,0 @@ -@testable import class AsyncDataLoader.Channel -@testable import protocol AsyncDataLoader.Stateful -@testable import typealias AsyncDataLoader.Waiter -import Testing - -/// When Channel.fulfill or .fail is called, an arbitrarily long suspension of -/// await state.removeAllWaiters() -/// would allow actor reentrancy, where another continuation could be added, removed, but not resumed. -/// This reproducer injects a mock state object that artificially pauses the removal of all waiters until a new one is added. -/// At that point, Channel will no longer resume the water and just remove it. -/// This test passes, but with a logged message -/// SWIFT TASK CONTINUATION MISUSE: reproduceLeakingCheckedContinuation() leaked its continuation without resuming it. This may cause tasks waiting on it to remain suspended forever. -/// -/// Note that it is not leaked in the computed property `value`. This reproducer only shows that a continuation could be leaked through the state and actor reentrancy. -/// Calling Channel.value instead causes this test to hang indefinitely because it is waiting for an unresumed continuation. -@Test func reproduceLeakedCheckedContinuation() async throws { - // setup global variables - // - // fulfill task - // - create state and channel - // - pass out to main thread - // - call fulfill so that removeAllWaiters is called - // - signal removeAllWaiters was called - // - suspend Task until append is called - // - // wait for removeAllWaiters signal - // - // leak continuation task - // - call append - // - appends the waiter - // - resumes the continuation created in removeAllWaiters, allowing the fulfill task to resume - // - // wait for fulfill task to complete, which removesAllWaiters - // test ends with SWIFT TASK CONTINUATION MISUSE - - var state: MockState? - var channel: Channel>? - - var fulfillTask: Task? = nil - - // continuation gets resumed when state.removeAllWaiters is called - await withCheckedContinuation { continuation in - let localState = MockState(continuation) - state = localState - channel = Channel(localState) - fulfillTask = Task { - _ = try await #require(channel).fulfill(42) - } - } - - // leaking continuation task - Task { - // calling channel.value here would be ideal, to make that append a continuation to the state - // but this causes the process to hand forever - // - // try await #require(channel).value - - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) -> Void in - Task { // wrap in task to allow throwing - try await #require(state).appendWaiters(waiters: continuation) - } - } - } - - try await #require(fulfillTask).value -} - -actor MockState: Stateful { - var waiters = [Waiter]() - var result: Success? - var failure: Failure? - - /// suspend until - let removeAllCalledContinuation: CheckedContinuation - var appendCalledContinuation: CheckedContinuation? - - init(_ continuation: CheckedContinuation) { - removeAllCalledContinuation = continuation - } -} - -extension MockState { - func setResult(result: Success) { - self.result = result - } - - func setFailure(failure: Failure) { - self.failure = failure - } - - func appendWaiters(waiters: Waiter...) { - self.waiters.append(contentsOf: waiters) - guard let continuation = appendCalledContinuation else { - Issue.record("removeAllWaiters was not called before appendWaiters") - return - } - continuation.resume() - } - - func removeAllWaiters() async { - removeAllCalledContinuation.resume() - await withCheckedContinuation { continuation in - appendCalledContinuation = continuation - } - waiters.removeAll() - } -} From df48b99eaa49dcbe289968ddf3728deb6e455b46 Mon Sep 17 00:00:00 2001 From: Brandon Schoenfeld Date: Wed, 10 Sep 2025 19:08:21 -0500 Subject: [PATCH 4/8] Revert "refactor: enable injecting a custom State type into Channel" This reverts commit b1f6d55f66c0f6a5a60b7a6eee8141e91b4aebe1. --- Sources/AsyncDataLoader/Channel/Channel.swift | 19 ++----------------- Sources/AsyncDataLoader/Channel/State.swift | 16 +--------------- Sources/AsyncDataLoader/DataLoader.swift | 8 ++++---- 3 files changed, 7 insertions(+), 36 deletions(-) diff --git a/Sources/AsyncDataLoader/Channel/Channel.swift b/Sources/AsyncDataLoader/Channel/Channel.swift index fc4be0e..0950bf9 100644 --- a/Sources/AsyncDataLoader/Channel/Channel.swift +++ b/Sources/AsyncDataLoader/Channel/Channel.swift @@ -1,20 +1,5 @@ -actor Channel< - Success: Sendable, - Failure: Error, - ChannelState: Stateful ->: Sendable where ChannelState.Success == Success, ChannelState.Failure == Failure { - private var state: ChannelState - - init(_ state: ChannelState) { - self.state = state - } -} - -extension Channel where ChannelState == State { - /// Default initializer: uses DefaultState - init() { - self.init(State()) - } +actor Channel: Sendable { + private var state = State() } extension Channel { diff --git a/Sources/AsyncDataLoader/Channel/State.swift b/Sources/AsyncDataLoader/Channel/State.swift index 8922a59..ee5e784 100644 --- a/Sources/AsyncDataLoader/Channel/State.swift +++ b/Sources/AsyncDataLoader/Channel/State.swift @@ -1,20 +1,6 @@ typealias Waiter = CheckedContinuation -protocol Stateful: Actor { - associatedtype Success: Sendable - associatedtype Failure: Sendable - - var waiters: [Waiter] { get set } - var result: Success? { get set } - var failure: Failure? { get set } - - func setResult(result: Success) async - func setFailure(failure: Failure) async - func appendWaiters(waiters: Waiter...) async - func removeAllWaiters() async -} - -actor State: Stateful { +actor State { var waiters = [Waiter]() var result: Success? var failure: Failure? diff --git a/Sources/AsyncDataLoader/DataLoader.swift b/Sources/AsyncDataLoader/DataLoader.swift index 298c3d5..d60b829 100644 --- a/Sources/AsyncDataLoader/DataLoader.swift +++ b/Sources/AsyncDataLoader/DataLoader.swift @@ -10,7 +10,7 @@ public typealias BatchLoadFunction = @Sendable (_ keys: [Key]) async throws -> [DataLoaderValue] private typealias LoaderQueue = [( key: Key, - channel: Channel> + channel: Channel )] /// DataLoader creates a public API for loading data from a particular @@ -25,7 +25,7 @@ public actor DataLoader { private let batchLoadFunction: BatchLoadFunction private let options: DataLoaderOptions - private var cache = [Key: Channel>]() + private var cache = [Key: Channel]() private var queue = LoaderQueue() private var dispatchScheduled = false @@ -46,7 +46,7 @@ public actor DataLoader { return try await cached.value } - let channel = Channel>() + let channel = Channel() if options.batchingEnabled { queue.append((key: key, channel: channel)) @@ -148,7 +148,7 @@ public actor DataLoader { let cacheKey = options.cacheKeyFunction?(key) ?? key if cache[cacheKey] == nil { - let channel = Channel>() + let channel = Channel() Task.detached { await channel.fulfill(value) From 81de3461be3bc96a14c38f9412cb8f160de14bcb Mon Sep 17 00:00:00 2001 From: Brandon Schoenfeld Date: Wed, 10 Sep 2025 13:44:19 -0500 Subject: [PATCH 5/8] fix: avoid leaking CheckedContinuations by removing possible Channel actor reentrancy In the Channel.fulfill and Channel.fail methods, the line `await state.removeAllWaiters()` allows for actor reentrance. This means that after resuming all the CheckedContinuations (waiters), but before removing them, another waiter could be added to the array. Then once the `state.removeAllWaiters` resumes execution, any CheckedContinuations would be removed without having been resumed. This leads to leaking CheckedContinuations. This was observed in my application, where I would get a logger message SWIFT TASK CONTINUATION MISUSE: value leaked its continuation without resuming it. This may cause tasks waiting on it to remain suspended forever. which originates from Swift here: https://github.com/swiftlang/swift/blob/b06eed151c8aa2bc2f4e081b0bb7b5e5c65f3bba/stdlib/public/Concurrency/CheckedContinuation.swift#L82 I verified that Channel.value was the culprit by renaming it and observing the changed name in the CONTINUATION MISUSE message. This change set removes the possibility of actor reentrancy and leaking CheckedContinuations. By inlining the State data members in Channel and deleting the State actor completely, we remove all await calls inside Channel.fulfill and Channel.fail. --- Sources/AsyncDataLoader/Channel.swift | 59 +++++++++++++++++++ Sources/AsyncDataLoader/Channel/Channel.swift | 55 ----------------- Sources/AsyncDataLoader/Channel/State.swift | 25 -------- 3 files changed, 59 insertions(+), 80 deletions(-) create mode 100644 Sources/AsyncDataLoader/Channel.swift delete mode 100644 Sources/AsyncDataLoader/Channel/Channel.swift delete mode 100644 Sources/AsyncDataLoader/Channel/State.swift diff --git a/Sources/AsyncDataLoader/Channel.swift b/Sources/AsyncDataLoader/Channel.swift new file mode 100644 index 0000000..9ede829 --- /dev/null +++ b/Sources/AsyncDataLoader/Channel.swift @@ -0,0 +1,59 @@ +actor Channel: Sendable { + private var waiters = [Waiter]() + private var result: Success? + private var failure: Failure? +} + +typealias Waiter = CheckedContinuation + +extension Channel { + @discardableResult + func fulfill(_ value: Success) -> Bool { + if result == nil { + result = value + + for waiter in waiters { + waiter.resume(returning: value) + } + + waiters.removeAll() + + return false + } + + return true + } + + @discardableResult + func fail(_ failure: Failure) -> Bool { + if self.failure == nil { + self.failure = failure + + for waiter in waiters { + waiter.resume(throwing: failure) + } + + waiters.removeAll() + + return false + } + + return true + } + + var value: Success { + get async throws { + try await withCheckedThrowingContinuation { continuation in + Task { + if let result = self.result { + continuation.resume(returning: result) + } else if let failure = self.failure { + continuation.resume(throwing: failure) + } else { + waiters.append(continuation) + } + } + } + } + } +} diff --git a/Sources/AsyncDataLoader/Channel/Channel.swift b/Sources/AsyncDataLoader/Channel/Channel.swift deleted file mode 100644 index 0950bf9..0000000 --- a/Sources/AsyncDataLoader/Channel/Channel.swift +++ /dev/null @@ -1,55 +0,0 @@ -actor Channel: Sendable { - private var state = State() -} - -extension Channel { - @discardableResult - func fulfill(_ value: Success) async -> Bool { - if await state.result == nil { - await state.setResult(result: value) - - for waiters in await state.waiters { - waiters.resume(returning: value) - } - - await state.removeAllWaiters() - - return false - } - - return true - } - - @discardableResult - func fail(_ failure: Failure) async -> Bool { - if await state.failure == nil { - await state.setFailure(failure: failure) - - for waiters in await state.waiters { - waiters.resume(throwing: failure) - } - - await state.removeAllWaiters() - - return false - } - - return true - } - - var value: Success { - get async throws { - try await withCheckedThrowingContinuation { continuation in - Task { - if let result = await state.result { - continuation.resume(returning: result) - } else if let failure = await self.state.failure { - continuation.resume(throwing: failure) - } else { - await state.appendWaiters(waiters: continuation) - } - } - } - } - } -} diff --git a/Sources/AsyncDataLoader/Channel/State.swift b/Sources/AsyncDataLoader/Channel/State.swift deleted file mode 100644 index ee5e784..0000000 --- a/Sources/AsyncDataLoader/Channel/State.swift +++ /dev/null @@ -1,25 +0,0 @@ -typealias Waiter = CheckedContinuation - -actor State { - var waiters = [Waiter]() - var result: Success? - var failure: Failure? -} - -extension State { - func setResult(result: Success) { - self.result = result - } - - func setFailure(failure: Failure) { - self.failure = failure - } - - func appendWaiters(waiters: Waiter...) { - self.waiters.append(contentsOf: waiters) - } - - func removeAllWaiters() { - waiters.removeAll() - } -} From fd5c11d23a4b2f4cb7277b45de8a74afc0e7f842 Mon Sep 17 00:00:00 2001 From: Brandon Schoenfeld Date: Thu, 11 Sep 2025 00:14:39 -0500 Subject: [PATCH 6/8] refactor: simplify Channel waiter resuming and removal --- Sources/AsyncDataLoader/Channel.swift | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/Sources/AsyncDataLoader/Channel.swift b/Sources/AsyncDataLoader/Channel.swift index 9ede829..76953c2 100644 --- a/Sources/AsyncDataLoader/Channel.swift +++ b/Sources/AsyncDataLoader/Channel.swift @@ -12,12 +12,10 @@ extension Channel { if result == nil { result = value - for waiter in waiters { - waiter.resume(returning: value) + while let waiter = waiters.popLast() { + waiter.resume(returning: success) } - waiters.removeAll() - return false } @@ -29,12 +27,10 @@ extension Channel { if self.failure == nil { self.failure = failure - for waiter in waiters { + while let waiter = waiters.popLast() { waiter.resume(throwing: failure) } - waiters.removeAll() - return false } From 53d7c9fc42882de872c0ac3b091e3300a4f6860f Mon Sep 17 00:00:00 2001 From: Brandon Schoenfeld Date: Thu, 11 Sep 2025 00:16:03 -0500 Subject: [PATCH 7/8] refactor: combine Channel Success and Failure into a single Result object --- Sources/AsyncDataLoader/Channel.swift | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Sources/AsyncDataLoader/Channel.swift b/Sources/AsyncDataLoader/Channel.swift index 76953c2..f6e261a 100644 --- a/Sources/AsyncDataLoader/Channel.swift +++ b/Sources/AsyncDataLoader/Channel.swift @@ -1,16 +1,15 @@ actor Channel: Sendable { private var waiters = [Waiter]() - private var result: Success? - private var failure: Failure? + private var result: Result? } typealias Waiter = CheckedContinuation extension Channel { @discardableResult - func fulfill(_ value: Success) -> Bool { + func fulfill(_ success: Success) -> Bool { if result == nil { - result = value + result = .success(success) while let waiter = waiters.popLast() { waiter.resume(returning: success) @@ -24,8 +23,8 @@ extension Channel { @discardableResult func fail(_ failure: Failure) -> Bool { - if self.failure == nil { - self.failure = failure + if result == nil { + result = .failure(failure) while let waiter = waiters.popLast() { waiter.resume(throwing: failure) @@ -41,11 +40,12 @@ extension Channel { get async throws { try await withCheckedThrowingContinuation { continuation in Task { - if let result = self.result { - continuation.resume(returning: result) - } else if let failure = self.failure { + switch result { + case let .success(success): + continuation.resume(returning: success) + case let .failure(failure): continuation.resume(throwing: failure) - } else { + case nil: waiters.append(continuation) } } From cf261bac4d3e8ab2f30060d9bf3cc7c3c4b568c6 Mon Sep 17 00:00:00 2001 From: Brandon Schoenfeld Date: Thu, 11 Sep 2025 00:22:03 -0500 Subject: [PATCH 8/8] refactor: prefer guard style in Channel.fulfill and .fail --- Sources/AsyncDataLoader/Channel.swift | 28 +++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/Sources/AsyncDataLoader/Channel.swift b/Sources/AsyncDataLoader/Channel.swift index f6e261a..163220f 100644 --- a/Sources/AsyncDataLoader/Channel.swift +++ b/Sources/AsyncDataLoader/Channel.swift @@ -8,32 +8,32 @@ typealias Waiter = CheckedContinuation extension Channel { @discardableResult func fulfill(_ success: Success) -> Bool { - if result == nil { - result = .success(success) + guard result == nil else { + return true + } - while let waiter = waiters.popLast() { - waiter.resume(returning: success) - } + result = .success(success) - return false + while let waiter = waiters.popLast() { + waiter.resume(returning: success) } - return true + return false } @discardableResult func fail(_ failure: Failure) -> Bool { - if result == nil { - result = .failure(failure) + guard result == nil else { + return true + } - while let waiter = waiters.popLast() { - waiter.resume(throwing: failure) - } + result = .failure(failure) - return false + while let waiter = waiters.popLast() { + waiter.resume(throwing: failure) } - return true + return false } var value: Success {