Skip to content

Commit b9150e6

Browse files
committed
impl(bigtable): improve query execution refresh and retry
1 parent 45fe5af commit b9150e6

12 files changed

+444
-154
lines changed

google/cloud/bigtable/ci/run_conformance_tests_proxy_bazel.sh

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -47,36 +47,14 @@ go test -v \
4747
-proxy_addr=:9999
4848
exit_status=$?
4949

50-
# Run all the ExecuteQuery tests that either work or we plan to skip such as
51-
# CloseClient
50+
# Run all the ExecuteQuery tests that either work or we plan to skip due to
51+
# unimplemented features or issues with the tests themselves.
5252
go test -v \
53-
-run "TestExecuteQuery|TestExecuteQuery_PlanRefresh$|TestExecuteQuery_PlanRefresh_WithMetadataChange|TestExecuteQuery_PlanRefresh_Retries|TestExecuteQuery_FailsOnSuccesfulStreamWithNoToken" \
54-
-skip "CloseClient|FailsOnInvalidType|FailsOnTypeMismatch|FailsOnTypeMismatchWithinMap|FailsOnTypeMismatchWithinArray|FailsOnTypeMismatchWithinStruct|FailsOnStructMissingField|TestExecuteQuery_PlanRefresh_AfterResumeTokenCausesError|TestExecuteQuery_RetryTest_WithPlanRefresh|TestExecuteQuery_PlanRefresh_RespectsDeadline|TestExecuteQuery_PlanRefresh_RecoversAfterPermanentError" \
53+
-run "TestExecuteQuery" \
54+
-skip "CloseClient|TestExecuteQuery_FailsOnInvalidType|TestExecuteQuery_PlanRefresh_RespectsDeadline" \
5555
-proxy_addr=:9999
5656
exit_status=$?
5757

58-
# These next four go test commands group the currently failing ExecuteQuery
59-
# tests into groups that exercise similar functionality and should be worked on
60-
# together.
61-
62-
# Metadata tests b/461232934
63-
#go test -v \
64-
# -run "FailsOnInvalidType" \
65-
# -proxy_addr=:9999
66-
#exit_status=$?
67-
68-
# Response/Metadata mismatches b/461233335
69-
go test -v \
70-
-run "FailsOnTypeMismatch|FailsOnTypeMismatchWithinMap|FailsOnTypeMismatchWithinArray|FailsOnTypeMismatchWithinStruct|FailsOnStructMissingField" \
71-
-proxy_addr=:9999
72-
exit_status=$?
73-
74-
# QueryPlan refresh tests b/461233613
75-
#go test -v \
76-
# -run "RetryTest_WithPlanRefresh|PlanRefresh|PlanRefresh_RecoversAfterPermanentError" \
77-
# -proxy_addr=:9999
78-
#exit_status=$?
79-
8058
# Remove the entire module cache, including unpacked source code of versioned
8159
# dependencies.
8260
go clean -modcache

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 154 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
#include "google/cloud/internal/random.h"
4040
#include "google/cloud/internal/retry_loop.h"
4141
#include "google/cloud/internal/streaming_read_rpc.h"
42-
#include "absl/functional/bind_front.h"
4342
#include <memory>
4443
#include <string>
4544

@@ -59,9 +58,16 @@ inline std::unique_ptr<bigtable::DataRetryPolicy> retry_policy(
5958
}
6059

6160
inline std::unique_ptr<bigtable::DataRetryPolicy>
62-
query_plan_refresh_retry_policy(Options const& options) {
61+
execute_query_plan_refresh_retry_policy(Options const& options) {
6362
return options
64-
.get<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>()
63+
.get<bigtable::experimental::ExecuteQueryPlanRefreshRetryPolicyOption>()
64+
->clone();
65+
}
66+
67+
inline std::unique_ptr<bigtable::DataRetryPolicy>
68+
query_plan_refresh_function_retry_policy(Options const& options) {
69+
return options
70+
.get<bigtable::experimental::QueryPlanRefreshFunctionRetryPolicyOption>()
6571
->clone();
6672
}
6773

@@ -111,10 +117,6 @@ bool IsStatusMetadataIndicatingRetryPolicyExhausted(Status const& status) {
111117
"retry-policy-exhausted"));
112118
}
113119

