Skip to content

Commit e460b63

Browse files
committed
chore(tracing): Use upstream tracing propagation
The proxy used its own custom trace propagation for some time. It was built before the upstream OpenTelemetry libraries were available. This replaces the manual propagation with a combination of the upstream w3c and b3 propagators, modified slightly to only propagate in one format or the other (and not both). Signed-off-by: Scott Fleener <[email protected]>
1 parent fea821b commit e460b63

File tree

11 files changed

+348
-554
lines changed

11 files changed

+348
-554
lines changed

Cargo.lock

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2087,6 +2087,7 @@ dependencies = [
20872087
"opentelemetry",
20882088
"opentelemetry-proto",
20892089
"opentelemetry-semantic-conventions",
2090+
"opentelemetry-zipkin",
20902091
"opentelemetry_sdk",
20912092
"tokio",
20922093
"tonic",
@@ -2608,11 +2609,17 @@ dependencies = [
26082609
"futures",
26092610
"hex",
26102611
"http",
2612+
"linkerd-app-core",
26112613
"linkerd-error",
26122614
"linkerd-http-box",
2615+
"linkerd-io",
2616+
"linkerd-opentelemetry",
26132617
"linkerd-stack",
26142618
"linkerd-tracing",
2619+
"opentelemetry",
2620+
"opentelemetry-http",
26152621
"opentelemetry-semantic-conventions",
2622+
"opentelemetry_sdk",
26162623
"rand 0.9.2",
26172624
"thiserror",
26182625
"tokio",
@@ -2952,6 +2959,18 @@ dependencies = [
29522959
"tracing",
29532960
]
29542961

2962+
[[package]]
2963+
name = "opentelemetry-http"
2964+
version = "0.31.0"
2965+
source = "registry+https://github.com/rust-lang/crates.io-index"
2966+
checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d"
2967+
dependencies = [
2968+
"async-trait",
2969+
"bytes",
2970+
"http",
2971+
"opentelemetry",
2972+
]
2973+
29552974
[[package]]
29562975
name = "opentelemetry-proto"
29572976
version = "0.31.0"
@@ -2975,6 +2994,23 @@ version = "0.31.0"
29752994
source = "registry+https://github.com/rust-lang/crates.io-index"
29762995
checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846"
29772996

2997+
[[package]]
2998+
name = "opentelemetry-zipkin"
2999+
version = "0.31.0"
3000+
source = "registry+https://github.com/rust-lang/crates.io-index"
3001+
checksum = "f27fcd074586dab55936b003c6a499acaabd6debbd539c3f36356bca2ef2fce2"
3002+
dependencies = [
3003+
"http",
3004+
"once_cell",
3005+
"opentelemetry",
3006+
"opentelemetry-http",
3007+
"opentelemetry_sdk",
3008+
"serde",
3009+
"serde_json",
3010+
"thiserror",
3011+
"typed-builder",
3012+
]
3013+
29783014
[[package]]
29793015
name = "opentelemetry_sdk"
29803016
version = "0.31.0"
@@ -4228,6 +4264,26 @@ version = "0.2.5"
42284264
source = "registry+https://github.com/rust-lang/crates.io-index"
42294265
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
42304266

4267+
[[package]]
4268+
name = "typed-builder"
4269+
version = "0.20.1"
4270+
source = "registry+https://github.com/rust-lang/crates.io-index"
4271+
checksum = "cd9d30e3a08026c78f246b173243cf07b3696d274debd26680773b6773c2afc7"
4272+
dependencies = [
4273+
"typed-builder-macro",
4274+
]
4275+
4276+
[[package]]
4277+
name = "typed-builder-macro"
4278+
version = "0.20.1"
4279+
source = "registry+https://github.com/rust-lang/crates.io-index"
4280+
checksum = "3c36781cc0e46a83726d9879608e4cf6c2505237e263a8eb8c24502989cfdb28"
4281+
dependencies = [
4282+
"proc-macro2",
4283+
"quote",
4284+
"syn",
4285+
]
4286+
42314287
[[package]]
42324288
name = "typenum"
42334289
version = "1.19.0"

linkerd/opentelemetry/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ linkerd-trace-context = { path = "../trace-context" }
1515
opentelemetry = { version = "0.31", default-features = false, features = ["trace"] }
1616
opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace"] }
1717
opentelemetry-proto = { version = "0.31" }
18+
opentelemetry-zipkin = { version = "0.31", default-features = false }
1819
opentelemetry-semantic-conventions = { version = "0.31", default-features = false, features = ["semconv_experimental"] }
1920
tonic = { workspace = true, default-features = false, features = [
2021
"codegen",

linkerd/opentelemetry/src/lib.rs

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22
#![forbid(unsafe_code)]
33

44
pub mod metrics;
5+
pub mod propagation;
56

67
use self::metrics::Registry;
8+
use crate::propagation::OrderedPropagator;
79
use futures::stream::{Stream, StreamExt};
810
use http_body::Body;
911
use linkerd_error::Error;
1012
use linkerd_trace_context::{self as trace_context, export::ExportSpan};
1113
pub use opentelemetry as otel;
1214
use opentelemetry::{
13-
trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
15+
trace::{SpanContext, SpanKind, Status, TraceFlags, TraceState},
1416
KeyValue,
1517
};
1618
pub use opentelemetry_proto as proto;
@@ -56,6 +58,8 @@ where
5658
)
5759
.build();
5860

61+
opentelemetry::global::set_text_map_propagator(OrderedPropagator::new());
62+
5963
SpanExportTask::new(spans, processor).run().await;
6064
}
6165

@@ -151,12 +155,12 @@ fn convert_span(span: ExportSpan) -> Result<SpanData, Error> {
151155
for (k, v) in labels.iter() {
152156
attributes.push(KeyValue::new(k.clone(), v.clone()));
153157
}
154-
for (k, v) in span.labels.into_iter() {
155-
attributes.push(KeyValue::new(k, v));
158+
for kv in span.labels.into_iter() {
159+
attributes.push(kv);
156160
}
157161
let is_remote = kind != trace_context::export::SpanKind::Client;
158162
Ok(SpanData {
159-
parent_span_id: SpanId::from_bytes(span.parent_id.into_bytes()?),
163+
parent_span_id: span.parent_id,
160164
parent_span_is_remote: true, // we do not originate any spans locally
161165
span_kind: match kind {
162166
trace_context::export::SpanKind::Server => SpanKind::Server,
@@ -170,8 +174,8 @@ fn convert_span(span: ExportSpan) -> Result<SpanData, Error> {
170174
links: SpanLinks::default(),
171175
status: Status::Unset, // TODO: this is gRPC status; we must read response trailers to populate this
172176
span_context: SpanContext::new(
173-
TraceId::from_bytes(span.trace_id.into_bytes()?),
174-
SpanId::from_bytes(span.span_id.into_bytes()?),
177+
span.trace_id,
178+
span.span_id,
175179
TraceFlags::default(),
176180
is_remote,
177181
TraceState::NONE,
@@ -184,40 +188,33 @@ fn convert_span(span: ExportSpan) -> Result<SpanData, Error> {
184188
#[cfg(test)]
185189
mod tests {
186190
use super::*;
187-
use linkerd_trace_context::{export::SpanKind, Id, Span};
188-
use opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue;
189-
use opentelemetry_proto::tonic::common::v1::AnyValue;
191+
use linkerd_trace_context::{export::SpanKind, Span};
192+
use opentelemetry::{SpanId, TraceId};
190193
use opentelemetry_proto::tonic::common::v1::InstrumentationScope;
191-
use std::{collections::HashMap, sync::Arc, time::SystemTime};
194+
use std::{sync::Arc, time::SystemTime};
192195
use tokio::sync::mpsc;
193-
use tonic::codegen::{tokio_stream::wrappers::ReceiverStream, tokio_stream::StreamExt, Bytes};
196+
use tonic::codegen::{tokio_stream::wrappers::ReceiverStream, tokio_stream::StreamExt};
194197
use tonic_prost::ProstDecoder;
195198

196199
#[tokio::test(flavor = "current_thread")]
197200
async fn send_span() {
198-
let trace_id = Id::from(Bytes::from(
199-
hex::decode("0123456789abcedffedcba9876543210").expect("decode"),
200-
));
201-
let span_id = Id::from(Bytes::from(
202-
hex::decode("fedcba9876543210").expect("decode"),
203-
));
204-
let parent_id = Id::from(Bytes::from(
205-
hex::decode("0123456789abcedf").expect("decode"),
206-
));
201+
let trace_id = TraceId::from_hex("0123456789abcedffedcba9876543210").expect("trace id");
202+
let parent_id = SpanId::from_hex("fedcba9876543210").expect("parent id");
203+
let span_id = SpanId::from_hex("0123456789abcedf").expect("span id");
207204
let span_name = "test".to_string();
208205

209206
let start = SystemTime::now();
210207
let end = SystemTime::now();
211208

212209
let span = ExportSpan {
213210
span: Span {
214-
trace_id: trace_id.clone(),
215-
span_id: span_id.clone(),
216-
parent_id: parent_id.clone(),
211+
trace_id,
212+
span_id,
213+
parent_id,
217214
span_name: span_name.clone(),
218215
start,
219216
end,
220-
labels: HashMap::new(),
217+
labels: Vec::new(),
221218
},
222219
kind: SpanKind::Server,
223220
labels: Arc::new(Default::default()),
@@ -263,18 +260,9 @@ mod tests {
263260
assert_eq!(scope_span.spans.len(), 1);
264261

265262
let span = scope_span.spans.remove(0);
266-
assert_eq!(
267-
span.span_id,
268-
span_id.into_bytes::<8>().expect("into_bytes").to_vec()
269-
);
270-
assert_eq!(
271-
span.parent_span_id,
272-
parent_id.into_bytes::<8>().expect("into_bytes").to_vec()
273-
);
274-
assert_eq!(
275-
span.trace_id,
276-
trace_id.into_bytes::<16>().expect("into_bytes").to_vec()
277-
);
263+
assert_eq!(span.span_id, span_id.to_bytes().to_vec(),);
264+
assert_eq!(span.parent_span_id, parent_id.to_bytes().to_vec(),);
265+
assert_eq!(span.trace_id, trace_id.to_bytes().to_vec(),);
278266
assert_eq!(span.name, span_name);
279267
assert_eq!(
280268
span.start_time_unix_nano,
@@ -308,7 +296,7 @@ mod tests {
308296
tokio::spawn(export_spans(
309297
inner,
310298
ReceiverStream::new(span_rx),
311-
opentelemetry_sdk::Resource::builder().build(),
299+
Resource::builder().build(),
312300
metrics,
313301
));
314302

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use opentelemetry::{
2+
propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator},
3+
Context,
4+
};
5+
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
6+
7+
#[derive(Copy, Clone, Eq, PartialEq)]
8+
enum PropagationFormat {
9+
W3C,
10+
B3,
11+
}
12+
13+
#[derive(Debug)]
14+
pub struct OrderedPropagator {
15+
w3c: TraceContextPropagator,
16+
b3: opentelemetry_zipkin::Propagator,
17+
baggage: BaggagePropagator,
18+
fields: Vec<String>,
19+
}
20+
21+
impl OrderedPropagator {
22+
pub fn new() -> Self {
23+
let w3c = TraceContextPropagator::new();
24+
let b3 = opentelemetry_zipkin::Propagator::new();
25+
let baggage = BaggagePropagator::new();
26+
27+
Self {
28+
fields: w3c
29+
.fields()
30+
.chain(b3.fields())
31+
.chain(baggage.fields())
32+
.map(|s| s.to_string())
33+
.collect(),
34+
w3c,
35+
b3,
36+
baggage,
37+
}
38+
}
39+
}
40+
41+
impl Default for OrderedPropagator {
42+
fn default() -> Self {
43+
Self::new()
44+
}
45+
}
46+
47+
impl TextMapPropagator for OrderedPropagator {
48+
fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) {
49+
match cx.get::<PropagationFormat>() {
50+
None => {}
51+
Some(PropagationFormat::W3C) => {
52+
self.w3c.inject_context(cx, injector);
53+
}
54+
Some(PropagationFormat::B3) => {
55+
self.b3.inject_context(cx, injector);
56+
}
57+
}
58+
self.baggage.inject_context(cx, injector);
59+
}
60+
61+
fn extract_with_context(&self, cx: &Context, extractor: &dyn Extractor) -> Context {
62+
let cx = if self.w3c.fields().any(|f| extractor.get(f).is_some()) {
63+
self.w3c
64+
.extract_with_context(cx, extractor)
65+
.with_value(PropagationFormat::W3C)
66+
} else if self.b3.fields().any(|f| extractor.get(f).is_some()) {
67+
self.b3
68+
.extract_with_context(cx, extractor)
69+
.with_value(PropagationFormat::B3)
70+
} else {
71+
cx.clone()
72+
};
73+
self.baggage.extract_with_context(&cx, extractor)
74+
}
75+
76+
fn fields(&self) -> FieldIter<'_> {
77+
FieldIter::new(self.fields.as_slice())
78+
}
79+
}

linkerd/trace-context/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@ hex = "0.4"
1515
http = { workspace = true }
1616
linkerd-error = { path = "../error" }
1717
linkerd-stack = { path = "../stack" }
18+
opentelemetry = { version = "0.31", default-features = false, features = ["trace"] }
19+
opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace"] }
20+
opentelemetry-http = { version = "0.31", default-features = false }
1821
opentelemetry-semantic-conventions = { version = "0.31", default-features = false, features = ["semconv_experimental"] }
1922
rand = "0.9"
2023
thiserror = "2"
2124
tower = { workspace = true, default-features = false, features = ["util"] }
2225
tracing = { workspace = true }
2326

2427
[dev-dependencies]
28+
linkerd-app-core = { path = "../app/core" }
2529
linkerd-http-box = { path = "../http/box" }
30+
linkerd-io = { path = "../io" }
2631
linkerd-tracing = { path = "../tracing" }
32+
linkerd-opentelemetry = { path = "../opentelemetry" }
2733
tokio = { version = "1", features = ["test-util"] }
2834
tower-test = { workspace = true }

linkerd/trace-context/src/export.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@ pub enum SpanKind {
88
Client = 2,
99
}
1010

11+
impl From<SpanKind> for opentelemetry::trace::SpanKind {
12+
fn from(value: SpanKind) -> Self {
13+
match value {
14+
SpanKind::Server => opentelemetry::trace::SpanKind::Server,
15+
SpanKind::Client => opentelemetry::trace::SpanKind::Client,
16+
}
17+
}
18+
}
19+
1120
pub type SpanLabels = Arc<HashMap<String, String>>;
1221

1322
#[derive(Debug)]

0 commit comments

Comments
 (0)