Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,7 @@ dependencies = [
"opentelemetry",
"opentelemetry-proto",
"opentelemetry-semantic-conventions",
"opentelemetry-zipkin",
"opentelemetry_sdk",
"tokio",
"tonic",
Expand Down Expand Up @@ -2602,11 +2603,17 @@ dependencies = [
"futures",
"hex",
"http",
"linkerd-app-core",
"linkerd-error",
"linkerd-http-box",
"linkerd-io",
"linkerd-opentelemetry",
"linkerd-stack",
"linkerd-tracing",
"opentelemetry",
"opentelemetry-http",
"opentelemetry-semantic-conventions",
"opentelemetry_sdk",
"rand 0.9.2",
"thiserror",
"tokio",
Expand Down Expand Up @@ -2946,6 +2953,18 @@ dependencies = [
"tracing",
]

[[package]]
name = "opentelemetry-http"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d"
dependencies = [
"async-trait",
"bytes",
"http",
"opentelemetry",
]

[[package]]
name = "opentelemetry-proto"
version = "0.31.0"
Expand All @@ -2969,6 +2988,23 @@ version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846"

[[package]]
name = "opentelemetry-zipkin"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f27fcd074586dab55936b003c6a499acaabd6debbd539c3f36356bca2ef2fce2"
dependencies = [
"http",
"once_cell",
"opentelemetry",
"opentelemetry-http",
"opentelemetry_sdk",
"serde",
"serde_json",
"thiserror",
"typed-builder",
]

[[package]]
name = "opentelemetry_sdk"
version = "0.31.0"
Expand Down Expand Up @@ -4222,6 +4258,26 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"

[[package]]
name = "typed-builder"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd9d30e3a08026c78f246b173243cf07b3696d274debd26680773b6773c2afc7"
dependencies = [
"typed-builder-macro",
]

[[package]]
name = "typed-builder-macro"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c36781cc0e46a83726d9879608e4cf6c2505237e263a8eb8c24502989cfdb28"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "typenum"
version = "1.19.0"
Expand Down
1 change: 1 addition & 0 deletions linkerd/opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ linkerd-trace-context = { path = "../trace-context" }
opentelemetry = { version = "0.31", default-features = false, features = ["trace"] }
opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace"] }
opentelemetry-proto = { version = "0.31" }
opentelemetry-zipkin = { version = "0.31", default-features = false }
opentelemetry-semantic-conventions = { version = "0.31", default-features = false, features = ["semconv_experimental"] }
tonic = { workspace = true, default-features = false, features = [
"codegen",
Expand Down
60 changes: 25 additions & 35 deletions linkerd/opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
#![forbid(unsafe_code)]

pub mod metrics;
pub mod propagation;

use self::metrics::Registry;
use crate::propagation::OrderedPropagator;
use futures::stream::{Stream, StreamExt};
use http_body::Body;
use linkerd_error::Error;
use linkerd_trace_context::{self as trace_context, export::ExportSpan};
pub use opentelemetry as otel;
use opentelemetry::{
trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
trace::{SpanContext, SpanKind, Status, TraceFlags, TraceState},
KeyValue,
};
pub use opentelemetry_proto as proto;
Expand Down Expand Up @@ -56,6 +58,8 @@ where
)
.build();

opentelemetry::global::set_text_map_propagator(OrderedPropagator::new());

SpanExportTask::new(spans, processor).run().await;
}

Expand Down Expand Up @@ -151,12 +155,12 @@ fn convert_span(span: ExportSpan) -> Result<SpanData, Error> {
for (k, v) in labels.iter() {
attributes.push(KeyValue::new(k.clone(), v.clone()));
}
for (k, v) in span.labels.into_iter() {
attributes.push(KeyValue::new(k, v));
for kv in span.labels.into_iter() {
attributes.push(kv);
}
let is_remote = kind != trace_context::export::SpanKind::Client;
Ok(SpanData {
parent_span_id: SpanId::from_bytes(span.parent_id.into_bytes()?),
parent_span_id: span.parent_id,
parent_span_is_remote: true, // we do not originate any spans locally
span_kind: match kind {
trace_context::export::SpanKind::Server => SpanKind::Server,
Expand All @@ -170,8 +174,8 @@ fn convert_span(span: ExportSpan) -> Result<SpanData, Error> {
links: SpanLinks::default(),
status: Status::Unset, // TODO: this is gRPC status; we must read response trailers to populate this
span_context: SpanContext::new(
TraceId::from_bytes(span.trace_id.into_bytes()?),
SpanId::from_bytes(span.span_id.into_bytes()?),
span.trace_id,
span.span_id,
TraceFlags::default(),
is_remote,
TraceState::NONE,
Expand All @@ -184,38 +188,33 @@ fn convert_span(span: ExportSpan) -> Result<SpanData, Error> {
#[cfg(test)]
mod tests {
use super::*;
use linkerd_trace_context::{export::SpanKind, Id, Span};
use linkerd_trace_context::{export::SpanKind, Span};
use opentelemetry::{SpanId, TraceId};
use opentelemetry_proto::tonic::common::v1::InstrumentationScope;
use std::{collections::HashMap, sync::Arc, time::SystemTime};
use std::{sync::Arc, time::SystemTime};
use tokio::sync::mpsc;
use tonic::codegen::{tokio_stream::wrappers::ReceiverStream, tokio_stream::StreamExt, Bytes};
use tonic::codegen::{tokio_stream::wrappers::ReceiverStream, tokio_stream::StreamExt};
use tonic_prost::ProstDecoder;

#[tokio::test(flavor = "current_thread")]
async fn send_span() {
let trace_id = Id::from(Bytes::from(
hex::decode("0123456789abcedffedcba9876543210").expect("decode"),
));
let span_id = Id::from(Bytes::from(
hex::decode("fedcba9876543210").expect("decode"),
));
let parent_id = Id::from(Bytes::from(
hex::decode("0123456789abcedf").expect("decode"),
));
let trace_id = TraceId::from_hex("0123456789abcedffedcba9876543210").expect("trace id");
let parent_id = SpanId::from_hex("fedcba9876543210").expect("parent id");
let span_id = SpanId::from_hex("0123456789abcedf").expect("span id");
let span_name = "test".to_string();

let start = SystemTime::now();
let end = SystemTime::now();

let span = ExportSpan {
span: Span {
trace_id: trace_id.clone(),
span_id: span_id.clone(),
parent_id: parent_id.clone(),
trace_id,
span_id,
parent_id,
span_name: span_name.clone(),
start,
end,
labels: HashMap::new(),
labels: Vec::new(),
},
kind: SpanKind::Server,
labels: Arc::new(Default::default()),
Expand Down Expand Up @@ -261,18 +260,9 @@ mod tests {
assert_eq!(scope_span.spans.len(), 1);

let span = scope_span.spans.remove(0);
assert_eq!(
span.span_id,
span_id.into_bytes::<8>().expect("into_bytes").to_vec()
);
assert_eq!(
span.parent_span_id,
parent_id.into_bytes::<8>().expect("into_bytes").to_vec()
);
assert_eq!(
span.trace_id,
trace_id.into_bytes::<16>().expect("into_bytes").to_vec()
);
assert_eq!(span.span_id, span_id.to_bytes().to_vec(),);
assert_eq!(span.parent_span_id, parent_id.to_bytes().to_vec(),);
assert_eq!(span.trace_id, trace_id.to_bytes().to_vec(),);
assert_eq!(span.name, span_name);
assert_eq!(
span.start_time_unix_nano,
Expand Down Expand Up @@ -306,7 +296,7 @@ mod tests {
tokio::spawn(export_spans(
inner,
ReceiverStream::new(span_rx),
opentelemetry_sdk::Resource::builder().build(),
Resource::builder().build(),
metrics,
));

Expand Down
79 changes: 79 additions & 0 deletions linkerd/opentelemetry/src/propagation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use opentelemetry::{
propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator},
Context,
};
use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};

#[derive(Copy, Clone, Eq, PartialEq)]
enum PropagationFormat {
W3C,
B3,
}

#[derive(Debug)]
pub struct OrderedPropagator {
w3c: TraceContextPropagator,
b3: opentelemetry_zipkin::Propagator,
baggage: BaggagePropagator,
fields: Vec<String>,
}

impl OrderedPropagator {
pub fn new() -> Self {
let w3c = TraceContextPropagator::new();
let b3 = opentelemetry_zipkin::Propagator::new();
let baggage = BaggagePropagator::new();

Self {
fields: w3c
.fields()
.chain(b3.fields())
.chain(baggage.fields())
.map(|s| s.to_string())
.collect(),
w3c,
b3,
baggage,
}
}
}

impl Default for OrderedPropagator {
fn default() -> Self {
Self::new()
}
}

impl TextMapPropagator for OrderedPropagator {
fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) {
match cx.get::<PropagationFormat>() {
None => {}
Some(PropagationFormat::W3C) => {
self.w3c.inject_context(cx, injector);
}
Some(PropagationFormat::B3) => {
self.b3.inject_context(cx, injector);
}
}
self.baggage.inject_context(cx, injector);
}

fn extract_with_context(&self, cx: &Context, extractor: &dyn Extractor) -> Context {
let cx = if self.w3c.fields().any(|f| extractor.get(f).is_some()) {
self.w3c
.extract_with_context(cx, extractor)
.with_value(PropagationFormat::W3C)
} else if self.b3.fields().any(|f| extractor.get(f).is_some()) {
self.b3
.extract_with_context(cx, extractor)
.with_value(PropagationFormat::B3)
} else {
cx.clone()
};
self.baggage.extract_with_context(&cx, extractor)
}

fn fields(&self) -> FieldIter<'_> {
FieldIter::new(self.fields.as_slice())
}
}
6 changes: 6 additions & 0 deletions linkerd/trace-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@ hex = "0.4"
http = { workspace = true }
linkerd-error = { path = "../error" }
linkerd-stack = { path = "../stack" }
opentelemetry = { version = "0.31", default-features = false, features = ["trace"] }
opentelemetry_sdk = { version = "0.31", default-features = false, features = ["trace"] }
opentelemetry-http = { version = "0.31", default-features = false }
opentelemetry-semantic-conventions = { version = "0.31", default-features = false, features = ["semconv_experimental"] }
rand = "0.9"
thiserror = "2"
tower = { workspace = true, default-features = false, features = ["util"] }
tracing = { workspace = true }

[dev-dependencies]
linkerd-app-core = { path = "../app/core" }
linkerd-http-box = { path = "../http/box" }
linkerd-io = { path = "../io" }
linkerd-tracing = { path = "../tracing" }
linkerd-opentelemetry = { path = "../opentelemetry" }
tokio = { version = "1", features = ["test-util"] }
tower-test = { workspace = true }
9 changes: 9 additions & 0 deletions linkerd/trace-context/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ pub enum SpanKind {
Client = 2,
}

impl From<SpanKind> for opentelemetry::trace::SpanKind {
fn from(value: SpanKind) -> Self {
match value {
SpanKind::Server => opentelemetry::trace::SpanKind::Server,
SpanKind::Client => opentelemetry::trace::SpanKind::Client,
}
}
}

pub type SpanLabels = Arc<HashMap<String, String>>;

#[derive(Debug)]
Expand Down
Loading
Loading