Skip to content

Commit 3a7a0bf

Browse files
authored
impl(bigtable): update execute query to retry (#15717)
1 parent 9ea0d34 commit 3a7a0bf

File tree

10 files changed

+213
-98
lines changed

10 files changed

+213
-98
lines changed

google/cloud/bigtable/bound_query.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626

2727
namespace google {
2828
namespace cloud {
29+
namespace bigtable_internal {
30+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
31+
class DataConnectionImpl;
32+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
33+
} // namespace bigtable_internal
34+
2935
namespace bigtable {
3036
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
3137

@@ -48,8 +54,15 @@ class BoundQuery {
4854

4955
google::bigtable::v2::ExecuteQueryRequest ToRequestProto() const;
5056

57+
GOOGLE_CLOUD_CPP_DEPRECATED("use response()")
58+
StatusOr<std::string> prepared_query() const;
59+
GOOGLE_CLOUD_CPP_DEPRECATED("use response()")
60+
StatusOr<google::bigtable::v2::ResultSetMetadata> metadata() const;
61+
5162
private:
5263
friend class PreparedQuery;
64+
friend class bigtable_internal::DataConnectionImpl;
65+
5366
BoundQuery(InstanceResource instance,
5467
std::shared_ptr<bigtable_internal::QueryPlan> query_plan,
5568
std::unordered_map<std::string, Value> parameters)

google/cloud/bigtable/client.cc

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,7 @@ future<StatusOr<PreparedQuery>> Client::AsyncPrepareQuery(
3737

3838
// NOLINTNEXTLINE(performance-unnecessary-value-param)
3939
RowStream Client::ExecuteQuery(BoundQuery&& bound_query, Options) {
40-
ExecuteQueryParams params{std::move(bound_query)};
41-
auto row_stream = conn_->ExecuteQuery(params);
42-
if (!row_stream.ok()) {
43-
return RowStream(
44-
std::make_unique<bigtable_internal::StatusOnlyResultSetSource>(
45-
row_stream.status()));
46-
}
47-
return std::move(row_stream.value());
40+
return conn_->ExecuteQuery({std::move(bound_query)});
4841
}
4942

5043
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/client_test.cc

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -132,30 +132,28 @@ TEST(ClientTest, ExecuteQuery) {
132132
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
133133
kResultMetadataText, pq_response.mutable_metadata()));
134134
EXPECT_CALL(*conn_mock, ExecuteQuery)
135-
.WillOnce(
136-
[&](bigtable::ExecuteQueryParams const&) -> StatusOr<RowStream> {
137-
auto mock_source = std::make_unique<MockQueryRowSource>();
138-
EXPECT_CALL(*mock_source, Metadata)
139-
.WillRepeatedly(Return(pq_response.metadata()));
140-
141-
testing::InSequence s;
142-
EXPECT_CALL(*mock_source, NextRow)
143-
.WillOnce(Return(bigtable_mocks::MakeQueryRow(
144-
{{"key", bigtable::Value("r1")},
145-
{"val", bigtable::Value("v1")}})));
146-
EXPECT_CALL(*mock_source, NextRow)
147-
.WillOnce(Return(bigtable_mocks::MakeQueryRow(
148-
{{"key", bigtable::Value("r2")},
149-
{"val", bigtable::Value("v2")}})));
150-
EXPECT_CALL(*mock_source, NextRow)
151-
// Signal end of stream
152-
.WillOnce(
153-
Return(Status(StatusCode::kOutOfRange, "End of stream")));
154-
155-
// Create RowStream with the mock result source
156-
RowStream row_stream(std::move(mock_source));
157-
return StatusOr<RowStream>(std::move(row_stream));
158-
});
135+
.WillOnce([&](bigtable::ExecuteQueryParams) {
136+
auto mock_source = std::make_unique<MockQueryRowSource>();
137+
EXPECT_CALL(*mock_source, Metadata)
138+
.WillRepeatedly(Return(pq_response.metadata()));
139+
140+
testing::InSequence s;
141+
EXPECT_CALL(*mock_source, NextRow)
142+
.WillOnce(Return(bigtable_mocks::MakeQueryRow(
143+
{{"key", bigtable::Value("r1")},
144+
{"val", bigtable::Value("v1")}})));
145+
EXPECT_CALL(*mock_source, NextRow)
146+
.WillOnce(Return(bigtable_mocks::MakeQueryRow(
147+
{{"key", bigtable::Value("r2")},
148+
{"val", bigtable::Value("v2")}})));
149+
EXPECT_CALL(*mock_source, NextRow)
150+
// Signal end of stream
151+
.WillOnce(Return(Status(StatusCode::kOutOfRange, "End of stream")));
152+
153+
// Create RowStream with the mock result source
154+
RowStream row_stream(std::move(mock_source));
155+
return row_stream;
156+
});
159157

160158
Client client(conn_mock);
161159
InstanceResource instance(Project("test-project"), "test-instance");

google/cloud/bigtable/data_connection.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "google/cloud/bigtable/internal/data_tracing_connection.h"
1919
#include "google/cloud/bigtable/internal/defaults.h"
2020
#include "google/cloud/bigtable/internal/mutate_rows_limiter.h"
21+
#include "google/cloud/bigtable/internal/partial_result_set_source.h"
2122
#include "google/cloud/bigtable/internal/row_reader_impl.h"
2223
#include "google/cloud/bigtable/options.h"
2324
#include "google/cloud/bigtable/results.h"
@@ -167,9 +168,10 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnection::AsyncPrepareQuery(
167168
Status(StatusCode::kUnimplemented, "not implemented"));
168169
}
169170

170-
StatusOr<bigtable::RowStream> DataConnection::ExecuteQuery(
171-
bigtable::ExecuteQueryParams const&) {
172-
return Status(StatusCode::kUnimplemented, "not implemented");
171+
bigtable::RowStream DataConnection::ExecuteQuery(bigtable::ExecuteQueryParams) {
172+
return RowStream(
173+
std::make_unique<bigtable_internal::StatusOnlyResultSetSource>(
174+
Status(StatusCode::kUnimplemented, "not implemented")));
173175
}
174176

175177
std::shared_ptr<DataConnection> MakeDataConnection(Options options) {

google/cloud/bigtable/data_connection.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ class DataConnection {
160160
bigtable::PrepareQueryParams const& p);
161161
virtual future<StatusOr<bigtable::PreparedQuery>> AsyncPrepareQuery(
162162
bigtable::PrepareQueryParams const& p);
163-
virtual StatusOr<bigtable::RowStream> ExecuteQuery(
164-
bigtable::ExecuteQueryParams const& p);
163+
virtual bigtable::RowStream ExecuteQuery(bigtable::ExecuteQueryParams p);
165164
};
166165

167166
/**

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 79 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
#include "google/cloud/bigtable/internal/bulk_mutator.h"
2020
#include "google/cloud/bigtable/internal/default_row_reader.h"
2121
#include "google/cloud/bigtable/internal/defaults.h"
22+
#include "google/cloud/bigtable/internal/logging_result_set_reader.h"
2223
#include "google/cloud/bigtable/internal/operation_context.h"
2324
#include "google/cloud/bigtable/internal/partial_result_set_reader.h"
2425
#include "google/cloud/bigtable/internal/partial_result_set_resume.h"
2526
#include "google/cloud/bigtable/internal/partial_result_set_source.h"
27+
#include "google/cloud/bigtable/internal/retry_traits.h"
28+
#include "google/cloud/bigtable/internal/rpc_policy_parameters.h"
2629
#include "google/cloud/bigtable/options.h"
2730
#include "google/cloud/bigtable/results.h"
31+
#include "google/cloud/bigtable/retry_policy.h"
2832
#include "google/cloud/background_threads.h"
2933
#include "google/cloud/grpc_options.h"
3034
#include "google/cloud/idempotency.h"
@@ -65,6 +69,15 @@ inline bool enable_server_retries(Options const& options) {
6569
return options.get<EnableServerRetriesOption>();
6670
}
6771

72+
inline bool RpcStreamTracingEnabled() {
73+
return internal::Contains(
74+
internal::CurrentOptions().get<LoggingComponentsOption>(), "rpc-streams");
75+
}
76+
77+
inline TracingOptions const& RpcTracingOptions() {
78+
return internal::CurrentOptions().get<GrpcTracingOptionsOption>();
79+
}
80+
6881
// This function allows for ReadRow and ReadRowsFull to provide an instance of
6982
// an OperationContext specific to that operation.
7083
bigtable::RowReader ReadRowsHelper(
@@ -138,21 +151,6 @@ class DefaultPartialResultSetReader
138151
Status final_status_;
139152
};
140153

141-
class StatusOnlyResultSetSource : public bigtable::ResultSourceInterface {
142-
public:
143-
explicit StatusOnlyResultSetSource(google::cloud::Status status)
144-
: status_(std::move(status)) {}
145-
~StatusOnlyResultSetSource() override = default;
146-
147-
StatusOr<bigtable::QueryRow> NextRow() override { return status_; }
148-
absl::optional<google::bigtable::v2::ResultSetMetadata> Metadata() override {
149-
return {};
150-
}
151-
152-
private:
153-
google::cloud::Status status_;
154-
};
155-
156154
template <typename ResultType>
157155
ResultType MakeStatusOnlyResult(Status status) {
158156
return ResultType(
@@ -836,44 +834,86 @@ DataConnectionImpl::CreateResumableReader(
836834
std::move(stream));
837835
}
838836

839-
StatusOr<bigtable::RowStream> DataConnectionImpl::ExecuteQuery(
840-
bigtable::ExecuteQueryParams const& params) {
837+
bigtable::RowStream DataConnectionImpl::ExecuteQuery(
838+
bigtable::ExecuteQueryParams params) {
841839
auto current = google::cloud::internal::SaveCurrentOptions();
842-
StatusOr<google::bigtable::v2::ResultSetMetadata> status_or_metadata =
843-
params.bound_query.response()->metadata();
844840
google::bigtable::v2::ExecuteQueryRequest request =
845841
params.bound_query.ToRequestProto();
846842
request.set_app_profile_id(app_profile_id(*current));
847-
if (!status_or_metadata) {
848-
return status_or_metadata.status();
849-
}
850-
google::bigtable::v2::ResultSetMetadata metadata =
851-
*std::move(status_or_metadata);
843+
844+
auto const tracing_enabled = RpcStreamTracingEnabled();
845+
auto const& tracing_options = RpcTracingOptions();
852846

853847
auto retry_resume_fn =
854-
[this, retry_policy_prototype = retry_policy(*current),
855-
backoff_policy_prototype = backoff_policy(*current)](
856-
google::bigtable::v2::ResultSetMetadata metadata,
857-
google::bigtable::v2::ExecuteQueryRequest const& initial_request)
848+
[stub = stub_, retry_policy_prototype = retry_policy(*current),
849+
backoff_policy_prototype = backoff_policy(*current), tracing_enabled,
850+
tracing_options](google::bigtable::v2::ExecuteQueryRequest& request,
851+
google::bigtable::v2::ResultSetMetadata metadata,
852+
std::shared_ptr<OperationContext> const&) mutable
858853
-> StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>> {
859-
auto factory = absl::bind_front(&DataConnectionImpl::CreateResumableReader,
860-
this, initial_request);
854+
auto factory = [stub, request, tracing_enabled,
855+
tracing_options](std::string const& resume_token) mutable {
856+
if (!resume_token.empty()) request.set_resume_token(resume_token);
857+
auto context = std::make_shared<grpc::ClientContext>();
858+
auto const& options = internal::CurrentOptions();
859+
internal::ConfigureContext(*context, options);
860+
auto stream = stub->ExecuteQuery(context, options, request);
861+
std::unique_ptr<PartialResultSetReader> reader =
862+
std::make_unique<DefaultPartialResultSetReader>(std::move(context),
863+
std::move(stream));
864+
if (tracing_enabled) {
865+
reader = std::make_unique<LoggingResultSetReader>(std::move(reader),
866+
tracing_options);
867+
}
868+
return reader;
869+
};
861870

862-
auto resume_reader = std::make_unique<PartialResultSetResume>(
871+
auto rpc = std::make_unique<PartialResultSetResume>(
863872
std::move(factory), Idempotency::kIdempotent,
864873
retry_policy_prototype->clone(), backoff_policy_prototype->clone());
865874

866-
return PartialResultSetSource::Create(std::move(metadata),
867-
std::move(resume_reader));
875+
return PartialResultSetSource::Create(std::move(metadata), std::move(rpc));
868876
};
869877

870-
auto response = retry_resume_fn(metadata, request);
871-
if (!response) {
872-
auto status = std::move(response).status();
873-
return MakeStatusOnlyResult<bigtable::RowStream>(std::move(status));
878+
auto operation_context = std::make_shared<OperationContext>();
879+
880+
auto query_plan = params.bound_query.query_plan_;
881+
auto operation_retry_policy = retry_policy(*current);
882+
auto operation_backoff_policy = backoff_policy(*current);
883+
Status last_status;
884+
885+
while (!operation_retry_policy->IsExhausted()) {
886+
StatusOr<google::bigtable::v2::PrepareQueryResponse> query_plan_data =
887+
query_plan->response();
888+
889+
if (query_plan_data.ok()) {
890+
request.set_prepared_query(query_plan_data->prepared_query());
891+
auto reader = retry_resume_fn(request, query_plan_data->metadata(),
892+
operation_context);
893+
if (reader.ok()) {
894+
return bigtable::RowStream(*std::move(reader));
895+
}
896+
if (SafeGrpcRetryAllowingQueryPlanRefresh::IsQueryPlanExpired(
897+
reader.status())) {
898+
query_plan->Invalidate(reader.status(),
899+
query_plan_data->prepared_query());
900+
}
901+
last_status = reader.status();
902+
} else {
903+
last_status = query_plan_data.status();
904+
}
905+
906+
auto delay =
907+
internal::Backoff(last_status, __func__, *operation_retry_policy,
908+
*operation_backoff_policy, Idempotency::kIdempotent,
909+
false /* enable_server_retries */);
910+
if (!delay) break;
911+
std::this_thread::sleep_for(*delay);
874912
}
875-
return bigtable::RowStream(*std::move(response));
876-
};
913+
return bigtable::RowStream(
914+
std::make_unique<StatusOnlyResultSetSource>(internal::RetryLoopError(
915+
last_status, __func__, operation_retry_policy->IsExhausted())));
916+
}
877917

878918
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
879919
} // namespace bigtable_internal

google/cloud/bigtable/internal/data_connection_impl.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,7 @@ class DataConnectionImpl : public bigtable::DataConnection {
108108
bigtable::PrepareQueryParams const& p) override;
109109
future<StatusOr<bigtable::PreparedQuery>> AsyncPrepareQuery(
110110
bigtable::PrepareQueryParams const& p) override;
111-
StatusOr<bigtable::RowStream> ExecuteQuery(
112-
bigtable::ExecuteQueryParams const& p) override;
111+
bigtable::RowStream ExecuteQuery(bigtable::ExecuteQueryParams p) override;
113112

114113
private:
115114
void AsyncReadRowsHelper(std::string const& table_name,

0 commit comments

Comments
 (0)