|
| 1 | +diff --git a/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift b/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift |
| 2 | +index badc34f967..f232d518f9 100644 |
| 3 | +--- a/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift |
| 4 | ++++ b/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift |
| 5 | +@@ -1173,7 +1173,38 @@ extension NIOAsyncWriter { |
| 6 | + delegate: delegate |
| 7 | + ) |
| 8 | + |
| 9 | +- case .initial, .finished, .writerFinished: |
| 10 | ++ case .writerFinished( |
| 11 | ++ let isWritable, |
| 12 | ++ let inDelegateOutcall, |
| 13 | ++ var suspendedYields, |
| 14 | ++ let cancelledYields, |
| 15 | ++ let bufferedYieldIDs, |
| 16 | ++ let delegate, |
| 17 | ++ let error |
| 18 | ++ ): |
| 19 | ++ // We have a suspended yield at this point that hasn't been cancelled yet. |
| 20 | ++ // It was buffered before we became finished, so we still have to deliver it. |
| 21 | ++ // We need to store the yield now. |
| 22 | ++ |
| 23 | ++ self._state = .modifying |
| 24 | ++ |
| 25 | ++ let suspendedYield = SuspendedYield( |
| 26 | ++ yieldID: yieldID, |
| 27 | ++ continuation: continuation |
| 28 | ++ ) |
| 29 | ++ suspendedYields.append(suspendedYield) |
| 30 | ++ |
| 31 | ++ self._state = .writerFinished( |
| 32 | ++ isWritable: isWritable, |
| 33 | ++ inDelegateOutcall: inDelegateOutcall, |
| 34 | ++ suspendedYields: suspendedYields, |
| 35 | ++ cancelledYields: cancelledYields, |
| 36 | ++ bufferedYieldIDs: bufferedYieldIDs, |
| 37 | ++ delegate: delegate, |
| 38 | ++ error: error |
| 39 | ++ ) |
| 40 | ++ |
| 41 | ++ case .initial, .finished: |
| 42 | + preconditionFailure("This should have already been handled by `yield()`") |
| 43 | + |
| 44 | + case .modifying: |
| 45 | +@@ -1501,7 +1532,7 @@ extension NIOAsyncWriter { |
| 46 | + |
| 47 | + self._state = .writerFinished( |
| 48 | + isWritable: isWritable, |
| 49 | +- inDelegateOutcall: inDelegateOutcall, |
| 50 | ++ inDelegateOutcall: false, |
| 51 | + suspendedYields: .init(), |
| 52 | + cancelledYields: cancelledYields, |
| 53 | + bufferedYieldIDs: bufferedYieldIDs, |
| 54 | +diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift |
| 55 | +index 31c680b8bf..4f15ac9af9 100644 |
| 56 | +--- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift |
| 57 | ++++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift |
| 58 | +@@ -606,6 +606,50 @@ final class NIOAsyncWriterTests: XCTestCase { |
| 59 | + self.assert(suspendCallCount: 1, yieldCallCount: 1, terminateCallCount: 1) |
| 60 | + } |
| 61 | + |
| 62 | ++ func testSuspendingBufferedYield_whenWriterFinished() async throws { |
| 63 | ++ self.sink.setWritability(to: false) |
| 64 | ++ |
| 65 | ++ let bothSuspended = expectation(description: "suspended on both yields") |
| 66 | ++ let suspendedAgain = ConditionLock(value: false) |
| 67 | ++ self.delegate.didSuspendHandler = { |
| 68 | ++ if self.delegate.didSuspendCallCount == 2 { |
| 69 | ++ bothSuspended.fulfill() |
| 70 | ++ } else if self.delegate.didSuspendCallCount > 2 { |
| 71 | ++ suspendedAgain.lock() |
| 72 | ++ suspendedAgain.unlock(withValue: true) |
| 73 | ++ } |
| 74 | ++ } |
| 75 | ++ |
| 76 | ++ self.delegate.didYieldHandler = { _ in |
| 77 | ++ if self.delegate.didYieldCallCount == 1 { |
| 78 | ++ // Delay this yield until the other yield is suspended again. |
| 79 | ++ suspendedAgain.lock(whenValue: true) |
| 80 | ++ suspendedAgain.unlock() |
| 81 | ++ } |
| 82 | ++ } |
| 83 | ++ |
| 84 | ++ let task1 = Task { [writer] in |
| 85 | ++ try await writer!.yield("message1") |
| 86 | ++ } |
| 87 | ++ let task2 = Task { [writer] in |
| 88 | ++ try await writer!.yield("message2") |
| 89 | ++ } |
| 90 | ++ |
| 91 | ++ await fulfillment(of: [bothSuspended], timeout: 1) |
| 92 | ++ self.writer.finish() |
| 93 | ++ |
| 94 | ++ self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0) |
| 95 | ++ |
| 96 | ++ // We have to become writable again to unbuffer the yields |
| 97 | ++ // The first call to didYield will pause, so that the other yield will be suspended again. |
| 98 | ++ self.sink.setWritability(to: true) |
| 99 | ++ |
| 100 | ++ await XCTAssertNoThrow(try await task1.value) |
| 101 | ++ await XCTAssertNoThrow(try await task2.value) |
| 102 | ++ |
| 103 | ++ self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1) |
| 104 | ++ } |
| 105 | ++ |
| 106 | + func testWriterFinish_whenFinished() { |
| 107 | + // This tests just checks that finishing again is a no-op |
| 108 | + self.writer.finish() |
0 commit comments