Skip to content

Commit 3130e3c

Browse files
committed
Integrate OpenTelemetry into the proxy
OpenCensus is a deprecated protocol and is slated to be removed from upstream collectors soon. This wires up the proxy to optionally use OpenTelmetry as the format for exported traces. Currently, this defaults to the existing OpenCensus exporter, and we can switch the default later. [#10111](linkerd/linkerd2#10111) Signed-off-by: Scott Fleener <[email protected]>
1 parent 642c2af commit 3130e3c

File tree

24 files changed

+601
-276
lines changed

24 files changed

+601
-276
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,7 @@ dependencies = [
11261126
"linkerd-app-outbound",
11271127
"linkerd-error",
11281128
"linkerd-opencensus",
1129+
"linkerd-opentelemetry",
11291130
"linkerd-tonic-stream",
11301131
"rangemap",
11311132
"regex",
@@ -1184,6 +1185,7 @@ dependencies = [
11841185
"linkerd-meshtls",
11851186
"linkerd-metrics",
11861187
"linkerd-opencensus",
1188+
"linkerd-opentelemetry",
11871189
"linkerd-proxy-api-resolve",
11881190
"linkerd-proxy-balance",
11891191
"linkerd-proxy-client-policy",
@@ -1743,6 +1745,7 @@ dependencies = [
17431745
"http-body",
17441746
"linkerd-error",
17451747
"linkerd-metrics",
1748+
"linkerd-trace-context",
17461749
"opencensus-proto",
17471750
"tokio",
17481751
"tokio-stream",
@@ -1759,6 +1762,7 @@ dependencies = [
17591762
"http-body",
17601763
"linkerd-error",
17611764
"linkerd-metrics",
1765+
"linkerd-trace-context",
17621766
"opentelemetry",
17631767
"opentelemetry-proto",
17641768
"opentelemetry_sdk",

linkerd/app/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ linkerd-app-inbound = { path = "./inbound" }
2525
linkerd-app-outbound = { path = "./outbound" }
2626
linkerd-error = { path = "../error" }
2727
linkerd-opencensus = { path = "../opencensus" }
28+
linkerd-opentelemetry = { path = "../opentelemetry" }
2829
linkerd-tonic-stream = { path = "../tonic-stream" }
2930
rangemap = "1"
3031
regex = "1"

linkerd/app/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ linkerd-io = { path = "../../io" }
4747
linkerd-meshtls = { path = "../../meshtls", default-features = false }
4848
linkerd-metrics = { path = "../../metrics", features = ["process", "stack"] }
4949
linkerd-opencensus = { path = "../../opencensus" }
50+
linkerd-opentelemetry = { path = "../../opentelemetry" }
5051
linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" }
5152
linkerd-proxy-balance = { path = "../../proxy/balance" }
5253
linkerd-proxy-core = { path = "../../proxy/core" }
Lines changed: 51 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -1,139 +1,76 @@
11
use linkerd_error::Error;
2-
use linkerd_opencensus::proto::trace::v1 as oc;
32
use linkerd_stack::layer;
4-
use linkerd_trace_context::{self as trace_context, TraceContext};
5-
use std::{collections::HashMap, sync::Arc};
6-
use thiserror::Error;
3+
use linkerd_trace_context::{
4+
self as trace_context,
5+
export::{ExportSpan, SpanKind, SpanLabels},
6+
Span, TraceContext,
7+
};
8+
use std::{str::FromStr, sync::Arc};
79
use tokio::sync::mpsc;
810

9-
pub type OpenCensusSink = Option<mpsc::Sender<oc::Span>>;
10-
pub type Labels = Arc<HashMap<String, String>>;
11-
12-
/// SpanConverter converts trace_context::Span objects into OpenCensus agent
13-
/// protobuf span objects. SpanConverter receives trace_context::Span objects by
14-
/// implmenting the SpanSink trait. For each span that it receives, it converts
15-
/// it to an OpenCensus span and then sends it on the provided mpsc::Sender.
16-
#[derive(Clone)]
17-
pub struct SpanConverter {
18-
kind: Kind,
19-
sink: mpsc::Sender<oc::Span>,
20-
labels: Labels,
11+
#[derive(Debug, Copy, Clone, Default)]
12+
pub enum CollectorProtocol {
13+
#[default]
14+
OpenCensus,
15+
OpenTelemetry,
2116
}
2217

23-
#[derive(Debug, Error)]
24-
#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)]
25-
pub struct IdLengthError {
26-
id: Vec<u8>,
27-
expected_size: usize,
28-
actual_size: usize,
18+
impl FromStr for CollectorProtocol {
19+
type Err = ();
20+
21+
fn from_str(s: &str) -> Result<Self, Self::Err> {
22+
if s.eq_ignore_ascii_case("opencensus") {
23+
Ok(Self::OpenCensus)
24+
} else if s.eq_ignore_ascii_case("opentelemetry") {
25+
Ok(Self::OpenTelemetry)
26+
} else {
27+
Err(())
28+
}
29+
}
2930
}
3031

32+
pub type SpanSink = mpsc::Sender<ExportSpan>;
33+
3134
pub fn server<S>(
32-
sink: OpenCensusSink,
33-
labels: impl Into<Labels>,
35+
sink: Option<SpanSink>,
36+
labels: impl Into<SpanLabels>,
3437
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
35-
SpanConverter::layer(Kind::Server, sink, labels)
38+
TraceContext::layer(sink.map(move |sink| SpanConverter {
39+
kind: SpanKind::Server,
40+
sink,
41+
labels: labels.into(),
42+
}))
3643
}
3744

3845
pub fn client<S>(
39-
sink: OpenCensusSink,
40-
labels: impl Into<Labels>,
46+
sink: Option<SpanSink>,
47+
labels: impl Into<SpanLabels>,
4148
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
42-
SpanConverter::layer(Kind::Client, sink, labels)
43-
}
44-
45-
#[derive(Copy, Clone, Debug, PartialEq)]
46-
enum Kind {
47-
Server = 1,
48-
Client = 2,
49+
TraceContext::layer(sink.map(move |sink| SpanConverter {
50+
kind: SpanKind::Client,
51+
sink,
52+
labels: labels.into(),
53+
}))
4954
}
5055

51-
impl SpanConverter {
52-
fn layer<S>(
53-
kind: Kind,
54-
sink: OpenCensusSink,
55-
labels: impl Into<Labels>,
56-
) -> impl layer::Layer<S, Service = TraceContext<Option<Self>, S>> + Clone {
57-
TraceContext::layer(sink.map(move |sink| Self {
58-
kind,
59-
sink,
60-
labels: labels.into(),
61-
}))
62-
}
63-
64-
fn mk_span(&self, mut span: trace_context::Span) -> Result<oc::Span, IdLengthError> {
65-
let mut attributes = HashMap::<String, oc::AttributeValue>::new();
66-
for (k, v) in self.labels.iter() {
67-
attributes.insert(
68-
k.clone(),
69-
oc::AttributeValue {
70-
value: Some(oc::attribute_value::Value::StringValue(truncatable(
71-
v.clone(),
72-
))),
73-
},
74-
);
75-
}
76-
for (k, v) in span.labels.drain() {
77-
attributes.insert(
78-
k.to_string(),
79-
oc::AttributeValue {
80-
value: Some(oc::attribute_value::Value::StringValue(truncatable(v))),
81-
},
82-
);
83-
}
84-
Ok(oc::Span {
85-
trace_id: into_bytes(span.trace_id, 16)?,
86-
span_id: into_bytes(span.span_id, 8)?,
87-
tracestate: None,
88-
parent_span_id: into_bytes(span.parent_id, 8)?,
89-
name: Some(truncatable(span.span_name)),
90-
kind: self.kind as i32,
91-
start_time: Some(span.start.into()),
92-
end_time: Some(span.end.into()),
93-
attributes: Some(oc::span::Attributes {
94-
attribute_map: attributes,
95-
dropped_attributes_count: 0,
96-
}),
97-
stack_trace: None,
98-
time_events: None,
99-
links: None,
100-
status: None, // TODO: this is gRPC status; we must read response trailers to populate this
101-
resource: None,
102-
same_process_as_parent_span: Some(self.kind == Kind::Client),
103-
child_span_count: None,
104-
})
105-
}
56+
#[derive(Clone)]
57+
pub struct SpanConverter {
58+
kind: SpanKind,
59+
sink: SpanSink,
60+
labels: SpanLabels,
10661
}
10762

10863
impl trace_context::SpanSink for SpanConverter {
109-
#[inline]
11064
fn is_enabled(&self) -> bool {
11165
true
11266
}
11367

114-
fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> {
115-
let span = self.mk_span(span)?;
116-
self.sink.try_send(span).map_err(Into::into)
117-
}
118-
}
119-
120-
fn into_bytes(id: trace_context::Id, size: usize) -> Result<Vec<u8>, IdLengthError> {
121-
let bytes: Vec<u8> = id.into();
122-
if bytes.len() == size {
123-
Ok(bytes)
124-
} else {
125-
let actual_size = bytes.len();
126-
Err(IdLengthError {
127-
id: bytes,
128-
expected_size: size,
129-
actual_size,
130-
})
131-
}
132-
}
133-
134-
fn truncatable(value: String) -> oc::TruncatableString {
135-
oc::TruncatableString {
136-
value,
137-
truncated_byte_count: 0,
68+
fn try_send(&mut self, span: Span) -> Result<(), Error> {
69+
self.sink.try_send(ExportSpan {
70+
span,
71+
kind: self.kind,
72+
labels: Arc::clone(&self.labels),
73+
})?;
74+
Ok(())
13875
}
13976
}

linkerd/app/core/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub use linkerd_http_metrics as http_metrics;
4040
pub use linkerd_idle_cache as idle_cache;
4141
pub use linkerd_io as io;
4242
pub use linkerd_opencensus as opencensus;
43+
pub use linkerd_opentelemetry as opentelemetry;
4344
pub use linkerd_service_profiles as profiles;
4445
pub use linkerd_stack_metrics as stack_metrics;
4546
pub use linkerd_stack_tracing as stack_tracing;
@@ -65,7 +66,7 @@ pub struct ProxyRuntime {
6566
pub identity: identity::creds::Receiver,
6667
pub metrics: metrics::Proxy,
6768
pub tap: proxy::tap::Registry,
68-
pub span_sink: http_tracing::OpenCensusSink,
69+
pub span_sink: Option<http_tracing::SpanSink>,
6970
pub drain: drain::Watch,
7071
}
7172

linkerd/app/core/src/metrics.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
pub use crate::transport::labels::{TargetAddr, TlsAccept};
1010
use crate::{
1111
classify::Class,
12-
control, http_metrics, opencensus, profiles, stack_metrics,
12+
control, http_metrics, opencensus, opentelemetry, profiles, stack_metrics,
1313
svc::Param,
1414
tls,
1515
transport::{self, labels::TlsConnect},
@@ -39,6 +39,7 @@ pub struct Metrics {
3939
pub proxy: Proxy,
4040
pub control: ControlHttp,
4141
pub opencensus: opencensus::metrics::Registry,
42+
pub opentelemetry: opentelemetry::metrics::Registry,
4243
}
4344

4445
#[derive(Clone, Debug)]
@@ -191,11 +192,13 @@ impl Metrics {
191192
};
192193

193194
let (opencensus, opencensus_report) = opencensus::metrics::new();
195+
let (opentelemetry, opentelemetry_report) = opentelemetry::metrics::new();
194196

195197
let metrics = Metrics {
196198
proxy,
197199
control,
198200
opencensus,
201+
opentelemetry,
199202
};
200203

201204
let report = endpoint_report
@@ -205,6 +208,7 @@ impl Metrics {
205208
.and_report(control_report)
206209
.and_report(transport_report)
207210
.and_report(opencensus_report)
211+
.and_report(opentelemetry_report)
208212
.and_report(stack);
209213

210214
(metrics, report)

linkerd/app/inbound/src/lib.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ mod server;
1818
#[cfg(any(test, feature = "test-util", fuzzing))]
1919
pub mod test_util;
2020

21+
#[cfg(fuzzing)]
22+
pub use self::http::fuzz as http_fuzz;
2123
pub use self::{metrics::InboundMetrics, policy::DefaultPolicy};
2224
use linkerd_app_core::{
2325
config::{ConnectConfig, ProxyConfig, QueueConfig},
2426
drain,
25-
http_tracing::OpenCensusSink,
27+
http_tracing::SpanSink,
2628
identity, io,
2729
proxy::{tap, tcp},
2830
svc,
@@ -33,9 +35,6 @@ use std::{fmt::Debug, time::Duration};
3335
use thiserror::Error;
3436
use tracing::debug_span;
3537

36-
#[cfg(fuzzing)]
37-
pub use self::http::fuzz as http_fuzz;
38-
3938
#[derive(Clone, Debug)]
4039
pub struct Config {
4140
pub allow_discovery: NameMatch,
@@ -67,7 +66,7 @@ struct Runtime {
6766
metrics: InboundMetrics,
6867
identity: identity::creds::Receiver,
6968
tap: tap::Registry,
70-
span_sink: OpenCensusSink,
69+
span_sink: Option<SpanSink>,
7170
drain: drain::Watch,
7271
}
7372

linkerd/app/outbound/src/http/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ impl<T> Outbound<svc::ArcNewCloneHttp<T>> {
4747
.check_new_service::<T, http::Request<_>>()
4848
.push(ServerRescue::layer(config.emit_headers))
4949
.check_new_service::<T, http::Request<_>>()
50-
// Initiates OpenCensus tracing.
50+
// Initiates OpenTelemetry tracing.
5151
.push_on_service(http_tracing::server(rt.span_sink.clone(), trace_labels()))
5252
.push_on_service(http::BoxResponse::layer())
5353
// Convert origin form HTTP/1 URIs to absolute form for Hyper's

linkerd/app/outbound/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
#![allow(opaque_hidden_inferred_bound)]
77
#![forbid(unsafe_code)]
88

9+
use linkerd_app_core::http_tracing::SpanSink;
910
use linkerd_app_core::{
1011
config::{ProxyConfig, QueueConfig},
1112
drain,
1213
exp_backoff::ExponentialBackoff,
13-
http_tracing::OpenCensusSink,
1414
identity, io,
1515
metrics::prom,
1616
profiles,
@@ -95,7 +95,7 @@ struct Runtime {
9595
metrics: OutboundMetrics,
9696
identity: identity::NewClient,
9797
tap: tap::Registry,
98-
span_sink: OpenCensusSink,
98+
span_sink: Option<SpanSink>,
9999
drain: drain::Watch,
100100
}
101101

0 commit comments

Comments
 (0)