Skip to content

Commit dd0d5d9

Browse files
committed
chore(tracing): Use upstream tracing batch exporter
The proxy used its own custom trace exporter for some time. It was built before the upstream OpenTelemetry libraries were available, and they have outlived their usefulness. This replaces the custom exporter with a batch exporter configured to export to the same endpoint with the same service configuration. In the future, this exporter can be installed as a global default trace processor, which would decouple it from the service layer where the proxy generates spans for requests. Signed-off-by: Scott Fleener <[email protected]>
1 parent 432f06b commit dd0d5d9

File tree

2 files changed

+140
-275
lines changed

2 files changed

+140
-275
lines changed
Lines changed: 26 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,6 @@
11
use super::EnabledCollector;
22
use linkerd_app_core::{control::ControlAddr, proxy::http::Body, Error};
3-
use linkerd_opentelemetry::{
4-
self as opentelemetry, metrics,
5-
proto::{
6-
tonic::common::v1::{any_value, AnyValue, KeyValue},
7-
transform::common::tonic::ResourceAttributesWithSchema,
8-
},
9-
semconv,
10-
};
3+
use linkerd_opentelemetry::{self as opentelemetry, metrics, otel::KeyValue, sdk, semconv};
114
use std::{
125
collections::HashMap,
136
time::{SystemTime, UNIX_EPOCH},
@@ -29,7 +22,7 @@ pub(super) fn create_collector<S>(
2922
legacy_metrics: metrics::Registry,
3023
) -> EnabledCollector
3124
where
32-
S: GrpcService<TonicBody> + Clone + Send + 'static,
25+
S: GrpcService<TonicBody> + Clone + Send + Sync + 'static,
3326
S::Error: Into<Error>,
3427
S::Future: Send,
3528
S::ResponseBody: Body<Data = tonic::codegen::Bytes> + Send + 'static,
@@ -38,37 +31,33 @@ where
3831
let (span_sink, spans_rx) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY);
3932
let spans_rx = ReceiverStream::new(spans_rx);
4033

41-
let mut resources = ResourceAttributesWithSchema::default();
42-
43-
resources
44-
.attributes
45-
.0
46-
.push((std::process::id() as i64).with_key(semconv::attribute::PROCESS_PID));
47-
48-
resources.attributes.0.push(
49-
SystemTime::now()
50-
.duration_since(UNIX_EPOCH)
51-
.map(|d| d.as_secs() as i64)
52-
.unwrap_or_else(|e| -(e.duration().as_secs() as i64))
53-
.with_key("process.start_timestamp"),
54-
);
55-
resources.attributes.0.push(
56-
attributes
57-
.hostname
58-
.unwrap_or_default()
59-
.with_key(semconv::attribute::HOST_NAME),
60-
);
61-
62-
resources.attributes.0.extend(
63-
attributes
64-
.extra
65-
.into_iter()
66-
.map(|(key, value)| value.with_key(&key)),
67-
);
34+
let resource = sdk::Resource::builder()
35+
.with_attribute(KeyValue::new(
36+
semconv::attribute::PROCESS_PID,
37+
std::process::id() as i64,
38+
))
39+
.with_attribute(KeyValue::new(
40+
"process.start_timestamp",
41+
SystemTime::now()
42+
.duration_since(UNIX_EPOCH)
43+
.map(|d| d.as_secs() as i64)
44+
.unwrap_or_else(|e| -(e.duration().as_secs() as i64)),
45+
))
46+
.with_attribute(KeyValue::new(
47+
semconv::attribute::HOST_NAME,
48+
attributes.hostname.unwrap_or_default(),
49+
))
50+
.with_attributes(
51+
attributes
52+
.extra
53+
.into_iter()
54+
.map(|(k, v)| KeyValue::new(k, v)),
55+
)
56+
.build();
6857

6958
let addr = addr.clone();
7059
let task = Box::pin(
71-
opentelemetry::export_spans(svc, spans_rx, resources, legacy_metrics)
60+
opentelemetry::export_spans(svc, spans_rx, resource, legacy_metrics)
7261
.instrument(tracing::debug_span!("opentelemetry", peer.addr = %addr).or_current()),
7362
);
7463

@@ -78,41 +67,3 @@ where
7867
span_sink,
7968
}
8069
}
81-
82-
trait IntoAnyValue
83-
where
84-
Self: Sized,
85-
{
86-
fn into_any_value(self) -> AnyValue;
87-
88-
fn with_key(self, key: &str) -> KeyValue {
89-
KeyValue {
90-
key: key.to_string(),
91-
value: Some(self.into_any_value()),
92-
}
93-
}
94-
}
95-
96-
impl IntoAnyValue for String {
97-
fn into_any_value(self) -> AnyValue {
98-
AnyValue {
99-
value: Some(any_value::Value::StringValue(self)),
100-
}
101-
}
102-
}
103-
104-
impl IntoAnyValue for &str {
105-
fn into_any_value(self) -> AnyValue {
106-
AnyValue {
107-
value: Some(any_value::Value::StringValue(self.to_string())),
108-
}
109-
}
110-
}
111-
112-
impl IntoAnyValue for i64 {
113-
fn into_any_value(self) -> AnyValue {
114-
AnyValue {
115-
value: Some(any_value::Value::IntValue(self)),
116-
}
117-
}
118-
}

0 commit comments

Comments
 (0)