Skip to content

Commit 50f3795

Browse files
Copy bi-directional event stream request (#3311)
* Copy bi-directional event stream operation request * Code generation example
1 parent 3a8bb67 commit 50f3795

File tree

46 files changed

+496
-189
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+496
-189
lines changed

generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/model/SubscribeToShardHandler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ namespace Model
3636
public:
3737
AWS_KINESIS_API SubscribeToShardHandler();
3838
AWS_KINESIS_API SubscribeToShardHandler& operator=(const SubscribeToShardHandler&) = default;
39+
AWS_KINESIS_API SubscribeToShardHandler(const SubscribeToShardHandler&) = default;
3940

4041
AWS_KINESIS_API virtual void OnEvent() override;
4142

generated/src/aws-cpp-sdk-kinesis/include/aws/kinesis/model/SubscribeToShardRequest.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ namespace Model
4545
/**
4646
* Underlying Event Stream Handler which is used to define callback functions.
4747
*/
48-
inline const SubscribeToShardHandler& GetEventStreamHandler() const { return m_handler; }
48+
inline SubscribeToShardHandler& GetEventStreamHandler() { return m_handler; }
4949

5050
/**
5151
* Underlying Event Stream Handler which is used to define callback functions.

generated/src/aws-cpp-sdk-kinesis/source/KinesisClient.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,14 @@ SubscribeToShardOutcome KinesisClient::SubscribeToShard(SubscribeToShardRequest&
836836
request.SetResponseStreamFactory(
837837
[&] { request.GetEventStreamDecoder().Reset(); return Aws::New<Aws::Utils::Event::EventDecoderStream>(ALLOCATION_TAG, request.GetEventStreamDecoder()); }
838838
);
839+
if (!request.GetHeadersReceivedEventHandler()) {
840+
request.SetHeadersReceivedEventHandler([&request](const Http::HttpRequest*, Http::HttpResponse* response) {
841+
AWS_CHECK_PTR("SubscribeToShard", response);
842+
if (const auto initialResponseHandler = request.GetEventStreamHandler().GetInitialResponseCallbackEx()) {
843+
initialResponseHandler({response->GetHeaders()}, Utils::Event::InitialResponseType::ON_RESPONSE);
844+
}
845+
});
846+
}
839847
return SubscribeToShardOutcome(MakeRequestDeserialize(&request, request.GetServiceRequestName(), Aws::Http::HttpMethod::HTTP_POST, [&](Aws::Endpoint::AWSEndpoint& resolvedEndpoint) -> void {
840848
AWS_UNREFERENCED_PARAM(resolvedEndpoint);
841849
}));

generated/src/aws-cpp-sdk-lambda/include/aws/lambda/model/InvokeWithResponseStreamHandler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ namespace Model
3939
public:
4040
AWS_LAMBDA_API InvokeWithResponseStreamHandler();
4141
AWS_LAMBDA_API InvokeWithResponseStreamHandler& operator=(const InvokeWithResponseStreamHandler&) = default;
42+
AWS_LAMBDA_API InvokeWithResponseStreamHandler(const InvokeWithResponseStreamHandler&) = default;
4243

4344
AWS_LAMBDA_API virtual void OnEvent() override;
4445

generated/src/aws-cpp-sdk-lambda/include/aws/lambda/model/InvokeWithResponseStreamRequest.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ namespace Model
5151
/**
5252
* Underlying Event Stream Handler which is used to define callback functions.
5353
*/
54-
inline const InvokeWithResponseStreamHandler& GetEventStreamHandler() const { return m_handler; }
54+
inline InvokeWithResponseStreamHandler& GetEventStreamHandler() { return m_handler; }
5555

5656
/**
5757
* Underlying Event Stream Handler which is used to define callback functions.

generated/src/aws-cpp-sdk-lambda/source/LambdaClient.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1467,6 +1467,14 @@ InvokeWithResponseStreamOutcome LambdaClient::InvokeWithResponseStream(InvokeWit
14671467
request.SetResponseStreamFactory(
14681468
[&] { request.GetEventStreamDecoder().Reset(); return Aws::New<Aws::Utils::Event::EventDecoderStream>(ALLOCATION_TAG, request.GetEventStreamDecoder()); }
14691469
);
1470+
if (!request.GetHeadersReceivedEventHandler()) {
1471+
request.SetHeadersReceivedEventHandler([&request](const Http::HttpRequest*, Http::HttpResponse* response) {
1472+
AWS_CHECK_PTR("InvokeWithResponseStream", response);
1473+
if (const auto initialResponseHandler = request.GetEventStreamHandler().GetInitialResponseCallbackEx()) {
1474+
initialResponseHandler({response->GetHeaders()}, Utils::Event::InitialResponseType::ON_RESPONSE);
1475+
}
1476+
});
1477+
}
14701478
return InvokeWithResponseStreamOutcome(MakeRequest(request, endpointResolutionOutcome.GetResult(), Aws::Http::HttpMethod::HTTP_POST));
14711479
},
14721480
TracingUtils::SMITHY_CLIENT_DURATION_METRIC,

generated/src/aws-cpp-sdk-lambda/source/model/InvokeWithResponseStreamRequest.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,6 @@ InvokeWithResponseStreamRequest::InvokeWithResponseStreamRequest() :
2727
m_qualifierHasBeenSet(false),
2828
m_handler(), m_decoder(Aws::Utils::Event::EventStreamDecoder(&m_handler))
2929
{
30-
AmazonWebServiceRequest::SetHeadersReceivedEventHandler([this](const Http::HttpRequest*, Http::HttpResponse* response)
31-
{
32-
auto& initialResponseHandler = m_handler.GetInitialResponseCallbackEx();
33-
if (initialResponseHandler) {
34-
initialResponseHandler(InvokeWithResponseStreamInitialResponse(response->GetHeaders()), Utils::Event::InitialResponseType::ON_RESPONSE);
35-
}
36-
});
3730
}
3831

3932

generated/src/aws-cpp-sdk-lexv2-runtime/include/aws/lexv2-runtime/model/StartConversationHandler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ namespace Model
5151
public:
5252
AWS_LEXRUNTIMEV2_API StartConversationHandler();
5353
AWS_LEXRUNTIMEV2_API StartConversationHandler& operator=(const StartConversationHandler&) = default;
54+
AWS_LEXRUNTIMEV2_API StartConversationHandler(const StartConversationHandler&) = default;
5455

5556
AWS_LEXRUNTIMEV2_API virtual void OnEvent() override;
5657

generated/src/aws-cpp-sdk-lexv2-runtime/include/aws/lexv2-runtime/model/StartConversationRequest.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ namespace Model
5050
/**
5151
* Underlying Event Stream Handler which is used to define callback functions.
5252
*/
53-
inline const StartConversationHandler& GetEventStreamHandler() const { return m_handler; }
53+
inline StartConversationHandler& GetEventStreamHandler() { return m_handler; }
5454

5555
/**
5656
* Underlying Event Stream Handler which is used to define callback functions.

generated/src/aws-cpp-sdk-lexv2-runtime/source/LexRuntimeV2Client.cpp

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <aws/core/utils/DNS.h>
1818
#include <aws/core/utils/logging/LogMacros.h>
1919
#include <aws/core/utils/logging/ErrorMacros.h>
20+
#include <aws/core/client/AWSClientEventStreamingAsyncTask.h>
2021
#include <aws/core/utils/event/EventStream.h>
2122

2223
#include <aws/lexv2-runtime/LexRuntimeV2Client.h>
@@ -502,29 +503,21 @@ void LexRuntimeV2Client::StartConversationAsync(Model::StartConversationRequest&
502503
endpointResolutionOutcome.GetResult().AddPathSegments("/sessions/");
503504
endpointResolutionOutcome.GetResult().AddPathSegment(request.GetSessionId());
504505
endpointResolutionOutcome.GetResult().AddPathSegments("/conversation");
505-
request.SetResponseStreamFactory(
506-
[&] { request.GetEventStreamDecoder().Reset(); return Aws::New<Aws::Utils::Event::EventDecoderStream>(ALLOCATION_TAG, request.GetEventStreamDecoder()); }
507-
);
508506

509507
auto eventEncoderStream = Aws::MakeShared<Model::StartConversationRequestEventStream>(ALLOCATION_TAG);
510508
eventEncoderStream->SetSigner(GetSignerByName(Aws::Auth::EVENTSTREAM_SIGV4_SIGNER));
511-
request.SetRequestEventStream(eventEncoderStream); // this becomes the body of the request
512-
auto sem = Aws::MakeShared<Aws::Utils::Threading::Semaphore>(ALLOCATION_TAG, 0, 1);
513-
request.SetRequestSignedHandler([eventEncoderStream, sem](const Aws::Http::HttpRequest& httpRequest) { eventEncoderStream->SetSignatureSeed(Aws::Client::GetAuthorizationHeader(httpRequest)); sem->ReleaseAll(); });
509+
auto requestCopy = Aws::MakeShared<StartConversationRequest>("StartConversation", request);
510+
requestCopy->SetRequestEventStream(eventEncoderStream); // this becomes the body of the request
511+
request.SetRequestEventStream(eventEncoderStream);
514512

515-
m_clientConfiguration.executor->Submit([this, endpointResolutionOutcome, &request, handler, handlerContext] () mutable {
516-
JsonOutcome outcome = MakeRequest(request, endpointResolutionOutcome.GetResult(), Aws::Http::HttpMethod::HTTP_POST, Aws::Auth::EVENTSTREAM_SIGV4_SIGNER);
517-
if(outcome.IsSuccess())
518-
{
519-
handler(this, request, StartConversationOutcome(NoResult()), handlerContext);
520-
}
521-
else
522-
{
523-
request.GetRequestEventStream()->Close();
524-
handler(this, request, StartConversationOutcome(outcome.GetError()), handlerContext);
525-
}
526-
return StartConversationOutcome(NoResult());
527-
});
513+
auto asyncTask = CreateBidirectionalEventStreamTask<StartConversationOutcome>(this,
514+
endpointResolutionOutcome.GetResultWithOwnership(),
515+
requestCopy,
516+
handler,
517+
handlerContext,
518+
eventEncoderStream);
519+
auto sem = asyncTask.GetSemaphore();
520+
m_clientConfiguration.executor->Submit(std::move(asyncTask));
528521
sem->WaitOne();
529-
streamReadyHandler(*request.GetRequestEventStream());
522+
streamReadyHandler(*eventEncoderStream);
530523
}

0 commit comments

Comments
 (0)