Skip to content

Commit d2cda87

Browse files
authored
fix: Streaming round trip (#284)
1 parent d91d213 commit d2cda87

28 files changed

+404
-419
lines changed

Packages/ClientRuntime/Sources/Networking/Http/CRT/CRTClientEngine.swift

Lines changed: 21 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,6 @@ public class CRTClientEngine: HttpClientEngine {
7979
return connectionPool
8080
}
8181

82-
private func addHttpHeaders(endpoint: Endpoint, request: SdkHttpRequest) -> HttpRequest {
83-
84-
var headers = request.headers
85-
86-
let contentLength: Int64 = {
87-
switch request.body {
88-
case .data(let data):
89-
return Int64(data?.count ?? 0)
90-
case .streamSource(let stream):
91-
// TODO: implement dynamic streaming with transfer-encoded-chunk header
92-
return stream.unwrap().contentLength
93-
case .none, .streamSink:
94-
return 0
95-
}
96-
}()
97-
98-
headers.update(name: CONTENT_LENGTH_HEADER, value: "\(contentLength)")
99-
100-
request.headers = headers
101-
return request.toHttpRequest(bufferSize: windowSize)
102-
}
103-
10482
public func executeWithClosure(request: SdkHttpRequest, completion: @escaping NetworkResult) {
10583
execute(request: request).then { (result) in
10684
completion(result)
@@ -110,7 +88,7 @@ public class CRTClientEngine: HttpClientEngine {
11088
public func execute(request: SdkHttpRequest) -> Future<HttpResponse> {
11189
let isStreaming = { () -> Bool in
11290
switch request.body {
113-
case .streamSink, .streamSource: return true
91+
case .stream: return true
11492
default: return false
11593
}
11694
}()
@@ -151,15 +129,16 @@ public class CRTClientEngine: HttpClientEngine {
151129

152130
public func makeHttpRequestStreamOptions(_ request: SdkHttpRequest) -> (HttpRequestOptions, Future<HttpResponse>) {
153131
let future = Future<HttpResponse>()
154-
let requestWithHeaders = addHttpHeaders(endpoint: request.endpoint, request: request)
132+
let crtRequest = request.toHttpRequest(bufferSize: windowSize)
155133
let response = HttpResponse()
156134

157-
var streamSink: StreamSink?
158-
if case let HttpBody.streamSink(unwrappedStream) = request.body {
159-
// we know they want to receive a stream via their request body type
160-
streamSink = unwrappedStream.unwrap()
135+
var streamReader: StreamReader?
136+
if case let HttpBody.stream(unwrappedStream) = request.body,
137+
case let ByteStream.reader(reader) = unwrappedStream {
138+
streamReader = reader
161139
}
162-
let requestOptions = HttpRequestOptions(request: requestWithHeaders) { [self] (stream, _, httpHeaders) in
140+
141+
let requestOptions = HttpRequestOptions(request: crtRequest) { [self] (stream, _, httpHeaders) in
163142
logger.debug("headers were received")
164143
response.statusCode = HttpStatusCode(rawValue: Int(stream.getResponseStatusCode()))
165144
?? HttpStatusCode.notFound
@@ -171,28 +150,25 @@ public class CRTClientEngine: HttpClientEngine {
171150
} onIncomingBody: { [self] (_, data) in
172151
logger.debug("incoming data")
173152

174-
if let streamSink = streamSink {
153+
if let streamReader = streamReader {
175154
let byteBuffer = ByteBuffer(data: data)
176-
streamSink.receiveData(readFrom: byteBuffer)
155+
streamReader.write(buffer: byteBuffer)
177156
}
178157
} onStreamComplete: { [self] (_, error) in
179158
logger.debug("stream completed")
180159
if case let CRTError.crtError(unwrappedError) = error {
181160
if unwrappedError.errorCode != 0 {
182161
logger.error("Response encountered an error: \(error)")
183-
if let streamSink = streamSink {
184-
streamSink.onError(error: StreamError.unknown(error))
162+
if let streamReader = streamReader {
163+
streamReader.onError(error: ClientError.crtError(error))
185164
}
186165
future.fail(error)
187166
}
188167
}
189-
190-
if let streamSink = streamSink {
191-
response.body = HttpBody.streamSink(.provider(streamSink))
192-
} else {
193-
response.body = HttpBody.none
168+
if let streamReader = streamReader {
169+
streamReader.hasFinishedWriting = true
170+
response.body = .stream(.reader(streamReader))
194171
}
195-
196172
future.fulfill(response)
197173
}
198174

@@ -201,12 +177,12 @@ public class CRTClientEngine: HttpClientEngine {
201177

202178
public func makeHttpRequestOptions(_ request: SdkHttpRequest) -> (HttpRequestOptions, Future<HttpResponse>) {
203179
let future = Future<HttpResponse>()
204-
let requestWithHeaders = addHttpHeaders(endpoint: request.endpoint, request: request)
180+
let crtRequest = request.toHttpRequest(bufferSize: windowSize)
205181

206182
let response = HttpResponse()
207-
let incomingByteBuffer = ByteBuffer(size: 0)
183+
var incomingData = Data()
208184

209-
let requestOptions = HttpRequestOptions(request: requestWithHeaders) { [self] (stream, _, httpHeaders) in
185+
let requestOptions = HttpRequestOptions(request: crtRequest) { [self] (stream, _, httpHeaders) in
210186
logger.debug("headers were received")
211187
response.statusCode = HttpStatusCode(rawValue: Int(stream.getResponseStatusCode()))
212188
?? HttpStatusCode.notFound
@@ -216,8 +192,8 @@ public class CRTClientEngine: HttpClientEngine {
216192
response.statusCode = HttpStatusCode(rawValue: Int(stream.getResponseStatusCode()))
217193
?? HttpStatusCode.notFound
218194
} onIncomingBody: { [self] (_, data) in
219-
logger.debug("incoming data")
220-
incomingByteBuffer.put(data)
195+
logger.debug("incoming data: \(data.count) bytes")
196+
incomingData.append(data)
221197
} onStreamComplete: { [self] (_, error) in
222198
logger.debug("stream completed")
223199
if case let CRTError.crtError(unwrappedError) = error {
@@ -227,7 +203,7 @@ public class CRTClientEngine: HttpClientEngine {
227203
}
228204
}
229205

230-
response.body = HttpBody.data(incomingByteBuffer.toData())
206+
response.body = HttpBody.data(incomingData)
231207
future.fulfill(response)
232208
}
233209

Packages/ClientRuntime/Sources/Networking/Http/DefaultMiddlewares/ContentLengthMiddleware.swift

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,24 @@ public struct ContentLengthMiddleware<OperationStackOutput: HttpResponseBinding,
1313
input: MInput,
1414
next: H) -> Result<MOutput, MError>
1515
where H: Handler,
16-
Self.MInput == H.Input,
17-
Self.MOutput == H.Output,
18-
Self.Context == H.Context,
19-
Self.MError == H.MiddlewareError {
16+
Self.MInput == H.Input,
17+
Self.MOutput == H.Output,
18+
Self.Context == H.Context,
19+
Self.MError == H.MiddlewareError {
2020

21-
let contentLength: Int64 = {
22-
switch input.body {
23-
case .data(let data):
24-
return Int64(data?.count ?? 0)
25-
case .streamSource(let stream):
26-
// TODO: implement dynamic streaming with transfer-encoded-chunk header
27-
return stream.unwrap().contentLength
28-
case .none, .streamSink:
29-
return 0
21+
switch input.body {
22+
case .data(let data):
23+
input.headers.update(name: "Content-Length", value: String(data?.count ?? 0))
24+
case .stream(let stream):
25+
switch stream {
26+
case .buffer(let bytes):
27+
input.headers.update(name: "Content-Length", value: String(bytes.length))
28+
case .reader(_):
29+
input.headers.update(name: "Transfer-Encoded", value: "Chunked")
3030
}
31-
}()
32-
33-
input.headers.update(name: "Content-Length", value: String(contentLength))
31+
default:
32+
input.headers.update(name: "Content-Length", value: "0")
33+
}
3434

3535
return next.handle(context: context, input: input)
3636
}

Packages/ClientRuntime/Sources/Networking/Http/HttpBody.swift

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,39 @@
44
*/
55
import AwsCommonRuntimeKit
66

7-
public enum HttpBody {
7+
public enum HttpBody: Equatable {
88
case data(Data?)
9-
case streamSource(StreamSourceProvider)
10-
case streamSink(StreamSinkProvider)
9+
case stream(ByteStream)
1110
case none
1211
}
1312

14-
extension HttpBody: Equatable {
15-
public static func == (lhs: HttpBody, rhs: HttpBody) -> Bool {
16-
switch (lhs, rhs) {
17-
case (let .data(unwrappedlhsData), let .data(unwrappedRhsData)):
18-
return unwrappedlhsData == unwrappedRhsData
19-
case (.streamSource, .streamSource):
20-
return false
21-
case (.streamSink, .streamSink):
22-
return false
23-
case (.none, .none):
24-
return true
25-
default:
26-
return false
27-
}
28-
}
29-
}
30-
3113
public extension HttpBody {
3214
static var empty: HttpBody {
3315
.data(nil)
3416
}
3517
}
3618

19+
extension HttpBody {
20+
func toAwsInputStream() -> AwsInputStream? {
21+
switch self {
22+
case .data(let data):
23+
guard let data = data else {
24+
return nil
25+
}
26+
return AwsInputStream(ByteBuffer(data: data))
27+
case .stream(let stream):
28+
switch stream {
29+
case .reader(let reader):
30+
return AwsInputStream(reader.read(maxBytes: nil))
31+
case .buffer(let byteBuffer):
32+
return AwsInputStream(byteBuffer)
33+
}
34+
case .none:
35+
return nil
36+
}
37+
}
38+
}
39+
3740
extension HttpBody: CustomDebugStringConvertible {
3841
public var debugDescription: String {
3942
var bodyAsString: String?
@@ -42,10 +45,8 @@ extension HttpBody: CustomDebugStringConvertible {
4245
if let data = data {
4346
bodyAsString = String(data: data, encoding: .utf8)
4447
}
45-
case .streamSource(let stream):
46-
let byteBuffer = ByteBuffer(size: 1024)
47-
stream.unwrap().sendData(writeTo: byteBuffer)
48-
bodyAsString = String(data: byteBuffer.toData(), encoding: .utf8)
48+
case .stream(let stream):
49+
bodyAsString = String(data: stream.toBytes().toData(), encoding: .utf8)
4950
default:
5051
bodyAsString = nil
5152
}

Packages/ClientRuntime/Sources/Networking/Http/SdkHttpRequest.swift

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,24 +36,7 @@ extension SdkHttpRequest {
3636
httpRequest.method = method.rawValue
3737
httpRequest.path = "\(endpoint.path)\(endpoint.queryItemString)"
3838
httpRequest.addHeaders(headers: httpHeaders)
39-
var awsInputStream: AwsInputStream?
40-
switch body {
41-
case .data(let data):
42-
if let data = data {
43-
let byteBuffer = ByteBuffer(data: data)
44-
awsInputStream = AwsInputStream(byteBuffer)
45-
}
46-
case .streamSource(let stream):
47-
let byteBuffer = ByteBuffer(size: bufferSize)
48-
stream.unwrap().sendData(writeTo: byteBuffer)
49-
awsInputStream = AwsInputStream(byteBuffer)
50-
case .none, .streamSink:
51-
awsInputStream = nil
52-
}
53-
if let inputStream = awsInputStream {
54-
httpRequest.body = inputStream
55-
}
56-
39+
httpRequest.body = body.toAwsInputStream()
5740
return httpRequest
5841
}
5942
}

Packages/ClientRuntime/Sources/Networking/StreamError.swift

Lines changed: 0 additions & 8 deletions
This file was deleted.

Packages/ClientRuntime/Sources/Networking/StreamSink.swift

Lines changed: 0 additions & 86 deletions
This file was deleted.

0 commit comments

Comments
 (0)