Skip to content

Commit 5325fcb

Browse files
authored
fix: Ensure all data is written before closing FoundationStreamBridge (#713)
1 parent cea1793 commit 5325fcb

File tree

3 files changed

+149
-73
lines changed

3 files changed

+149
-73
lines changed

Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift

Lines changed: 130 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,26 @@ import class Foundation.Timer
1919
import struct Foundation.TimeInterval
2020
import protocol Foundation.StreamDelegate
2121

22-
/// Reads data from a smithy-swift native `ReadableStream` and streams the data to a Foundation `InputStream`.
22+
/// Reads data from a smithy-swift native `ReadableStream` and streams the data through to a Foundation `InputStream`.
23+
///
24+
/// A pair of Foundation "bound streams" is created. Data from the `ReadableStream` is transferred into the Foundation
25+
/// `OutputStream` until the `ReadableStream` is closed and all data has been read from it. The Foundation
26+
/// `InputStream` is exposed as a property, and may be used to stream the data to other components.
2327
///
2428
/// Used to permit SDK streaming request bodies to be used with `URLSession`-based HTTP requests.
2529
class FoundationStreamBridge: NSObject, StreamDelegate {
2630

27-
/// The max number of bytes to buffer internally (and transfer) at any given time.
28-
let bufferSize: Int
31+
/// The max number of bytes to buffer between the `ReadableStream` and the Foundation `OutputStream`
32+
/// at any given time.
33+
let bridgeBufferSize: Int
2934

30-
/// A buffer to hold data that has been read from the ReadableStream but not yet written to the OutputStream.
35+
/// A buffer to hold data that has been read from the `ReadableStream` but not yet written to the
36+
/// Foundation `OutputStream`. At most, it will contain `bridgeBufferSize` bytes.
3137
private var buffer: Data
3238

3339
/// The `ReadableStream` that will serve as the input to this bridge.
3440
/// The bridge will read bytes from this stream and dump them to the Foundation stream
35-
/// pair as they become available.
41+
/// pair as they become available.
3642
let readableStream: ReadableStream
3743

3844
/// A Foundation stream that will carry the bytes read from the readableStream as they become available.
@@ -44,80 +50,100 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
4450
/// A Logger for logging events.
4551
private let logger: LogAgent
4652

47-
/// Actor used to ensure writes are performed in series.
48-
actor WriteCoordinator {
53+
/// Actor used to ensure writes are performed in series, one at a time.
54+
private actor WriteCoordinator {
4955
var task: Task<Void, Error>?
5056

51-
/// `true` if the readable stream has been found to be empty, `false` otherwise. Will flip to `true` if the readable stream is read,
52-
/// and `nil` is returned.
53-
var readableStreamIsEmpty = false
54-
55-
/// Sets stream status to indicate the stream is empty.
56-
func setReadableStreamIsEmpty() async {
57-
readableStreamIsEmpty = true
58-
}
59-
6057
/// Creates a new concurrent Task that executes the passed block, ensuring that the previous Task
6158
/// finishes before this task starts.
6259
///
6360
/// Acts as a sort of "serial queue" of Swift concurrency tasks.
6461
/// - Parameter block: The code to be performed in this task.
65-
func perform(_ block: @escaping @Sendable (WriteCoordinator) async throws -> Void) {
66-
self.task = Task { [task] in
62+
func perform(_ block: @escaping @Sendable () async throws -> Void) async throws {
63+
let task = Task { [task] in
6764
_ = await task?.result
68-
try await block(self)
65+
try await block()
6966
}
67+
self.task = task
68+
_ = try await task.value
7069
}
7170
}
7271

7372
/// Actor used to enforce the order of multiple concurrent stream writes.
7473
private let writeCoordinator = WriteCoordinator()
7574

76-
/// A shared serial DispatchQueue to run the stream operations.
77-
/// Performing operations on an async queue allows Swift concurrency tasks to not block.
75+
/// A serial `DispatchQueue` to run the stream operations for the Foundation `OutputStream`.
76+
///
77+
/// Operations performed on the queue include:
78+
/// - Opening the stream
79+
/// - Closing the stream
80+
/// - Writing to the stream
81+
/// - Receiving `StreamDelegate` callbacks
82+
///
83+
/// Queue operations are run in the order they are placed on the queue, and only one operation
84+
/// runs at a time (i.e. this is a "serial queue".)
7885
private let queue = DispatchQueue(label: "AWSFoundationStreamBridge")
7986

87+
/// `true` if the readable stream has been closed, `false` otherwise. Will be flipped to `true` once the readable stream is read,
88+
/// and `nil` is returned.
89+
///
90+
/// Access this variable only during a write operation to ensure exclusive access.
91+
private var readableStreamIsClosed = false
92+
8093
// MARK: - init & deinit
8194

82-
/// Creates a stream bridge taking the passed `ReadableStream` as its input
95+
/// Creates a stream bridge taking the passed `ReadableStream` as its input., and exposing a Foundation `InputStream`
96+
/// that may be used for streaming data on to Foundation components.
8397
///
8498
/// Data will be buffered in an internal, in-memory buffer. The Foundation `InputStream` that exposes `readableStream`
8599
/// is exposed by the `inputStream` property after creation.
86100
/// - Parameters:
87101
/// - readableStream: The `ReadableStream` that serves as the input to the bridge.
88-
/// - bufferSize: The number of bytes in the in-memory buffer. The buffer is allocated for this size no matter if in use or not.
89-
/// Defaults to 65536 bytes.
90-
init(readableStream: ReadableStream, bufferSize: Int = 65_536, logger: LogAgent) {
102+
/// - bridgeBufferSize: The number of bytes in the in-memory buffer. The buffer is allocated for this size no matter if in use or not.
103+
/// Defaults to 65536 bytes (64 kb).
104+
/// - boundStreamBufferSize: The number of bytes in the buffer between the bound Foundation streams. If `nil`, uses the
105+
/// same size as `bridgeBufferSize`. Defaults to `nil`. Primary use of this parameter is for testing.
106+
/// - logger: A logger that can be used to log stream events.
107+
init(
108+
readableStream: ReadableStream,
109+
bridgeBufferSize: Int = 65_536,
110+
boundStreamBufferSize: Int? = nil,
111+
logger: LogAgent
112+
) {
91113
var inputStream: InputStream?
92114
var outputStream: OutputStream?
93115

94116
// Create a "bound stream pair" of Foundation streams.
95117
// Data written into the output stream will automatically flow to the inputStream for reading.
96118
// The bound streams have a buffer between them of size equal to the buffer held by this bridge.
97119
Foundation.Stream.getBoundStreams(
98-
withBufferSize: bufferSize, inputStream: &inputStream, outputStream: &outputStream
120+
withBufferSize: boundStreamBufferSize ?? bridgeBufferSize,
121+
inputStream: &inputStream,
122+
outputStream: &outputStream
99123
)
100124
guard let inputStream, let outputStream else {
101125
// Fail with fatalError since this is not a failure that would happen in normal operation.
102126
fatalError("Get pair of bound streams failed. Please file a bug with AWS SDK for Swift.")
103127
}
104-
self.bufferSize = bufferSize
105-
self.buffer = Data(capacity: bufferSize)
128+
self.bridgeBufferSize = bridgeBufferSize
129+
self.buffer = Data(capacity: bridgeBufferSize)
106130
self.readableStream = readableStream
107131
self.inputStream = inputStream
108132
self.outputStream = outputStream
109133
self.logger = logger
110134

111-
// The output stream is configured to deliver its callbacks on the dispatch queue.
135+
// The Foundation `OutputStream` is configured to deliver its callbacks on the dispatch queue.
112136
// This precludes the need for a Thread with RunLoop.
113137
// For safety, all interactions with the output stream will be performed on this queue.
114138
CFWriteStreamSetDispatchQueue(outputStream, queue)
115139
}
116140

117141
// MARK: - Opening & closing
118142

119-
/// Schedule the output stream on the queue for stream callbacks.
120-
/// Do not wait to complete opening before returning.
143+
/// Open the output stream and schedule this bridge to receive stream delegate callbacks.
144+
///
145+
/// Stream operations are performed on the stream's queue.
146+
/// Stream opening is completed before asynchronous return to the caller.
121147
func open() async {
122148
await withCheckedContinuation { continuation in
123149
queue.async {
@@ -128,8 +154,10 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
128154
}
129155
}
130156

131-
/// Unschedule the output stream on the special stream callback thread.
132-
/// Do not wait to complete closing before returning.
157+
/// Close the output stream and unschedule this bridge from receiving stream delegate callbacks.
158+
///
159+
/// Stream operations are performed on the stream's queue.
160+
/// Stream closing is completed before asynchronous return to the caller.
133161
func close() async {
134162
await withCheckedContinuation { continuation in
135163
queue.async {
@@ -142,50 +170,97 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
142170

143171
// MARK: - Writing to bridge
144172

145-
/// Tries to read from the readable stream if possible, then transfer the data to the output stream.
173+
/// Writes buffered data to the output stream.
174+
/// If the buffer is empty, the `ReadableStream` will be read first to replenish the buffer.
175+
///
176+
/// If the buffer is empty and the readable stream is closed, there is no more data to bridge, and the output stream is closed.
146177
private func writeToOutput() async throws {
147-
await writeCoordinator.perform { [self] writeCoordinator in
148-
var data = Data()
149-
if await !writeCoordinator.readableStreamIsEmpty {
150-
if let newData = try await readableStream.readAsync(upToCount: bufferSize) {
151-
data = newData
178+
179+
// Perform the write on the `WriteCoordinator` to ensure that writes happen in-order
180+
// and one at a time.
181+
//
182+
// Note that it is safe to access `buffer` and `readableStreamIsClosed` instance vars
183+
// from inside the block passed to `perform()` because this is the only place
184+
// these instance vars are accessed, and the code in the `perform()` block runs
185+
// in series with any other calls to `perform()`.
186+
try await writeCoordinator.perform { [self] in
187+
188+
// If there is no data in the buffer and the `ReadableStream` is still open,
189+
// attempt to read the stream. Otherwise, skip reading the `ReadableStream` and
190+
// write what's in the buffer immediately.
191+
if !readableStreamIsClosed && buffer.isEmpty {
192+
if let newData = try await readableStream.readAsync(upToCount: bridgeBufferSize - buffer.count) {
193+
buffer.append(newData)
152194
} else {
153-
await writeCoordinator.setReadableStreamIsEmpty()
154-
await close()
195+
readableStreamIsClosed = true
155196
}
156197
}
157-
try await writeToOutputStream(data: data)
198+
199+
// Write the previously buffered data and/or newly read data, if any, to the Foundation `OutputStream`.
200+
// Capture the error from the stream write, if any.
201+
var streamError: Error?
202+
if !buffer.isEmpty {
203+
streamError = await writeToOutputStream()
204+
}
205+
206+
// If the readable stream has closed and there is no data in the buffer,
207+
// there is nothing left to forward to the output stream, so close it.
208+
if readableStreamIsClosed && buffer.isEmpty {
209+
await close()
210+
}
211+
212+
// If the output stream write produced an error, throw it now, else just return.
213+
if let streamError { throw streamError }
158214
}
159215
}
160216

161-
/// Write the passed data to the output stream, using the reserved thread.
162-
private func writeToOutputStream(data: Data) async throws {
163-
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
217+
/// Using the output stream's callback queue, write the buffered data to the Foundation `OutputStream`.
218+
///
219+
/// After writing, remove the written data from the buffer.
220+
/// - Returns: The error resulting from the write to the Foundation `OutputStream`, or `nil` if no error occurred.
221+
private func writeToOutputStream() async -> Error? {
222+
223+
// Suspend the caller while the write is performed on the Foundation `OutputStream`'s queue.
224+
await withCheckedContinuation { continuation in
225+
226+
// Perform the write to the Foundation `OutputStream` on its queue.
164227
queue.async { [self] in
165-
guard !buffer.isEmpty || !data.isEmpty else { continuation.resume(); return }
166-
buffer.append(data)
228+
229+
// Write to the output stream. It may not accept all data, so get the number of bytes
230+
// it accepted in `writeCount`.
167231
var writeCount = 0
168232
buffer.withUnsafeBytes { bufferPtr in
169233
guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return }
170234
writeCount = outputStream.write(bytePtr, maxLength: buffer.count)
171235
}
236+
237+
// `writeCount` will be a positive number if bytes were written.
238+
// Remove the written bytes from the front of the buffer.
172239
if writeCount > 0 {
173240
logger.info("FoundationStreamBridge: wrote \(writeCount) bytes to request body")
174241
buffer.removeFirst(writeCount)
175242
}
176-
if let error = outputStream.streamError {
177-
continuation.resume(throwing: error)
178-
} else {
179-
continuation.resume()
180-
}
243+
244+
// Resume the caller now that the write is complete, returning the stream error, if any.
245+
continuation.resume(returning: outputStream.streamError)
181246
}
182247
}
183248
}
184249

185250
// MARK: - StreamDelegate protocol
186251

187-
/// The stream places this callback when appropriate. Call will be delivered on the GCD queue for stream callbacks.
188-
/// `.hasSpaceAvailable` prompts this type to query the readable stream for more data.
252+
/// The stream places this callback when an event happens.
253+
///
254+
/// The `FoundationStreamBridge` sets itself as the delegate of the Foundation `OutputStream` whenever the
255+
/// `OutputStream` is open. Stream callbacks will be delivered on the GCD serial queue.
256+
///
257+
/// `.hasSpaceAvailable` is the only event where the `FoundationStreamBridge` takes action; in response to
258+
/// this event, the `FoundationStreamBridge` will write data to the `OutputStream`.
259+
///
260+
/// This method is implemented for the Foundation `StreamDelegate` protocol.
261+
/// - Parameters:
262+
/// - aStream: The stream which experienced the event.
263+
/// - eventCode: A code describing the type of event that happened.
189264
@objc func stream(_ aStream: Foundation.Stream, handle eventCode: Foundation.Stream.Event) {
190265
switch eventCode {
191266
case .openCompleted:
@@ -199,7 +274,7 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
199274
Task { try await writeToOutput() }
200275
case .errorOccurred:
201276
logger.info("FoundationStreamBridge: .errorOccurred event")
202-
logger.info("FoundationStreamBridge: Stream error: \(String(describing: aStream.streamError))")
277+
logger.info("FoundationStreamBridge: Stream error: \(aStream.streamError.debugDescription)")
203278
case .endEncountered:
204279
break
205280
default:

Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ public final class URLSessionHTTPClient: HTTPClient {
410410
// that URLSession can stream its request body from.
411411
// Allow 16kb of in-memory buffer for request body streaming
412412
let streamBridge = requestStream.map {
413-
FoundationStreamBridge(readableStream: $0, bufferSize: 16_384, logger: logger)
413+
FoundationStreamBridge(readableStream: $0, bridgeBufferSize: 16_384, logger: logger)
414414
}
415415

416416
// Create the request (with a streaming body when needed.)

0 commit comments

Comments
 (0)