Skip to content

Commit f1bc406

Browse files
authored
feat(bigtable): implement async prepare query for DataConnectionImpl (#15667)
* feat(bigtable): implement async prepare query for DataConnectionImpl
1 parent 717cc7b commit f1bc406

File tree

2 files changed

+76
-3
lines changed

2 files changed

+76
-3
lines changed

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -645,9 +645,39 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
645645
}
646646

647647
future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
648-
bigtable::PrepareQueryParams const&) {
649-
return make_ready_future<StatusOr<bigtable::PreparedQuery>>(
650-
Status(StatusCode::kUnimplemented, "not implemented"));
648+
bigtable::PrepareQueryParams const& params) {
649+
auto current = google::cloud::internal::SaveCurrentOptions();
650+
google::bigtable::v2::PrepareQueryRequest request;
651+
request.set_instance_name(params.instance.FullName());
652+
request.set_app_profile_id(app_profile_id(*current));
653+
request.set_query(params.sql_statement.sql());
654+
for (auto const& p : params.sql_statement.params()) {
655+
(*request.mutable_param_types())[p.first] = p.second.type();
656+
}
657+
auto retry = retry_policy(*current);
658+
auto backoff = backoff_policy(*current);
659+
return google::cloud::internal::AsyncRetryLoop(
660+
std::move(retry), std::move(backoff), Idempotency::kNonIdempotent,
661+
background_->cq(),
662+
[this](CompletionQueue& cq,
663+
std::shared_ptr<grpc::ClientContext> context,
664+
google::cloud::internal::ImmutableOptions options,
665+
google::bigtable::v2::PrepareQueryRequest const& request) {
666+
return stub_->AsyncPrepareQuery(cq, std::move(context),
667+
std::move(options), request);
668+
},
669+
std::move(current), request, __func__)
670+
.then([this, params = std::move(params)](
671+
future<StatusOr<google::bigtable::v2::PrepareQueryResponse>>
672+
future) -> StatusOr<bigtable::PreparedQuery> {
673+
auto response = future.get();
674+
if (!response) {
675+
return std::move(response).status();
676+
}
677+
return bigtable::PreparedQuery(background_->cq(), params.instance,
678+
params.sql_statement,
679+
*std::move(response));
680+
});
651681
}
652682

653683
StatusOr<bigtable::RowStream> DataConnectionImpl::ExecuteQuery(

google/cloud/bigtable/internal/data_connection_impl_test.cc

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2771,6 +2771,49 @@ TEST_F(DataConnectionTest, PrepareQueryPermanentError) {
27712771
EXPECT_THAT(result, StatusIs(StatusCode::kPermissionDenied));
27722772
}
27732773

2774+
TEST_F(DataConnectionTest, AsyncPrepareQuerySuccess) {
2775+
auto mock = std::make_shared<MockBigtableStub>();
2776+
EXPECT_CALL(*mock, AsyncPrepareQuery)
2777+
.WillOnce([](CompletionQueue const&, auto, auto,
2778+
v2::PrepareQueryRequest const& request) {
2779+
EXPECT_EQ(kAppProfile, request.app_profile_id());
2780+
EXPECT_EQ("projects/the-project/instances/the-instance",
2781+
request.instance_name());
2782+
EXPECT_EQ("SELECT * FROM the-table", request.query());
2783+
return make_ready_future(make_status_or(v2::PrepareQueryResponse{}));
2784+
});
2785+
2786+
auto conn = TestConnection(std::move(mock));
2787+
internal::OptionsSpan span(CallOptions());
2788+
auto params = bigtable::PrepareQueryParams{
2789+
bigtable::InstanceResource(google::cloud::Project("the-project"),
2790+
"the-instance"),
2791+
bigtable::SqlStatement("SELECT * FROM the-table")};
2792+
auto future = conn->AsyncPrepareQuery(params);
2793+
auto result = future.get();
2794+
ASSERT_STATUS_OK(result);
2795+
}
2796+
2797+
TEST_F(DataConnectionTest, AsyncPrepareQueryPermanentError) {
2798+
auto mock = std::make_shared<MockBigtableStub>();
2799+
EXPECT_CALL(*mock, AsyncPrepareQuery)
2800+
.WillOnce(
2801+
[](CompletionQueue&, auto, auto, v2::PrepareQueryRequest const&) {
2802+
return make_ready_future<StatusOr<v2::PrepareQueryResponse>>(
2803+
PermanentError());
2804+
});
2805+
2806+
auto conn = TestConnection(std::move(mock));
2807+
internal::OptionsSpan span(CallOptions());
2808+
auto params = bigtable::PrepareQueryParams{
2809+
bigtable::InstanceResource(google::cloud::Project("the-project"),
2810+
"the-instance"),
2811+
bigtable::SqlStatement("SELECT * FROM the-table")};
2812+
auto future = conn->AsyncPrepareQuery(params);
2813+
auto result = future.get();
2814+
EXPECT_THAT(result, StatusIs(StatusCode::kPermissionDenied));
2815+
}
2816+
27742817
} // namespace
27752818
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
27762819
} // namespace bigtable_internal

0 commit comments

Comments
 (0)