Skip to content

Commit b1e14ef

Browse files
authored
fix: Stream error handling (#743)
1 parent 999add0 commit b1e14ef

File tree

2 files changed

+68
-2
lines changed

2 files changed

+68
-2
lines changed

Sources/ClientRuntime/Networking/Streaming/BufferedStream.swift

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import class Foundation.NSRecursiveLock
1313
/// Note: This class is thread-safe and async-safe.
1414
/// Note: if data is not read from the stream, the buffer will grow indefinitely until the stream is closed.
1515
/// or reach the maximum size of a `Data` object.
16-
public class BufferedStream: Stream {
16+
public class BufferedStream: Stream, @unchecked Sendable {
1717

1818
/// Returns the cumulative length of all data so far written to the stream, if known.
1919
/// For a buffered stream, the length will only be known if the stream has closed.
@@ -130,6 +130,13 @@ public class BufferedStream: Stream {
130130

131131
/// Call this function only while `lock` is locked, to prevent simultaneous access.
132132
private func _read(upToCount count: Int) throws -> Data? {
133+
134+
// throw any previously stored error, if there was one
135+
// dispose of the error when throwing so it is only thrown once
136+
if let error = _error {
137+
_error = nil
138+
throw error
139+
}
133140
let toRead = min(count, _buffer.count)
134141
let endPosition = position.advanced(by: toRead)
135142
let chunk = _buffer[position..<endPosition]
@@ -143,7 +150,6 @@ public class BufferedStream: Stream {
143150
// if we're closed and there's no data left, return nil
144151
// this will signal the end of the stream
145152
if _isClosed && chunk.isEmpty == true {
146-
if let error = _error { throw error }
147153
return nil
148154
}
149155

Tests/ClientRuntimeTests/NetworkingTests/Streaming/BufferedStreamTests.swift

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,20 @@ final class BufferedStreamTests: XCTestCase {
6969
XCTAssertNil(readData2)
7070
}
7171

72+
func test_read_throwsErrorWhenStreamIsClosedWithError() throws {
73+
let subject = BufferedStream()
74+
try subject.write(contentsOf: testData)
75+
subject.closeWithError(TestError.error)
76+
do {
77+
let readData1 = try subject.read(upToCount: Int.max)
78+
XCTFail("Error was expected to be thrown")
79+
} catch TestError.error {
80+
// Test passes
81+
} catch {
82+
XCTFail("Unexpected error thrown: \(error.localizedDescription)")
83+
}
84+
}
85+
7286
// MARK: - readToEnd()
7387

7488
func test_readToEnd_readsToEnd() throws {
@@ -83,6 +97,20 @@ final class BufferedStreamTests: XCTestCase {
8397
XCTAssertNil(readData2)
8498
}
8599

100+
func test_readToEnd_throwsErrorWhenStreamIsClosedWithError() throws {
101+
let subject = BufferedStream()
102+
try subject.write(contentsOf: testData)
103+
subject.closeWithError(TestError.error)
104+
do {
105+
let readData1 = try subject.readToEnd()
106+
XCTFail("Error was expected to be thrown")
107+
} catch TestError.error {
108+
// Test passes
109+
} catch {
110+
XCTFail("Unexpected error thrown: \(error.localizedDescription)")
111+
}
112+
}
113+
86114
// MARK: - readToEndAsync()
87115

88116
func test_readToEndAsync_readsToEnd() async throws {
@@ -97,6 +125,20 @@ final class BufferedStreamTests: XCTestCase {
97125
XCTAssertNil(readData2)
98126
}
99127

128+
func test_readToEndAsync_throwsErrorWhenStreamIsClosedWithError() async throws {
129+
let subject = BufferedStream()
130+
try subject.write(contentsOf: testData)
131+
subject.closeWithError(TestError.error)
132+
do {
133+
let readData1 = try await subject.readToEndAsync()
134+
XCTFail("Error was expected to be thrown")
135+
} catch TestError.error {
136+
// Test passes
137+
} catch {
138+
XCTFail("Unexpected error thrown: \(error.localizedDescription)")
139+
}
140+
}
141+
100142
// MARK: - readAsync(upToCount:)
101143

102144
func test_readAsync_readsAsynchronously() async throws {
@@ -142,6 +184,20 @@ final class BufferedStreamTests: XCTestCase {
142184
XCTAssertNil(readData3)
143185
}
144186

187+
func test_readAsync_throwsErrorWhenStreamIsClosedWithError() async throws {
188+
let subject = BufferedStream()
189+
try subject.write(contentsOf: testData)
190+
subject.closeWithError(TestError.error)
191+
do {
192+
let readData1 = try await subject.readAsync(upToCount: Int.max)
193+
XCTFail("Error was expected to be thrown")
194+
} catch TestError.error {
195+
// Test passes
196+
} catch {
197+
XCTFail("Unexpected error thrown: \(error.localizedDescription)")
198+
}
199+
}
200+
145201
// MARK: - write(contentsOf:)
146202

147203
func test_write_appendsWrittenDataToBuffer() throws {
@@ -189,3 +245,7 @@ final class BufferedStreamTests: XCTestCase {
189245
XCTAssertEqual(sut.length, testData.count)
190246
}
191247
}
248+
249+
private enum TestError: Error, Equatable {
250+
case error
251+
}

0 commit comments

Comments
 (0)