Skip to content

Commit 313620e

Browse files
authored
impl(bigtable): update OperationContext to support metrics (#15280)
1 parent c89cb71 commit 313620e

File tree

10 files changed

+328
-25
lines changed

10 files changed

+328
-25
lines changed

google/cloud/bigtable/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ cc_library(
113113
[cc_test(
114114
name = test.replace("/", "_").replace(".cc", ""),
115115
srcs = [test],
116+
local_defines = select({
117+
":metrics_enabled": ["GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS"],
118+
"//conditions:default": [],
119+
}),
116120
deps = [
117121
":bigtable_client_testing",
118122
":google_cloud_cpp_bigtable",

google/cloud/bigtable/internal/async_bulk_apply.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ void AsyncBulkApplier::OnFinish(Status const& status) {
105105
return;
106106
}
107107

108-
operation_context_->PostCall(*context_);
108+
operation_context_->PostCall(*context_, {});
109109
context_.reset();
110110
auto self = this->shared_from_this();
111111
internal::TracedAsyncBackoff(cq_, *call_context_.options, *delay,

google/cloud/bigtable/internal/async_row_reader.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ void AsyncRowReader::OnStreamFinished(Status status) {
220220
TryGiveRowToUser();
221221
return;
222222
}
223-
operation_context_->PostCall(*context_);
223+
operation_context_->PostCall(*context_, {});
224224
context_.reset();
225225
auto self = this->shared_from_this();
226226
internal::TracedAsyncBackoff(cq_, *call_context_.options, *delay,

google/cloud/bigtable/internal/async_row_sampler.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ void AsyncRowSampler::OnFinish(Status const& status) {
9696
return;
9797
}
9898

99-
operation_context_->PostCall(*context_);
99+
operation_context_->PostCall(*context_, {});
100100
context_.reset();
101101
samples_.clear();
102102
auto self = this->shared_from_this();

google/cloud/bigtable/internal/bulk_mutator.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ Status BulkMutator::MakeOneRequest(BigtableStub& stub,
245245
while (absl::visit(UnpackVariant{state_, limiter, enable_server_retries},
246246
stream->Read())) {
247247
}
248-
operation_context_.PostCall(*context);
248+
operation_context_.PostCall(*context, {});
249249
return state_.last_status();
250250
}
251251

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ Status DataConnectionImpl::Apply(std::string const& table_name,
114114
google::bigtable::v2::MutateRowRequest const& request) {
115115
operation_context.PreCall(context);
116116
auto s = stub_->MutateRow(context, options, request);
117-
operation_context.PostCall(context);
117+
operation_context.PostCall(context, s.status());
118118
return s;
119119
},
120120
*current, request, __func__);
@@ -156,7 +156,7 @@ future<Status> DataConnectionImpl::AsyncApply(std::string const& table_name,
156156
return f.then(
157157
[operation_context, context = std::move(context)](auto f) {
158158
auto s = f.get();
159-
operation_context->PostCall(*context);
159+
operation_context->PostCall(*context, s.status());
160160
return s;
161161
});
162162
},
@@ -264,7 +264,7 @@ StatusOr<bigtable::MutationBranch> DataConnectionImpl::CheckAndMutateRow(
264264
google::bigtable::v2::CheckAndMutateRowRequest const& request) {
265265
operation_context.PreCall(context);
266266
auto s = stub_->CheckAndMutateRow(context, options, request);
267-
operation_context.PostCall(context);
267+
operation_context.PostCall(context, s.status());
268268
return s;
269269
},
270270
*current, request, __func__);
@@ -314,7 +314,7 @@ DataConnectionImpl::AsyncCheckAndMutateRow(
314314
return f.then(
315315
[operation_context, context = std::move(context)](auto f) {
316316
auto s = f.get();
317-
operation_context->PostCall(*context);
317+
operation_context->PostCall(*context, s.status());
318318
return s;
319319
});
320320
},
@@ -374,7 +374,7 @@ StatusOr<std::vector<bigtable::RowKeySample>> DataConnectionImpl::SampleRows(
374374
Idempotency::kIdempotent,
375375
enable_server_retries(*current));
376376
if (!delay) return std::move(delay).status();
377-
operation_context.PostCall(*context);
377+
operation_context.PostCall(*context, status);
378378
// A new stream invalidates previously returned samples.
379379
samples.clear();
380380
std::this_thread::sleep_for(*delay);

google/cloud/bigtable/internal/default_row_reader.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ bool DefaultRowReader::NextChunk() {
7979
if (absl::holds_alternative<Status>(v)) {
8080
last_status_ = absl::get<Status>(std::move(v));
8181
response_ = {};
82-
operation_context_.PostCall(*context_);
82+
operation_context_.PostCall(*context_, {});
8383
context_.reset();
8484
return false;
8585
}

google/cloud/bigtable/internal/operation_context.cc

Lines changed: 113 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,32 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/internal/operation_context.h"
16+
#include "google/cloud/bigtable/internal/metrics.h"
1617
#include "absl/strings/match.h"
18+
#include "absl/strings/str_split.h"
19+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
20+
#include <opentelemetry/context/runtime_context.h>
21+
#endif
1722

1823
namespace google {
1924
namespace cloud {
2025
namespace bigtable_internal {
2126
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2227

23-
void OperationContext::PreCall(grpc::ClientContext& context) {
24-
for (auto const& h : cookies_) {
25-
context.AddMetadata(h.first, h.second);
28+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
29+
namespace {
30+
std::vector<std::shared_ptr<Metric>> CloneMetrics(
31+
ResourceLabels const& resource_labels, DataLabels const& data_labels,
32+
std::vector<std::shared_ptr<Metric const>> const& metrics) {
33+
std::vector<std::shared_ptr<Metric>> v;
34+
v.reserve(metrics.size());
35+
for (auto const& m : metrics) {
36+
v.emplace_back(m->clone(resource_labels, data_labels));
2637
}
27-
context.AddMetadata("bigtable-attempt", std::to_string(attempt_number_++));
28-
}
29-
30-
void OperationContext::PostCall(grpc::ClientContext const& context) {
31-
ProcessMetadata(context.GetServerInitialMetadata());
32-
ProcessMetadata(context.GetServerTrailingMetadata());
38+
return v;
3339
}
40+
} // namespace
41+
#endif
3442

3543
void OperationContext::ProcessMetadata(
3644
std::multimap<grpc::string_ref, grpc::string_ref> const& metadata) {
@@ -43,6 +51,102 @@ void OperationContext::ProcessMetadata(
4351
}
4452
}
4553

54+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
55+
56+
OperationContext::OperationContext(
57+
ResourceLabels const& resource_labels, DataLabels const& data_labels,
58+
std::vector<std::shared_ptr<Metric const>> const& metrics,
59+
std::shared_ptr<Clock> clock)
60+
: cloned_metrics_(CloneMetrics(resource_labels, data_labels, metrics)),
61+
clock_(std::move(clock)) {}
62+
63+
void OperationContext::PreCall(grpc::ClientContext& client_context) {
64+
auto otel_context = opentelemetry::context::RuntimeContext::GetCurrent();
65+
auto attempt_start = clock_->Now();
66+
if (attempt_number_ == 0) {
67+
operation_start_ = attempt_start;
68+
}
69+
for (auto& m : cloned_metrics_) {
70+
m->PreCall(otel_context,
71+
PreCallParams{attempt_start, attempt_number_ == 0});
72+
}
73+
74+
for (auto const& h : cookies_) {
75+
client_context.AddMetadata(h.first, h.second);
76+
}
77+
client_context.AddMetadata("bigtable-attempt",
78+
std::to_string(attempt_number_++));
79+
}
80+
81+
void OperationContext::PostCall(grpc::ClientContext const& client_context,
82+
google::cloud::Status const& status) {
83+
ProcessMetadata(client_context.GetServerInitialMetadata());
84+
ProcessMetadata(client_context.GetServerTrailingMetadata());
85+
auto attempt_end = clock_->Now();
86+
auto otel_context = opentelemetry::context::RuntimeContext::GetCurrent();
87+
for (auto& m : cloned_metrics_) {
88+
m->PostCall(otel_context, client_context,
89+
PostCallParams{attempt_end, status});
90+
}
91+
}
92+
93+
void OperationContext::OnDone(Status const& s) {
94+
auto operation_end = clock_->Now();
95+
auto otel_context = opentelemetry::context::RuntimeContext::GetCurrent();
96+
for (auto& m : cloned_metrics_) {
97+
m->OnDone(otel_context, OnDoneParams{operation_end, s});
98+
}
99+
}
100+
101+
void OperationContext::ElementRequest(grpc::ClientContext const&) {
102+
auto element_request = clock_->Now();
103+
auto otel_context = opentelemetry::context::RuntimeContext::GetCurrent();
104+
for (auto& m : cloned_metrics_) {
105+
m->ElementRequest(otel_context, ElementRequestParams{element_request});
106+
}
107+
}
108+
109+
void OperationContext::ElementDelivery(grpc::ClientContext const&) {
110+
auto otel_context = opentelemetry::context::RuntimeContext::GetCurrent();
111+
auto first_response = clock_->Now();
112+
for (auto& m : cloned_metrics_) {
113+
m->ElementDelivery(otel_context,
114+
ElementDeliveryParams{first_response, first_response_});
115+
}
116+
if (first_response_) {
117+
first_response_ = false;
118+
}
119+
}
120+
121+
#else // GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
122+
123+
OperationContext::OperationContext(
124+
ResourceLabels const&, DataLabels const&,
125+
std::vector<std::shared_ptr<Metric const>> const&, std::shared_ptr<Clock>) {
126+
}
127+
128+
void OperationContext::PreCall(grpc::ClientContext& client_context) {
129+
for (auto const& h : cookies_) {
130+
client_context.AddMetadata(h.first, h.second);
131+
}
132+
client_context.AddMetadata("bigtable-attempt",
133+
std::to_string(attempt_number_++));
134+
}
135+
136+
void OperationContext::PostCall(grpc::ClientContext const& client_context,
137+
google::cloud::Status const&) {
138+
ProcessMetadata(client_context.GetServerInitialMetadata());
139+
ProcessMetadata(client_context.GetServerTrailingMetadata());
140+
}
141+
142+
void OperationContext::OnDone(Status const&) {}
143+
144+
void OperationContext::ElementRequest(grpc::ClientContext const&) {}
145+
146+
void OperationContext::ElementDelivery(grpc::ClientContext const&) {}
147+
148+
#endif // GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
149+
46150
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
47151
} // namespace bigtable_internal
48152
} // namespace cloud

google/cloud/bigtable/internal/operation_context.h

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,23 @@
1717

1818
#include "google/cloud/bigtable/version.h"
1919
#include "google/cloud/internal/clock.h"
20+
#include "google/cloud/status.h"
2021
#include <grpcpp/grpcpp.h>
2122
#include <map>
23+
#include <memory>
2224
#include <string>
2325
#include <unordered_map>
26+
#include <vector>
2427

2528
namespace google {
2629
namespace cloud {
2730
namespace bigtable_internal {
2831
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2932

33+
struct DataLabels;
34+
struct ResourceLabels;
35+
class Metric;
36+
3037
/**
3138
* A Bigtable-specific context that persists across retries in an operation.
3239
*
@@ -54,18 +61,39 @@ class OperationContext {
5461
public:
5562
using Clock = ::google::cloud::internal::SteadyClock;
5663

57-
// Adds stored bigtable cookies as client metadata.
58-
void PreCall(grpc::ClientContext& context);
59-
// Stores bigtable cookies returned as server metadata.
60-
void PostCall(grpc::ClientContext const& context);
64+
// The default constructor is used when metric support is unavailable or
65+
// disabled.
66+
OperationContext() = default;
67+
OperationContext(ResourceLabels const& resource_labels,
68+
DataLabels const& data_labels,
69+
std::vector<std::shared_ptr<Metric const>> const& metrics,
70+
std::shared_ptr<Clock> clock);
71+
72+
// Called before each RPC attempt.
73+
void PreCall(grpc::ClientContext& client_context);
74+
// Called after receiving RPC attempt response.
75+
void PostCall(grpc::ClientContext const& client_context,
76+
google::cloud::Status const& status);
77+
// A hook that executes at the end of a client operation.
78+
void OnDone(Status const& status);
79+
// Called during operations that allow the user to iterate over data
80+
// synchronously or asynchronously.
81+
void ElementRequest(grpc::ClientContext const& client_context);
82+
void ElementDelivery(grpc::ClientContext const& client_context);
6183

6284
private:
63-
// Adds cookies that start with "x-goog-cbt-cookie" to the cookie jar.
6485
void ProcessMetadata(
6586
std::multimap<grpc::string_ref, grpc::string_ref> const& metadata);
6687

6788
std::unordered_map<std::string, std::string> cookies_;
6889
int attempt_number_ = 0;
90+
#ifdef GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
91+
std::vector<std::shared_ptr<Metric>> cloned_metrics_;
92+
std::shared_ptr<Clock> clock_ = std::make_shared<Clock>();
93+
Clock::time_point operation_start_;
94+
Clock::time_point attempt_start_;
95+
bool first_response_ = true;
96+
#endif // GOOGLE_CLOUD_CPP_BIGTABLE_WITH_OTEL_METRICS
6997
};
7098

7199
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

0 commit comments

Comments
 (0)