114-
bool IsStatusIndicatingInternalError(Status const& status) {
115-
return status.code() == StatusCode::kInternal;
116-
}
117-
118120
class DefaultPartialResultSetReader
119121
: public bigtable_internal::PartialResultSetReader {
120122
public:
@@ -760,7 +762,7 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
760762
auto const* func = __func__;
761763
auto refresh_fn = [this, request, func]() mutable {
762764
auto current = google::cloud::internal::SaveCurrentOptions();
763-
auto retry = retry_policy(*current);
765+
auto retry = query_plan_refresh_function_retry_policy(*current);
764766
auto backoff = backoff_policy(*current);
765767
auto operation_context = operation_context_factory_->PrepareQuery(
766768
request.instance_name(), app_profile_id(*current));
@@ -843,7 +845,7 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
843845

844846
auto refresh_fn = [this, request, func]() mutable {
845847
auto current = google::cloud::internal::SaveCurrentOptions();
846-
auto retry = retry_policy(*current);
848+
auto retry = query_plan_refresh_function_retry_policy(*current);
847849
auto backoff = backoff_policy(*current);
848850
auto operation_context = operation_context_factory_->PrepareQuery(
849851
request.instance_name(), app_profile_id(*current));
@@ -882,6 +884,140 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
882884
});
883885
}
884886

