Skip to content

Commit fcda1af

Browse files
authored
feat(bigtable): implement execute query in data connection impl (#15694)
* feat(bigtable): implement execute query in data connection impl
1 parent 18073cc commit fcda1af

File tree

10 files changed

+425
-64
lines changed

10 files changed

+425
-64
lines changed

google/cloud/bigtable/bound_query.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ std::unordered_map<std::string, Value> const& BoundQuery::parameters() const {
3333

3434
InstanceResource const& BoundQuery::instance() const { return instance_; }
3535

36-
google::bigtable::v2::ExecuteQueryRequest BoundQuery::ToRequestProto() {
36+
google::bigtable::v2::ExecuteQueryRequest BoundQuery::ToRequestProto() const {
3737
google::bigtable::v2::ExecuteQueryRequest result;
3838
*result.mutable_instance_name() = instance_.FullName();
3939
auto prepared_query = query_plan_->prepared_query();

google/cloud/bigtable/bound_query.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class BoundQuery {
4747
std::unordered_map<std::string, Value> const& parameters() const;
4848
InstanceResource const& instance() const;
4949

50-
google::bigtable::v2::ExecuteQueryRequest ToRequestProto();
50+
google::bigtable::v2::ExecuteQueryRequest ToRequestProto() const;
5151

5252
private:
5353
friend class PreparedQuery;

google/cloud/bigtable/client_test.cc

Lines changed: 90 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/client.h"
16+
#include "google/cloud/bigtable/mocks/mock_data_connection.h"
17+
#include "google/cloud/bigtable/mocks/mock_query_row.h"
18+
#include "google/cloud/bigtable/mocks/mock_row_reader.h"
19+
#include "google/cloud/bigtable/query_row.h"
20+
#include "google/cloud/stream_range.h"
1621
#include "google/cloud/testing_util/fake_completion_queue_impl.h"
1722
#include "google/cloud/testing_util/status_matchers.h"
18-
#include "mocks/mock_data_connection.h"
23+
#include <google/protobuf/text_format.h>
1924

2025
namespace google {
2126
namespace cloud {
@@ -24,7 +29,10 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2429
namespace {
2530

2631
using ::google::bigtable::v2::PrepareQueryResponse;
32+
using ::google::cloud::testing_util::FakeCompletionQueueImpl;
33+
using ::google::cloud::testing_util::IsOkAndHolds;
2734
using ::google::cloud::testing_util::StatusIs;
35+
using ::testing::Return;
2836

2937
TEST(Client, PrepareQuery) {
3038
auto fake_cq_impl = std::make_shared<testing_util::FakeCompletionQueueImpl>();
@@ -71,23 +79,91 @@ TEST(Client, AsyncPrepareQuery) {
7179
fake_cq_impl->SimulateCompletion(false);
7280
}
7381

74-
TEST(Client, ExecuteQuery) {
75-
auto fake_cq_impl = std::make_shared<testing_util::FakeCompletionQueueImpl>();
76-
auto conn = MakeDataConnection();
77-
Client client(conn);
82+
class MockQueryRowSource : public ResultSourceInterface {
83+
public:
84+
MOCK_METHOD(StatusOr<QueryRow>, NextRow, (), (override));
85+
MOCK_METHOD(absl::optional<google::bigtable::v2::ResultSetMetadata>, Metadata,
86+
(), (override));
87+
};
88+
89+
TEST(ClientTest, ExecuteQuery) {
90+
auto conn_mock = std::make_shared<bigtable_mocks::MockDataConnection>();
91+
auto fake_cq_impl = std::make_shared<FakeCompletionQueueImpl>();
92+
auto refresh_fn = []() {
93+
return make_ready_future(
94+
StatusOr<google::bigtable::v2::PrepareQueryResponse>(
95+
Status{StatusCode::kUnimplemented, "not implemented"}));
96+
};
97+
PrepareQueryResponse pq_response;
98+
pq_response.set_prepared_query("test-pq-id-54321");
99+
auto constexpr kResultMetadataText = R"pb(
100+
proto_schema {
101+
columns {
102+
name: "key"
103+
type { string_type {} }
104+
}
105+
columns {
106+
name: "val"
107+
type { string_type {} }
108+
}
109+
}
110+
)pb";
111+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
112+
kResultMetadataText, pq_response.mutable_metadata()));
113+
EXPECT_CALL(*conn_mock, ExecuteQuery)
114+
.WillOnce(
115+
[&](bigtable::ExecuteQueryParams const&) -> StatusOr<RowStream> {
116+
auto mock_source = std::make_unique<MockQueryRowSource>();
117+
EXPECT_CALL(*mock_source, Metadata)
118+
.WillRepeatedly(Return(pq_response.metadata()));
119+
120+
testing::InSequence s;
121+
EXPECT_CALL(*mock_source, NextRow)
122+
.WillOnce(Return(bigtable_mocks::MakeQueryRow(
123+
{{"key", bigtable::Value("r1")},
124+
{"val", bigtable::Value("v1")}})));
125+
EXPECT_CALL(*mock_source, NextRow)
126+
.WillOnce(Return(bigtable_mocks::MakeQueryRow(
127+
{{"key", bigtable::Value("r2")},
128+
{"val", bigtable::Value("v2")}})));
129+
EXPECT_CALL(*mock_source, NextRow)
130+
// Signal end of stream
131+
.WillOnce(
132+
Return(Status(StatusCode::kOutOfRange, "End of stream")));
133+
134+
// Create RowStream with the mock result source
135+
RowStream row_stream(std::move(mock_source));
136+
return StatusOr<RowStream>(std::move(row_stream));
137+
});
138+
139+
Client client(conn_mock);
78140
InstanceResource instance(Project("test-project"), "test-instance");
79141
SqlStatement sql("SELECT * FROM `test-table`");
80-
auto prepared_query = PreparedQuery(CompletionQueue{fake_cq_impl}, instance,
81-
sql, PrepareQueryResponse{});
142+
auto query_plan = bigtable_internal::QueryPlan::Create(
143+
CompletionQueue(fake_cq_impl), std::move(pq_response),
144+
std::move(refresh_fn));
145+
auto prepared_query = PreparedQuery(instance, sql, std::move(query_plan));
82146
auto bound_query = prepared_query.BindParameters({});
83-
auto row_stream = client.ExecuteQuery(std::move(bound_query));
84-
// We expect a row stream with a single unimplemented status row while
85-
// this is not implemented.
86-
for (auto const& row : row_stream) {
87-
EXPECT_THAT(row.status(),
88-
StatusIs(StatusCode::kUnimplemented, "not implemented"));
147+
RowStream row_stream = client.ExecuteQuery(std::move(bound_query));
148+
std::vector<StatusOr<bigtable::QueryRow>> rows;
149+
for (auto const& row : std::move(row_stream)) {
150+
rows.push_back(row);
89151
}
90-
EXPECT_EQ(1, std::distance(row_stream.begin(), row_stream.end()));
152+
153+
ASSERT_EQ(rows.size(), 3);
154+
ASSERT_STATUS_OK(rows[0]);
155+
auto const& row1 = *rows[0];
156+
EXPECT_EQ(row1.columns().size(), 2);
157+
EXPECT_THAT(row1.values()[0].get<std::string>(), IsOkAndHolds("r1"));
158+
EXPECT_THAT(row1.values()[1].get<std::string>(), IsOkAndHolds("v1"));
159+
160+
ASSERT_STATUS_OK(rows[1]);
161+
auto const& row2 = *rows[1];
162+
EXPECT_EQ(row2.columns().size(), 2);
163+
EXPECT_THAT(row2.values()[0].get<std::string>(), IsOkAndHolds("r2"));
164+
EXPECT_THAT(row2.values()[1].get<std::string>(), IsOkAndHolds("v2"));
165+
166+
EXPECT_THAT(rows[2], StatusIs(StatusCode::kOutOfRange, "End of stream"));
91167

92168
// Cancel all pending operations, satisfying any remaining futures.
93169
fake_cq_impl->SimulateCompletion(false);

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 135 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
#include "google/cloud/bigtable/internal/default_row_reader.h"
2121
#include "google/cloud/bigtable/internal/defaults.h"
2222
#include "google/cloud/bigtable/internal/operation_context.h"
23+
#include "google/cloud/bigtable/internal/partial_result_set_reader.h"
24+
#include "google/cloud/bigtable/internal/partial_result_set_resume.h"
25+
#include "google/cloud/bigtable/internal/partial_result_set_source.h"
2326
#include "google/cloud/bigtable/options.h"
2427
#include "google/cloud/bigtable/results.h"
2528
#include "google/cloud/background_threads.h"
@@ -29,6 +32,8 @@
2932
#include "google/cloud/internal/make_status.h"
3033
#include "google/cloud/internal/random.h"
3134
#include "google/cloud/internal/retry_loop.h"
35+
#include "google/cloud/internal/streaming_read_rpc.h"
36+
#include "absl/functional/bind_front.h"
3237
#include <memory>
3338
#include <string>
3439

@@ -77,6 +82,83 @@ bigtable::RowReader ReadRowsHelper(
7782
return MakeRowReader(std::move(impl));
7883
}
7984

85+
class DefaultPartialResultSetReader
86+
: public bigtable_internal::PartialResultSetReader {
87+
public:
88+
DefaultPartialResultSetReader(std::shared_ptr<grpc::ClientContext> context,
89+
std::unique_ptr<internal::StreamingReadRpc<
90+
google::bigtable::v2::ExecuteQueryResponse>>
91+
reader)
92+
: context_(std::move(context)), reader_(std::move(reader)) {}
93+
94+
~DefaultPartialResultSetReader() override = default;
95+
96+
void TryCancel() override { context_->TryCancel(); }
97+
98+
bool Read(absl::optional<std::string> const&,
99+
bigtable_internal::UnownedPartialResultSet& result_set) override {
100+
while (true) {
101+
google::bigtable::v2::ExecuteQueryResponse response;
102+
absl::optional<google::cloud::Status> status = reader_->Read(&response);
103+
104+
if (status.has_value()) {
105+
// Stream has ended or an error occurred.
106+
final_status_ = *std::move(status);
107+
return false;
108+
}
109+
110+
// Message successfully read into response.
111+
if (response.has_results()) {
112+
result_set.result = std::move(*response.mutable_results());
113+
result_set.resumption = false;
114+
return true;
115+
}
116+
117+
// Ignore metadata from the stream because PartialResultSetSource already
118+
// has it set (in ExecuteQuery).
119+
// TODO(#15701): Investigate expected behavior for processing metadata.
120+
if (response.has_metadata()) {
121+
continue;
122+
}
123+
124+
final_status_ = google::cloud::Status(
125+
google::cloud::StatusCode::kInternal,
126+
"Empty ExecuteQueryResponse received from stream");
127+
return false;
128+
}
129+
}
130+
131+
Status Finish() override { return final_status_; }
132+
133+
private:
134+
std::shared_ptr<grpc::ClientContext> context_;
135+
std::unique_ptr<
136+
internal::StreamingReadRpc<google::bigtable::v2::ExecuteQueryResponse>>
137+
reader_;
138+
Status final_status_;
139+
};
140+
141+
class StatusOnlyResultSetSource : public bigtable::ResultSourceInterface {
142+
public:
143+
explicit StatusOnlyResultSetSource(google::cloud::Status status)
144+
: status_(std::move(status)) {}
145+
~StatusOnlyResultSetSource() override = default;
146+
147+
StatusOr<bigtable::QueryRow> NextRow() override { return status_; }
148+
absl::optional<google::bigtable::v2::ResultSetMetadata> Metadata() override {
149+
return {};
150+
}
151+
152+
private:
153+
google::cloud::Status status_;
154+
};
155+
156+
template <typename ResultType>
157+
ResultType MakeStatusOnlyResult(Status status) {
158+
return ResultType(
159+
std::make_unique<StatusOnlyResultSetSource>(std::move(status)));
160+
}
161+
80162
} // namespace
81163

82164
bigtable::Row TransformReadModifyWriteRowResponse(
@@ -737,9 +819,60 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
737819
});
738820
}
739821

822+
std::unique_ptr<PartialResultSetReader>
823+
DataConnectionImpl::CreateResumableReader(
824+
google::bigtable::v2::ExecuteQueryRequest request,
825+
std::string const& resume_token) {
826+
if (!resume_token.empty()) {
827+
request.set_resume_token(resume_token);
828+
}
829+
auto context = std::make_shared<grpc::ClientContext>();
830+
auto const& options = google::cloud::internal::CurrentOptions();
831+
832+
google::cloud::internal::ConfigureContext(*context, options);
833+
auto stream = stub_->ExecuteQuery(context, options, request);
834+
835+
return std::make_unique<DefaultPartialResultSetReader>(std::move(context),
836+
std::move(stream));
837+
}
838+
740839
StatusOr<bigtable::RowStream> DataConnectionImpl::ExecuteQuery(
741-
bigtable::ExecuteQueryParams const&) {
742-
return Status(StatusCode::kUnimplemented, "not implemented");
840+
bigtable::ExecuteQueryParams const& params) {
841+
auto current = google::cloud::internal::SaveCurrentOptions();
842+
StatusOr<google::bigtable::v2::ResultSetMetadata> status_or_metadata =
843+
params.bound_query.metadata();
844+
google::bigtable::v2::ExecuteQueryRequest request =
845+
params.bound_query.ToRequestProto();
846+
request.set_app_profile_id(app_profile_id(*current));
847+
if (!status_or_metadata) {
848+
return status_or_metadata.status();
849+
}
850+
google::bigtable::v2::ResultSetMetadata metadata =
851+
*std::move(status_or_metadata);
852+
853+
auto retry_resume_fn =
854+
[this, retry_policy_prototype = retry_policy(*current),
855+
backoff_policy_prototype = backoff_policy(*current)](
856+
google::bigtable::v2::ResultSetMetadata metadata,
857+
google::bigtable::v2::ExecuteQueryRequest const& initial_request)
858+
-> StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>> {
859+
auto factory = absl::bind_front(&DataConnectionImpl::CreateResumableReader,
860+
this, initial_request);
861+
862+
auto resume_reader = std::make_unique<PartialResultSetResume>(
863+
std::move(factory), Idempotency::kIdempotent,
864+
retry_policy_prototype->clone(), backoff_policy_prototype->clone());
865+
866+
return PartialResultSetSource::Create(std::move(metadata),
867+
std::move(resume_reader));
868+
};
869+
870+
auto response = retry_resume_fn(metadata, request);
871+
if (!response) {
872+
auto status = std::move(response).status();
873+
return MakeStatusOnlyResult<bigtable::RowStream>(std::move(status));
874+
}
875+
return bigtable::RowStream(*std::move(response));
743876
};
744877

745878
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/internal/data_connection_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "google/cloud/bigtable/internal/bigtable_stub.h"
2020
#include "google/cloud/bigtable/internal/mutate_rows_limiter.h"
2121
#include "google/cloud/bigtable/internal/operation_context_factory.h"
22+
#include "google/cloud/bigtable/internal/partial_result_set_reader.h"
2223
#include "google/cloud/bigtable/prepared_query.h"
2324
#include "google/cloud/bigtable/results.h"
2425
#include "google/cloud/background_threads.h"
@@ -118,6 +119,9 @@ class DataConnectionImpl : public bigtable::DataConnection {
118119
bigtable::Filter filter,
119120
internal::ImmutableOptions const& current,
120121
std::shared_ptr<OperationContext> operation_context);
122+
std::unique_ptr<PartialResultSetReader> CreateResumableReader(
123+
google::bigtable::v2::ExecuteQueryRequest request,
124+
std::string const& resume_token);
121125

122126
std::unique_ptr<BackgroundThreads> background_;
123127
std::shared_ptr<BigtableStub> stub_;

0 commit comments

Comments
 (0)