Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
613 changes: 298 additions & 315 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ datadog-trace-utils = { git = "https://github.com/Datadog/libdatadog", rev = "54
tinybytes = { git = "https://github.com/Datadog/libdatadog", rev = "540ea1e63ab403774325ffadb76f870747f11f6c" }

futures = "0.3.31"
opentelemetry_sdk = { version = "0.28.0", features = [
opentelemetry_sdk = { git = "https://github.com/open-telemetry/opentelemetry-rust", features = [
"trace",
], default-features = false }
opentelemetry = { version = "0.28.0", features = [
opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust", features = [
"trace",
], default-features = false }
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
opentelemetry-semantic-conventions = { git = "https://github.com/open-telemetry/opentelemetry-rust", features = ["semconv_experimental"] }
tokio = { version = "1.44.1" }
rand = { version = "0.8" }
rand = { version = "0.8", features = ["small_rng"] }

[profile.dev]
debug = 2 # full debug info
Expand Down
12 changes: 6 additions & 6 deletions datadog-opentelemetry/examples/propagator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ hyper = { version = "1.3", default-features = false, features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
tokio = { version = "1", default-features = false, features = ["full"] }
tokio-stream = "0.1"
opentelemetry = "0.28.0"
opentelemetry_sdk = "0.28.0"
opentelemetry-http = "0.28.0"
opentelemetry-stdout = { version = "0.28.0", features = ["trace", "logs"] }
opentelemetry-semantic-conventions = "0.28.0"
opentelemetry-appender-tracing = "0.28.0"
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true }
opentelemetry-http = { git = "https://github.com/open-telemetry/opentelemetry-rust" }
opentelemetry-stdout = { git = "https://github.com/open-telemetry/opentelemetry-rust", features = ["trace", "logs"] }
opentelemetry-appender-tracing = { git = "https://github.com/open-telemetry/opentelemetry-rust" }

tracing = { version = ">=0.1.40", default-features = false, features = ["std"] }
# `tracing-core >=0.1.33` is required for compatibility with `tracing >=0.1.40`.
Expand Down
4 changes: 4 additions & 0 deletions datadog-opentelemetry/examples/propagator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor {
Ok(())
}

fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}

fn on_start(&self, span: &mut opentelemetry_sdk::trace::Span, cx: &Context) {
for (kk, vv) in cx.baggage().iter() {
span.set_attribute(KeyValue::new(kk.clone(), vv.0.clone()));
Expand Down
6 changes: 5 additions & 1 deletion datadog-opentelemetry/src/span_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,14 @@ impl DatadogExporter {
}

pub fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(SPAN_EXPORTER_SHUTDOWN_TIMEOUT)
}

pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
match self
.tx
.trigger_shutdown()
.and_then(|()| self.tx.wait_shutdown_done(SPAN_EXPORTER_SHUTDOWN_TIMEOUT))
.and_then(|()| self.tx.wait_shutdown_done(timeout))
{
Ok(()) | Err(SenderError::BatchFull(_)) => {}
Err(SenderError::AlreadyShutdown) => {
Expand Down
7 changes: 7 additions & 0 deletions datadog-opentelemetry/src/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,4 +461,11 @@ impl opentelemetry_sdk::trace::SpanProcessor for DatadogSpanProcessor {
// set the shared resource in the DatadogSpanProcessor
*self.resource.write().unwrap() = resource.clone();
}

fn shutdown_with_timeout(
&self,
timeout: std::time::Duration,
) -> opentelemetry_sdk::error::OTelSdkResult {
self.span_exporter.shutdown_with_timeout(timeout)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"span.kind": "server",
"telemetry.sdk.language": "rust",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "0.28.0"
"telemetry.sdk.version": "0.30.0"
},
"metrics": {
"_sampling_priority_v1": 2.0,
Expand All @@ -43,7 +43,7 @@
"span.kind": "client",
"telemetry.sdk.language": "rust",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "0.28.0"
"telemetry.sdk.version": "0.30.0"
},
"metrics": {
"_dd.measured": 1.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"span.kind": "client",
"telemetry.sdk.language": "rust",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "0.28.0"
"telemetry.sdk.version": "0.30.0"
},
"metrics": {
"_dd.measured": 1.0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"span.kind": "server",
"telemetry.sdk.language": "rust",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "0.28.0"
"telemetry.sdk.version": "0.30.0"
},
"metrics": {
"_dd.rule.psr": 1.0,
Expand All @@ -41,7 +41,7 @@
"span.kind": "client",
"telemetry.sdk.language": "rust",
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.version": "0.28.0"
"telemetry.sdk.version": "0.30.0"
},
"metrics": {
"_dd.measured": 1.0,
Expand Down
3 changes: 2 additions & 1 deletion dd-trace-propagation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ thiserror = { version = "1.0", default-features = false}
opentelemetry = { workspace = true, optional = true }

[dev-dependencies]
pretty_assertions = "1.4.1"
assert_unordered = "0.3"
multimap = "0.10.1"
pretty_assertions = "1.4.1"

[features]
default = ["opentelemetry"]
Expand Down
46 changes: 46 additions & 0 deletions dd-trace-propagation/src/carrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
/// <https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry/src/propagation/mod.rs>
use std::collections::HashMap;

use crate::error::Error;

/// Injector provides an interface for a carrier to be used
/// with a Propagator to inject a Context into the carrier.
pub trait Injector {
Expand All @@ -16,10 +18,45 @@ pub trait Extractor {
/// Get a value from the carrier.
fn get(&self, key: &str) -> Option<&str>;

/// Get all values for a key from the carrier
fn get_all(&self, key: &str) -> Option<Vec<&str>>;

/// Get all keys from the carrier.
fn keys(&self) -> Vec<&str>;
}

pub fn get_single_value_from_extractor<'a>(
extractor: &'a dyn Extractor,
key: &'a str,
) -> Result<Option<&'a str>, Error> {
let all = extractor.get_all(key);
if let Some(all) = all {
if all.iter().len() > 1 {
return Err(Error::extract(
"Multiple values while getting a single value",
"generic",
));
} else {
return Ok(all.first().map(|v| &**v));
}
}

Ok(None)
}

pub fn get_comma_separated_value_from_extractor<'a>(
extractor: &'a dyn Extractor,
key: &'a str,
) -> Option<String> {
extractor.get_all(key).map(|all| {
all.iter()
.filter(|part| !part.is_empty())
.copied()
.collect::<Vec<_>>()
.join(",")
})
}

impl<S: std::hash::BuildHasher> Injector for HashMap<String, String, S> {
/// Set a key and value in the `HashMap`.
fn set(&mut self, key: &str, value: String) {
Expand All @@ -33,6 +70,11 @@ impl<S: std::hash::BuildHasher> Extractor for HashMap<String, String, S> {
self.get(&key.to_lowercase()).map(String::as_str)
}

/// Get all values for a key from the `HashMap`.
fn get_all(&self, key: &str) -> Option<Vec<&str>> {
Extractor::get(self, key).map(|value| vec![value])
}

/// Collect all the keys from the `HashMap`.
fn keys(&self) -> Vec<&str> {
self.keys().map(String::as_str).collect::<Vec<_>>()
Expand All @@ -45,6 +87,10 @@ impl Extractor for &dyn opentelemetry::propagation::Extractor {
opentelemetry::propagation::Extractor::get(*self, key)
}

fn get_all(&self, key: &str) -> Option<Vec<&str>> {
opentelemetry::propagation::Extractor::get_all(*self, key)
}

fn keys(&self) -> Vec<&str> {
opentelemetry::propagation::Extractor::keys(*self)
}
Expand Down
29 changes: 14 additions & 15 deletions dd-trace-propagation/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl FromStr for Tracestate {
fn from_str(tracestate: &str) -> Result<Self, Self::Err> {
let ts_v = tracestate.split(',');

let mut dd: Option<HashMap<String, String>> = None;
let mut dd: HashMap<String, String> = HashMap::new();
let mut additional_values = vec![];

for v in ts_v {
Expand All @@ -160,21 +160,20 @@ impl FromStr for Tracestate {
return Err(String::from("Invalid tracestate"));
}

// datadog tracestate part is as 'dd=key1:value1;key2:value2'
if key == "dd" {
dd = Some(
value
.trim()
.split(';')
.filter_map(|item| {
if INVALID_ASCII_CHARACTERS_REGEX.is_match(item) {
None
} else {
let mut parts = item.splitn(2, ':');
Some((parts.next()?.to_string(), decode_tag_value(parts.next()?)))
value
.trim()
.split(';')
.filter(|item| !INVALID_ASCII_CHARACTERS_REGEX.is_match(item))
.for_each(|item| {
let mut parts = item.splitn(2, ':');
if let Some(key) = parts.next() {
if let Some(value) = parts.next() {
dd.insert(key.to_string(), decode_tag_value(value));
}
})
.collect(),
);
}
});
} else {
additional_values.push((key.to_string(), value.to_string()));
}
Expand All @@ -193,7 +192,7 @@ impl FromStr for Tracestate {
tracestate.additional_values = Some(additional_values);
}

let propagation_tags = if let Some(dd) = dd {
let propagation_tags = if !dd.is_empty() {
let mut tags = HashMap::new();
let mut priority = None;
let mut mechanism = None;
Expand Down
Loading
Loading