3232#include " google/cloud/background_threads.h"
3333#include " google/cloud/grpc_options.h"
3434#include " google/cloud/idempotency.h"
35+ #include " google/cloud/internal/algorithm.h"
3536#include " google/cloud/internal/async_retry_loop.h"
3637#include " google/cloud/internal/make_status.h"
3738#include " google/cloud/internal/random.h"
@@ -102,14 +103,25 @@ bigtable::RowReader ReadRowsHelper(
102103 return MakeRowReader (std::move (impl));
103104}
104105
106+ bool IsStatusMetadataIndicatingRetryPolicyExhausted (Status const & status) {
107+ return internal::Contains (
108+ status.error_info ().metadata (),
109+ std::pair<std::string const , std::string>(" gcloud-cpp.retry.reason" ,
110+ " retry-policy-exhausted" ));
111+ }
112+
105113class DefaultPartialResultSetReader
106114 : public bigtable_internal::PartialResultSetReader {
107115 public:
108- DefaultPartialResultSetReader (std::shared_ptr<grpc::ClientContext> context,
109- std::unique_ptr<internal::StreamingReadRpc<
110- google::bigtable::v2::ExecuteQueryResponse>>
111- reader)
112- : context_(std::move(context)), reader_(std::move(reader)) {}
116+ DefaultPartialResultSetReader (
117+ std::shared_ptr<grpc::ClientContext> context,
118+ std::shared_ptr<OperationContext> operation_context,
119+ std::unique_ptr<internal::StreamingReadRpc<
120+ google::bigtable::v2::ExecuteQueryResponse>>
121+ reader)
122+ : context_(std::move(context)),
123+ operation_context_ (std::move(operation_context)),
124+ reader_(std::move(reader)) {}
113125
114126 ~DefaultPartialResultSetReader () override = default ;
115127
@@ -123,6 +135,7 @@ class DefaultPartialResultSetReader
123135
124136 if (status.has_value ()) {
125137 // Stream has ended or an error occurred.
138+ operation_context_->PostCall (*context_, status.value ());
126139 final_status_ = *std::move (status);
127140 return false ;
128141 }
@@ -144,19 +157,18 @@ class DefaultPartialResultSetReader
144157 final_status_ = google::cloud::Status (
145158 google::cloud::StatusCode::kInternal ,
146159 " Empty ExecuteQueryResponse received from stream" );
160+ operation_context_->PostCall (*context_, final_status_);
147161 return false ;
148162 }
149163 }
150164
151165 grpc::ClientContext const & context () const override { return *context_; }
152166
153- Status Finish () override {
154- // operation_context_->OnDone(final_status_);
155- return final_status_;
156- }
167+ Status Finish () override { return final_status_; }
157168
158169 private:
159170 std::shared_ptr<grpc::ClientContext> context_;
171+ std::shared_ptr<OperationContext> operation_context_;
160172 std::unique_ptr<
161173 internal::StreamingReadRpc<google::bigtable::v2::ExecuteQueryResponse>>
162174 reader_;
@@ -743,17 +755,33 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
743755 auto refresh_fn = [this , request, current, func]() mutable {
744756 auto retry = retry_policy (*current);
745757 auto backoff = backoff_policy (*current);
758+ auto operation_context = operation_context_factory_->PrepareQuery (
759+ request.instance_name (), app_profile_id (*current));
746760 return google::cloud::internal::AsyncRetryLoop (
747- std::move (retry), std::move (backoff), Idempotency::kIdempotent ,
748- background_->cq (),
749- [this ](CompletionQueue& cq,
750- std::shared_ptr<grpc::ClientContext> context,
751- google::cloud::internal::ImmutableOptions options,
752- google::bigtable::v2::PrepareQueryRequest const & request) {
753- return stub_->AsyncPrepareQuery (cq, std::move (context),
754- std::move (options), request);
755- },
756- std::move (current), request, func);
761+ std::move (retry), std::move (backoff), Idempotency::kIdempotent ,
762+ background_->cq (),
763+ [this , operation_context](
764+ CompletionQueue& cq,
765+ std::shared_ptr<grpc::ClientContext> context,
766+ google::cloud::internal::ImmutableOptions options,
767+ google::bigtable::v2::PrepareQueryRequest const & request) {
768+ operation_context->PreCall (*context);
769+ auto f = stub_->AsyncPrepareQuery (cq, context,
770+ std::move (options), request);
771+ return f.then (
772+ [operation_context, context = std::move (context)](auto f) {
773+ auto s = f.get ();
774+ operation_context->PostCall (*context, s.status ());
775+ return s;
776+ });
777+ },
778+ std::move (current), request, func)
779+ .then ([operation_context](auto f) mutable {
780+ StatusOr<google::bigtable::v2::PrepareQueryResponse> response =
781+ f.get ();
782+ operation_context->OnDone (response.status ());
783+ return response;
784+ });
757785 };
758786 auto query_plan = QueryPlan::Create (background_->cq (), *std::move (response),
759787 std::move (refresh_fn));
@@ -809,17 +837,34 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
809837 auto refresh_fn = [this , request, current, func]() mutable {
810838 auto retry = retry_policy (*current);
811839 auto backoff = backoff_policy (*current);
840+ auto operation_context = operation_context_factory_->PrepareQuery (
841+ request.instance_name (), app_profile_id (*current));
812842 return google::cloud::internal::AsyncRetryLoop (
813- std::move (retry), std::move (backoff), Idempotency::kIdempotent ,
814- background_->cq (),
815- [this ](CompletionQueue& cq,
816- std::shared_ptr<grpc::ClientContext> context,
817- google::cloud::internal::ImmutableOptions options,
818- google::bigtable::v2::PrepareQueryRequest const & request) {
819- return stub_->AsyncPrepareQuery (cq, std::move (context),
820- std::move (options), request);
821- },
822- std::move (current), request, func);
843+ std::move (retry), std::move (backoff),
844+ Idempotency::kIdempotent , background_->cq (),
845+ [this , operation_context](
846+ CompletionQueue& cq,
847+ std::shared_ptr<grpc::ClientContext> context,
848+ google::cloud::internal::ImmutableOptions options,
849+ google::bigtable::v2::PrepareQueryRequest const &
850+ request) {
851+ operation_context->PreCall (*context);
852+ auto f = stub_->AsyncPrepareQuery (
853+ cq, context, std::move (options), request);
854+ return f.then ([operation_context,
855+ context = std::move (context)](auto f) {
856+ auto s = f.get ();
857+ operation_context->PostCall (*context, s.status ());
858+ return s;
859+ });
860+ },
861+ std::move (current), request, func)
862+ .then ([operation_context](auto f) mutable {
863+ StatusOr<google::bigtable::v2::PrepareQueryResponse> response =
864+ f.get ();
865+ operation_context->OnDone (response.status ());
866+ return response;
867+ });
823868 };
824869
825870 auto query_plan = QueryPlan::Create (background_->cq (),
@@ -854,26 +899,28 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
854899 auto context = std::make_shared<grpc::ClientContext>();
855900 auto const & options = internal::CurrentOptions ();
856901 internal::ConfigureContext (*context, options);
902+ operation_context->PreCall (*context);
857903 auto stream = stub->ExecuteQuery (context, options, request);
858904 std::unique_ptr<PartialResultSetReader> reader =
859905 std::make_unique<DefaultPartialResultSetReader>(
860- std::move (context), std::move (stream));
906+ std::move (context), operation_context, std::move (stream));
861907 if (tracing_enabled) {
862908 reader = std::make_unique<LoggingResultSetReader>(std::move (reader),
863909 tracing_options);
864910 }
865911 return reader;
866912 };
867913
868- auto rpc = std::make_unique<PartialResultSetResume>(
914+ auto resume = std::make_unique<PartialResultSetResume>(
869915 std::move (factory), Idempotency::kIdempotent ,
870916 retry_policy_prototype->clone (), backoff_policy_prototype->clone ());
871917
872918 return PartialResultSetSource::Create (std::move (metadata),
873- operation_context, std::move (rpc ));
919+ operation_context, std::move (resume ));
874920 };
875921
876- auto operation_context = std::make_shared<OperationContext>();
922+ auto operation_context = operation_context_factory_->ExecuteQuery (
923+ request.instance_name (), app_profile_id (*current));
877924
878925 auto query_plan = params.bound_query .query_plan_ ;
879926 auto query_plan_retry_policy = query_plan_refresh_retry_policy (*current);
@@ -890,18 +937,23 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
890937
891938 if (query_plan_data.ok ()) {
892939 request.set_prepared_query (query_plan_data->prepared_query ());
893- auto reader = retry_resume_fn (
940+ auto source = retry_resume_fn (
894941 request, query_plan_data->metadata (), retry_policy (*current),
895942 backoff_policy (*current), operation_context);
896- if (reader .ok ()) {
897- return bigtable::RowStream (*std::move (reader ));
943+ if (source .ok ()) {
944+ return bigtable::RowStream (*std::move (source ));
898945 }
946+ last_status = source.status ();
947+
899948 if (SafeGrpcRetryAllowingQueryPlanRefresh::IsQueryPlanExpired (
900- reader .status ())) {
901- query_plan->Invalidate (reader .status (),
949+ source .status ())) {
950+ query_plan->Invalidate (source .status (),
902951 query_plan_data->prepared_query ());
903952 }
904- last_status = reader.status ();
953+ if (IsStatusMetadataIndicatingRetryPolicyExhausted (source.status ())) {
954+ return bigtable::RowStream (std::make_unique<StatusOnlyResultSetSource>(
955+ std::move (last_status)));
956+ }
905957 } else {
906958 last_status = query_plan_data.status ();
907959 }
0 commit comments