Skip to content

Commit 7abc525

Browse files
committed
rebase with main
2 parents 938c7ce + 8e3250e commit 7abc525

12 files changed

+443
-148
lines changed

google/cloud/bigtable/ci/run_conformance_tests_proxy_bazel.sh

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,30 +47,14 @@ go test -v \
4747
-proxy_addr=:9999
4848
exit_status=$?
4949

50-
# Run all the ExecuteQuery tests that either work or we plan to skip such as
51-
# CloseClient
50+
# Run all the ExecuteQuery tests that either work or we plan to skip due to
51+
# unimplemented features or issues with the tests themselves.
5252
go test -v \
53-
-run "TestExecuteQuery|TestExecuteQuery_PlanRefresh$|TestExecuteQuery_PlanRefresh_WithMetadataChange|TestExecuteQuery_PlanRefresh_Retries|TestExecuteQuery_FailsOnSuccesfulStreamWithNoToken" \
54-
-skip "CloseClient|FailsOnTypeMismatch|FailsOnTypeMismatchWithinMap|FailsOnTypeMismatchWithinArray|FailsOnTypeMismatchWithinStruct|FailsOnStructMissingField|TestExecuteQuery_PlanRefresh_AfterResumeTokenCausesError|TestExecuteQuery_RetryTest_WithPlanRefresh|TestExecuteQuery_PlanRefresh_RespectsDeadline|TestExecuteQuery_PlanRefresh_RecoversAfterPermanentError" \
53+
-run "TestExecuteQuery" \
54+
-skip "CloseClient|TestExecuteQuery_PlanRefresh_RespectsDeadline" \
5555
-proxy_addr=:9999
5656
exit_status=$?
5757

58-
# These next four go test commands group the currently failing ExecuteQuery
59-
# tests into groups that exercise similar functionality and should be worked on
60-
# together.
61-
62-
# Response/Metadata mismatches b/461233335
63-
go test -v \
64-
-run "FailsOnTypeMismatch|FailsOnTypeMismatchWithinMap|FailsOnTypeMismatchWithinArray|FailsOnTypeMismatchWithinStruct|FailsOnStructMissingField" \
65-
-proxy_addr=:9999
66-
exit_status=$?
67-
68-
# QueryPlan refresh tests b/461233613
69-
#go test -v \
70-
# -run "RetryTest_WithPlanRefresh|PlanRefresh|PlanRefresh_RecoversAfterPermanentError" \
71-
# -proxy_addr=:9999
72-
#exit_status=$?
73-
7458
# Remove the entire module cache, including unpacked source code of versioned
7559
# dependencies.
7660
go clean -modcache

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 153 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
#include "google/cloud/internal/random.h"
4040
#include "google/cloud/internal/retry_loop.h"
4141
#include "google/cloud/internal/streaming_read_rpc.h"
42-
#include "absl/functional/bind_front.h"
4342
#include <memory>
4443
#include <string>
4544

@@ -59,9 +58,16 @@ inline std::unique_ptr<bigtable::DataRetryPolicy> retry_policy(
5958
}
6059

6160
inline std::unique_ptr<bigtable::DataRetryPolicy>
62-
query_plan_refresh_retry_policy(Options const& options) {
61+
execute_query_plan_refresh_retry_policy(Options const& options) {
6362
return options
64-
.get<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>()
63+
.get<bigtable::experimental::ExecuteQueryPlanRefreshRetryPolicyOption>()
64+
->clone();
65+
}
66+
67+
inline std::unique_ptr<bigtable::DataRetryPolicy>
68+
query_plan_refresh_function_retry_policy(Options const& options) {
69+
return options
70+
.get<bigtable::experimental::QueryPlanRefreshFunctionRetryPolicyOption>()
6571
->clone();
6672
}
6773

@@ -111,10 +117,6 @@ bool IsStatusMetadataIndicatingRetryPolicyExhausted(Status const& status) {
111117
"retry-policy-exhausted"));
112118
}
113119

