Skip to content

Commit 7450a4a

Browse files
committed
refactor existing code and tests to use updated QueryPlan
1 parent a5d627d commit 7450a4a

File tree

6 files changed

+166
-30
lines changed

6 files changed

+166
-30
lines changed

google/cloud/bigtable/bound_query_test.cc

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "google/cloud/bigtable/prepared_query.h"
1717
#include "google/cloud/bigtable/sql_statement.h"
1818
#include "google/cloud/bigtable/value.h"
19+
#include "google/cloud/testing_util/fake_completion_queue_impl.h"
1920
#include "google/cloud/testing_util/status_matchers.h"
2021
#include <algorithm>
2122

@@ -27,7 +28,7 @@ namespace {
2728
using ::google::bigtable::v2::PrepareQueryResponse;
2829

2930
TEST(BoundQuery, FromPreparedQuery) {
30-
CompletionQueue cq;
31+
auto fake_cq_impl = std::make_shared<testing_util::FakeCompletionQueueImpl>();
3132
Project p("dummy-project");
3233
InstanceResource instance(p, "dummy-instance");
3334
std::string statement_contents(
@@ -47,7 +48,8 @@ TEST(BoundQuery, FromPreparedQuery) {
4748
metadata->set_allocated_proto_schema(schema.release());
4849
response.set_allocated_metadata(metadata.release());
4950

50-
PreparedQuery pq(cq, instance, sql_statement, response);
51+
PreparedQuery pq(CompletionQueue(fake_cq_impl), instance, sql_statement,
52+
response);
5153
auto bq = pq.BindParameters(parameters);
5254
EXPECT_EQ(instance.FullName(), bq.instance().FullName());
5355
EXPECT_STATUS_OK(bq.prepared_query());
@@ -57,10 +59,13 @@ TEST(BoundQuery, FromPreparedQuery) {
5759
EXPECT_TRUE(bq.metadata().value().has_proto_schema());
5860
EXPECT_EQ(1, bq.metadata().value().proto_schema().columns_size());
5961
EXPECT_EQ("col1", bq.metadata().value().proto_schema().columns()[0].name());
62+
63+
// Cancel all pending operations, satisfying any remaining futures.
64+
fake_cq_impl->SimulateCompletion(false);
6065
}
6166

6267
TEST(BoundQuery, ToRequestProto) {
63-
CompletionQueue cq;
68+
auto fake_cq_impl = std::make_shared<testing_util::FakeCompletionQueueImpl>();
6469
Project p("dummy-project");
6570
InstanceResource instance(p, "dummy-instance");
6671
std::string statement_contents(
@@ -70,7 +75,8 @@ TEST(BoundQuery, ToRequestProto) {
7075
std::unordered_map<std::string, Value> parameters = {{"val1", Value(true)},
7176
{"val2", Value(2.0)}};
7277

73-
PreparedQuery pq(cq, instance, sql_statement, response);
78+
PreparedQuery pq(CompletionQueue(fake_cq_impl), instance, sql_statement,
79+
response);
7480
auto bq = pq.BindParameters(parameters);
7581
google::bigtable::v2::ExecuteQueryRequest proto = bq.ToRequestProto();
7682
EXPECT_EQ(instance.FullName(), proto.instance_name());
@@ -90,6 +96,9 @@ TEST(BoundQuery, ToRequestProto) {
9096
auto val2 = proto.params().find("val2")->second;
9197
EXPECT_TRUE(val2.has_float_value());
9298
EXPECT_EQ(2.0, val2.float_value());
99+
100+
// Cancel all pending operations, satisfying any remaining futures.
101+
fake_cq_impl->SimulateCompletion(false);
93102
}
94103
} // namespace
95104
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/client_test.cc

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/client.h"
16+
#include "google/cloud/testing_util/fake_completion_queue_impl.h"
1617
#include "google/cloud/testing_util/status_matchers.h"
1718
#include "mocks/mock_data_connection.h"
1819

@@ -26,49 +27,58 @@ using ::google::bigtable::v2::PrepareQueryResponse;
2627
using ::google::cloud::testing_util::StatusIs;
2728

2829
TEST(Client, PrepareQuery) {
30+
auto fake_cq_impl = std::make_shared<testing_util::FakeCompletionQueueImpl>();
2931
auto conn_mock = std::make_shared<bigtable_mocks::MockDataConnection>();
3032
InstanceResource instance(Project("the-project"), "the-instance");
3133
SqlStatement sql("SELECT * FROM the-table");
3234
EXPECT_CALL(*conn_mock, PrepareQuery)
33-
.WillOnce([](PrepareQueryParams const& params) {
35+
.WillOnce([&](PrepareQueryParams const& params) {
3436
EXPECT_EQ("projects/the-project/instances/the-instance",
3537
params.instance.FullName());
3638
EXPECT_EQ("SELECT * FROM the-table", params.sql_statement.sql());
37-
PreparedQuery q(CompletionQueue{}, params.instance,
39+
PreparedQuery q(CompletionQueue{fake_cq_impl}, params.instance,
3840
params.sql_statement, PrepareQueryResponse{});
3941
return q;
4042
});
4143

4244
Client client(std::move(conn_mock));
4345
auto prepared_query = client.PrepareQuery(instance, sql);
4446
ASSERT_STATUS_OK(prepared_query);
47+
48+
// Cancel all pending operations, satisfying any remaining futures.
49+
fake_cq_impl->SimulateCompletion(false);
4550
}
4651

4752
TEST(Client, AsyncPrepareQuery) {
53+
auto fake_cq_impl = std::make_shared<testing_util::FakeCompletionQueueImpl>();
4854
auto conn_mock = std::make_shared<bigtable_mocks::MockDataConnection>();
4955
InstanceResource instance(Project("the-project"), "the-instance");
5056
SqlStatement sql("SELECT * FROM the-table");
5157
EXPECT_CALL(*conn_mock, AsyncPrepareQuery)
52-
.WillOnce([](PrepareQueryParams const& params) {
58+
.WillOnce([&](PrepareQueryParams const& params) {
5359
EXPECT_EQ("projects/the-project/instances/the-instance",
5460
params.instance.FullName());
5561
EXPECT_EQ("SELECT * FROM the-table", params.sql_statement.sql());
56-
PreparedQuery q(CompletionQueue{}, params.instance,
62+
PreparedQuery q(CompletionQueue{fake_cq_impl}, params.instance,
5763
params.sql_statement, PrepareQueryResponse{});
5864
return make_ready_future(make_status_or(std::move(q)));
5965
});
6066
Client client(std::move(conn_mock));
6167
auto prepared_query = client.AsyncPrepareQuery(instance, sql);
6268
ASSERT_STATUS_OK(prepared_query.get());
69+
70+
// Cancel all pending operations, satisfying any remaining futures.
71+
fake_cq_impl->SimulateCompletion(false);
6372
}
6473

6574
TEST(Client, ExecuteQuery) {
75+
auto fake_cq_impl = std::make_shared<testing_util::FakeCompletionQueueImpl>();
6676
auto conn = MakeDataConnection();
6777
Client client(conn);
6878
InstanceResource instance(Project("test-project"), "test-instance");
6979
SqlStatement sql("SELECT * FROM `test-table`");
70-
auto prepared_query =
71-
PreparedQuery(CompletionQueue{}, instance, sql, PrepareQueryResponse{});
80+
auto prepared_query = PreparedQuery(CompletionQueue{fake_cq_impl}, instance,
81+
sql, PrepareQueryResponse{});
7282
auto bound_query = prepared_query.BindParameters({});
7383
auto row_stream = client.ExecuteQuery(std::move(bound_query));
7484
// We expect a row stream with a single unimplemented status row while
@@ -78,6 +88,9 @@ TEST(Client, ExecuteQuery) {
7888
StatusIs(StatusCode::kUnimplemented, "not implemented"));
7989
}
8090
EXPECT_EQ(1, std::distance(row_stream.begin(), row_stream.end()));
91+
92+
// Cancel all pending operations, satisfying any remaining futures.
93+
fake_cq_impl->SimulateCompletion(false);
8194
}
8295

8396
} // namespace

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -647,9 +647,26 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
647647
if (!response) {
648648
return std::move(response).status();
649649
}
650-
return bigtable::PreparedQuery(background_->cq(), params.instance,
651-
std::move(params.sql_statement),
652-
*std::move(response));
650+
auto const* func = __func__;
651+
auto refresh_fn = [this, request, current, func]() mutable {
652+
auto retry = retry_policy(*current);
653+
auto backoff = backoff_policy(*current);
654+
return google::cloud::internal::AsyncRetryLoop(
655+
std::move(retry), std::move(backoff), Idempotency::kIdempotent,
656+
background_->cq(),
657+
[this](CompletionQueue& cq,
658+
std::shared_ptr<grpc::ClientContext> context,
659+
google::cloud::internal::ImmutableOptions options,
660+
google::bigtable::v2::PrepareQueryRequest const& request) {
661+
return stub_->AsyncPrepareQuery(cq, std::move(context),
662+
std::move(options), request);
663+
},
664+
std::move(current), request, func);
665+
};
666+
auto query_plan = QueryPlan::Create(background_->cq(), *std::move(response),
667+
std::move(refresh_fn));
668+
return bigtable::PreparedQuery(
669+
params.instance, std::move(params.sql_statement), std::move(query_plan));
653670
}
654671

655672
future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
@@ -667,6 +684,7 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
667684
auto backoff = backoff_policy(*current);
668685
auto operation_context = operation_context_factory_->PrepareQuery(
669686
instance_full_name, app_profile_id(*current));
687+
auto const* func = __func__;
670688
return google::cloud::internal::AsyncRetryLoop(
671689
std::move(retry), std::move(backoff), Idempotency::kIdempotent,
672690
background_->cq(),
@@ -685,18 +703,37 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
685703
return s;
686704
});
687705
},
688-
std::move(current), request, __func__)
689-
.then([this, operation_context, params = std::move(params)](
690-
future<StatusOr<google::bigtable::v2::PrepareQueryResponse>>
691-
future) -> StatusOr<bigtable::PreparedQuery> {
706+
current, request, func)
707+
.then([this, request, operation_context, current,
708+
params = std::move(params),
709+
func](future<StatusOr<google::bigtable::v2::PrepareQueryResponse>>
710+
future) -> StatusOr<bigtable::PreparedQuery> {
692711
auto response = future.get();
693712
operation_context->OnDone(response.status());
694713
if (!response) {
695714
return std::move(response).status();
696715
}
697-
return bigtable::PreparedQuery(background_->cq(), params.instance,
698-
params.sql_statement,
699-
*std::move(response));
716+
717+
auto refresh_fn = [this, request, current, func]() mutable {
718+
auto retry = retry_policy(*current);
719+
auto backoff = backoff_policy(*current);
720+
return google::cloud::internal::AsyncRetryLoop(
721+
std::move(retry), std::move(backoff), Idempotency::kIdempotent,
722+
background_->cq(),
723+
[this](CompletionQueue& cq,
724+
std::shared_ptr<grpc::ClientContext> context,
725+
google::cloud::internal::ImmutableOptions options,
726+
google::bigtable::v2::PrepareQueryRequest const& request) {
727+
return stub_->AsyncPrepareQuery(cq, std::move(context),
728+
std::move(options), request);
729+
},
730+
std::move(current), request, func);
731+
};
732+
733+
auto query_plan = QueryPlan::Create(background_->cq(),
734+
*std::move(response), refresh_fn);
735+
return bigtable::PreparedQuery(params.instance, params.sql_statement,
736+
std::move(query_plan));
700737
});
701738
}
702739

google/cloud/bigtable/internal/data_connection_impl_test.cc

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
#include "google/cloud/credentials.h"
3131
#include "google/cloud/grpc_options.h"
3232
#include "google/cloud/internal/async_streaming_read_rpc_impl.h"
33+
#include "google/cloud/internal/time_utils.h"
34+
#include "google/cloud/testing_util/fake_completion_queue_impl.h"
3335
#include "google/cloud/testing_util/mock_backoff_policy.h"
3436
#include "google/cloud/testing_util/status_matchers.h"
3537
#include "google/cloud/testing_util/validate_metadata.h"
@@ -58,6 +60,7 @@ using ::google::cloud::bigtable::testing::MockMutateRowsLimiter;
5860
using ::google::cloud::bigtable::testing::MockMutateRowsStream;
5961
using ::google::cloud::bigtable::testing::MockReadRowsStream;
6062
using ::google::cloud::bigtable::testing::MockSampleRowKeysStream;
63+
using ::google::cloud::testing_util::FakeCompletionQueueImpl;
6164
using ::google::cloud::testing_util::MockBackoffPolicy;
6265
using ::google::cloud::testing_util::StatusIs;
6366
using ::testing::An;
@@ -217,6 +220,17 @@ std::shared_ptr<DataConnectionImpl> TestConnection(
217220
std::move(background), std::move(stub), std::move(limiter), Options{});
218221
}
219222

223+
std::shared_ptr<DataConnectionImpl> TestConnection(
224+
std::unique_ptr<BackgroundThreads> background,
225+
std::shared_ptr<BigtableStub> stub,
226+
std::unique_ptr<OperationContextFactory> operation_context_factory,
227+
std::shared_ptr<MutateRowsLimiter> limiter =
228+
std::make_shared<NoopMutateRowsLimiter>()) {
229+
return std::make_shared<DataConnectionImpl>(
230+
std::move(background), std::move(stub),
231+
std::move(operation_context_factory), std::move(limiter), Options{});
232+
}
233+
220234
std::shared_ptr<DataConnectionImpl> TestConnection(
221235
std::shared_ptr<BigtableStub> stub,
222236
std::unique_ptr<OperationContextFactory> operation_context_factory,
@@ -266,6 +280,11 @@ TEST(TransformReadModifyWriteRowResponse, Basic) {
266280
MatchCell(c3), MatchCell(c4)));
267281
}
268282

283+
class MockBackgroundThreads : public BackgroundThreads {
284+
public:
285+
MOCK_METHOD(CompletionQueue, cq, (), (const, override));
286+
};
287+
269288
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
270289

271290
class MockMetric : public Metric {
@@ -2736,17 +2755,40 @@ TEST_F(DataConnectionTest, AsyncReadRowFailure) {
27362755
}
27372756

27382757
TEST_F(DataConnectionTest, ExecuteQuery) {
2739-
auto conn = TestConnection(std::make_shared<MockBigtableStub>());
2758+
auto fake_cq_impl = std::make_shared<FakeCompletionQueueImpl>();
2759+
auto mock_bg = std::make_unique<MockBackgroundThreads>();
2760+
EXPECT_CALL(*mock_bg, cq).WillRepeatedly([&]() {
2761+
return CompletionQueue{fake_cq_impl};
2762+
});
2763+
2764+
auto refresh_fn = []() {
2765+
return make_ready_future(
2766+
StatusOr<google::bigtable::v2::PrepareQueryResponse>(
2767+
Status{StatusCode::kUnimplemented, "not implemented"}));
2768+
};
2769+
2770+
v2::PrepareQueryResponse response;
2771+
*response.mutable_valid_until() = internal::ToProtoTimestamp(
2772+
std::chrono::system_clock::now() + std::chrono::seconds(3600));
2773+
2774+
auto query_plan =
2775+
QueryPlan::Create(CompletionQueue(fake_cq_impl), std::move(response),
2776+
std::move(refresh_fn));
2777+
2778+
auto mock_stub = std::make_shared<MockBigtableStub>();
2779+
auto conn = TestConnection(std::move(mock_stub));
27402780
internal::OptionsSpan span(CallOptions());
27412781
bigtable::InstanceResource instance(Project("test-project"), "test-instance");
27422782
bigtable::SqlStatement sql("SELECT * FROM `test-table`");
27432783
auto prepared_query =
2744-
bigtable::PreparedQuery(CompletionQueue{}, instance, sql,
2745-
google::bigtable::v2::PrepareQueryResponse{});
2784+
bigtable::PreparedQuery(instance, sql, std::move(query_plan));
27462785
auto bound_query = prepared_query.BindParameters({});
27472786
EXPECT_THAT(
27482787
conn->ExecuteQuery(bigtable::ExecuteQueryParams{std::move(bound_query)}),
27492788
StatusIs(StatusCode::kUnimplemented));
2789+
2790+
// Cancel all pending operations, satisfying any remaining futures.
2791+
fake_cq_impl->SimulateCompletion(false);
27502792
}
27512793

27522794
TEST_F(DataConnectionTest, PrepareQuerySuccess) {
@@ -2773,10 +2815,19 @@ TEST_F(DataConnectionTest, PrepareQuerySuccess) {
27732815
request.instance_name());
27742816
EXPECT_EQ("SELECT * FROM the-table", request.query());
27752817
v2::PrepareQueryResponse response;
2818+
*response.mutable_valid_until() = internal::ToProtoTimestamp(
2819+
std::chrono::system_clock::now() + std::chrono::seconds(3600));
27762820
return response;
27772821
});
27782822

2779-
auto conn = TestConnection(std::move(mock), std::move(factory));
2823+
auto fake_cq_impl = std::make_shared<FakeCompletionQueueImpl>();
2824+
auto mock_bg = std::make_unique<MockBackgroundThreads>();
2825+
EXPECT_CALL(*mock_bg, cq).WillRepeatedly([&]() {
2826+
return CompletionQueue{fake_cq_impl};
2827+
});
2828+
2829+
auto conn =
2830+
TestConnection(std::move(mock_bg), std::move(mock), std::move(factory));
27802831
internal::OptionsSpan span(CallOptions());
27812832
auto params = bigtable::PrepareQueryParams{
27822833
bigtable::InstanceResource(google::cloud::Project("the-project"),
@@ -2786,6 +2837,9 @@ TEST_F(DataConnectionTest, PrepareQuerySuccess) {
27862837
ASSERT_STATUS_OK(prepared_query);
27872838
EXPECT_EQ(prepared_query->instance(), params.instance);
27882839
EXPECT_EQ(prepared_query->sql_statement(), params.sql_statement);
2840+
2841+
// Cancel all pending operations, satisfying any remaining futures.
2842+
fake_cq_impl->SimulateCompletion(false);
27892843
}
27902844

27912845
TEST_F(DataConnectionTest, PrepareQueryPermanentError) {
@@ -2844,7 +2898,14 @@ TEST_F(DataConnectionTest, AsyncPrepareQuerySuccess) {
28442898
return make_ready_future(make_status_or(v2::PrepareQueryResponse{}));
28452899
});
28462900

2847-
auto conn = TestConnection(std::move(mock), std::move(factory));
2901+
auto fake_cq_impl = std::make_shared<FakeCompletionQueueImpl>();
2902+
auto mock_bg = std::make_unique<MockBackgroundThreads>();
2903+
EXPECT_CALL(*mock_bg, cq).WillRepeatedly([&]() {
2904+
return CompletionQueue{fake_cq_impl};
2905+
});
2906+
2907+
auto conn =
2908+
TestConnection(std::move(mock_bg), std::move(mock), std::move(factory));
28482909
internal::OptionsSpan span(CallOptions());
28492910
auto params = bigtable::PrepareQueryParams{
28502911
bigtable::InstanceResource(google::cloud::Project("the-project"),
@@ -2853,6 +2914,9 @@ TEST_F(DataConnectionTest, AsyncPrepareQuerySuccess) {
28532914
auto future = conn->AsyncPrepareQuery(params);
28542915
auto result = future.get();
28552916
ASSERT_STATUS_OK(result);
2917+
2918+
// Cancel all pending operations, satisfying any remaining futures.
2919+
fake_cq_impl->SimulateCompletion(false);
28562920
}
28572921

28582922
TEST_F(DataConnectionTest, AsyncPrepareQueryPermanentError) {

0 commit comments

Comments
 (0)