887+
class QueryPlanRefreshingPartialResultSource
888+
: public bigtable::ResultSourceInterface {
889+
public:
890+
using SourceFactoryFn =
891+
std::function<StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>>(
892+
google::bigtable::v2::ExecuteQueryRequest& request,
893+
google::bigtable::v2::ResultSetMetadata metadata,
894+
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_prototype,
895+
std::unique_ptr<BackoffPolicy> backoff_policy_prototype,
896+
std::shared_ptr<OperationContext> const& operation_context)>;
897+
static std::unique_ptr<QueryPlanRefreshingPartialResultSource> Create(
898+
google::bigtable::v2::ExecuteQueryRequest request,
899+
SourceFactoryFn source_factory, std::shared_ptr<QueryPlan> query_plan,
900+
std::shared_ptr<OperationContext> operation_context,
901+
std::unique_ptr<BackoffPolicy> backoff_policy,
902+
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
903+
std::unique_ptr<bigtable::DataRetryPolicy>
904+
query_plan_refresh_retry_policy,
905+
internal::ImmutableOptions options) {
906+
auto foo = std::unique_ptr<QueryPlanRefreshingPartialResultSource>(
907+
new QueryPlanRefreshingPartialResultSource(
908+
std::move(request), std::move(source_factory),
909+
std::move(query_plan), std::move(operation_context),
910+
std::move(backoff_policy), std::move(retry_policy),
911+
std::move(query_plan_refresh_retry_policy), std::move(options)));
912+
return foo;
913+
}
914+
915+
~QueryPlanRefreshingPartialResultSource() override = default;
916+
917+
StatusOr<bigtable::QueryRow> NextRow() override {
918+
StatusOr<bigtable::QueryRow> row;
919+
do {
920+
if (!query_plan_valid_) source_ = absl::nullopt;
921+
if (!source_.has_value() || !source_->ok()) {
922+
UpdateSource();
923+
}
924+
row = (**source_)->NextRow();
925+
if (ExecuteQueryPlanRefreshRetry::IsQueryPlanExpired(row.status())) {
926+
query_plan_valid_ = false;
927+
query_plan_->Invalidate(row.status(),
928+
query_plan_data_->prepared_query());
929+
}
930+
} while (!query_plan_refresh_retry_policy_->IsExhausted() &&
931+
ExecuteQueryPlanRefreshRetry::IsQueryPlanExpired(row.status()));
932+
return row;
933+
}
934+
935+
absl::optional<google::bigtable::v2::ResultSetMetadata> Metadata() override {
936+
if (!source_.has_value()) return absl::nullopt;
937+
return (**source_)->Metadata();
938+
}
939+
940+
private:
941+
QueryPlanRefreshingPartialResultSource(
942+
google::bigtable::v2::ExecuteQueryRequest request,
943+
SourceFactoryFn source_factory, std::shared_ptr<QueryPlan> query_plan,
944+
std::shared_ptr<OperationContext> operation_context,
945+
std::unique_ptr<BackoffPolicy> backoff_policy,
946+
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
947+
std::unique_ptr<bigtable::DataRetryPolicy>
948+
query_plan_refresh_retry_policy,
949+
internal::ImmutableOptions options)
950+
: request_(std::move(request)),
951+
source_factory_(std::move(source_factory)),
952+
query_plan_(std::move(query_plan)),
953+
operation_context_(std::move(operation_context)),
954+
backoff_policy_(std::move(backoff_policy)),
955+
retry_policy_(std::move(retry_policy)),
956+
query_plan_refresh_retry_policy_(
957+
std::move(query_plan_refresh_retry_policy)),
958+
query_plan_backoff_policy_(backoff_policy_->clone()),
959+
options_(std::move(options)) {}
960+
961+
void UpdateSource() {
962+
internal::OptionsSpan options_span(options_);
963+
while (!query_plan_refresh_retry_policy_->IsExhausted()) {
964+
query_plan_data_ = query_plan_->response();
965+
if (query_plan_data_.ok()) {
966+
query_plan_valid_ = true;
967+
request_.set_prepared_query(query_plan_data_->prepared_query());
968+
source_ = source_factory_(
969+
request_, query_plan_data_->metadata(),
970+
options_->get<bigtable::DataRetryPolicyOption>()->clone(),
971+
options_->get<bigtable::DataBackoffPolicyOption>()->clone(),
972+
operation_context_);
973+
if (source_->ok()) return;
974+
975+
last_status_ = source_->status();
976+
if (ExecuteQueryPlanRefreshRetry::IsQueryPlanExpired(
977+
source_->status())) {
978+
query_plan_valid_ = false;
979+
query_plan_->Invalidate(source_->status(),
980+
query_plan_data_->prepared_query());
981+
}
982+
if (IsStatusMetadataIndicatingRetryPolicyExhausted(source_->status())) {
983+
source_ = std::make_unique<StatusOnlyResultSetSource>(last_status_);
984+
return;
985+
}
986+
} else {
987+
last_status_ = query_plan_data_.status();
988+
}
989+
990+
auto delay = internal::Backoff(
991+
last_status_, __func__, *query_plan_refresh_retry_policy_,
992+
*query_plan_backoff_policy_, Idempotency::kIdempotent,
993+
false /* enable_server_retries */);
994+
if (!delay) break;
995+
std::this_thread::sleep_for(*delay);
996+
}
997+
auto retry_loop_error = internal::RetryLoopError(
998+
last_status_, __func__,
999+
query_plan_refresh_retry_policy_->IsExhausted());
1000+
source_ = std::make_unique<StatusOnlyResultSetSource>(retry_loop_error);
1001+
last_status_ = retry_loop_error;
1002+
}
1003+
1004+
google::bigtable::v2::ExecuteQueryRequest request_;
1005+
SourceFactoryFn source_factory_;
1006+
std::shared_ptr<QueryPlan> query_plan_;
1007+
std::shared_ptr<OperationContext> operation_context_;
1008+
std::unique_ptr<BackoffPolicy> backoff_policy_;
1009+
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_;
1010+
std::unique_ptr<bigtable::DataRetryPolicy> query_plan_refresh_retry_policy_;
1011+
std::unique_ptr<BackoffPolicy> query_plan_backoff_policy_;
1012+
internal::ImmutableOptions options_;
1013+
1014+
absl::optional<StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>>>
1015+
source_;
1016+
StatusOr<google::bigtable::v2::PrepareQueryResponse> query_plan_data_;
1017+
bool query_plan_valid_ = false;
1018+
Status last_status_;
1019+
};
1020+
8851021
bigtable::RowStream DataConnectionImpl::ExecuteQuery(
8861022
bigtable::ExecuteQueryParams params) {
8871023
auto current = google::cloud::internal::SaveCurrentOptions();
@@ -892,7 +1028,7 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
8921028
auto const tracing_enabled = RpcStreamTracingEnabled();
8931029
auto const& tracing_options = RpcTracingOptions();
8941030

895-
auto retry_resume_fn =
1031+
auto source_fn =
8961032
[stub = stub_, tracing_enabled, tracing_options](
8971033
google::bigtable::v2::ExecuteQueryRequest& request,
8981034
google::bigtable::v2::ResultSetMetadata metadata,
@@ -930,56 +1066,15 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
9301066
auto operation_context = operation_context_factory_->ExecuteQuery(
9311067
request.instance_name(), app_profile_id(*current));
9321068

933-
auto query_plan = params.bound_query.query_plan_;
934-
auto query_plan_retry_policy = query_plan_refresh_retry_policy(*current);
935-
auto query_plan_backoff_policy = backoff_policy(*current);
936-
Status last_status;
937-
938-
// TODO(sdhart): OperationContext needs to be plumbed through the QueryPlan
939-
// refresh_fn so that it's shared with the ExecuteQuery attempts.
940-
while (!query_plan_retry_policy->IsExhausted()) {
941-
// Snapshot query_plan data.
942-
// This access could cause a query plan refresh to occur.
943-
StatusOr<google::bigtable::v2::PrepareQueryResponse> query_plan_data =
944-
query_plan->response();
945-
946-
if (query_plan_data.ok()) {
947-
request.set_prepared_query(query_plan_data->prepared_query());
948-
auto source = retry_resume_fn(
949-
request, query_plan_data->metadata(), retry_policy(*current),
950-
backoff_policy(*current), operation_context);
951-
if (source.ok()) {
952-
return bigtable::RowStream(*std::move(source));
953-
}
954-
last_status = source.status();
955-
956-
if (IsStatusIndicatingInternalError(source.status())) {
957-
return bigtable::RowStream(std::make_unique<StatusOnlyResultSetSource>(
958-
std::move(last_status)));
959-
}
1069+
auto query_plan_refreshing_source =
1070+
QueryPlanRefreshingPartialResultSource::Create(
1071+
std::move(request), std::move(source_fn),
1072+
std::move(params.bound_query.query_plan_),
1073+
std::move(operation_context), backoff_policy(*current),
1074+
retry_policy(*current),
1075+
execute_query_plan_refresh_retry_policy(*current), current);
9601076

961-
if (QueryPlanRefreshRetry::IsQueryPlanExpired(source.status())) {
962-
query_plan->Invalidate(source.status(),
963-
query_plan_data->prepared_query());
964-
}
965-
if (IsStatusMetadataIndicatingRetryPolicyExhausted(source.status())) {
966-
return bigtable::RowStream(std::make_unique<StatusOnlyResultSetSource>(
967-
std::move(last_status)));
968-
}
969-
} else {
970-
last_status = query_plan_data.status();
971-
}
972-
973-
auto delay =
974-
internal::Backoff(last_status, __func__, *query_plan_retry_policy,
975-
*query_plan_backoff_policy, Idempotency::kIdempotent,
976-
false /* enable_server_retries */);
977-
if (!delay) break;
978-
std::this_thread::sleep_for(*delay);
979-
}
980-
return bigtable::RowStream(
981-
std::make_unique<StatusOnlyResultSetSource>(internal::RetryLoopError(
982-
last_status, __func__, query_plan_retry_policy->IsExhausted())));
1077+
return bigtable::RowStream(std::move(query_plan_refreshing_source));
9831078
}
9841079

9851080
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/internal/data_connection_impl_test.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,10 @@ DataLimitedErrorCountRetryPolicy TestRetryPolicy() {
212212
return DataLimitedErrorCountRetryPolicy(kNumRetries);
213213
}
214214

215-
bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy
215+
bigtable::experimental::ExecuteQueryPlanRefreshLimitedErrorCountRetryPolicy
216216
TestQueryPlanRefreshRetryPolicy() {
217-
return bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy(
218-
kNumRetries);
217+
return bigtable::experimental::
218+
ExecuteQueryPlanRefreshLimitedErrorCountRetryPolicy(kNumRetries);
219219
}
220220

221221
ExponentialBackoffPolicy TestBackoffPolicy() {
@@ -255,7 +255,8 @@ Options CallOptionsWithoutClientContextSetup() {
255255
Options{}
256256
.set<bigtable::AppProfileIdOption>(kAppProfile)
257257
.set<DataRetryPolicyOption>(TestRetryPolicy().clone())
258-
.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
258+
.set<
259+
bigtable::experimental::ExecuteQueryPlanRefreshRetryPolicyOption>(
259260
TestQueryPlanRefreshRetryPolicy().clone())
260261
.set<DataBackoffPolicyOption>(TestBackoffPolicy().clone()));
261262
}

google/cloud/bigtable/internal/defaults.cc

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,21 @@ Options DefaultDataOptions(Options opts) {
251251
kBigtableLimits.maximum_retry_period)
252252
.clone());
253253
}
254-
if (!opts.has<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>()) {
255-
opts.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
256-
bigtable::experimental::QueryPlanRefreshLimitedTimeRetryPolicy(
254+
if (!opts.has<
255+
bigtable::experimental::ExecuteQueryPlanRefreshRetryPolicyOption>()) {
256+
opts.set<bigtable::experimental::ExecuteQueryPlanRefreshRetryPolicyOption>(
257+
bigtable::experimental::ExecuteQueryPlanRefreshLimitedTimeRetryPolicy(
257258
kBigtableLimits.maximum_retry_period)
258259
.clone());
259260
}
261+
if (!opts.has<bigtable::experimental::
262+
QueryPlanRefreshFunctionRetryPolicyOption>()) {
263+
opts.set<bigtable::experimental::QueryPlanRefreshFunctionRetryPolicyOption>(
264+
bigtable::experimental::QueryPlanRefreshFunctionLimitedTimeRetryPolicy(
265+
kBigtableLimits.maximum_retry_period)
266+
.clone());
267+
}
268+
260269
if (!opts.has<bigtable::DataBackoffPolicyOption>()) {
261270
opts.set<bigtable::DataBackoffPolicyOption>(
262271
ExponentialBackoffPolicy(kBigtableLimits.initial_delay / 2,

0 commit comments

Comments
 (0)