3939#include " google/cloud/internal/random.h"
4040#include " google/cloud/internal/retry_loop.h"
4141#include " google/cloud/internal/streaming_read_rpc.h"
42- #include " absl/functional/bind_front.h"
4342#include < memory>
4443#include < string>
4544
@@ -59,9 +58,16 @@ inline std::unique_ptr<bigtable::DataRetryPolicy> retry_policy(
5958}
6059
6160inline std::unique_ptr<bigtable::DataRetryPolicy>
62- query_plan_refresh_retry_policy (Options const & options) {
61+ execute_query_plan_refresh_retry_policy (Options const & options) {
6362 return options
64- .get <bigtable::experimental::QueryPlanRefreshRetryPolicyOption>()
63+ .get <bigtable::experimental::ExecuteQueryPlanRefreshRetryPolicyOption>()
64+ ->clone ();
65+ }
66+
67+ inline std::unique_ptr<bigtable::DataRetryPolicy>
68+ query_plan_refresh_function_retry_policy (Options const & options) {
69+ return options
70+ .get <bigtable::experimental::QueryPlanRefreshFunctionRetryPolicyOption>()
6571 ->clone ();
6672}
6773
@@ -111,10 +117,6 @@ bool IsStatusMetadataIndicatingRetryPolicyExhausted(Status const& status) {
111117 " retry-policy-exhausted" ));
112118}
113119
114- bool IsStatusIndicatingInternalError (Status const & status) {
115- return status.code () == StatusCode::kInternal ;
116- }
117-
118120class DefaultPartialResultSetReader
119121 : public bigtable_internal::PartialResultSetReader {
120122 public:
@@ -760,7 +762,7 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
760762 auto const * func = __func__;
761763 auto refresh_fn = [this , request, func]() mutable {
762764 auto current = google::cloud::internal::SaveCurrentOptions ();
763- auto retry = retry_policy (*current);
765+ auto retry = query_plan_refresh_function_retry_policy (*current);
764766 auto backoff = backoff_policy (*current);
765767 auto operation_context = operation_context_factory_->PrepareQuery (
766768 request.instance_name (), app_profile_id (*current));
@@ -843,7 +845,7 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
843845
844846 auto refresh_fn = [this , request, func]() mutable {
845847 auto current = google::cloud::internal::SaveCurrentOptions ();
846- auto retry = retry_policy (*current);
848+ auto retry = query_plan_refresh_function_retry_policy (*current);
847849 auto backoff = backoff_policy (*current);
848850 auto operation_context = operation_context_factory_->PrepareQuery (
849851 request.instance_name (), app_profile_id (*current));
@@ -882,6 +884,139 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
882884 });
883885}
884886
887+ class QueryPlanRefreshingPartialResultSource
888+ : public bigtable::ResultSourceInterface {
889+ public:
890+ using SourceFactoryFn =
891+ std::function<StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>>(
892+ google::bigtable::v2::ExecuteQueryRequest& request,
893+ google::bigtable::v2::ResultSetMetadata metadata,
894+ std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_prototype,
895+ std::unique_ptr<BackoffPolicy> backoff_policy_prototype,
896+ std::shared_ptr<OperationContext> const & operation_context)>;
897+ static std::unique_ptr<QueryPlanRefreshingPartialResultSource> Create (
898+ google::bigtable::v2::ExecuteQueryRequest request,
899+ SourceFactoryFn source_factory, std::shared_ptr<QueryPlan> query_plan,
900+ std::shared_ptr<OperationContext> operation_context,
901+ std::unique_ptr<BackoffPolicy> backoff_policy,
902+ std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
903+ std::unique_ptr<bigtable::DataRetryPolicy>
904+ query_plan_refresh_retry_policy,
905+ internal::ImmutableOptions options) {
906+ return std::unique_ptr<QueryPlanRefreshingPartialResultSource>(
907+ new QueryPlanRefreshingPartialResultSource (
908+ std::move (request), std::move (source_factory),
909+ std::move (query_plan), std::move (operation_context),
910+ std::move (backoff_policy), std::move (retry_policy),
911+ std::move (query_plan_refresh_retry_policy), std::move (options)));
912+ }
913+
914+ ~QueryPlanRefreshingPartialResultSource () override = default ;
915+
916+ StatusOr<bigtable::QueryRow> NextRow () override {
917+ StatusOr<bigtable::QueryRow> row;
918+ do {
919+ if (!query_plan_valid_) source_ = absl::nullopt ;
920+ if (!source_.has_value () || !source_->ok ()) {
921+ UpdateSource ();
922+ }
923+ row = (**source_)->NextRow ();
924+ if (ExecuteQueryPlanRefreshRetry::IsQueryPlanExpired (row.status ())) {
925+ query_plan_valid_ = false ;
926+ query_plan_->Invalidate (row.status (),
927+ query_plan_data_->prepared_query ());
928+ }
929+ } while (!query_plan_refresh_retry_policy_->IsExhausted () &&
930+ ExecuteQueryPlanRefreshRetry::IsQueryPlanExpired (row.status ()));
931+ return row;
932+ }
933+
934+ absl::optional<google::bigtable::v2::ResultSetMetadata> Metadata () override {
935+ if (!source_.has_value ()) return absl::nullopt ;
936+ return (**source_)->Metadata ();
937+ }
938+
939+ private:
940+ QueryPlanRefreshingPartialResultSource (
941+ google::bigtable::v2::ExecuteQueryRequest request,
942+ SourceFactoryFn source_factory, std::shared_ptr<QueryPlan> query_plan,
943+ std::shared_ptr<OperationContext> operation_context,
944+ std::unique_ptr<BackoffPolicy> backoff_policy,
945+ std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
946+ std::unique_ptr<bigtable::DataRetryPolicy>
947+ query_plan_refresh_retry_policy,
948+ internal::ImmutableOptions options)
949+ : request_(std::move(request)),
950+ source_factory_ (std::move(source_factory)),
951+ query_plan_(std::move(query_plan)),
952+ operation_context_(std::move(operation_context)),
953+ backoff_policy_(std::move(backoff_policy)),
954+ retry_policy_(std::move(retry_policy)),
955+ query_plan_refresh_retry_policy_(
956+ std::move (query_plan_refresh_retry_policy)),
957+ query_plan_backoff_policy_(backoff_policy_->clone ()),
958+ options_(std::move(options)) {}
959+
960+ void UpdateSource () {
961+ internal::OptionsSpan options_span (options_);
962+ while (!query_plan_refresh_retry_policy_->IsExhausted ()) {
963+ query_plan_data_ = query_plan_->response ();
964+ if (query_plan_data_.ok ()) {
965+ query_plan_valid_ = true ;
966+ request_.set_prepared_query (query_plan_data_->prepared_query ());
967+ source_ = source_factory_ (
968+ request_, query_plan_data_->metadata (),
969+ options_->get <bigtable::DataRetryPolicyOption>()->clone (),
970+ options_->get <bigtable::DataBackoffPolicyOption>()->clone (),
971+ operation_context_);
972+ if (source_->ok ()) return ;
973+
974+ last_status_ = source_->status ();
975+ if (ExecuteQueryPlanRefreshRetry::IsQueryPlanExpired (
976+ source_->status ())) {
977+ query_plan_valid_ = false ;
978+ query_plan_->Invalidate (source_->status (),
979+ query_plan_data_->prepared_query ());
980+ }
981+ if (IsStatusMetadataIndicatingRetryPolicyExhausted (source_->status ())) {
982+ source_ = std::make_unique<StatusOnlyResultSetSource>(last_status_);
983+ return ;
984+ }
985+ } else {
986+ last_status_ = query_plan_data_.status ();
987+ }
988+
989+ auto delay = internal::Backoff (
990+ last_status_, __func__, *query_plan_refresh_retry_policy_,
991+ *query_plan_backoff_policy_, Idempotency::kIdempotent ,
992+ false /* enable_server_retries */ );
993+ if (!delay) break ;
994+ std::this_thread::sleep_for (*delay);
995+ }
996+ auto retry_loop_error = internal::RetryLoopError (
997+ last_status_, __func__,
998+ query_plan_refresh_retry_policy_->IsExhausted ());
999+ source_ = std::make_unique<StatusOnlyResultSetSource>(retry_loop_error);
1000+ last_status_ = retry_loop_error;
1001+ }
1002+
1003+ google::bigtable::v2::ExecuteQueryRequest request_;
1004+ SourceFactoryFn source_factory_;
1005+ std::shared_ptr<QueryPlan> query_plan_;
1006+ std::shared_ptr<OperationContext> operation_context_;
1007+ std::unique_ptr<BackoffPolicy> backoff_policy_;
1008+ std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_;
1009+ std::unique_ptr<bigtable::DataRetryPolicy> query_plan_refresh_retry_policy_;
1010+ std::unique_ptr<BackoffPolicy> query_plan_backoff_policy_;
1011+ internal::ImmutableOptions options_;
1012+
1013+ absl::optional<StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>>>
1014+ source_;
1015+ StatusOr<google::bigtable::v2::PrepareQueryResponse> query_plan_data_;
1016+ bool query_plan_valid_ = false ;
1017+ Status last_status_;
1018+ };
1019+
8851020bigtable::RowStream DataConnectionImpl::ExecuteQuery (
8861021 bigtable::ExecuteQueryParams params) {
8871022 auto current = google::cloud::internal::SaveCurrentOptions ();
@@ -892,7 +1027,7 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
8921027 auto const tracing_enabled = RpcStreamTracingEnabled ();
8931028 auto const & tracing_options = RpcTracingOptions ();
8941029
895- auto retry_resume_fn =
1030+ auto source_fn =
8961031 [stub = stub_, tracing_enabled, tracing_options](
8971032 google::bigtable::v2::ExecuteQueryRequest& request,
8981033 google::bigtable::v2::ResultSetMetadata metadata,
@@ -930,56 +1065,15 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
9301065 auto operation_context = operation_context_factory_->ExecuteQuery (
9311066 request.instance_name (), app_profile_id (*current));
9321067
933- auto query_plan = params.bound_query .query_plan_ ;
934- auto query_plan_retry_policy = query_plan_refresh_retry_policy (*current);
935- auto query_plan_backoff_policy = backoff_policy (*current);
936- Status last_status;
937-
938- // TODO(sdhart): OperationContext needs to be plumbed through the QueryPlan
939- // refresh_fn so that it's shared with the ExecuteQuery attempts.
940- while (!query_plan_retry_policy->IsExhausted ()) {
941- // Snapshot query_plan data.
942- // This access could cause a query plan refresh to occur.
943- StatusOr<google::bigtable::v2::PrepareQueryResponse> query_plan_data =
944- query_plan->response ();
945-
946- if (query_plan_data.ok ()) {
947- request.set_prepared_query (query_plan_data->prepared_query ());
948- auto source = retry_resume_fn (
949- request, query_plan_data->metadata (), retry_policy (*current),
950- backoff_policy (*current), operation_context);
951- if (source.ok ()) {
952- return bigtable::RowStream (*std::move (source));
953- }
954- last_status = source.status ();
955-
956- if (IsStatusIndicatingInternalError (source.status ())) {
957- return bigtable::RowStream (std::make_unique<StatusOnlyResultSetSource>(
958- std::move (last_status)));
959- }
1068+ auto query_plan_refreshing_source =
1069+ QueryPlanRefreshingPartialResultSource::Create (
1070+ std::move (request), std::move (source_fn),
1071+ std::move (params.bound_query .query_plan_ ),
1072+ std::move (operation_context), backoff_policy (*current),
1073+ retry_policy (*current),
1074+ execute_query_plan_refresh_retry_policy (*current), current);
9601075
961- if (QueryPlanRefreshRetry::IsQueryPlanExpired (source.status ())) {
962- query_plan->Invalidate (source.status (),
963- query_plan_data->prepared_query ());
964- }
965- if (IsStatusMetadataIndicatingRetryPolicyExhausted (source.status ())) {
966- return bigtable::RowStream (std::make_unique<StatusOnlyResultSetSource>(
967- std::move (last_status)));
968- }
969- } else {
970- last_status = query_plan_data.status ();
971- }
972-
973- auto delay =
974- internal::Backoff (last_status, __func__, *query_plan_retry_policy,
975- *query_plan_backoff_policy, Idempotency::kIdempotent ,
976- false /* enable_server_retries */ );
977- if (!delay) break ;
978- std::this_thread::sleep_for (*delay);
979- }
980- return bigtable::RowStream (
981- std::make_unique<StatusOnlyResultSetSource>(internal::RetryLoopError (
982- last_status, __func__, query_plan_retry_policy->IsExhausted ())));
1076+ return bigtable::RowStream (std::move (query_plan_refreshing_source));
9831077}
9841078
9851079GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
0 commit comments