Skip to content

Commit 05ab352

Browse files
authored
fix: streaming response (#293)
1 parent b7ed7ee commit 05ab352

File tree

10 files changed

+39
-105
lines changed

10 files changed

+39
-105
lines changed

Packages/ClientRuntime/Sources/Networking/Http/CRT/CRTClientEngine.swift

Lines changed: 9 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -86,18 +86,11 @@ public class CRTClientEngine: HttpClientEngine {
8686
}
8787

8888
public func execute(request: SdkHttpRequest) -> Future<HttpResponse> {
89-
let isStreaming = { () -> Bool in
90-
switch request.body {
91-
case .stream: return true
92-
default: return false
93-
}
94-
}()
9589
let connectionMgr = getOrCreateConnectionPool(endpoint: request.endpoint)
9690
let httpResponseFuture: Future<HttpResponse> = connectionMgr.acquireConnection()
9791
.chained { (connectionResult) -> Future<HttpResponse> in
9892
self.logger.debug("Connection was acquired to: \(String(describing: request.endpoint.url?.absoluteString))")
99-
let (requestOptions, future) = isStreaming ?
100-
self.makeHttpRequestStreamOptions(request): self.makeHttpRequestOptions(request)
93+
let (requestOptions, future) = self.makeHttpRequestStreamOptions(request)
10194
switch connectionResult {
10295
case .failure(let error):
10396
future.fail(error)
@@ -132,11 +125,7 @@ public class CRTClientEngine: HttpClientEngine {
132125
let crtRequest = request.toHttpRequest(bufferSize: windowSize)
133126
let response = HttpResponse()
134127

135-
var streamReader: StreamReader?
136-
if case let HttpBody.stream(unwrappedStream) = request.body,
137-
case let ByteStream.reader(reader) = unwrappedStream {
138-
streamReader = reader
139-
}
128+
let streamReader: StreamReader = DataStreamReader()
140129

141130
let requestOptions = HttpRequestOptions(request: crtRequest) { [self] (stream, _, httpHeaders) in
142131
logger.debug("headers were received")
@@ -150,60 +139,21 @@ public class CRTClientEngine: HttpClientEngine {
150139
} onIncomingBody: { [self] (_, data) in
151140
logger.debug("incoming data")
152141

153-
if let streamReader = streamReader {
154-
let byteBuffer = ByteBuffer(data: data)
155-
streamReader.write(buffer: byteBuffer)
156-
}
157-
} onStreamComplete: { [self] (_, error) in
158-
logger.debug("stream completed")
159-
if case let CRTError.crtError(unwrappedError) = error {
160-
if unwrappedError.errorCode != 0 {
161-
logger.error("Response encountered an error: \(error)")
162-
if let streamReader = streamReader {
163-
streamReader.onError(error: ClientError.crtError(error))
164-
}
165-
future.fail(error)
166-
}
167-
}
168-
if let streamReader = streamReader {
169-
streamReader.hasFinishedWriting = true
170-
response.body = .stream(.reader(streamReader))
171-
}
172-
future.fulfill(response)
173-
}
174-
175-
return (requestOptions, future)
176-
}
177-
178-
public func makeHttpRequestOptions(_ request: SdkHttpRequest) -> (HttpRequestOptions, Future<HttpResponse>) {
179-
let future = Future<HttpResponse>()
180-
let crtRequest = request.toHttpRequest(bufferSize: windowSize)
181-
182-
let response = HttpResponse()
183-
var incomingData = Data()
184-
185-
let requestOptions = HttpRequestOptions(request: crtRequest) { [self] (stream, _, httpHeaders) in
186-
logger.debug("headers were received")
187-
response.statusCode = HttpStatusCode(rawValue: Int(stream.getResponseStatusCode()))
188-
?? HttpStatusCode.notFound
189-
response.headers.addAll(httpHeaders: httpHeaders)
190-
} onIncomingHeadersBlockDone: { [self] (stream, _) in
191-
logger.debug("header block is done")
192-
response.statusCode = HttpStatusCode(rawValue: Int(stream.getResponseStatusCode()))
193-
?? HttpStatusCode.notFound
194-
} onIncomingBody: { [self] (_, data) in
195-
logger.debug("incoming data: \(data.count) bytes")
196-
incomingData.append(data)
142+
let byteBuffer = ByteBuffer(data: data)
143+
streamReader.write(buffer: byteBuffer)
197144
} onStreamComplete: { [self] (_, error) in
198145
logger.debug("stream completed")
146+
streamReader.hasFinishedWriting = true
199147
if case let CRTError.crtError(unwrappedError) = error {
200148
if unwrappedError.errorCode != 0 {
201149
logger.error("Response encountered an error: \(error)")
150+
streamReader.onError(error: ClientError.crtError(error))
202151
future.fail(error)
152+
return
203153
}
204154
}
205-
206-
response.body = HttpBody.data(incomingData)
155+
156+
response.body = .stream(.reader(streamReader))
207157
future.fulfill(response)
208158
}
209159

smithy-swift-codegen/src/main/kotlin/software/amazon/smithy/swift/codegen/integration/HttpHeaderMiddleware.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ class HttpHeaderMiddleware(
7474
renderDoCatch(memberNameWithExtension, paramName)
7575
} else {
7676
if (member.needsDefaultValueCheck(ctx.model, ctx.symbolProvider) && !inCollection) {
77-
writer.write("let needsToBeSentAcrossTheWire = $memberName != ${member.defaultValue(ctx.symbolProvider)}")
78-
writer.openBlock("if needsToBeSentAcrossTheWire {", "}") {
77+
writer.openBlock("if $memberName != ${member.defaultValue(ctx.symbolProvider)} {", "}") {
7978
writer.write("input.builder.withHeader(name: \"$paramName\", value: String($memberNameWithExtension))")
8079
}
8180
} else {

smithy-swift-codegen/src/main/kotlin/software/amazon/smithy/swift/codegen/integration/HttpProtocolUnitTestRequestGenerator.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ open class HttpProtocolUnitTestRequestGenerator protected constructor(builder: B
6767
decoderProperty?.renderInstantiation(writer)
6868
decoderProperty?.renderConfiguration(writer)
6969

70-
// TODO:: handle streaming inputs
71-
// isStreamingRequest = inputShape.asStructureShape().get().hasStreamingMember(model)
7270
writer.writeInline("\nlet input = ")
7371
.call {
7472
ShapeValueGenerator(model, symbolProvider).writeShapeValueInline(writer, inputShape, test.params)

smithy-swift-codegen/src/main/kotlin/software/amazon/smithy/swift/codegen/integration/HttpProtocolUnitTestResponseGenerator.kt

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import software.amazon.smithy.swift.codegen.RecursiveShapeBoxer
1515
import software.amazon.smithy.swift.codegen.ShapeValueGenerator
1616
import software.amazon.smithy.swift.codegen.SwiftWriter
1717
import software.amazon.smithy.swift.codegen.getOrNull
18-
import software.amazon.smithy.swift.codegen.hasStreamingMember
1918
import software.amazon.smithy.swift.codegen.isBoxed
2019

2120
/**
@@ -53,21 +52,10 @@ open class HttpProtocolUnitTestResponseGenerator protected constructor(builder:
5352
renderHeadersInHttpResponse(test)
5453
test.body.ifPresent { body ->
5554
if (body.isNotBlank() && body.isNotEmpty()) {
56-
val isStreamingResponse = operation.output.map {
57-
val outputShape = model.expectShape(it)
58-
outputShape.asStructureShape().get().hasStreamingMember(model)
59-
}.orElse(false)
60-
if (isStreamingResponse) {
61-
writer.write(
62-
"content: HttpBody.stream(ByteStream.from(data: \"\"\"\n\$L\n\"\"\".data(using: .utf8)!)),",
63-
body.replace(".000", "")
64-
)
65-
} else {
66-
writer.write(
67-
"content: HttpBody.data(\"\"\"\n\$L\n\"\"\".data(using: .utf8)),",
68-
body.replace(".000", "")
69-
)
70-
}
55+
writer.write(
56+
"content: HttpBody.stream(ByteStream.from(data: \"\"\"\n\$L\n\"\"\".data(using: .utf8)!)),",
57+
body.replace(".000", "")
58+
)
7159
}
7260
}
7361
writer.write("host: host")

smithy-swift-codegen/src/main/kotlin/software/amazon/smithy/swift/codegen/integration/HttpQueryItemMiddleware.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,7 @@ class HttpQueryItemMiddleware(
126126
renderDoCatch(memberName, paramName)
127127
} else {
128128
if (member.needsDefaultValueCheck(ctx.model, ctx.symbolProvider)) {
129-
writer.write("let needsToBeSentAcrossTheWire = $memberName != ${member.defaultValue(ctx.symbolProvider)}")
130-
writer.openBlock("if needsToBeSentAcrossTheWire {", "}") {
129+
writer.openBlock("if $memberName != ${member.defaultValue(ctx.symbolProvider)} {", "}") {
131130
val queryItemName = "${ctx.symbolProvider.toMemberNames(member).second}QueryItem"
132131
writer.write("let $queryItemName = URLQueryItem(name: \"$paramName\".urlPercentEncoding(), value: String($memberName).urlPercentEncoding())")
133132
writer.write("input.builder.withQueryItem($queryItemName)")

smithy-swift-codegen/src/main/kotlin/software/amazon/smithy/swift/codegen/integration/httpResponse/bindingTraits/HttpResponseTraitWithHttpPayload.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ class HttpResponseTraitWithHttpPayload(
2222
// TODO: properly support event streams and other binary stream types besides blob
2323
val isBinaryStream =
2424
ctx.model.getShape(binding.member.target).get().hasTrait<StreamingTrait>() && target.type == ShapeType.BLOB
25-
val bodyType = if (isBinaryStream) ".stream" else ".data"
26-
val additionalUnwrap = if (!isBinaryStream) ",\n let data = data" else ""
27-
writer.openBlock("if case $bodyType(let data) = httpResponse.body$additionalUnwrap {", "} else {") {
25+
writer.openBlock("if case .stream(let reader) = httpResponse.body {", "} else {") {
26+
val extension = if (!isBinaryStream) ".toBytes().toData()" else ""
27+
writer.write("let data = reader$extension")
2828
when (target.type) {
2929
ShapeType.DOCUMENT -> {
3030
writer.openBlock("if let responseDecoder = decoder {", "} else {") {

smithy-swift-codegen/src/main/kotlin/software/amazon/smithy/swift/codegen/integration/httpResponse/bindingTraits/HttpResponseTraitWithoutHttpPayload.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ class HttpResponseTraitWithoutHttpPayload(
4040
val bodyMembersWithoutQueryTraitMemberNames = bodyMembersWithoutQueryTrait.map { ctx.symbolProvider.toMemberName(it.member) }
4141

4242
if (bodyMembersWithoutQueryTrait.isNotEmpty()) {
43-
writer.write("if case .data(let data) = httpResponse.body,")
43+
writer.write("if case .stream(let reader) = httpResponse.body,")
4444
writer.indent()
45-
writer.write("let unwrappedData = data,")
4645
writer.write("let responseDecoder = decoder {")
47-
writer.write("let output: ${outputShapeName}Body = try responseDecoder.decode(responseBody: unwrappedData)")
46+
writer.write("let data = reader.toBytes().toData()")
47+
writer.write("let output: ${outputShapeName}Body = try responseDecoder.decode(responseBody: data)")
4848
bodyMembersWithoutQueryTraitMemberNames.sorted().forEach {
4949
writer.write("self.$it = output.$it")
5050
}

smithy-swift-codegen/src/test/kotlin/HttpBindingProtocolGeneratorTests.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ class HttpBindingProtocolGeneratorTests {
7171
"""
7272
extension ExplicitStructOutputResponse: HttpResponseBinding {
7373
public init (httpResponse: HttpResponse, decoder: ResponseDecoder? = nil) throws {
74-
if case .data(let data) = httpResponse.body,
75-
let data = data {
74+
if case .stream(let reader) = httpResponse.body {
75+
let data = reader.toBytes().toData()
7676
if let responseDecoder = decoder {
7777
let output: Nested2 = try responseDecoder.decode(responseBody: data)
7878
self.payload1 = output
@@ -118,8 +118,8 @@ extension HttpResponseCodeOutputResponse: HttpResponseBinding {
118118
"""
119119
extension InlineDocumentAsPayloadOutputResponse: HttpResponseBinding {
120120
public init (httpResponse: HttpResponse, decoder: ResponseDecoder? = nil) throws {
121-
if case .data(let data) = httpResponse.body,
122-
let data = data {
121+
if case .stream(let reader) = httpResponse.body {
122+
let data = reader.toBytes().toData()
123123
if let responseDecoder = decoder {
124124
let output: Document = try responseDecoder.decode(responseBody: data)
125125
self.documentValue = output

smithy-swift-codegen/src/test/kotlin/HttpProtocolUnitTestErrorGeneratorTests.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ class GreetingWithErrorsComplexErrorTest: HttpResponseTestBase {
2727
"X-Amzn-Errortype": "ComplexError",
2828
"X-Header": "Header"
2929
],
30-
content: HttpBody.data(""${'"'}
30+
content: HttpBody.stream(ByteStream.from(data: ""${'"'}
3131
{
3232
"TopLevel": "Top level",
3333
"Nested": {
3434
"Fooooo": "bar"
3535
}
3636
}
37-
""${'"'}.data(using: .utf8)),
37+
""${'"'}.data(using: .utf8)!)),
3838
host: host
3939
) else {
4040
XCTFail("Something is wrong with the created http response")
@@ -91,14 +91,14 @@ class GreetingWithErrorsComplexErrorTest: HttpResponseTestBase {
9191
"X-Amzn-Errortype": "ComplexError",
9292
"X-Header": "Header"
9393
],
94-
content: HttpBody.data(""${'"'}
94+
content: HttpBody.stream(ByteStream.from(data: ""${'"'}
9595
{
9696
"TopLevel": "Top level",
9797
"Nested": {
9898
"Fooooo": "bar"
9999
}
100100
}
101-
""${'"'}.data(using: .utf8)),
101+
""${'"'}.data(using: .utf8)!)),
102102
host: host
103103
) else {
104104
XCTFail("Something is wrong with the created http response")

smithy-swift-codegen/src/test/kotlin/HttpProtocolUnitTestResponseGeneratorTests.kt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ open class HttpProtocolUnitTestResponseGeneratorTests {
3737
"X-Int": "1",
3838
"X-String": "Hello"
3939
],
40-
content: HttpBody.data(""${'"'}
40+
content: HttpBody.stream(ByteStream.from(data: ""${'"'}
4141
{
4242
"payload1": "explicit string",
4343
"payload2": 1,
@@ -47,7 +47,7 @@ open class HttpProtocolUnitTestResponseGeneratorTests {
4747
}
4848
}
4949
50-
""${'"'}.data(using: .utf8)),
50+
""${'"'}.data(using: .utf8)!)),
5151
host: host
5252
) else {
5353
XCTFail("Something is wrong with the created http response")
@@ -165,13 +165,13 @@ open class HttpProtocolUnitTestResponseGeneratorTests {
165165
headers: [
166166
"Content-Type": "application/json"
167167
],
168-
content: HttpBody.data(""${'"'}
168+
content: HttpBody.stream(ByteStream.from(data: ""${'"'}
169169
{
170170
"contents": {
171171
"stringValue": "foo"
172172
}
173173
}
174-
""${'"'}.data(using: .utf8)),
174+
""${'"'}.data(using: .utf8)!)),
175175
host: host
176176
) else {
177177
XCTFail("Something is wrong with the created http response")
@@ -208,7 +208,7 @@ open class HttpProtocolUnitTestResponseGeneratorTests {
208208
headers: [
209209
"Content-Type": "application/json"
210210
],
211-
content: HttpBody.data(""${'"'}
211+
content: HttpBody.stream(ByteStream.from(data: ""${'"'}
212212
{
213213
"nested": {
214214
"foo": "Foo1",
@@ -223,7 +223,7 @@ open class HttpProtocolUnitTestResponseGeneratorTests {
223223
}
224224
}
225225
}
226-
""${'"'}.data(using: .utf8)),
226+
""${'"'}.data(using: .utf8)!)),
227227
host: host
228228
) else {
229229
XCTFail("Something is wrong with the created http response")
@@ -276,14 +276,14 @@ open class HttpProtocolUnitTestResponseGeneratorTests {
276276
headers: [
277277
"Content-Type": "application/json"
278278
],
279-
content: HttpBody.data(""${'"'}
279+
content: HttpBody.stream(ByteStream.from(data: ""${'"'}
280280
{
281281
"stringValue": "string",
282282
"documentValue": {
283283
"foo": "bar"
284284
}
285285
}
286-
""${'"'}.data(using: .utf8)),
286+
""${'"'}.data(using: .utf8)!)),
287287
host: host
288288
) else {
289289
XCTFail("Something is wrong with the created http response")
@@ -330,11 +330,11 @@ open class HttpProtocolUnitTestResponseGeneratorTests {
330330
headers: [
331331
"Content-Type": "application/json"
332332
],
333-
content: HttpBody.data(""${'"'}
333+
content: HttpBody.stream(ByteStream.from(data: ""${'"'}
334334
{
335335
"foo": "bar"
336336
}
337-
""${'"'}.data(using: .utf8)),
337+
""${'"'}.data(using: .utf8)!)),
338338
host: host
339339
) else {
340340
XCTFail("Something is wrong with the created http response")

0 commit comments

Comments
 (0)