Skip to content

Commit 50ed7f8

Browse files
authored
impl(bigtable): add query plan refresh policy and option (#15723)
1 parent 3a7a0bf commit 50ed7f8

File tree

7 files changed

+320
-33
lines changed

7 files changed

+320
-33
lines changed

google/cloud/bigtable/bound_query.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ namespace cloud {
1919
namespace bigtable {
2020
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2121

22-
StatusOr<google::bigtable::v2::PrepareQueryResponse> BoundQuery::response()
23-
const {
22+
StatusOr<google::bigtable::v2::PrepareQueryResponse> BoundQuery::response() {
2423
return query_plan_->response();
2524
}
2625

google/cloud/bigtable/bound_query.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class BoundQuery {
4848
BoundQuery& operator=(BoundQuery&&) = default;
4949

5050
// Accessors
51-
StatusOr<google::bigtable::v2::PrepareQueryResponse> response() const;
51+
StatusOr<google::bigtable::v2::PrepareQueryResponse> response();
5252
std::unordered_map<std::string, Value> const& parameters() const;
5353
InstanceResource const& instance() const;
5454

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ inline std::unique_ptr<bigtable::DataRetryPolicy> retry_policy(
5656
return options.get<bigtable::DataRetryPolicyOption>()->clone();
5757
}
5858

59+
inline std::unique_ptr<bigtable::DataRetryPolicy>
60+
query_plan_refresh_retry_policy(Options const& options) {
61+
return options
62+
.get<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>()
63+
->clone();
64+
}
65+
5966
inline std::unique_ptr<BackoffPolicy> backoff_policy(Options const& options) {
6067
return options.get<bigtable::DataBackoffPolicyOption>()->clone();
6168
}
@@ -817,23 +824,6 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
817824
});
818825
}
819826

820-
std::unique_ptr<PartialResultSetReader>
821-
DataConnectionImpl::CreateResumableReader(
822-
google::bigtable::v2::ExecuteQueryRequest request,
823-
std::string const& resume_token) {
824-
if (!resume_token.empty()) {
825-
request.set_resume_token(resume_token);
826-
}
827-
auto context = std::make_shared<grpc::ClientContext>();
828-
auto const& options = google::cloud::internal::CurrentOptions();
829-
830-
google::cloud::internal::ConfigureContext(*context, options);
831-
auto stream = stub_->ExecuteQuery(context, options, request);
832-
833-
return std::make_unique<DefaultPartialResultSetReader>(std::move(context),
834-
std::move(stream));
835-
}
836-
837827
bigtable::RowStream DataConnectionImpl::ExecuteQuery(
838828
bigtable::ExecuteQueryParams params) {
839829
auto current = google::cloud::internal::SaveCurrentOptions();
@@ -845,11 +835,12 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
845835
auto const& tracing_options = RpcTracingOptions();
846836

847837
auto retry_resume_fn =
848-
[stub = stub_, retry_policy_prototype = retry_policy(*current),
849-
backoff_policy_prototype = backoff_policy(*current), tracing_enabled,
850-
tracing_options](google::bigtable::v2::ExecuteQueryRequest& request,
851-
google::bigtable::v2::ResultSetMetadata metadata,
852-
std::shared_ptr<OperationContext> const&) mutable
838+
[stub = stub_, tracing_enabled, tracing_options](
839+
google::bigtable::v2::ExecuteQueryRequest& request,
840+
google::bigtable::v2::ResultSetMetadata metadata,
841+
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_prototype,
842+
std::unique_ptr<BackoffPolicy> backoff_policy_prototype,
843+
std::shared_ptr<OperationContext> const&) mutable
853844
-> StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>> {
854845
auto factory = [stub, request, tracing_enabled,
855846
tracing_options](std::string const& resume_token) mutable {
@@ -878,18 +869,23 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
878869
auto operation_context = std::make_shared<OperationContext>();
879870

880871
auto query_plan = params.bound_query.query_plan_;
881-
auto operation_retry_policy = retry_policy(*current);
882-
auto operation_backoff_policy = backoff_policy(*current);
872+
auto query_plan_retry_policy = query_plan_refresh_retry_policy(*current);
873+
auto query_plan_backoff_policy = backoff_policy(*current);
883874
Status last_status;
884875

885-
while (!operation_retry_policy->IsExhausted()) {
876+
// TODO(sdhart): OperationContext needs to be plumbed through the QueryPlan
877+
// refresh_fn so that it's shared with the ExecuteQuery attempts.
878+
while (!query_plan_retry_policy->IsExhausted()) {
879+
// Snapshot query_plan data.
880+
// This access could cause a query plan refresh to occur.
886881
StatusOr<google::bigtable::v2::PrepareQueryResponse> query_plan_data =
887882
query_plan->response();
888883

889884
if (query_plan_data.ok()) {
890885
request.set_prepared_query(query_plan_data->prepared_query());
891-
auto reader = retry_resume_fn(request, query_plan_data->metadata(),
892-
operation_context);
886+
auto reader = retry_resume_fn(
887+
request, query_plan_data->metadata(), retry_policy(*current),
888+
backoff_policy(*current), operation_context);
893889
if (reader.ok()) {
894890
return bigtable::RowStream(*std::move(reader));
895891
}
@@ -904,15 +900,15 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
904900
}
905901

906902
auto delay =
907-
internal::Backoff(last_status, __func__, *operation_retry_policy,
908-
*operation_backoff_policy, Idempotency::kIdempotent,
903+
internal::Backoff(last_status, __func__, *query_plan_retry_policy,
904+
*query_plan_backoff_policy, Idempotency::kIdempotent,
909905
false /* enable_server_retries */);
910906
if (!delay) break;
911907
std::this_thread::sleep_for(*delay);
912908
}
913909
return bigtable::RowStream(
914910
std::make_unique<StatusOnlyResultSetSource>(internal::RetryLoopError(
915-
last_status, __func__, operation_retry_policy->IsExhausted())));
911+
last_status, __func__, query_plan_retry_policy->IsExhausted())));
916912
}
917913

918914
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/internal/data_connection_impl_test.cc

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ Status PermanentError() {
9696
return Status(StatusCode::kPermissionDenied, "fail");
9797
}
9898

99+
Status QueryPlanError() {
100+
return Status(StatusCode::kFailedPrecondition,
101+
"oops! PREPARED_QUERY_EXPIRED");
102+
}
103+
99104
bigtable::SingleRowMutation IdempotentMutation(std::string const& row_key) {
100105
return bigtable::SingleRowMutation(
101106
row_key, {bigtable::SetCell("fam", "col", ms(0), "val")});
@@ -204,6 +209,12 @@ DataLimitedErrorCountRetryPolicy TestRetryPolicy() {
204209
return DataLimitedErrorCountRetryPolicy(kNumRetries);
205210
}
206211

212+
bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy
213+
TestQueryPlanRefreshRetryPolicy() {
214+
return bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy(
215+
kNumRetries);
216+
}
217+
207218
ExponentialBackoffPolicy TestBackoffPolicy() {
208219
return ExponentialBackoffPolicy(ms(0), ms(0), 2.0);
209220
}
@@ -239,6 +250,8 @@ Options CallOptionsWithoutClientContextSetup() {
239250
Options{}
240251
.set<bigtable::AppProfileIdOption>(kAppProfile)
241252
.set<DataRetryPolicyOption>(TestRetryPolicy().clone())
253+
.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
254+
TestQueryPlanRefreshRetryPolicy().clone())
242255
.set<DataBackoffPolicyOption>(TestBackoffPolicy().clone()));
243256
}
244257

@@ -3108,6 +3121,7 @@ TEST_F(DataConnectionTest, ExecuteQueryOperationRetryExhausted) {
31083121
std::move(refresh_fn));
31093122

31103123
EXPECT_CALL(*mock, ExecuteQuery)
3124+
.Times(9)
31113125
.WillRepeatedly([&](auto, auto const&, auto const&) {
31123126
auto stream = std::make_unique<MockExecuteQueryStream>();
31133127
EXPECT_CALL(*stream, Read).WillOnce(Return(TransientError()));
@@ -3129,6 +3143,136 @@ TEST_F(DataConnectionTest, ExecuteQueryOperationRetryExhausted) {
31293143
fake_cq_impl->SimulateCompletion(false);
31303144
}
31313145

3146+
TEST_F(DataConnectionTest, ExecuteQuerySuccessWithQueryPlanRefresh) {
3147+
auto mock = std::make_shared<MockBigtableStub>();
3148+
auto fake_cq_impl = std::make_shared<FakeCompletionQueueImpl>();
3149+
auto mock_bg = std::make_unique<MockBackgroundThreads>();
3150+
EXPECT_CALL(*mock_bg, cq).WillRepeatedly([&]() {
3151+
return CompletionQueue{fake_cq_impl};
3152+
});
3153+
3154+
auto constexpr kInitialResultMetadataText = R"pb(
3155+
proto_schema {
3156+
columns {
3157+
name: "row_key"
3158+
type { string_type {} }
3159+
}
3160+
columns {
3161+
name: "value"
3162+
type { string_type {} }
3163+
}
3164+
columns {
3165+
name: "other_value"
3166+
type { string_type {} }
3167+
}
3168+
}
3169+
)pb";
3170+
v2::PrepareQueryResponse initial_pq_response;
3171+
initial_pq_response.set_prepared_query("test-pq-id-initial");
3172+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
3173+
kInitialResultMetadataText, initial_pq_response.mutable_metadata()));
3174+
*initial_pq_response.mutable_valid_until() = internal::ToProtoTimestamp(
3175+
std::chrono::system_clock::now() + std::chrono::seconds(3600));
3176+
3177+
auto constexpr kRefreshResultMetadataText = R"pb(
3178+
proto_schema {
3179+
columns {
3180+
name: "row_key"
3181+
type { string_type {} }
3182+
}
3183+
columns {
3184+
name: "value"
3185+
type { string_type {} }
3186+
}
3187+
}
3188+
)pb";
3189+
v2::PrepareQueryResponse refresh_pq_response;
3190+
refresh_pq_response.set_prepared_query("test-pq-id-refresh");
3191+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
3192+
kRefreshResultMetadataText, refresh_pq_response.mutable_metadata()));
3193+
*refresh_pq_response.mutable_valid_until() = internal::ToProtoTimestamp(
3194+
std::chrono::system_clock::now() + std::chrono::seconds(3600));
3195+
3196+
using RefreshReturnType =
3197+
StatusOr<google::bigtable::v2::PrepareQueryResponse>;
3198+
MockFunction<future<RefreshReturnType>()> refresh_fn;
3199+
EXPECT_CALL(refresh_fn, Call)
3200+
.WillOnce([&]() {
3201+
return make_ready_future(RefreshReturnType(TransientError()));
3202+
})
3203+
.WillOnce([&]() {
3204+
return make_ready_future(make_status_or(refresh_pq_response));
3205+
});
3206+
3207+
EXPECT_CALL(*mock, ExecuteQuery)
3208+
.WillOnce([&](auto, auto const&,
3209+
google::bigtable::v2::ExecuteQueryRequest const& request) {
3210+
EXPECT_EQ(request.prepared_query(), "test-pq-id-initial");
3211+
auto error_stream = std::make_unique<MockExecuteQueryStream>();
3212+
EXPECT_CALL(*error_stream, Read).WillOnce(Return(QueryPlanError()));
3213+
return error_stream;
3214+
})
3215+
.WillOnce([&](auto, auto const&,
3216+
google::bigtable::v2::ExecuteQueryRequest const& request) {
3217+
EXPECT_EQ(request.app_profile_id(), kAppProfile);
3218+
EXPECT_EQ(request.instance_name(),
3219+
"projects/test-project/instances/test-instance");
3220+
EXPECT_EQ(request.prepared_query(), "test-pq-id-refresh");
3221+
3222+
auto stream = std::make_unique<MockExecuteQueryStream>();
3223+
EXPECT_CALL(*stream, Read)
3224+
.WillOnce([&](google::bigtable::v2::ExecuteQueryResponse* r) {
3225+
*r->mutable_metadata() = refresh_pq_response.metadata();
3226+
return absl::nullopt;
3227+
})
3228+
.WillOnce([&](google::bigtable::v2::ExecuteQueryResponse* r) {
3229+
MakeResponse(*r->mutable_results(), {"r1", "v1"}, absl::nullopt,
3230+
false);
3231+
return absl::nullopt;
3232+
})
3233+
.WillOnce([&](google::bigtable::v2::ExecuteQueryResponse* r) {
3234+
MakeResponse(*r->mutable_results(), {"r2", "v2"},
3235+
"sentinel-token", false);
3236+
return absl::nullopt;
3237+
})
3238+
// End of stream
3239+
.WillOnce(Return(google::cloud::Status()));
3240+
3241+
return stream;
3242+
});
3243+
3244+
auto conn = TestConnection(std::move(mock));
3245+
internal::OptionsSpan span(CallOptions());
3246+
Project p("test-project");
3247+
bigtable::SqlStatement statement("SELECT * FROM the-table");
3248+
bigtable::InstanceResource instance(p, "test-instance");
3249+
auto query_plan = QueryPlan::Create(CompletionQueue(fake_cq_impl),
3250+
std::move(initial_pq_response),
3251+
refresh_fn.AsStdFunction());
3252+
auto prepared_query =
3253+
bigtable::PreparedQuery(instance, statement, std::move(query_plan));
3254+
auto bq = prepared_query.BindParameters({});
3255+
bigtable::ExecuteQueryParams params{std::move(bq)};
3256+
auto row_stream = conn->ExecuteQuery(std::move(params));
3257+
std::vector<StatusOr<bigtable::QueryRow>> rows;
3258+
for (auto const& row : row_stream) {
3259+
ASSERT_STATUS_OK(row);
3260+
rows.push_back(row);
3261+
}
3262+
3263+
ASSERT_EQ(rows.size(), 2);
3264+
ASSERT_STATUS_OK(rows[0]);
3265+
auto const& row1 = *rows[0];
3266+
EXPECT_EQ(row1.columns().at(0), "row_key");
3267+
EXPECT_EQ(row1.columns().at(1), "value");
3268+
EXPECT_THAT(row1.values().at(0).get<std::string>(), IsOkAndHolds("r1"));
3269+
EXPECT_THAT(row1.values().at(1).get<std::string>(), IsOkAndHolds("v1"));
3270+
auto const& row2 = *rows[1];
3271+
EXPECT_THAT(row2.values().at(0).get<std::string>(), IsOkAndHolds("r2"));
3272+
EXPECT_THAT(row2.values().at(1).get<std::string>(), IsOkAndHolds("v2"));
3273+
fake_cq_impl->SimulateCompletion(false);
3274+
}
3275+
31323276
} // namespace
31333277
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
31343278
} // namespace bigtable_internal

