Skip to content

Commit cfa8052

Browse files
feat(bigtable): add suport for OTEL in [Async]PrepareQuery (#15670)
* feat(bigtable): add suport for OTEL * chore: implement context factory side * chore: metrics in conn impl for prepare query rpc * chore: clang-tidy fixes * chore: linter fixes * feat(bigtable): add suport for OTEL in [Async]PrepareQuery * chore: implement context factory side * chore: metrics in conn impl for prepare query rpc * chore: clang-tidy fixes * chore: linter fixes * chore: cleanup * chore: address review comments
1 parent ed85fbe commit cfa8052

File tree

5 files changed

+245
-16
lines changed

5 files changed

+245
-16
lines changed

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -622,20 +622,28 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
622622
bigtable::PrepareQueryParams const& params) {
623623
auto current = google::cloud::internal::SaveCurrentOptions();
624624
google::bigtable::v2::PrepareQueryRequest request;
625-
request.set_instance_name(params.instance.FullName());
625+
auto instance_full_name = params.instance.FullName();
626+
request.set_instance_name(instance_full_name);
626627
request.set_app_profile_id(app_profile_id(*current));
627628
request.set_query(params.sql_statement.sql());
628629
for (auto const& p : params.sql_statement.params()) {
629630
(*request.mutable_param_types())[p.first] = p.second.type();
630631
}
632+
auto operation_context = operation_context_factory_->PrepareQuery(
633+
instance_full_name, app_profile_id(*current));
631634
auto response = google::cloud::internal::RetryLoop(
632635
retry_policy(*current), backoff_policy(*current),
633636
Idempotency::kIdempotent,
634-
[this](grpc::ClientContext& context, Options const& options,
635-
google::bigtable::v2::PrepareQueryRequest const& request) {
636-
return stub_->PrepareQuery(context, options, request);
637+
[this, operation_context](
638+
grpc::ClientContext& context, Options const& options,
639+
google::bigtable::v2::PrepareQueryRequest const& request) {
640+
operation_context->PreCall(context);
641+
auto const& result = stub_->PrepareQuery(context, options, request);
642+
operation_context->PostCall(context, result.status());
643+
return result;
637644
},
638645
*current, request, __func__);
646+
operation_context->OnDone(response.status());
639647
if (!response) {
640648
return std::move(response).status();
641649
}
@@ -648,29 +656,41 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
648656
bigtable::PrepareQueryParams const& params) {
649657
auto current = google::cloud::internal::SaveCurrentOptions();
650658
google::bigtable::v2::PrepareQueryRequest request;
651-
request.set_instance_name(params.instance.FullName());
659+
auto instance_full_name = params.instance.FullName();
660+
request.set_instance_name(instance_full_name);
652661
request.set_app_profile_id(app_profile_id(*current));
653662
request.set_query(params.sql_statement.sql());
654663
for (auto const& p : params.sql_statement.params()) {
655664
(*request.mutable_param_types())[p.first] = p.second.type();
656665
}
657666
auto retry = retry_policy(*current);
658667
auto backoff = backoff_policy(*current);
668+
auto operation_context = operation_context_factory_->PrepareQuery(
669+
instance_full_name, app_profile_id(*current));
659670
return google::cloud::internal::AsyncRetryLoop(
660671
std::move(retry), std::move(backoff), Idempotency::kIdempotent,
661672
background_->cq(),
662-
[this](CompletionQueue& cq,
663-
std::shared_ptr<grpc::ClientContext> context,
664-
google::cloud::internal::ImmutableOptions options,
665-
google::bigtable::v2::PrepareQueryRequest const& request) {
666-
return stub_->AsyncPrepareQuery(cq, std::move(context),
667-
std::move(options), request);
673+
[this, operation_context](
674+
CompletionQueue& cq,
675+
std::shared_ptr<grpc::ClientContext> context,
676+
google::cloud::internal::ImmutableOptions options,
677+
google::bigtable::v2::PrepareQueryRequest const& request) {
678+
operation_context->PreCall(*context);
679+
auto f = stub_->AsyncPrepareQuery(cq, context,
680+
std::move(options), request);
681+
return f.then(
682+
[operation_context, context = std::move(context)](auto f) {
683+
auto s = f.get();
684+
operation_context->PostCall(*context, s.status());
685+
return s;
686+
});
668687
},
669688
std::move(current), request, __func__)
670-
.then([this, params = std::move(params)](
689+
.then([this, operation_context, params = std::move(params)](
671690
future<StatusOr<google::bigtable::v2::PrepareQueryResponse>>
672691
future) -> StatusOr<bigtable::PreparedQuery> {
673692
auto response = future.get();
693+
operation_context->OnDone(response.status());
674694
if (!response) {
675695
return std::move(response).status();
676696
}

google/cloud/bigtable/internal/data_connection_impl_test.cc

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,16 @@ class FakeOperationContextFactory : public OperationContextFactory {
354354
return Helper(name, app_profile);
355355
}
356356

357+
std::shared_ptr<OperationContext> PrepareQuery(
358+
std::string const&, std::string const& app_profile) override {
359+
return Helper("", app_profile);
360+
}
361+
362+
std::shared_ptr<OperationContext> ExecuteQuery(
363+
std::string const&, std::string const& app_profile) override {
364+
return Helper("", app_profile);
365+
}
366+
357367
private:
358368
std::shared_ptr<OperationContext> Helper(std::string const& name,
359369
std::string const& app_profile) {
@@ -2740,6 +2750,20 @@ TEST_F(DataConnectionTest, ExecuteQuery) {
27402750
}
27412751

27422752
TEST_F(DataConnectionTest, PrepareQuerySuccess) {
2753+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
2754+
auto mock_metric = std::make_unique<MockMetric>();
2755+
EXPECT_CALL(*mock_metric, PreCall).Times(1);
2756+
EXPECT_CALL(*mock_metric, PostCall).Times(1);
2757+
EXPECT_CALL(*mock_metric, OnDone).Times(1);
2758+
EXPECT_CALL(*mock_metric, ElementRequest).Times(0);
2759+
EXPECT_CALL(*mock_metric, ElementDelivery).Times(0);
2760+
auto fake_metric = std::make_shared<CloningMetric>(std::move(mock_metric));
2761+
auto clock = std::make_shared<testing_util::FakeSteadyClock>();
2762+
auto factory = std::make_unique<FakeOperationContextFactory>(
2763+
ResourceLabels{}, DataLabels{}, fake_metric, clock);
2764+
#else
2765+
auto factory = std::make_unique<SimpleOperationContextFactory>();
2766+
#endif
27432767
auto mock = std::make_shared<MockBigtableStub>();
27442768
EXPECT_CALL(*mock, PrepareQuery)
27452769
.WillOnce([](grpc::ClientContext&, Options const&,
@@ -2752,7 +2776,7 @@ TEST_F(DataConnectionTest, PrepareQuerySuccess) {
27522776
return response;
27532777
});
27542778

2755-
auto conn = TestConnection(std::move(mock));
2779+
auto conn = TestConnection(std::move(mock), std::move(factory));
27562780
internal::OptionsSpan span(CallOptions());
27572781
auto params = bigtable::PrepareQueryParams{
27582782
bigtable::InstanceResource(google::cloud::Project("the-project"),
@@ -2765,13 +2789,27 @@ TEST_F(DataConnectionTest, PrepareQuerySuccess) {
27652789
}
27662790

27672791
TEST_F(DataConnectionTest, PrepareQueryPermanentError) {
2792+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
2793+
auto mock_metric = std::make_unique<MockMetric>();
2794+
EXPECT_CALL(*mock_metric, PreCall).Times(1);
2795+
EXPECT_CALL(*mock_metric, PostCall).Times(1);
2796+
EXPECT_CALL(*mock_metric, OnDone).Times(1);
2797+
EXPECT_CALL(*mock_metric, ElementRequest).Times(0);
2798+
EXPECT_CALL(*mock_metric, ElementDelivery).Times(0);
2799+
auto fake_metric = std::make_shared<CloningMetric>(std::move(mock_metric));
2800+
auto clock = std::make_shared<testing_util::FakeSteadyClock>();
2801+
auto factory = std::make_unique<FakeOperationContextFactory>(
2802+
ResourceLabels{}, DataLabels{}, fake_metric, clock);
2803+
#else
2804+
auto factory = std::make_unique<SimpleOperationContextFactory>();
2805+
#endif
27682806
auto mock = std::make_shared<MockBigtableStub>();
27692807
EXPECT_CALL(*mock, PrepareQuery)
27702808
.WillOnce(
27712809
[](grpc::ClientContext&, Options const&,
27722810
v2::PrepareQueryRequest const&) { return PermanentError(); });
27732811

2774-
auto conn = TestConnection(std::move(mock));
2812+
auto conn = TestConnection(std::move(mock), std::move(factory));
27752813
internal::OptionsSpan span(CallOptions());
27762814
auto result = conn->PrepareQuery(bigtable::PrepareQueryParams{
27772815
bigtable::InstanceResource(google::cloud::Project("the-project"),
@@ -2781,6 +2819,20 @@ TEST_F(DataConnectionTest, PrepareQueryPermanentError) {
27812819
}
27822820

27832821
TEST_F(DataConnectionTest, AsyncPrepareQuerySuccess) {
2822+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
2823+
auto mock_metric = std::make_unique<MockMetric>();
2824+
EXPECT_CALL(*mock_metric, PreCall).Times(1);
2825+
EXPECT_CALL(*mock_metric, PostCall).Times(1);
2826+
EXPECT_CALL(*mock_metric, OnDone).Times(1);
2827+
EXPECT_CALL(*mock_metric, ElementRequest).Times(0);
2828+
EXPECT_CALL(*mock_metric, ElementDelivery).Times(0);
2829+
auto fake_metric = std::make_shared<CloningMetric>(std::move(mock_metric));
2830+
auto clock = std::make_shared<testing_util::FakeSteadyClock>();
2831+
auto factory = std::make_unique<FakeOperationContextFactory>(
2832+
ResourceLabels{}, DataLabels{}, fake_metric, clock);
2833+
#else
2834+
auto factory = std::make_unique<SimpleOperationContextFactory>();
2835+
#endif
27842836
auto mock = std::make_shared<MockBigtableStub>();
27852837
EXPECT_CALL(*mock, AsyncPrepareQuery)
27862838
.WillOnce([](CompletionQueue const&, auto, auto,
@@ -2792,7 +2844,7 @@ TEST_F(DataConnectionTest, AsyncPrepareQuerySuccess) {
27922844
return make_ready_future(make_status_or(v2::PrepareQueryResponse{}));
27932845
});
27942846

2795-
auto conn = TestConnection(std::move(mock));
2847+
auto conn = TestConnection(std::move(mock), std::move(factory));
27962848
internal::OptionsSpan span(CallOptions());
27972849
auto params = bigtable::PrepareQueryParams{
27982850
bigtable::InstanceResource(google::cloud::Project("the-project"),
@@ -2804,6 +2856,20 @@ TEST_F(DataConnectionTest, AsyncPrepareQuerySuccess) {
28042856
}
28052857

28062858
TEST_F(DataConnectionTest, AsyncPrepareQueryPermanentError) {
2859+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
2860+
auto mock_metric = std::make_unique<MockMetric>();
2861+
EXPECT_CALL(*mock_metric, PreCall).Times(1);
2862+
EXPECT_CALL(*mock_metric, PostCall).Times(1);
2863+
EXPECT_CALL(*mock_metric, OnDone).Times(1);
2864+
EXPECT_CALL(*mock_metric, ElementRequest).Times(0);
2865+
EXPECT_CALL(*mock_metric, ElementDelivery).Times(0);
2866+
auto fake_metric = std::make_shared<CloningMetric>(std::move(mock_metric));
2867+
auto clock = std::make_shared<testing_util::FakeSteadyClock>();
2868+
auto factory = std::make_unique<FakeOperationContextFactory>(
2869+
ResourceLabels{}, DataLabels{}, fake_metric, clock);
2870+
#else
2871+
auto factory = std::make_unique<SimpleOperationContextFactory>();
2872+
#endif
28072873
auto mock = std::make_shared<MockBigtableStub>();
28082874
EXPECT_CALL(*mock, AsyncPrepareQuery)
28092875
.WillOnce(
@@ -2812,7 +2878,7 @@ TEST_F(DataConnectionTest, AsyncPrepareQueryPermanentError) {
28122878
PermanentError());
28132879
});
28142880

2815-
auto conn = TestConnection(std::move(mock));
2881+
auto conn = TestConnection(std::move(mock), std::move(factory));
28162882
internal::OptionsSpan span(CallOptions());
28172883
auto params = bigtable::PrepareQueryParams{
28182884
bigtable::InstanceResource(google::cloud::Project("the-project"),

google/cloud/bigtable/internal/operation_context_factory.cc

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ ResourceLabels ResourceLabelsFromTableName(std::string const& table_name) {
4848
return resource_labels;
4949
}
5050

51+
ResourceLabels ResourceLabelsFromInstanceName(
52+
std::string const& instance_name) {
53+
// split instance_name into component pieces
54+
// projects/<project>/instances/<instance>
55+
std::vector<absl::string_view> name_parts =
56+
absl::StrSplit(instance_name, '/');
57+
if (name_parts.size() < 4) return {};
58+
ResourceLabels resource_labels = {std::string(name_parts[1]),
59+
std::string(name_parts[3]), "",
60+
"" /*=cluster*/, "" /*=zone*/};
61+
return resource_labels;
62+
}
63+
5164
} // namespace
5265
#endif // GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
5366

@@ -82,6 +95,16 @@ std::shared_ptr<OperationContext> OperationContextFactory::ReadModifyWriteRow(
8295
return std::make_shared<OperationContext>();
8396
}
8497

98+
std::shared_ptr<OperationContext> OperationContextFactory::PrepareQuery(
99+
std::string const&, std::string const&) {
100+
return std::make_shared<OperationContext>();
101+
}
102+
103+
std::shared_ptr<OperationContext> OperationContextFactory::ExecuteQuery(
104+
std::string const&, std::string const&) {
105+
return std::make_shared<OperationContext>();
106+
}
107+
85108
std::shared_ptr<OperationContext> SimpleOperationContextFactory::ReadRow(
86109
std::string const&, std::string const&) {
87110
return std::make_shared<OperationContext>();
@@ -113,6 +136,16 @@ SimpleOperationContextFactory::ReadModifyWriteRow(std::string const&,
113136
return std::make_shared<OperationContext>();
114137
}
115138

139+
std::shared_ptr<OperationContext> SimpleOperationContextFactory::PrepareQuery(
140+
std::string const&, std::string const&) {
141+
return std::make_shared<OperationContext>();
142+
}
143+
144+
std::shared_ptr<OperationContext> SimpleOperationContextFactory::ExecuteQuery(
145+
std::string const&, std::string const&) {
146+
return std::make_shared<OperationContext>();
147+
}
148+
116149
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
117150

118151
MetricsOperationContextFactory::MetricsOperationContextFactory(
@@ -154,6 +187,12 @@ MetricsOperationContextFactory::MetricsOperationContextFactory(
154187
absl::call_once(read_modify_write_row_metrics_.once, [this, metric]() {
155188
read_modify_write_row_metrics_.metrics.push_back(metric);
156189
});
190+
absl::call_once(prepare_query_metrics_.once, [this, metric]() {
191+
prepare_query_metrics_.metrics.push_back(metric);
192+
});
193+
absl::call_once(execute_query_metrics_.once, [this, metric]() {
194+
execute_query_metrics_.metrics.push_back(metric);
195+
});
157196
}
158197

159198
void MetricsOperationContextFactory::InitializeProvider(
@@ -424,6 +463,57 @@ MetricsOperationContextFactory::ReadModifyWriteRow(
424463
clock_);
425464
}
426465

466+
std::shared_ptr<OperationContext> MetricsOperationContextFactory::PrepareQuery(
467+
std::string const& instance_name, std::string const& app_profile) {
468+
auto constexpr kRpc = "PrepareQuery";
469+
absl::call_once(prepare_query_metrics_.once, [this, kRpc]() {
470+
std::vector<std::shared_ptr<Metric const>> v;
471+
v.emplace_back(std::make_shared<OperationLatency>(kRpc, provider_));
472+
v.emplace_back(std::make_shared<AttemptLatency>(kRpc, provider_));
473+
v.emplace_back(std::make_shared<RetryCount>(kRpc, provider_));
474+
v.emplace_back(std::make_shared<ServerLatency>(kRpc, provider_));
475+
v.emplace_back(std::make_shared<ConnectivityErrorCount>(kRpc, provider_));
476+
swap(prepare_query_metrics_.metrics, v);
477+
});
478+
479+
auto resource_labels = ResourceLabelsFromInstanceName(instance_name);
480+
DataLabels data_labels = {kRpc,
481+
"false", /*=streaming*/
482+
"cpp.Bigtable/" + version_string(),
483+
client_uid_,
484+
app_profile,
485+
"" /*=status*/};
486+
487+
return std::make_shared<OperationContext>(
488+
resource_labels, data_labels, prepare_query_metrics_.metrics, clock_);
489+
}
490+
491+
std::shared_ptr<OperationContext> MetricsOperationContextFactory::ExecuteQuery(
492+
std::string const& instance_name, std::string const& app_profile) {
493+
auto constexpr kRpc = "ExecuteQuery";
494+
absl::call_once(execute_query_metrics_.once, [this, kRpc]() {
495+
std::vector<std::shared_ptr<Metric const>> v;
496+
v.emplace_back(std::make_shared<OperationLatency>(kRpc, provider_));
497+
v.emplace_back(std::make_shared<AttemptLatency>(kRpc, provider_));
498+
v.emplace_back(std::make_shared<RetryCount>(kRpc, provider_));
499+
v.emplace_back(std::make_shared<FirstResponseLatency>(kRpc, provider_));
500+
v.emplace_back(std::make_shared<ServerLatency>(kRpc, provider_));
501+
v.emplace_back(std::make_shared<ConnectivityErrorCount>(kRpc, provider_));
502+
swap(execute_query_metrics_.metrics, v);
503+
});
504+
505+
auto resource_labels = ResourceLabelsFromInstanceName(instance_name);
506+
DataLabels data_labels = {kRpc,
507+
"true", /*=streaming*/
508+
"cpp.Bigtable/" + version_string(),
509+
client_uid_,
510+
app_profile,
511+
"" /*=status*/};
512+
513+
return std::make_shared<OperationContext>(
514+
resource_labels, data_labels, execute_query_metrics_.metrics, clock_);
515+
}
516+
427517
#endif // GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
428518

429519
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/internal/operation_context_factory.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ class OperationContextFactory {
5454
std::string const& name, std::string const& app_profile);
5555
virtual std::shared_ptr<OperationContext> ReadModifyWriteRow(
5656
std::string const& name, std::string const& app_profile);
57+
virtual std::shared_ptr<OperationContext> PrepareQuery(
58+
std::string const& instance_name, std::string const& app_profile);
59+
virtual std::shared_ptr<OperationContext> ExecuteQuery(
60+
std::string const& instance_name, std::string const& app_profile);
5761
};
5862

5963
class SimpleOperationContextFactory : public OperationContextFactory {
@@ -72,6 +76,12 @@ class SimpleOperationContextFactory : public OperationContextFactory {
7276
std::string const& name, std::string const& app_profile) override;
7377
std::shared_ptr<OperationContext> ReadModifyWriteRow(
7478
std::string const& name, std::string const& app_profile) override;
79+
std::shared_ptr<OperationContext> PrepareQuery(
80+
std::string const& instance_name,
81+
std::string const& app_profile) override;
82+
std::shared_ptr<OperationContext> ExecuteQuery(
83+
std::string const& instance_name,
84+
std::string const& app_profile) override;
7585
};
7686

7787
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
@@ -115,6 +125,13 @@ class MetricsOperationContextFactory : public OperationContextFactory {
115125
std::shared_ptr<OperationContext> ReadModifyWriteRow(
116126
std::string const& table_name, std::string const& app_profile) override;
117127

128+
std::shared_ptr<OperationContext> PrepareQuery(
129+
std::string const& instance_name,
130+
std::string const& app_profile) override;
131+
std::shared_ptr<OperationContext> ExecuteQuery(
132+
std::string const& instance_name,
133+
std::string const& app_profile) override;
134+
118135
private:
119136
void InitializeProvider(
120137
std::shared_ptr<monitoring_v3::MetricServiceConnection> conn,
@@ -137,6 +154,8 @@ class MetricsOperationContextFactory : public OperationContextFactory {
137154
MetricHolder check_and_mutate_row_metrics_;
138155
MetricHolder sample_row_keys_metrics_;
139156
MetricHolder read_modify_write_row_metrics_;
157+
MetricHolder prepare_query_metrics_;
158+
MetricHolder execute_query_metrics_;
140159
};
141160

142161
#endif // GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS

0 commit comments

Comments
 (0)