Skip to content

Commit 8694d75

Browse files
Fixed an issue where AsyncThrowingBackpressureStream would not deinitialize when a suspended write was never read
1 parent 3f4cf0d commit 8694d75

File tree

2 files changed

+54
-29
lines changed

2 files changed

+54
-29
lines changed

Sources/CodableDatastore/Persistence/Disk Persistence/AsyncThrowingBackpressureStream.swift

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,24 @@ struct AsyncThrowingBackpressureStream<Element: Sendable>: Sendable {
2121
var pendingReadContinuation: CheckedContinuation<Element?, Error>?
2222
var wasCancelled = false
2323

24-
func provide(_ result: Result<Element?, Error>) async throws {
24+
func provide(_ result: Result<Element?, Error>, in continuation: CheckedContinuation<Void, Error>) {
2525
/// If reads were cancelled, propagate the cancellation to the provider without saving the result.
26-
if wasCancelled { throw CancellationError() }
26+
guard !wasCancelled else {
27+
continuation.resume(throwing: CancellationError())
28+
return
29+
}
2730

28-
/// Enqueue the provided result and continue the task once it is ready to be consumed.
29-
try await withCheckedThrowingContinuation { continuation in
30-
/// Ideally, no more than one pending event should be queued up, as a second event means backpressure isn't working.
31-
precondition(pendingWriteEvents.isEmpty, "More than one event has been queued on the stream.")
32-
33-
/// If a read is currently pending, signal that a new result has been provided.
34-
if let pendingReadContinuation {
35-
self.pendingReadContinuation = nil
36-
pendingReadContinuation.resume(with: result)
37-
continuation.resume()
38-
} else {
39-
/// If we aren't ready for events, queue the event and suspend the task until events are ready. This will stop more values from being provided (ie. the backpressure at work).
40-
pendingWriteEvents.append((continuation, result))
41-
}
31+
/// Ideally, no more than one pending event should be queued up, as a second event means backpressure isn't working.
32+
precondition(pendingWriteEvents.isEmpty, "More than one event has been queued on the stream.")
33+
34+
/// If a read is currently pending, signal that a new result has been provided.
35+
if let pendingReadContinuation {
36+
self.pendingReadContinuation = nil
37+
pendingReadContinuation.resume(with: result)
38+
continuation.resume()
39+
} else {
40+
/// If we aren't ready for events, queue the event and suspend the task until events are ready. This will stop more values from being provided (ie. the backpressure at work).
41+
pendingWriteEvents.append((continuation, result))
4242
}
4343
}
4444

@@ -112,17 +112,34 @@ struct AsyncThrowingBackpressureStream<Element: Sendable>: Sendable {
112112
}
113113

114114
func yield(_ value: Element) async throws {
115-
guard let stateMachine else { throw CancellationError() }
116-
try await stateMachine.provide(.success(value))
115+
do {
116+
try await withCheckedThrowingContinuation { continuation in
117+
guard let stateMachine else {
118+
continuation.resume(throwing: CancellationError())
119+
return
120+
}
121+
Task {
122+
await stateMachine.provide(.success(value), in: continuation)
123+
}
124+
} as Void
125+
} catch {
126+
throw error
127+
}
117128
}
118129

119130
fileprivate func finish(throwing error: Error? = nil) async throws {
120-
guard let stateMachine else { throw CancellationError() }
121-
if let error {
122-
try await stateMachine.provide(.failure(error))
123-
} else {
124-
try await stateMachine.provide(.success(nil))
125-
}
131+
try await withCheckedThrowingContinuation { continuation in
132+
guard let stateMachine else { continuation.resume(throwing: CancellationError())
133+
return
134+
}
135+
Task {
136+
if let error {
137+
await stateMachine.provide(.failure(error), in: continuation)
138+
} else {
139+
await stateMachine.provide(.success(nil), in: continuation)
140+
}
141+
}
142+
} as Void
126143
}
127144
}
128145

Tests/CodableDatastoreTests/AsyncThrowingBackpressureStreamTests.swift

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ final class AsyncThrowingBackpressureStreamTests: XCTestCase {
123123
} catch {
124124
XCTAssertEqual(error is CancellationError, true)
125125
expectation.fulfill()
126+
throw error
126127
}
127128
}
128129

@@ -166,6 +167,7 @@ final class AsyncThrowingBackpressureStreamTests: XCTestCase {
166167
} catch {
167168
XCTAssertEqual(error is CancellationError, true)
168169
expectation.fulfill()
170+
throw error
169171
}
170172
}
171173

@@ -182,8 +184,9 @@ final class AsyncThrowingBackpressureStreamTests: XCTestCase {
182184
_ = try await iterator.next()
183185
XCTFail()
184186
} catch {
187+
/// Let the write happen strictly after the read, in its own task so signaling doesn't "see" the cancellation.
185188
XCTAssertEqual(error is CancellationError, true)
186-
await writeContinuations.first(where: { _ in true })?.resume()
189+
await Task { await writeContinuations.first(where: { _ in true })!.resume() }.value
187190
}
188191
}
189192

@@ -198,37 +201,42 @@ final class AsyncThrowingBackpressureStreamTests: XCTestCase {
198201
let expectation = expectation(description: "Writes were cancelled")
199202

200203
let task = Task {
201-
let stream = AsyncThrowingBackpressureStream<Int> { continuation in
204+
var stream: AsyncThrowingBackpressureStream<Int>? = AsyncThrowingBackpressureStream<Int> { continuation in
202205
try await continuation.yield(0)
203206
await withCheckedContinuation { continuation in
204207
readProvider.yield(continuation)
205208
}
206209
do {
207210
try await continuation.yield(1)
208211
XCTFail()
212+
expectation.fulfill()
209213
} catch {
210214
XCTAssertEqual(error is CancellationError, true)
211215
expectation.fulfill()
216+
throw error
212217
}
213218
}
214219

215-
let iterator = stream.makeAsyncIterator()
220+
let iterator = stream!.makeAsyncIterator()
216221
let result = try await iterator.next()
217222
XCTAssertEqual(result, 0)
218223

219224
withUnsafeCurrentTask { task in
220225
task?.cancel()
221226
}
222227

223-
/// Let the write happen stritly after cancellation
224-
await writeContinuations.first(where: { _ in true })?.resume()
228+
/// Let the write happen strictly after cancellation, in its own task so signaling doesn't "see" the cancellation.
229+
await Task { await writeContinuations.first(where: { _ in true })!.resume() }.value
225230

226231
/// The stream can't be marked as cancelled if another read never happens.
227232
let wasCancelled = await iterator.wasCancelled
228233
XCTAssertEqual(wasCancelled, false)
234+
235+
stream = nil
229236
}
230237

231238
try? await task.value
239+
readProvider.finish()
232240

233241
await fulfillment(of: [expectation], timeout: 10)
234242
}

0 commit comments

Comments
 (0)