Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions opentelemetry-datadog/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@
) -> Result<http::Request<Vec<u8>>, TraceError> {
let traces: Vec<&[SpanData]> = group_into_traces(&mut batch);
let trace_count = traces.len();
let data = self.api_version.encode(
let mut buffer = Vec::with_capacity(trace_count * 512);
self.api_version.encode(
&mut buffer,

Check warning on line 102 in opentelemetry-datadog/src/exporter/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-datadog/src/exporter/mod.rs#L100-L102

Added lines #L100 - L102 were not covered by tests
&self.model_config,
traces,
&self.mapping,
Expand All @@ -114,7 +116,7 @@
DATADOG_META_TRACER_VERSION_HEADER,
env!("CARGO_PKG_VERSION"),
)
.body(data)
.body(buffer)

Check warning on line 119 in opentelemetry-datadog/src/exporter/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-datadog/src/exporter/mod.rs#L119

Added line #L119 was not covered by tests
.map_err::<Error, _>(Into::into)?;

Ok(req)
Expand Down
22 changes: 16 additions & 6 deletions opentelemetry-datadog/src/exporter/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,18 @@ impl ApiVersion {
}
}

pub(crate) fn encode(
pub(crate) fn encode<W: std::io::Write>(
self,
writer: &mut W,
model_config: &ModelConfig,
traces: Vec<&[trace::SpanData]>,
mapping: &Mapping,
unified_tags: &UnifiedTags,
resource: Option<&Resource>,
) -> Result<Vec<u8>, Error> {
) -> Result<(), Error> {
match self {
Self::Version03 => v03::encode(
writer,
model_config,
traces,
|span, config| match &mapping.service_name {
Expand All @@ -176,6 +178,7 @@ impl ApiVersion {
resource,
),
Self::Version05 => v05::encode(
writer,
model_config,
traces,
|span, config| match &mapping.service_name {
Expand Down Expand Up @@ -256,13 +259,17 @@ pub(crate) mod tests {
..Default::default()
};
let resource = Resource::new(vec![KeyValue::new("host.name", "test")]);
let encoded = base64::encode(ApiVersion::Version03.encode(

let mut buffer = Vec::new();
ApiVersion::Version03.encode(
&mut buffer,
&model_config,
traces.iter().map(|x| &x[..]).collect(),
&Mapping::empty(),
&UnifiedTags::new(),
Some(&resource),
)?);
)?;
let encoded = base64::encode(buffer);

assert_eq!(encoded.as_str(), "kZGMpHR5cGWjd2Vip3NlcnZpY2Wsc2VydmljZV9uYW1lpG5hbWWpY29tcG9uZW\
50qHJlc291cmNlqHJlc291cmNlqHRyYWNlX2lkzwAAAAAAAAAHp3NwYW5faWTPAAAAAAAAAGOpcGFyZW50X2lkzwAAAA\
Expand All @@ -286,13 +293,16 @@ pub(crate) mod tests {
unified_tags.set_version(Some(String::from("test-version")));
unified_tags.set_service(Some(String::from("test-service")));

let _encoded = base64::encode(ApiVersion::Version05.encode(
let mut buffer = Vec::new();
ApiVersion::Version05.encode(
&mut buffer,
&model_config,
traces.iter().map(|x| &x[..]).collect(),
&Mapping::empty(),
&unified_tags,
Some(&resource),
)?);
)?;
let _encoded = base64::encode(&mut buffer);

// TODO: Need someone to generate the expected result or instructions to do so.
// assert_eq!(encoded.as_str(), "kp6jd2VirHNlcnZpY2VfbmFtZaljb21wb25lbnSocmVzb3VyY2WpaG9zdC5uYW\
Expand Down
79 changes: 38 additions & 41 deletions opentelemetry-datadog/src/exporter/model/v03.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@
use opentelemetry_sdk::Resource;
use std::time::SystemTime;

pub(crate) fn encode<S, N, R>(
pub(crate) fn encode<S, N, R, W: std::io::Write>(
writer: &mut W,
model_config: &ModelConfig,
traces: Vec<&[SpanData]>,
get_service_name: S,
get_name: N,
get_resource: R,
resource: Option<&Resource>,
) -> Result<Vec<u8>, Error>
) -> Result<(), Error>
where
for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str,
for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str,
for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str,
{
let mut encoded = Vec::new();
rmp::encode::write_array_len(&mut encoded, traces.len() as u32)?;
rmp::encode::write_array_len(writer, traces.len() as u32)?;

for trace in traces.into_iter() {
rmp::encode::write_array_len(&mut encoded, trace.len() as u32)?;
rmp::encode::write_array_len(writer, trace.len() as u32)?;

for span in trace {
// Safe until the year 2262 when Datadog will need to change their API
Expand All @@ -42,81 +42,78 @@
for kv in &span.attributes {
if kv.key.as_str() == "span.type" {
span_type_found = true;
rmp::encode::write_map_len(&mut encoded, 12)?;
rmp::encode::write_str(&mut encoded, "type")?;
rmp::encode::write_str(&mut encoded, kv.value.as_str().as_ref())?;
rmp::encode::write_map_len(writer, 12)?;
rmp::encode::write_str(writer, "type")?;
rmp::encode::write_str(writer, kv.value.as_str().as_ref())?;
break;
}
}

if !span_type_found {
rmp::encode::write_map_len(&mut encoded, 11)?;
rmp::encode::write_map_len(writer, 11)?;

Check warning on line 53 in opentelemetry-datadog/src/exporter/model/v03.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-datadog/src/exporter/model/v03.rs#L53

Added line #L53 was not covered by tests
}

// Datadog span name is OpenTelemetry component name - see module docs for more information
rmp::encode::write_str(&mut encoded, "service")?;
rmp::encode::write_str(&mut encoded, get_service_name(span, model_config))?;
rmp::encode::write_str(writer, "service")?;
rmp::encode::write_str(writer, get_service_name(span, model_config))?;

rmp::encode::write_str(&mut encoded, "name")?;
rmp::encode::write_str(&mut encoded, get_name(span, model_config))?;
rmp::encode::write_str(writer, "name")?;
rmp::encode::write_str(writer, get_name(span, model_config))?;

rmp::encode::write_str(&mut encoded, "resource")?;
rmp::encode::write_str(&mut encoded, get_resource(span, model_config))?;
rmp::encode::write_str(writer, "resource")?;
rmp::encode::write_str(writer, get_resource(span, model_config))?;

rmp::encode::write_str(&mut encoded, "trace_id")?;
rmp::encode::write_str(writer, "trace_id")?;
rmp::encode::write_u64(
&mut encoded,
writer,
u128::from_be_bytes(span.span_context.trace_id().to_bytes()) as u64,
)?;

rmp::encode::write_str(&mut encoded, "span_id")?;
rmp::encode::write_str(writer, "span_id")?;
rmp::encode::write_u64(
&mut encoded,
writer,
u64::from_be_bytes(span.span_context.span_id().to_bytes()),
)?;

rmp::encode::write_str(&mut encoded, "parent_id")?;
rmp::encode::write_u64(
&mut encoded,
u64::from_be_bytes(span.parent_span_id.to_bytes()),
)?;
rmp::encode::write_str(writer, "parent_id")?;
rmp::encode::write_u64(writer, u64::from_be_bytes(span.parent_span_id.to_bytes()))?;

rmp::encode::write_str(&mut encoded, "start")?;
rmp::encode::write_i64(&mut encoded, start)?;
rmp::encode::write_str(writer, "start")?;
rmp::encode::write_i64(writer, start)?;

rmp::encode::write_str(&mut encoded, "duration")?;
rmp::encode::write_i64(&mut encoded, duration)?;
rmp::encode::write_str(writer, "duration")?;
rmp::encode::write_i64(writer, duration)?;

rmp::encode::write_str(&mut encoded, "error")?;
rmp::encode::write_str(writer, "error")?;
rmp::encode::write_i32(
&mut encoded,
writer,
match span.status {
Status::Error { .. } => 1,
_ => 0,
},
)?;

rmp::encode::write_str(&mut encoded, "meta")?;
rmp::encode::write_str(writer, "meta")?;
rmp::encode::write_map_len(
&mut encoded,
writer,
(span.attributes.len() + resource.map(|r| r.len()).unwrap_or(0)) as u32,
)?;
if let Some(resource) = resource {
for (key, value) in resource.iter() {
rmp::encode::write_str(&mut encoded, key.as_str())?;
rmp::encode::write_str(&mut encoded, value.as_str().as_ref())?;
rmp::encode::write_str(writer, key.as_str())?;
rmp::encode::write_str(writer, value.as_str().as_ref())?;
}
}
for kv in span.attributes.iter() {
rmp::encode::write_str(&mut encoded, kv.key.as_str())?;
rmp::encode::write_str(&mut encoded, kv.value.as_str().as_ref())?;
rmp::encode::write_str(writer, kv.key.as_str())?;
rmp::encode::write_str(writer, kv.value.as_str().as_ref())?;
}

rmp::encode::write_str(&mut encoded, "metrics")?;
rmp::encode::write_map_len(&mut encoded, 1)?;
rmp::encode::write_str(&mut encoded, SAMPLING_PRIORITY_KEY)?;
rmp::encode::write_str(writer, "metrics")?;
rmp::encode::write_map_len(writer, 1)?;
rmp::encode::write_str(writer, SAMPLING_PRIORITY_KEY)?;
rmp::encode::write_f64(
&mut encoded,
writer,
if span.span_context.is_sampled() {
1.0
} else {
Expand All @@ -126,5 +123,5 @@
}
}

Ok(encoded)
Ok(())
}
Loading
Loading