114-
bool IsStatusIndicatingInternalError(Status const& status) {
115-
return status.code() == StatusCode::kInternal;
116-
}
117-
118120
class DefaultPartialResultSetReader
119121
: public bigtable_internal::PartialResultSetReader {
120122
public:
@@ -773,7 +775,7 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
773775
auto const* func = __func__;
774776
auto refresh_fn = [this, request, func]() mutable {
775777
auto current = google::cloud::internal::SaveCurrentOptions();
776-
auto retry = retry_policy(*current);
778+
auto retry = query_plan_refresh_function_retry_policy(*current);
777779
auto backoff = backoff_policy(*current);
778780
auto operation_context = operation_context_factory_->PrepareQuery(
779781
request.instance_name(), app_profile_id(*current));
@@ -869,7 +871,7 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
869871
}
870872
auto refresh_fn = [this, request, func]() mutable {
871873
auto current = google::cloud::internal::SaveCurrentOptions();
872-
auto retry = retry_policy(*current);
874+
auto retry = query_plan_refresh_function_retry_policy(*current);
873875
auto backoff = backoff_policy(*current);
874876
auto operation_context = operation_context_factory_->PrepareQuery(
875877
request.instance_name(), app_profile_id(*current));
@@ -911,6 +913,139 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
911913
});
912914
}
913915

