diff --git a/opentelemetry-datadog/src/exporter/mod.rs b/opentelemetry-datadog/src/exporter/mod.rs index 5514e8a78..452e9ff68 100644 --- a/opentelemetry-datadog/src/exporter/mod.rs +++ b/opentelemetry-datadog/src/exporter/mod.rs @@ -97,7 +97,9 @@ impl DatadogExporter { ) -> Result>, TraceError> { let traces: Vec<&[SpanData]> = group_into_traces(&mut batch); let trace_count = traces.len(); - let data = self.api_version.encode( + let mut buffer = Vec::with_capacity(trace_count * 512); + self.api_version.encode( + &mut buffer, &self.model_config, traces, &self.mapping, @@ -114,7 +116,7 @@ impl DatadogExporter { DATADOG_META_TRACER_VERSION_HEADER, env!("CARGO_PKG_VERSION"), ) - .body(data) + .body(buffer) .map_err::(Into::into)?; Ok(req) diff --git a/opentelemetry-datadog/src/exporter/model/mod.rs b/opentelemetry-datadog/src/exporter/model/mod.rs index 2dbf8a329..8848e8f55 100644 --- a/opentelemetry-datadog/src/exporter/model/mod.rs +++ b/opentelemetry-datadog/src/exporter/model/mod.rs @@ -149,16 +149,18 @@ impl ApiVersion { } } - pub(crate) fn encode( + pub(crate) fn encode( self, + writer: &mut W, model_config: &ModelConfig, traces: Vec<&[trace::SpanData]>, mapping: &Mapping, unified_tags: &UnifiedTags, resource: Option<&Resource>, - ) -> Result, Error> { + ) -> Result<(), Error> { match self { Self::Version03 => v03::encode( + writer, model_config, traces, |span, config| match &mapping.service_name { @@ -176,6 +178,7 @@ impl ApiVersion { resource, ), Self::Version05 => v05::encode( + writer, model_config, traces, |span, config| match &mapping.service_name { @@ -256,13 +259,17 @@ pub(crate) mod tests { ..Default::default() }; let resource = Resource::new(vec![KeyValue::new("host.name", "test")]); - let encoded = base64::encode(ApiVersion::Version03.encode( + + let mut buffer = Vec::new(); + ApiVersion::Version03.encode( + &mut buffer, &model_config, traces.iter().map(|x| &x[..]).collect(), &Mapping::empty(), &UnifiedTags::new(), Some(&resource), - )?); + )?; + let encoded = base64::encode(buffer); assert_eq!(encoded.as_str(), "kZGMpHR5cGWjd2Vip3NlcnZpY2Wsc2VydmljZV9uYW1lpG5hbWWpY29tcG9uZW\ 50qHJlc291cmNlqHJlc291cmNlqHRyYWNlX2lkzwAAAAAAAAAHp3NwYW5faWTPAAAAAAAAAGOpcGFyZW50X2lkzwAAAA\ @@ -286,13 +293,16 @@ pub(crate) mod tests { unified_tags.set_version(Some(String::from("test-version"))); unified_tags.set_service(Some(String::from("test-service"))); - let _encoded = base64::encode(ApiVersion::Version05.encode( + let mut buffer = Vec::new(); + ApiVersion::Version05.encode( + &mut buffer, &model_config, traces.iter().map(|x| &x[..]).collect(), &Mapping::empty(), &unified_tags, Some(&resource), - )?); + )?; + let _encoded = base64::encode(&mut buffer); // TODO: Need someone to generate the expected result or instructions to do so. // assert_eq!(encoded.as_str(), "kp6jd2VirHNlcnZpY2VfbmFtZaljb21wb25lbnSocmVzb3VyY2WpaG9zdC5uYW\ diff --git a/opentelemetry-datadog/src/exporter/model/v03.rs b/opentelemetry-datadog/src/exporter/model/v03.rs index 0cbba9c1a..7a74f63a2 100644 --- a/opentelemetry-datadog/src/exporter/model/v03.rs +++ b/opentelemetry-datadog/src/exporter/model/v03.rs @@ -5,24 +5,24 @@ use opentelemetry_sdk::export::trace::SpanData; use opentelemetry_sdk::Resource; use std::time::SystemTime; -pub(crate) fn encode( +pub(crate) fn encode( + writer: &mut W, model_config: &ModelConfig, traces: Vec<&[SpanData]>, get_service_name: S, get_name: N, get_resource: R, resource: Option<&Resource>, -) -> Result, Error> +) -> Result<(), Error> where for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, { - let mut encoded = Vec::new(); - rmp::encode::write_array_len(&mut encoded, traces.len() as u32)?; + rmp::encode::write_array_len(writer, traces.len() as u32)?; for trace in traces.into_iter() { - rmp::encode::write_array_len(&mut encoded, trace.len() as u32)?; + rmp::encode::write_array_len(writer, trace.len() as u32)?; for span in trace { // Safe until the year 2262 when Datadog will need to change their API @@ -42,81 +42,78 @@ where for kv in &span.attributes { if kv.key.as_str() == "span.type" { span_type_found = true; - rmp::encode::write_map_len(&mut encoded, 12)?; - rmp::encode::write_str(&mut encoded, "type")?; - rmp::encode::write_str(&mut encoded, kv.value.as_str().as_ref())?; + rmp::encode::write_map_len(writer, 12)?; + rmp::encode::write_str(writer, "type")?; + rmp::encode::write_str(writer, kv.value.as_str().as_ref())?; break; } } if !span_type_found { - rmp::encode::write_map_len(&mut encoded, 11)?; + rmp::encode::write_map_len(writer, 11)?; } // Datadog span name is OpenTelemetry component name - see module docs for more information - rmp::encode::write_str(&mut encoded, "service")?; - rmp::encode::write_str(&mut encoded, get_service_name(span, model_config))?; + rmp::encode::write_str(writer, "service")?; + rmp::encode::write_str(writer, get_service_name(span, model_config))?; - rmp::encode::write_str(&mut encoded, "name")?; - rmp::encode::write_str(&mut encoded, get_name(span, model_config))?; + rmp::encode::write_str(writer, "name")?; + rmp::encode::write_str(writer, get_name(span, model_config))?; - rmp::encode::write_str(&mut encoded, "resource")?; - rmp::encode::write_str(&mut encoded, get_resource(span, model_config))?; + rmp::encode::write_str(writer, "resource")?; + rmp::encode::write_str(writer, get_resource(span, model_config))?; - rmp::encode::write_str(&mut encoded, "trace_id")?; + rmp::encode::write_str(writer, "trace_id")?; rmp::encode::write_u64( - &mut encoded, + writer, u128::from_be_bytes(span.span_context.trace_id().to_bytes()) as u64, )?; - rmp::encode::write_str(&mut encoded, "span_id")?; + rmp::encode::write_str(writer, "span_id")?; rmp::encode::write_u64( - &mut encoded, + writer, u64::from_be_bytes(span.span_context.span_id().to_bytes()), )?; - rmp::encode::write_str(&mut encoded, "parent_id")?; - rmp::encode::write_u64( - &mut encoded, - u64::from_be_bytes(span.parent_span_id.to_bytes()), - )?; + rmp::encode::write_str(writer, "parent_id")?; + rmp::encode::write_u64(writer, u64::from_be_bytes(span.parent_span_id.to_bytes()))?; - rmp::encode::write_str(&mut encoded, "start")?; - rmp::encode::write_i64(&mut encoded, start)?; + rmp::encode::write_str(writer, "start")?; + rmp::encode::write_i64(writer, start)?; - rmp::encode::write_str(&mut encoded, "duration")?; - rmp::encode::write_i64(&mut encoded, duration)?; + rmp::encode::write_str(writer, "duration")?; + rmp::encode::write_i64(writer, duration)?; - rmp::encode::write_str(&mut encoded, "error")?; + rmp::encode::write_str(writer, "error")?; rmp::encode::write_i32( - &mut encoded, + writer, match span.status { Status::Error { .. } => 1, _ => 0, }, )?; - rmp::encode::write_str(&mut encoded, "meta")?; + rmp::encode::write_str(writer, "meta")?; rmp::encode::write_map_len( - &mut encoded, + writer, (span.attributes.len() + resource.map(|r| r.len()).unwrap_or(0)) as u32, )?; if let Some(resource) = resource { for (key, value) in resource.iter() { - rmp::encode::write_str(&mut encoded, key.as_str())?; - rmp::encode::write_str(&mut encoded, value.as_str().as_ref())?; + rmp::encode::write_str(writer, key.as_str())?; + rmp::encode::write_str(writer, value.as_str().as_ref())?; } } for kv in span.attributes.iter() { - rmp::encode::write_str(&mut encoded, kv.key.as_str())?; - rmp::encode::write_str(&mut encoded, kv.value.as_str().as_ref())?; + rmp::encode::write_str(writer, kv.key.as_str())?; + rmp::encode::write_str(writer, kv.value.as_str().as_ref())?; } - rmp::encode::write_str(&mut encoded, "metrics")?; - rmp::encode::write_map_len(&mut encoded, 1)?; - rmp::encode::write_str(&mut encoded, SAMPLING_PRIORITY_KEY)?; + rmp::encode::write_str(writer, "metrics")?; + rmp::encode::write_map_len(writer, 1)?; + rmp::encode::write_str(writer, SAMPLING_PRIORITY_KEY)?; rmp::encode::write_f64( - &mut encoded, + writer, if span.span_context.is_sampled() { 1.0 } else { @@ -126,5 +123,5 @@ where } } - Ok(encoded) + Ok(()) } diff --git a/opentelemetry-datadog/src/exporter/model/v05.rs b/opentelemetry-datadog/src/exporter/model/v05.rs index ef3080a52..d1d9e5125 100644 --- a/opentelemetry-datadog/src/exporter/model/v05.rs +++ b/opentelemetry-datadog/src/exporter/model/v05.rs @@ -5,6 +5,8 @@ use crate::propagator::DatadogTraceState; use opentelemetry::trace::Status; use opentelemetry_sdk::export::trace::SpanData; use opentelemetry_sdk::Resource; +use std::cell::RefCell; +use std::ops::DerefMut; use std::time::SystemTime; use super::unified_tags::{UnifiedTagField, UnifiedTags}; @@ -25,7 +27,7 @@ const GIT_META_TAGS_COUNT: u32 = if matches!( // Protocol documentation sourced from https://github.com/DataDog/datadog-agent/blob/c076ea9a1ffbde4c76d35343dbc32aecbbf99cb9/pkg/trace/api/version.go // -// The payload is an array containing exactly 12 elements: +// The payload is an array containing exactly 2 elements: // // 1. An array of all unique strings present in the payload (a dictionary referred to by index). // 2. An array of traces, where each trace is an array of spans. A span is encoded as an array having @@ -68,7 +70,9 @@ const GIT_META_TAGS_COUNT: u32 = if matches!( // // The dictionary in this case would be []string{""}, having only the empty string at index 0. // -pub(crate) fn encode( +#[allow(clippy::too_many_arguments)] +pub(crate) fn encode( + writer: &mut W, model_config: &ModelConfig, traces: Vec<&[SpanData]>, get_service_name: S, @@ -76,53 +80,63 @@ pub(crate) fn encode( get_resource: R, unified_tags: &UnifiedTags, resource: Option<&Resource>, -) -> Result, Error> +) -> Result<(), Error> where for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, { + thread_local! { + static TRACES_BUFFER: RefCell> = RefCell::new(Vec::with_capacity(4096)); + } let mut interner = StringInterner::new(); - let mut encoded_traces = encode_traces( - &mut interner, - model_config, - get_service_name, - get_name, - get_resource, - &traces, - unified_tags, - resource, - )?; + TRACES_BUFFER.with(|buffer| { + let buffer = &mut buffer.borrow_mut(); + buffer.clear(); - let mut payload = Vec::with_capacity(traces.len() * 512); - rmp::encode::write_array_len(&mut payload, 2)?; + encode_traces( + buffer.deref_mut(), + &mut interner, + model_config, + get_service_name, + get_name, + get_resource, + &traces, + unified_tags, + resource, + )?; - interner.write_dictionary(&mut payload)?; + rmp::encode::write_array_len(writer, 2)?; - payload.append(&mut encoded_traces); + interner.write_dictionary(writer)?; - Ok(payload) + writer + .write_all(buffer) + .map_err(|_| Error::MessagePackError)?; + + Ok(()) + }) } -fn write_unified_tags<'a>( - encoded: &mut Vec, +fn write_unified_tags<'a, W: std::io::Write>( + writer: &mut W, interner: &mut StringInterner<'a>, unified_tags: &'a UnifiedTags, ) -> Result<(), Error> { - write_unified_tag(encoded, interner, &unified_tags.service)?; - write_unified_tag(encoded, interner, &unified_tags.env)?; - write_unified_tag(encoded, interner, &unified_tags.version)?; + write_unified_tag(writer, interner, &unified_tags.service)?; + write_unified_tag(writer, interner, &unified_tags.env)?; + write_unified_tag(writer, interner, &unified_tags.version)?; Ok(()) } -fn write_unified_tag<'a>( - encoded: &mut Vec, +fn write_unified_tag<'a, W: std::io::Write>( + writer: &mut W, interner: &mut StringInterner<'a>, tag: &'a UnifiedTagField, ) -> Result<(), Error> { if let Some(tag_value) = &tag.value { - rmp::encode::write_u32(encoded, interner.intern(tag.get_tag_name()))?; - rmp::encode::write_u32(encoded, interner.intern(tag_value.as_str().as_ref()))?; + rmp::encode::write_u32(writer, interner.intern(tag.get_tag_name()))?; + rmp::encode::write_u32(writer, interner.intern(tag_value.as_str().as_ref()))?; } Ok(()) } @@ -150,7 +164,8 @@ fn get_measuring(span: &SpanData) -> f64 { } #[allow(clippy::too_many_arguments)] -fn encode_traces<'interner, S, N, R>( +fn encode_traces<'interner, S, N, R, W: std::io::Write>( + writer: &mut W, interner: &mut StringInterner<'interner>, model_config: &'interner ModelConfig, get_service_name: S, @@ -159,17 +174,16 @@ fn encode_traces<'interner, S, N, R>( traces: &'interner [&[SpanData]], unified_tags: &'interner UnifiedTags, resource: Option<&'interner Resource>, -) -> Result, Error> +) -> Result<(), Error> where for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, { - let mut encoded = Vec::new(); - rmp::encode::write_array_len(&mut encoded, traces.len() as u32)?; + rmp::encode::write_array_len(writer, traces.len() as u32)?; for trace in traces.iter() { - rmp::encode::write_array_len(&mut encoded, trace.len() as u32)?; + rmp::encode::write_array_len(writer, trace.len() as u32)?; for span in trace.iter() { // Safe until the year 2262 when Datadog will need to change their API @@ -194,32 +208,26 @@ where } // Datadog span name is OpenTelemetry component name - see module docs for more information - rmp::encode::write_array_len(&mut encoded, SPAN_NUM_ELEMENTS)?; + rmp::encode::write_array_len(writer, SPAN_NUM_ELEMENTS)?; rmp::encode::write_u32( - &mut encoded, + writer, interner.intern(get_service_name(span, model_config)), )?; - rmp::encode::write_u32(&mut encoded, interner.intern(get_name(span, model_config)))?; - rmp::encode::write_u32( - &mut encoded, - interner.intern(get_resource(span, model_config)), - )?; + rmp::encode::write_u32(writer, interner.intern(get_name(span, model_config)))?; + rmp::encode::write_u32(writer, interner.intern(get_resource(span, model_config)))?; rmp::encode::write_u64( - &mut encoded, + writer, u128::from_be_bytes(span.span_context.trace_id().to_bytes()) as u64, )?; rmp::encode::write_u64( - &mut encoded, + writer, u64::from_be_bytes(span.span_context.span_id().to_bytes()), )?; - rmp::encode::write_u64( - &mut encoded, - u64::from_be_bytes(span.parent_span_id.to_bytes()), - )?; - rmp::encode::write_i64(&mut encoded, start)?; - rmp::encode::write_i64(&mut encoded, duration)?; + rmp::encode::write_u64(writer, u64::from_be_bytes(span.parent_span_id.to_bytes()))?; + rmp::encode::write_i64(writer, start)?; + rmp::encode::write_i64(writer, duration)?; rmp::encode::write_i32( - &mut encoded, + writer, match span.status { Status::Error { .. } => 1, _ => 0, @@ -227,46 +235,46 @@ where )?; rmp::encode::write_map_len( - &mut encoded, + writer, (span.attributes.len() + resource.map(|r| r.len()).unwrap_or(0)) as u32 + unified_tags.compute_attribute_size() + GIT_META_TAGS_COUNT, )?; if let Some(resource) = resource { for (key, value) in resource.iter() { - rmp::encode::write_u32(&mut encoded, interner.intern(key.as_str()))?; - rmp::encode::write_u32(&mut encoded, interner.intern_value(value))?; + rmp::encode::write_u32(writer, interner.intern(key.as_str()))?; + rmp::encode::write_u32(writer, interner.intern_value(value))?; } } - write_unified_tags(&mut encoded, interner, unified_tags)?; + write_unified_tags(writer, interner, unified_tags)?; for kv in span.attributes.iter() { - rmp::encode::write_u32(&mut encoded, interner.intern(kv.key.as_str()))?; - rmp::encode::write_u32(&mut encoded, interner.intern_value(&kv.value))?; + rmp::encode::write_u32(writer, interner.intern(kv.key.as_str()))?; + rmp::encode::write_u32(writer, interner.intern_value(&kv.value))?; } if let (Some(repository_url), Some(commit_sha)) = ( option_env!("DD_GIT_REPOSITORY_URL"), option_env!("DD_GIT_COMMIT_SHA"), ) { - rmp::encode::write_u32(&mut encoded, interner.intern("git.repository_url"))?; - rmp::encode::write_u32(&mut encoded, interner.intern(repository_url))?; - rmp::encode::write_u32(&mut encoded, interner.intern("git.commit.sha"))?; - rmp::encode::write_u32(&mut encoded, interner.intern(commit_sha))?; + rmp::encode::write_u32(writer, interner.intern("git.repository_url"))?; + rmp::encode::write_u32(writer, interner.intern(repository_url))?; + rmp::encode::write_u32(writer, interner.intern("git.commit.sha"))?; + rmp::encode::write_u32(writer, interner.intern(commit_sha))?; } - rmp::encode::write_map_len(&mut encoded, METRICS_LEN)?; - rmp::encode::write_u32(&mut encoded, interner.intern(SAMPLING_PRIORITY_KEY))?; + rmp::encode::write_map_len(writer, METRICS_LEN)?; + rmp::encode::write_u32(writer, interner.intern(SAMPLING_PRIORITY_KEY))?; let sampling_priority = get_sampling_priority(span); - rmp::encode::write_f64(&mut encoded, sampling_priority)?; + rmp::encode::write_f64(writer, sampling_priority)?; - rmp::encode::write_u32(&mut encoded, interner.intern(DD_MEASURED_KEY))?; + rmp::encode::write_u32(writer, interner.intern(DD_MEASURED_KEY))?; let measuring = get_measuring(span); - rmp::encode::write_f64(&mut encoded, measuring)?; - rmp::encode::write_u32(&mut encoded, span_type)?; + rmp::encode::write_f64(writer, measuring)?; + rmp::encode::write_u32(writer, span_type)?; } } - Ok(encoded) + Ok(()) }