google/cloud/bigtable/internal/defaults.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,12 @@ Options DefaultDataOptions(Options opts) {
251251
kBigtableLimits.maximum_retry_period)
252252
.clone());
253253
}
254+
if (!opts.has<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>()) {
255+
opts.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
256+
bigtable::experimental::QueryPlanRefreshLimitedTimeRetryPolicy(
257+
kBigtableLimits.maximum_retry_period)
258+
.clone());
259+
}
254260
if (!opts.has<bigtable::DataBackoffPolicyOption>()) {
255261
opts.set<bigtable::DataBackoffPolicyOption>(
256262
ExponentialBackoffPolicy(kBigtableLimits.initial_delay / 2,

google/cloud/bigtable/options.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,15 @@ struct BulkApplyThrottlingOption {
187187
using Type = bool;
188188
};
189189

190+
/**
191+
* Option to configure the retry policy used in `bigtable::Client::ExecuteQuery`
192+
* to automatically refresh expired or invalid query plans encountered during
193+
* execution.
194+
*/
195+
struct QueryPlanRefreshRetryPolicyOption {
196+
using Type = std::shared_ptr<DataRetryPolicy>;
197+
};
198+
190199
} // namespace experimental
191200

192201
/// The complete list of options accepted by `bigtable::*Client`

0 commit comments

Comments
 (0)