Skip to content

Commit d925933

Browse files
committed
chore(tracing): Use upstream tracing propagation
The proxy used its own custom trace propagation for some time. It was built before the upstream OpenTelemetry libraries were available. This replaces the manual propagation with a combination of the upstream w3c and b3 propagators, modified slightly to only propagate in one format or the other (and not both). Signed-off-by: Scott Fleener <[email protected]>
1 parent 6d66554 commit d925933

File tree

11 files changed

+480
-485
lines changed

11 files changed

+480
-485
lines changed

Cargo.lock

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2084,6 +2084,7 @@ dependencies = [
20842084
"opentelemetry",
20852085
"opentelemetry-proto",
20862086
"opentelemetry-semantic-conventions",
2087+
"opentelemetry-zipkin",
20872088
"opentelemetry_sdk",
20882089
"tokio",
20892090
"tonic",
@@ -2603,12 +2604,22 @@ dependencies = [
26032604
"futures",
26042605
"hex",
26052606
"http",
2607+
"linkerd-app-core",
26062608
"linkerd-error",
2609+
"linkerd-http-box",
2610+
"linkerd-io",
2611+
"linkerd-opentelemetry",
26072612
"linkerd-stack",
2613+
"linkerd-tracing",
2614+
"opentelemetry",
2615+
"opentelemetry-http",
26082616
"opentelemetry-semantic-conventions",
2617+
"opentelemetry_sdk",
26092618
"rand 0.9.2",
26102619
"thiserror",
2620+
"tokio",
26112621
"tower",
2622+
"tower-test",
26122623
"tracing",
26132624
]
26142625

@@ -2943,6 +2954,18 @@ dependencies = [
29432954
"tracing",
29442955
]
29452956

2957+
[[package]]
2958+
name = "opentelemetry-http"
2959+
version = "0.31.0"
2960+
source = "registry+https://github.com/rust-lang/crates.io-index"
2961+
checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d"
2962+
dependencies = [
2963+
"async-trait",
2964+
"bytes",
2965+
"http",
2966+
"opentelemetry",
2967+
]
2968+
29462969
[[package]]
29472970
name = "opentelemetry-proto"
29482971
version = "0.31.0"
@@ -2966,6 +2989,23 @@ version = "0.31.0"
29662989
source = "registry+https://github.com/rust-lang/crates.io-index"
29672990
checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846"
29682991

2992+
[[package]]
2993+
name = "opentelemetry-zipkin"
2994+
version = "0.31.0"
2995+
source = "registry+https://github.com/rust-lang/crates.io-index"
2996+
checksum = "f27fcd074586dab55936b003c6a499acaabd6debbd539c3f36356bca2ef2fce2"
2997+
dependencies = [
2998+
"http",
2999+
"once_cell",
3000+
"opentelemetry",
3001+
"opentelemetry-http",
3002+
"opentelemetry_sdk",
3003+
"serde",
3004+
"serde_json",
3005+
"thiserror",
3006+
"typed-builder",
3007+
]
3008+
29693009
[[package]]
29703010
name = "opentelemetry_sdk"
29713011
version = "0.31.0"
@@ -4220,6 +4260,26 @@ version = "0.2.5"
42204260
source = "registry+https://github.com/rust-lang/crates.io-index"
42214261
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
42224262

4263+
[[package]]
4264+
name = "typed-builder"
4265+
version = "0.20.1"
4266+
source = "registry+https://github.com/rust-lang/crates.io-index"
4267+
checksum = "cd9d30e3a08026c78f246b173243cf07b3696d274debd26680773b6773c2afc7"
4268+
dependencies = [
4269+
"typed-builder-macro",
4270+
]
4271+
4272+
[[package]]
4273+
name = "typed-builder-macro"
4274+
version = "0.20.1"
4275+
source = "registry+https://github.com/rust-lang/crates.io-index"
4276+
checksum = "3c36781cc0e46a83726d9879608e4cf6c2505237e263a8eb8c24502989cfdb28"
4277+
dependencies = [
4278+
"proc-macro2",
4279+
"quote",
4280+
"syn",
4281+
]
4282+
42234283
[[package]]
42244284
name = "typenum"
42254285
version = "1.19.0"

linkerd/opentelemetry/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ linkerd-trace-context = { path = "../trace-context" }
1515
opentelemetry = { version = "0.31", default-features = false, features = ["trace"] }
1616
opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace"] }
1717
opentelemetry-proto = { version = "0.31" }
18+
opentelemetry-zipkin = { version = "0.31", default-features = false }
1819
opentelemetry-semantic-conventions = { version = "0.31", default-features = false, features = ["semconv_experimental"] }
1920
tonic = { workspace = true, default-features = false, features = [
2021
"codegen",

linkerd/opentelemetry/src/lib.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22
#![forbid(unsafe_code)]
33

44
pub mod metrics;
5+
pub mod propagation;
56

67
use self::metrics::Registry;
8+
use crate::propagation::OrderedPropagator;
79
use futures::stream::{Stream, StreamExt};
810
use http_body::Body;
911
use linkerd_error::Error;
1012
use linkerd_trace_context::{self as trace_context, export::ExportSpan};
1113
pub use opentelemetry as otel;
1214
use opentelemetry::{
13-
trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
15+
trace::{SpanContext, SpanKind, Status, TraceFlags, TraceState},
1416
KeyValue,
1517
};
1618
pub use opentelemetry_proto as proto;
@@ -56,6 +58,8 @@ where
5658
)
5759
.build();
5860

61+
opentelemetry::global::set_text_map_propagator(OrderedPropagator::new());
62+
5963
SpanExportTask::new(spans, processor).run().await;
6064
}
6165

@@ -151,12 +155,12 @@ fn convert_span(span: ExportSpan) -> Result<SpanData, Error> {
151155
for (k, v) in labels.iter() {
152156
attributes.push(KeyValue::new(k.clone(), v.clone()));
153157
}
154-
for (k, v) in span.labels.into_iter() {
155-
attributes.push(KeyValue::new(k, v));
158+
for kv in span.labels.into_iter() {
159+
attributes.push(kv);
156160
}
157161
let is_remote = kind != trace_context::export::SpanKind::Client;
158162
Ok(SpanData {
159-
parent_span_id: SpanId::from_bytes(span.parent_id.into_bytes()?),
163+
parent_span_id: span.parent_id,
160164
parent_span_is_remote: true, // we do not originate any spans locally
161165
span_kind: match kind {
162166
trace_context::export::SpanKind::Server => SpanKind::Server,
@@ -170,8 +174,8 @@ fn convert_span(span: ExportSpan) -> Result<SpanData, Error> {
170174
links: SpanLinks::default(),
171175
status: Status::Unset, // TODO: this is gRPC status; we must read response trailers to populate this
172176
span_context: SpanContext::new(
173-
TraceId::from_bytes(span.trace_id.into_bytes()?),
174-
SpanId::from_bytes(span.span_id.into_bytes()?),
177+
span.trace_id,
178+
span.span_id,
175179
TraceFlags::default(),
176180
is_remote,
177181
TraceState::NONE,
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use opentelemetry::{
2+
propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator},
3+
Context,
4+
};
5+
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
6+
7+
#[derive(Copy, Clone, Eq, PartialEq)]
8+
enum PropagationFormat {
9+
W3C,
10+
B3,
11+
}
12+
13+
#[derive(Debug)]
14+
pub struct OrderedPropagator {
15+
w3c: TraceContextPropagator,
16+
b3: opentelemetry_zipkin::Propagator,
17+
baggage: BaggagePropagator,
18+
fields: Vec<String>,
19+
}
20+
21+
impl OrderedPropagator {
22+
pub fn new() -> Self {
23+
let w3c = TraceContextPropagator::new();
24+
let b3 = opentelemetry_zipkin::Propagator::new();
25+
let baggage = BaggagePropagator::new();
26+
27+
Self {
28+
fields: w3c
29+
.fields()
30+
.chain(b3.fields())
31+
.chain(baggage.fields())
32+
.map(|s| s.to_string())
33+
.collect(),
34+
w3c,
35+
b3,
36+
baggage,
37+
}
38+
}
39+
}
40+
41+
impl Default for OrderedPropagator {
42+
fn default() -> Self {
43+
Self::new()
44+
}
45+
}
46+
47+
impl TextMapPropagator for OrderedPropagator {
48+
fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) {
49+
match cx.get::<PropagationFormat>() {
50+
None => {}
51+
Some(PropagationFormat::W3C) => {
52+
self.w3c.inject_context(cx, injector);
53+
}
54+
Some(PropagationFormat::B3) => {
55+
self.b3.inject_context(cx, injector);
56+
}
57+
}
58+
self.baggage.inject_context(cx, injector);
59+
}
60+
61+
fn extract_with_context(&self, cx: &Context, extractor: &dyn Extractor) -> Context {
62+
let cx = if self.w3c.fields().any(|f| extractor.get(f).is_some()) {
63+
self.w3c
64+
.extract_with_context(cx, extractor)
65+
.with_value(PropagationFormat::W3C)
66+
} else if self.b3.fields().any(|f| extractor.get(f).is_some()) {
67+
self.b3
68+
.extract_with_context(cx, extractor)
69+
.with_value(PropagationFormat::B3)
70+
} else {
71+
cx.clone()
72+
};
73+
self.baggage.extract_with_context(&cx, extractor)
74+
}
75+
76+
fn fields(&self) -> FieldIter<'_> {
77+
FieldIter::new(self.fields.as_slice())
78+
}
79+
}

linkerd/trace-context/Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,20 @@ hex = "0.4"
1515
http = { workspace = true }
1616
linkerd-error = { path = "../error" }
1717
linkerd-stack = { path = "../stack" }
18+
opentelemetry = { version = "0.31", default-features = false, features = ["trace"] }
19+
opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace"] }
20+
opentelemetry-http = { version = "0.31", default-features = false }
1821
opentelemetry-semantic-conventions = { version = "0.31", default-features = false, features = ["semconv_experimental"] }
1922
rand = "0.9"
2023
thiserror = "2"
2124
tower = { workspace = true, default-features = false, features = ["util"] }
2225
tracing = { workspace = true }
26+
27+
[dev-dependencies]
28+
linkerd-app-core = { path = "../app/core" }
29+
linkerd-http-box = { path = "../http/box" }
30+
linkerd-io = { path = "../io" }
31+
linkerd-tracing = { path = "../tracing" }
32+
linkerd-opentelemetry = { path = "../opentelemetry" }
33+
tokio = { version = "1", features = ["test-util"] }
34+
tower-test = { workspace = true }

linkerd/trace-context/src/export.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@ pub enum SpanKind {
88
Client = 2,
99
}
1010

11+
impl From<SpanKind> for opentelemetry::trace::SpanKind {
12+
fn from(value: SpanKind) -> Self {
13+
match value {
14+
SpanKind::Server => opentelemetry::trace::SpanKind::Server,
15+
SpanKind::Client => opentelemetry::trace::SpanKind::Client,
16+
}
17+
}
18+
}
19+
1120
pub type SpanLabels = Arc<HashMap<String, String>>;
1221

1322
#[derive(Debug)]

linkerd/trace-context/src/lib.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,16 @@
22
#![forbid(unsafe_code)]
33

44
pub mod export;
5-
mod propagation;
65
mod service;
76

87
pub use self::service::TraceContext;
98
use bytes::Bytes;
109
use linkerd_error::Error;
11-
use rand::Rng;
12-
use std::collections::HashMap;
10+
use opentelemetry::{KeyValue, SpanId, TraceId};
1311
use std::fmt;
1412
use std::time::SystemTime;
1513
use thiserror::Error;
1614

17-
const SPAN_ID_LEN: usize = 8;
18-
1915
#[derive(Debug, Default)]
2016
pub struct Id(Vec<u8>);
2117

@@ -49,13 +45,13 @@ pub struct InsufficientBytes;
4945

5046
#[derive(Debug)]
5147
pub struct Span {
52-
pub trace_id: Id,
53-
pub span_id: Id,
54-
pub parent_id: Id,
48+
pub trace_id: TraceId,
49+
pub span_id: SpanId,
50+
pub parent_id: SpanId,
5551
pub span_name: String,
5652
pub start: SystemTime,
5753
pub end: SystemTime,
58-
pub labels: HashMap<&'static str, String>,
54+
pub labels: Vec<KeyValue>,
5955
}
6056

6157
pub trait SpanSink {
@@ -78,14 +74,6 @@ impl<K: SpanSink> SpanSink for Option<K> {
7874

7975
// === impl Id ===
8076

81-
impl Id {
82-
fn new_span_id<R: Rng>(rng: &mut R) -> Self {
83-
let mut bytes = vec![0; SPAN_ID_LEN];
84-
rng.fill(bytes.as_mut_slice());
85-
Self(bytes)
86-
}
87-
}
88-
8977
impl From<Id> for Vec<u8> {
9078
fn from(Id(bytes): Id) -> Vec<u8> {
9179
bytes

0 commit comments

Comments
 (0)