916+
class QueryPlanRefreshingPartialResultSource
917+
: public bigtable::ResultSourceInterface {
918+
public:
919+
using SourceFactoryFn =
920+
std::function<StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>>(
921+
google::bigtable::v2::ExecuteQueryRequest& request,
922+
google::bigtable::v2::ResultSetMetadata metadata,
923+
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_prototype,
924+
std::unique_ptr<BackoffPolicy> backoff_policy_prototype,
925+
std::shared_ptr<OperationContext> const& operation_context)>;
926+
static std::unique_ptr<QueryPlanRefreshingPartialResultSource> Create(
927+
google::bigtable::v2::ExecuteQueryRequest request,
928+
SourceFactoryFn source_factory, std::shared_ptr<QueryPlan> query_plan,
929+
std::shared_ptr<OperationContext> operation_context,
930+
std::unique_ptr<BackoffPolicy> backoff_policy,
931+
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
932+
std::unique_ptr<bigtable::DataRetryPolicy>
933+
query_plan_refresh_retry_policy,
934+
internal::ImmutableOptions options) {
935+
return std::unique_ptr<QueryPlanRefreshingPartialResultSource>(
936+
new QueryPlanRefreshingPartialResultSource(
937+
std::move(request), std::move(source_factory),
938+
std::move(query_plan), std::move(operation_context),
939+
std::move(backoff_policy), std::move(retry_policy),
940+
std::move(query_plan_refresh_retry_policy), std::move(options)));
941+
}
942+
943+
~QueryPlanRefreshingPartialResultSource() override = default;
944+
945+
StatusOr<bigtable::QueryRow> NextRow() override {
946+
StatusOr<bigtable::QueryRow> row;
947+
do {
948+
if (!query_plan_valid_) source_ = absl::nullopt;
949+
if (!source_.has_value() || !source_->ok()) {
950+
UpdateSource();
951+
}
952+
row = (**source_)->NextRow();
953+
if (ExecuteQueryPlanRefreshRetry::IsQueryPlanExpired(row.status())) {
954+
query_plan_valid_ = false;
955+
query_plan_->Invalidate(row.status(),
956+
query_plan_data_->prepared_query());
957+
}
958+
} while (!query_plan_refresh_retry_policy_->IsExhausted() &&
959+
ExecuteQueryPlanRefreshRetry::IsQueryPlanExpired(row.status()));
960+
return row;
961+
}
962+
963+
absl::optional<google::bigtable::v2::ResultSetMetadata> Metadata() override {
964+
if (!source_.has_value()) return absl::nullopt;
965+
return (**source_)->Metadata();
966+
}
967+
968+
private:
969+
QueryPlanRefreshingPartialResultSource(
970+
google::bigtable::v2::ExecuteQueryRequest request,
971+
SourceFactoryFn source_factory, std::shared_ptr<QueryPlan> query_plan,
972+
std::shared_ptr<OperationContext> operation_context,
973+
std::unique_ptr<BackoffPolicy> backoff_policy,
974+
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
975+
std::unique_ptr<bigtable::DataRetryPolicy>
976+
query_plan_refresh_retry_policy,
977+
internal::ImmutableOptions options)
978+
: request_(std::move(request)),
979+
source_factory_(std::move(source_factory)),
980+
query_plan_(std::move(query_plan)),
981+
operation_context_(std::move(operation_context)),
982+
backoff_policy_(std::move(backoff_policy)),
983+
retry_policy_(std::move(retry_policy)),
984+
query_plan_refresh_retry_policy_(
985+
std::move(query_plan_refresh_retry_policy)),
986+
query_plan_backoff_policy_(backoff_policy_->clone()),
987+
options_(std::move(options)) {}
988+
989+
void UpdateSource() {
990+
internal::OptionsSpan options_span(options_);
991+
while (!query_plan_refresh_retry_policy_->IsExhausted()) {
992+
query_plan_data_ = query_plan_->response();
993+
if (query_plan_data_.ok()) {
994+
query_plan_valid_ = true;
995+
request_.set_prepared_query(query_plan_data_->prepared_query());
996+
source_ = source_factory_(
997+
request_, query_plan_data_->metadata(),
998+
options_->get<bigtable::DataRetryPolicyOption>()->clone(),
999+
options_->get<bigtable::DataBackoffPolicyOption>()->clone(),
1000+
operation_context_);
1001+
if (source_->ok()) return;
1002+
1003+
last_status_ = source_->status();
1004+
if (ExecuteQueryPlanRefreshRetry::IsQueryPlanExpired(
1005+
source_->status())) {
1006+
query_plan_valid_ = false;
1007+
query_plan_->Invalidate(source_->status(),
1008+
query_plan_data_->prepared_query());
1009+
}
1010+
if (IsStatusMetadataIndicatingRetryPolicyExhausted(source_->status())) {
1011+
source_ = std::make_unique<StatusOnlyResultSetSource>(last_status_);
1012+
return;
1013+
}
1014+
} else {
1015+
last_status_ = query_plan_data_.status();
1016+
}
1017+
1018+
auto delay = internal::Backoff(
1019+
last_status_, __func__, *query_plan_refresh_retry_policy_,
1020+
*query_plan_backoff_policy_, Idempotency::kIdempotent,
1021+
false /* enable_server_retries */);
1022+
if (!delay) break;
1023+
std::this_thread::sleep_for(*delay);
1024+
}
1025+
auto retry_loop_error = internal::RetryLoopError(
1026+
last_status_, __func__,
1027+
query_plan_refresh_retry_policy_->IsExhausted());
1028+
source_ = std::make_unique<StatusOnlyResultSetSource>(retry_loop_error);
1029+
last_status_ = retry_loop_error;
1030+
}
1031+
1032+
google::bigtable::v2::ExecuteQueryRequest request_;
1033+
SourceFactoryFn source_factory_;
1034+
std::shared_ptr<QueryPlan> query_plan_;
1035+
std::shared_ptr<OperationContext> operation_context_;
1036+
std::unique_ptr<BackoffPolicy> backoff_policy_;
1037+
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_;
1038+
std::unique_ptr<bigtable::DataRetryPolicy> query_plan_refresh_retry_policy_;
1039+
std::unique_ptr<BackoffPolicy> query_plan_backoff_policy_;
1040+
internal::ImmutableOptions options_;
1041+
1042+
absl::optional<StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>>>
1043+
source_;
1044+
StatusOr<google::bigtable::v2::PrepareQueryResponse> query_plan_data_;
1045+
bool query_plan_valid_ = false;
1046+
Status last_status_;
1047+
};
1048+
9141049
bigtable::RowStream DataConnectionImpl::ExecuteQuery(
9151050
bigtable::ExecuteQueryParams params) {
9161051
auto current = google::cloud::internal::SaveCurrentOptions();
@@ -921,7 +1056,7 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
9211056
auto const tracing_enabled = RpcStreamTracingEnabled();
9221057
auto const& tracing_options = RpcTracingOptions();
9231058

924-
auto retry_resume_fn =
1059+
auto source_fn =
9251060
[stub = stub_, tracing_enabled, tracing_options](
9261061
google::bigtable::v2::ExecuteQueryRequest& request,
9271062
google::bigtable::v2::ResultSetMetadata metadata,
@@ -959,56 +1094,15 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
9591094
auto operation_context = operation_context_factory_->ExecuteQuery(
9601095
request.instance_name(), app_profile_id(*current));
9611096

962-
auto query_plan = params.bound_query.query_plan_;
963-
auto query_plan_retry_policy = query_plan_refresh_retry_policy(*current);
964-
auto query_plan_backoff_policy = backoff_policy(*current);
965-
Status last_status;
966-
967-
// TODO(sdhart): OperationContext needs to be plumbed through the QueryPlan
968-
// refresh_fn so that it's shared with the ExecuteQuery attempts.
969-
while (!query_plan_retry_policy->IsExhausted()) {
970-
// Snapshot query_plan data.
971-
// This access could cause a query plan refresh to occur.
972-
StatusOr<google::bigtable::v2::PrepareQueryResponse> query_plan_data =
973-
query_plan->response();
974-
975-
if (query_plan_data.ok()) {
976-
request.set_prepared_query(query_plan_data->prepared_query());
977-
auto source = retry_resume_fn(
978-
request, query_plan_data->metadata(), retry_policy(*current),
979-
backoff_policy(*current), operation_context);
980-
if (source.ok()) {
981-
return bigtable::RowStream(*std::move(source));
982-
}
983-
last_status = source.status();
984-
985-
if (IsStatusIndicatingInternalError(source.status())) {
986-
return bigtable::RowStream(std::make_unique<StatusOnlyResultSetSource>(
987-
std::move(last_status)));
988-
}
1097+
auto query_plan_refreshing_source =
1098+
QueryPlanRefreshingPartialResultSource::Create(
1099+
std::move(request), std::move(source_fn),
1100+
std::move(params.bound_query.query_plan_),
1101+
std::move(operation_context), backoff_policy(*current),
1102+
retry_policy(*current),
1103+
execute_query_plan_refresh_retry_policy(*current), current);
9891104

990-
if (QueryPlanRefreshRetry::IsQueryPlanExpired(source.status())) {
991-
query_plan->Invalidate(source.status(),
992-
query_plan_data->prepared_query());
993-
}
994-
if (IsStatusMetadataIndicatingRetryPolicyExhausted(source.status())) {
995-
return bigtable::RowStream(std::make_unique<StatusOnlyResultSetSource>(
996-
std::move(last_status)));
997-
}
998-
} else {
999-
last_status = query_plan_data.status();
1000-
}
1001-
1002-
auto delay =
1003-
internal::Backoff(last_status, __func__, *query_plan_retry_policy,
1004-
*query_plan_backoff_policy, Idempotency::kIdempotent,
1005-
false /* enable_server_retries */);
1006-
if (!delay) break;
1007-
std::this_thread::sleep_for(*delay);
1008-
}
1009-
return bigtable::RowStream(
1010-
std::make_unique<StatusOnlyResultSetSource>(internal::RetryLoopError(
1011-
last_status, __func__, query_plan_retry_policy->IsExhausted())));
1105+
return bigtable::RowStream(std::move(query_plan_refreshing_source));
10121106
}
10131107

10141108
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/internal/data_connection_impl_test.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,10 @@ DataLimitedErrorCountRetryPolicy TestRetryPolicy() {
212212
return DataLimitedErrorCountRetryPolicy(kNumRetries);
213213
}
214214

215-
bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy
215+
bigtable::experimental::ExecuteQueryPlanRefreshLimitedErrorCountRetryPolicy
216216
TestQueryPlanRefreshRetryPolicy() {
217-
return bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy(
218-
kNumRetries);
217+
return bigtable::experimental::
218+
ExecuteQueryPlanRefreshLimitedErrorCountRetryPolicy(kNumRetries);
219219
}
220220

221221
ExponentialBackoffPolicy TestBackoffPolicy() {
@@ -255,7 +255,8 @@ Options CallOptionsWithoutClientContextSetup() {
255255
Options{}
256256
.set<bigtable::AppProfileIdOption>(kAppProfile)
257257
.set<DataRetryPolicyOption>(TestRetryPolicy().clone())
258-
.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
258+
.set<
259+
bigtable::experimental::ExecuteQueryPlanRefreshRetryPolicyOption>(
259260
TestQueryPlanRefreshRetryPolicy().clone())
260261
.set<DataBackoffPolicyOption>(TestBackoffPolicy().clone()));
261262
}

google/cloud/bigtable/internal/defaults.cc

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,21 @@ Options DefaultDataOptions(Options opts) {
251251
kBigtableLimits.maximum_retry_period)
252252
.clone());
253253
}
254-
if (!opts.has<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>()) {
255-
opts.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
256-
bigtable::experimental::QueryPlanRefreshLimitedTimeRetryPolicy(
254+
if (!opts.has<
255+
bigtable::experimental::ExecuteQueryPlanRefreshRetryPolicyOption>()) {
256+
opts.set<bigtable::experimental::ExecuteQueryPlanRefreshRetryPolicyOption>(
257+
bigtable::experimental::ExecuteQueryPlanRefreshLimitedTimeRetryPolicy(
257258
kBigtableLimits.maximum_retry_period)
258259
.clone());
259260
}
261+
if (!opts.has<bigtable::experimental::
262+
QueryPlanRefreshFunctionRetryPolicyOption>()) {
263+
opts.set<bigtable::experimental::QueryPlanRefreshFunctionRetryPolicyOption>(
264+
bigtable::experimental::QueryPlanRefreshFunctionLimitedTimeRetryPolicy(
265+
kBigtableLimits.maximum_retry_period)
266+
.clone());
267+
}
268+
260269
if (!opts.has<bigtable::DataBackoffPolicyOption>()) {
261270
opts.set<bigtable::DataBackoffPolicyOption>(
262271
ExponentialBackoffPolicy(kBigtableLimits.initial_delay / 2,

google/cloud/bigtable/internal/partial_result_set_source.cc

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,7 @@ PartialResultSetSource::Create(
5353
auto status = source->ReadFromStream();
5454

5555
// Any error during parsing will be returned.
56-
if (!status.ok()) {
57-
return status;
58-
}
56+
if (!status.ok()) return status;
5957

6058
return {std::move(source)};
6159
}
@@ -142,15 +140,23 @@ Status PartialResultSetSource::ReadFromStream() {
142140
if (reader_->Read(resume_token_, result_set)) {
143141
return ProcessDataFromStream(result_set.result);
144142
}
143+
144+
last_status_ = reader_->Finish();
145145
state_ = State::kFinished;
146-
// buffered_rows_ and read_buffer_ are expected to be empty because the last
147-
// successful read would have had a sentinel resume_token, causing
148-
// ProcessDataFromStream to commit them.
149-
if (!buffered_rows_.empty() || !read_buffer_.empty()) {
146+
147+
if (ExecuteQueryPlanRefreshRetry::IsQueryPlanExpired(last_status_)) {
148+
if (resume_token_ && !resume_token_->empty()) {
149+
return internal::InternalError(
150+
"Query plan expired during a retry attempt", GCP_ERROR_INFO());
151+
}
152+
} else if (!buffered_rows_.empty() || !read_buffer_.empty()) {
153+
// buffered_rows_ and read_buffer_ are expected to be empty because the last
154+
// successful read would have had a sentinel resume_token, causing
155+
// ProcessDataFromStream to commit them.
150156
return internal::InternalError("Stream ended with uncommitted rows.",
151157
GCP_ERROR_INFO());
152158
}
153-
last_status_ = reader_->Finish();
159+
154160
return last_status_;
155161
}
156162

0 commit comments

Comments
 (0)