Skip to content

Commit 29b7679

Browse files
authored
refactor(pubsub): add a tracing batch sink (#13204)
1 parent b3f5e98 commit 29b7679

File tree

6 files changed

+789
-0
lines changed

6 files changed

+789
-0
lines changed

google/cloud/pubsub/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ add_library(
210210
internal/subscription_message_source.h
211211
internal/subscription_session.cc
212212
internal/subscription_session.h
213+
internal/tracing_batch_sink.cc
214+
internal/tracing_batch_sink.h
213215
internal/tracing_message_batch.cc
214216
internal/tracing_message_batch.h
215217
message.cc
@@ -397,6 +399,7 @@ function (google_cloud_cpp_pubsub_client_define_tests)
397399
internal/subscription_lease_management_test.cc
398400
internal/subscription_message_queue_test.cc
399401
internal/subscription_session_test.cc
402+
internal/tracing_batch_sink_test.cc
400403
internal/tracing_message_batch_test.cc
401404
message_test.cc
402405
options_test.cc

google/cloud/pubsub/google_cloud_cpp_pubsub.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ google_cloud_cpp_pubsub_hdrs = [
109109
"internal/subscription_message_queue.h",
110110
"internal/subscription_message_source.h",
111111
"internal/subscription_session.h",
112+
"internal/tracing_batch_sink.h",
112113
"internal/tracing_message_batch.h",
113114
"message.h",
114115
"options.h",
@@ -220,6 +221,7 @@ google_cloud_cpp_pubsub_srcs = [
220221
"internal/subscription_lease_management.cc",
221222
"internal/subscription_message_queue.cc",
222223
"internal/subscription_session.cc",
224+
"internal/tracing_batch_sink.cc",
223225
"internal/tracing_message_batch.cc",
224226
"message.cc",
225227
"options.cc",
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/pubsub/internal/tracing_batch_sink.h"
16+
#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
17+
#include "google/cloud/pubsub/internal/publisher_stub.h"
18+
#include "google/cloud/pubsub/options.h"
19+
#include "google/cloud/future.h"
20+
#include "google/cloud/internal/opentelemetry.h"
21+
#include "opentelemetry/context/runtime_context.h"
22+
#include "opentelemetry/trace/context.h"
23+
#include "opentelemetry/trace/semantic_conventions.h"
24+
#include "opentelemetry/trace/span.h"
25+
#include <algorithm>
26+
#include <string>
27+
#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
28+
29+
namespace google {
30+
namespace cloud {
31+
namespace pubsub_internal {
32+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
33+
34+
#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
35+
36+
namespace {
37+
using Spans =
38+
std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>>;
39+
40+
using Attributes =
41+
std::vector<std::pair<opentelemetry::nostd::string_view,
42+
opentelemetry::common::AttributeValue>>;
43+
using Links =
44+
std::vector<std::pair<opentelemetry::trace::SpanContext, Attributes>>;
45+
46+
/// Creates a link for each sampled span in the range @p begin to @p end.
47+
auto MakeLinks(Spans::const_iterator begin, Spans::const_iterator end) {
48+
Links links;
49+
Spans sampled_spans;
50+
std::copy_if(begin, end, std::back_inserter(sampled_spans),
51+
[](auto const& span) { return span->GetContext().IsSampled(); });
52+
std::transform(sampled_spans.begin(), sampled_spans.end(),
53+
std::back_inserter(links),
54+
[i = static_cast<std::int64_t>(0)](auto const& span) mutable {
55+
return std::make_pair(
56+
span->GetContext(),
57+
Attributes{{"messaging.gcp_pubsub.message.link", i++}});
58+
});
59+
return links;
60+
}
61+
62+
auto MakeParent(Links const& links, Spans const& message_spans) {
63+
namespace sc = ::opentelemetry::trace::SemanticConventions;
64+
opentelemetry::trace::StartSpanOptions options;
65+
options.kind = opentelemetry::trace::SpanKind::kProducer;
66+
auto batch_sink_parent =
67+
internal::MakeSpan("publish",
68+
/*attributes=*/
69+
{{sc::kMessagingBatchMessageCount,
70+
static_cast<std::int64_t>(message_spans.size())},
71+
{sc::kCodeFunction, "BatchSink::AsyncPublish"},
72+
{/*sc::kMessagingOperation=*/
73+
"messaging.operation", "publish"},
74+
{sc::kThreadId, internal::CurrentThreadId()}},
75+
/*links*/ std::move(links), options);
76+
77+
// Add metadata to the message spans about the batch sink span.
78+
auto context = batch_sink_parent->GetContext();
79+
auto trace_id = internal::ToString(context.trace_id());
80+
auto span_id = internal::ToString(context.span_id());
81+
for (auto const& message_span : message_spans) {
82+
#if OPENTELEMETRY_ABI_VERSION_NO >= 2
83+
message_span->AddEvent("gl-cpp.publish_start");
84+
message_span->AddLink(context, {{}});
85+
#else
86+
message_span->AddEvent("gl-cpp.publish_start",
87+
Attributes{{"gcp_pubsub.publish.trace_id", trace_id},
88+
{"gcp_pubsub.publish.span_id", span_id}});
89+
#endif
90+
}
91+
return batch_sink_parent;
92+
}
93+
94+
auto MakeChild(
95+
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> const& parent,
96+
int count, Links const& links) {
97+
opentelemetry::trace::StartSpanOptions options;
98+
options.parent = parent->GetContext();
99+
options.kind = opentelemetry::trace::SpanKind::kClient;
100+
return internal::MakeSpan("publish #" + std::to_string(count),
101+
/*attributes=*/{{}},
102+
/*links=*/links, options);
103+
}
104+
105+
Spans MakeBatchSinkSpans(Spans const& message_spans, Options const& options) {
106+
auto const max_otel_links = options.get<pubsub::MaxOtelLinkCountOption>();
107+
Spans batch_sink_spans;
108+
// If the batch size is less than the max size, add the links to a single
109+
// span. If the batch size is greater than the max size, create a parent
110+
// span with no links and each child spans will contain links.
111+
if (message_spans.size() <= max_otel_links) {
112+
batch_sink_spans.push_back(MakeParent(
113+
MakeLinks(message_spans.begin(), message_spans.end()), message_spans));
114+
return batch_sink_spans;
115+
}
116+
batch_sink_spans.push_back(MakeParent({{}}, message_spans));
117+
auto batch_sink_parent = batch_sink_spans.front();
118+
119+
auto cut = [&message_spans, max_otel_links](auto i) {
120+
auto const batch_size = static_cast<std::ptrdiff_t>(max_otel_links);
121+
return std::next(
122+
i, std::min(batch_size, std::distance(i, message_spans.end())));
123+
};
124+
int count = 0;
125+
for (auto i = message_spans.begin(); i != message_spans.end(); i = cut(i)) {
126+
// Generates child spans with links between [i, min(i + batch_size, end))
127+
// such that each child span will have exactly batch_size elements or less.
128+
batch_sink_spans.push_back(
129+
MakeChild(batch_sink_parent, count++, MakeLinks(i, cut(i))));
130+
}
131+
132+
return batch_sink_spans;
133+
}
134+
135+
/**
136+
* Records spans related to a batch of messages across calls and
137+
* callbacks in the `BatchingPublisherConnection`.
138+
*/
139+
class TracingBatchSink : public BatchSink {
140+
public:
141+
explicit TracingBatchSink(std::shared_ptr<BatchSink> child, Options opts)
142+
: child_(std::move(child)), options_(std::move(opts)) {}
143+
144+
~TracingBatchSink() override = default;
145+
146+
void AddMessage(pubsub::Message const& m) override {
147+
auto active_span = opentelemetry::trace::GetSpan(
148+
opentelemetry::context::RuntimeContext::GetCurrent());
149+
active_span->AddEvent("gl-cpp.added_to_batch");
150+
{
151+
std::lock_guard<std::mutex> lk(mu_);
152+
message_spans_.push_back(std::move(active_span));
153+
}
154+
child_->AddMessage(std::move(m));
155+
}
156+
157+
future<StatusOr<google::pubsub::v1::PublishResponse>> AsyncPublish(
158+
google::pubsub::v1::PublishRequest request) override {
159+
decltype(message_spans_) message_spans;
160+
{
161+
std::lock_guard<std::mutex> lk(mu_);
162+
message_spans.swap(message_spans_);
163+
}
164+
165+
auto batch_sink_spans = MakeBatchSinkSpans(message_spans, options_);
166+
167+
// The first span in `batch_sink_spans` is the parent to the other spans in
168+
// the vector.
169+
internal::OTelScope scope(batch_sink_spans.front());
170+
return child_->AsyncPublish(std::move(request))
171+
.then([oc = opentelemetry::context::RuntimeContext::GetCurrent(),
172+
spans = std::move(batch_sink_spans),
173+
message_spans = std::move(message_spans)](auto f) mutable {
174+
for (auto& span : message_spans) {
175+
span->AddEvent("gl-cpp.publish_end");
176+
}
177+
for (auto& span : spans) {
178+
internal::EndSpan(*span);
179+
}
180+
internal::DetachOTelContext(oc);
181+
return f;
182+
});
183+
}
184+
185+
void ResumePublish(std::string const& ordering_key) override {
186+
child_->ResumePublish(ordering_key);
187+
};
188+
189+
private:
190+
std::shared_ptr<BatchSink> child_;
191+
std::mutex mu_;
192+
std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>>
193+
message_spans_; // ABSL_GUARDED_BY(mu_)
194+
Options options_;
195+
};
196+
197+
} // namespace
198+
199+
std::shared_ptr<BatchSink> MakeTracingBatchSink(
200+
std::shared_ptr<BatchSink> batch_sink, Options opts) {
201+
return std::make_shared<TracingBatchSink>(std::move(batch_sink),
202+
std::move(opts));
203+
}
204+
205+
#else // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
206+
207+
std::shared_ptr<BatchSink> MakeTracingBatchSink(
208+
std::shared_ptr<BatchSink> batch_sink, Options) {
209+
return batch_sink;
210+
}
211+
212+
#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
213+
214+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
215+
} // namespace pubsub_internal
216+
} // namespace cloud
217+
} // namespace google
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_TRACING_BATCH_SINK_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_TRACING_BATCH_SINK_H
17+
18+
#include "google/cloud/pubsub/internal/batch_sink.h"
19+
#include "google/cloud/version.h"
20+
#include <memory>
21+
22+
namespace google {
23+
namespace cloud {
24+
namespace pubsub_internal {
25+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
26+
27+
std::shared_ptr<BatchSink> MakeTracingBatchSink(
28+
std::shared_ptr<BatchSink> batch_sink, Options opts);
29+
30+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
31+
} // namespace pubsub_internal
32+
} // namespace cloud
33+
} // namespace google
34+
35+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_TRACING_BATCH_SINK_H

0 commit comments

Comments
 (0)