diff --git a/opentelemetry-datadog/benches/datadog_exporter.rs b/opentelemetry-datadog/benches/datadog_exporter.rs index 13d16f178..9115c0f57 100644 --- a/opentelemetry-datadog/benches/datadog_exporter.rs +++ b/opentelemetry-datadog/benches/datadog_exporter.rs @@ -195,10 +195,10 @@ fn generate_traces(number_of_traces: usize, spans_per_trace: usize) -> Vec f64 { + 1.0 +} + +#[cfg(feature = "agent-sampling")] +fn get_sampling_priority(span: &SpanData) -> f64 { + if span.span_context.trace_state().priority_sampling_enabled() { + 1.0 + } else { + 0.0 + } +} + // https://github.com/DataDog/datadog-agent/blob/ec96f3c24173ec66ba235bda7710504400d9a000/pkg/trace/traceutil/span.go#L20 static DD_MEASURED_KEY: &str = "_dd.measured"; +fn get_measuring(span: &SpanData) -> f64 { + if span.span_context.trace_state().measuring_enabled() { + 1.0 + } else { + 0.0 + } +} + /// Custom mapping between opentelemetry spans and datadog spans. /// /// User can provide custom function to change the mapping. It currently supports customizing the following @@ -77,6 +101,16 @@ fn default_resource_mapping<'a>(span: &'a SpanData, _config: &'a ModelConfig) -> span.name.as_ref() } +fn get_span_type(span: &SpanData) -> Option<&Value> { + for kv in &span.attributes { + if kv.key.as_str() == "span.type" { + return Some(&kv.value); + } + } + + None +} + /// Wrap type for errors from opentelemetry datadog exporter #[derive(Debug, thiserror::Error)] pub enum Error { @@ -129,6 +163,8 @@ pub enum ApiVersion { Version03, /// Version 0.5 - requires datadog-agent v7.22.0 or above Version05, + /// Version 0.7 + Version07, } impl ApiVersion { @@ -136,6 +172,7 @@ impl ApiVersion { match self { ApiVersion::Version03 => "/v0.3/traces", ApiVersion::Version05 => "/v0.5/traces", + ApiVersion::Version07 => "/v0.7/traces", } } @@ -143,6 +180,7 @@ impl ApiVersion { match self { ApiVersion::Version03 => "application/msgpack", ApiVersion::Version05 => "application/msgpack", + ApiVersion::Version07 => "application/msgpack", } } @@ -190,6 +228,24 @@ impl ApiVersion { unified_tags, resource, ), + Self::Version07 => v07::encode( + model_config, + traces, + |span, config| match &mapping.service_name { + Some(f) => f(span, config), + None => default_service_name_mapping(span, config), + }, + |span, config| match &mapping.name { + Some(f) => f(span, config), + None => default_name_mapping(span, config), + }, + |span, config| match &mapping.resource { + Some(f) => f(span, config), + None => default_resource_mapping(span, config), + }, + unified_tags, + resource, + ), } } } @@ -198,6 +254,7 @@ impl ApiVersion { pub(crate) mod tests { use super::*; use base64::{engine::general_purpose::STANDARD, Engine}; + use opentelemetry::trace::Event; use opentelemetry::InstrumentationScope; use opentelemetry::{ trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState}, @@ -213,7 +270,43 @@ pub(crate) mod tests { vec![vec![get_span(7, 1, 99)]] } + fn get_traces_with_events() -> Vec> { + let event = Event::new( + "myevent", + SystemTime::UNIX_EPOCH + .checked_add(Duration::from_secs(5)) + .unwrap(), + vec![ + KeyValue::new("mykey", 1), + KeyValue::new( + "myarray", + Value::Array(opentelemetry::Array::String(vec![ + "myvalue1".into(), + "myvalue2".into(), + ])), + ), + KeyValue::new("mybool", true), + KeyValue::new("myint", 2.5), + KeyValue::new("myboolfalse", false), + ], + 0, + ); + let mut events = SpanEvents::default(); + events.events.push(event); + + vec![vec![get_span_with_events(7, 1, 99, events)]] + } + pub(crate) fn get_span(trace_id: u128, parent_span_id: u64, span_id: u64) -> trace::SpanData { + get_span_with_events(trace_id, parent_span_id, span_id, SpanEvents::default()) + } + + pub(crate) fn get_span_with_events( + trace_id: u128, + parent_span_id: u64, + span_id: u64, + events: SpanEvents, + ) -> trace::SpanData { let span_context = SpanContext::new( TraceId::from_u128(trace_id), SpanId::from_u64(span_id), @@ -226,7 +319,6 @@ pub(crate) mod tests { let end_time = start_time.checked_add(Duration::from_secs(1)).unwrap(); let attributes = vec![KeyValue::new("span.type", "web")]; - let events = SpanEvents::default(); let links = SpanLinks::default(); let instrumentation_scope = InstrumentationScope::builder("component").build(); @@ -305,4 +397,71 @@ pub(crate) mod tests { Ok(()) } + + #[test] + fn test_encode_v07() { + let traces = get_traces_with_events(); + let model_config = ModelConfig { + service_name: "service_name".to_string(), + ..Default::default() + }; + + // we use an empty builder with a single attribute because the attributes are in a hashmap + // which causes the order to change every test + let resource = Resource::builder_empty() + .with_attribute(KeyValue::new("host.name", "test")) + .build(); + + let mut unified_tags = UnifiedTags::new(); + unified_tags.set_env(Some(String::from("test-env"))); + unified_tags.set_version(Some(String::from("test-version"))); + unified_tags.set_service(Some(String::from("test-service"))); + + let encoded = STANDARD.encode( + ApiVersion::Version07 + .encode( + &model_config, + traces.iter().map(|x| &x[..]).collect(), + &Mapping::empty(), + &unified_tags, + Some(&resource), + ) + .unwrap(), + ); + + // A very nice way to check the encoded values is to use + // https://github.com/DataDog/dd-apm-test-agent + // Which is a test http server that receives and validates sent traces + let expected = "ha1sYW5ndWFnZV9uYW1lpHJ1c3SmY2h1bmtzkYOocHJpb3JpdHnSAAAAAaZvcmlnaW6gpXNwY\ + W5zkY6kbmFtZaljb21wb25lbnSnc3Bhbl9pZM8AAAAAAAAAY6h0cmFjZV9pZM8AAAAAAAAAB6VzdGFydNMAAAAAAAAAAKhk\ + dXJhdGlvbtMAAAAAO5rKAKlwYXJlbnRfaWTPAAAAAAAAAAGnc2VydmljZaxzZXJ2aWNlX25hbWWocmVzb3VyY2WocmVzb3V\ + yY2WkdHlwZaN3ZWKlZXJyb3LSAAAAAKRtZXRhgqlob3N0Lm5hbWWkdGVzdKlzcGFuLnR5cGWjd2Vip21ldHJpY3OCtV9zYW\ + 1wbGluZ19wcmlvcml0eV92Mcs/8AAAAAAAAKxfZGQubWVhc3VyZWTLAAAAAAAAAACqc3Bhbl9saW5rc5Crc3Bhbl9ldmVud\ + HORg6RuYW1lp215ZXZlbnSudGltZV91bml4X25hbm/TAAAAASoF8gCqYXR0cmlidXRlc4WlbXlrZXmCpHR5cGXSAAAAAqlp\ + bnRfdmFsdWXTAAAAAAAAAAGnbXlhcnJheYKkdHlwZdIAAAAEq2FycmF5X3ZhbHVlkoKkdHlwZQCsc3RyaW5nX3ZhbHVlqG1\ + 5dmFsdWUxgqR0eXBlAKxzdHJpbmdfdmFsdWWobXl2YWx1ZTKmbXlib29sgqR0eXBl0gAAAAGqYm9vbF92YWx1ZcOlbXlpbn\ + SCpHR5cGXSAAAAA6xkb3VibGVfdmFsdWXLQAQAAAAAAACrbXlib29sZmFsc2WCpHR5cGXSAAAAAapib29sX3ZhbHVlwqR0Y\ + Wdzg6dzZXJ2aWNlrHRlc3Qtc2VydmljZad2ZXJzaW9urHRlc3QtdmVyc2lvbqNlbnaodGVzdC1lbnajZW52qHRlc3QtZW52\ + q2FwcF92ZXJzaW9urHRlc3QtdmVyc2lvbg=="; + assert_eq!(encoded.as_str(), expected); + + // change to a different resource and make sure the encoded value changes and that we actually encode stuff + let other_resource = Resource::builder_empty() + .with_attribute(KeyValue::new("host.name", "thisissometingelse")) + .build(); + + let encoded = STANDARD.encode( + ApiVersion::Version07 + .encode( + &model_config, + traces.iter().map(|x| &x[..]).collect(), + &Mapping::empty(), + &unified_tags, + Some(&other_resource), + ) + .unwrap(), + ); + + assert_ne!(encoded.as_str(), expected); + } } diff --git a/opentelemetry-datadog/src/exporter/model/v05.rs b/opentelemetry-datadog/src/exporter/model/v05.rs index cef89a8d7..f928088b4 100644 --- a/opentelemetry-datadog/src/exporter/model/v05.rs +++ b/opentelemetry-datadog/src/exporter/model/v05.rs @@ -1,13 +1,13 @@ use crate::exporter::intern::StringInterner; use crate::exporter::model::{DD_MEASURED_KEY, SAMPLING_PRIORITY_KEY}; use crate::exporter::{Error, ModelConfig}; -use crate::propagator::DatadogTraceState; use opentelemetry::trace::Status; use opentelemetry_sdk::trace::SpanData; use opentelemetry_sdk::Resource; use std::time::SystemTime; use super::unified_tags::{UnifiedTagField, UnifiedTags}; +use super::{get_measuring, get_sampling_priority, get_span_type}; const SPAN_NUM_ELEMENTS: u32 = 12; const METRICS_LEN: u32 = 2; @@ -127,28 +127,6 @@ fn write_unified_tag<'a>( Ok(()) } -#[cfg(not(feature = "agent-sampling"))] -fn get_sampling_priority(_span: &SpanData) -> f64 { - 1.0 -} - -#[cfg(feature = "agent-sampling")] -fn get_sampling_priority(span: &SpanData) -> f64 { - if span.span_context.trace_state().priority_sampling_enabled() { - 1.0 - } else { - 0.0 - } -} - -fn get_measuring(span: &SpanData) -> f64 { - if span.span_context.trace_state().measuring_enabled() { - 1.0 - } else { - 0.0 - } -} - #[allow(clippy::too_many_arguments)] fn encode_traces<'interner, S, N, R>( interner: &mut StringInterner<'interner>, @@ -186,11 +164,8 @@ where .unwrap_or(0); let mut span_type = interner.intern(""); - for kv in &span.attributes { - if kv.key.as_str() == "span.type" { - span_type = interner.intern_value(&kv.value); - break; - } + if let Some(value) = get_span_type(span) { + span_type = interner.intern_value(value); } // Datadog span name is OpenTelemetry component name - see module docs for more information diff --git a/opentelemetry-datadog/src/exporter/model/v07.rs b/opentelemetry-datadog/src/exporter/model/v07.rs new file mode 100644 index 000000000..32cb6cdd3 --- /dev/null +++ b/opentelemetry-datadog/src/exporter/model/v07.rs @@ -0,0 +1,541 @@ +use crate::exporter::model::{DD_MEASURED_KEY, SAMPLING_PRIORITY_KEY}; +use crate::exporter::{Error, ModelConfig}; +use opentelemetry::trace::{Event, Status}; +use opentelemetry::KeyValue; +use opentelemetry_sdk::trace::SpanData; +use opentelemetry_sdk::Resource; +use rmp::encode::ValueWriteError; +use std::time::SystemTime; + +use super::unified_tags::UnifiedTags; +use super::{get_measuring, get_sampling_priority, get_span_type}; + +// Documentation for all versions sourced from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/version.go +// Specifically, the v0.7 versions is described with a protobuf definition but is still encoded using message pack. +// +// message TraceChunk { +// // priority specifies sampling priority of the trace. +// // @gotags: json:"priority" msg:"priority" +// int32 priority = 1; +// // origin specifies origin product ("lambda", "rum", etc.) of the trace. +// // @gotags: json:"origin" msg:"origin" +// string origin = 2; +// // spans specifies list of containing spans. +// // @gotags: json:"spans" msg:"spans" +// repeated Span spans = 3; +// // tags specifies tags common in all `spans`. +// // @gotags: json:"tags" msg:"tags" +// map tags = 4; +// // droppedTrace specifies whether the trace was dropped by samplers or not. +// // @gotags: json:"dropped_trace" msg:"dropped_trace" +// bool droppedTrace = 5; +// } +// +// // TracerPayload represents a payload the trace agent receives from tracers. +// message TracerPayload { +// // containerID specifies the ID of the container where the tracer is running on. +// // @gotags: json:"container_id" msg:"container_id" +// string containerID = 1; +// // languageName specifies language of the tracer. +// // @gotags: json:"language_name" msg:"language_name" +// string languageName = 2; +// // languageVersion specifies language version of the tracer. +// // @gotags: json:"language_version" msg:"language_version" +// string languageVersion = 3; +// // tracerVersion specifies version of the tracer. +// // @gotags: json:"tracer_version" msg:"tracer_version" +// string tracerVersion = 4; +// // runtimeID specifies V4 UUID representation of a tracer session. +// // @gotags: json:"runtime_id" msg:"runtime_id" +// string runtimeID = 5; +// // chunks specifies list of containing trace chunks. +// // @gotags: json:"chunks" msg:"chunks" +// repeated TraceChunk chunks = 6; +// // tags specifies tags common in all `chunks`. +// // @gotags: json:"tags" msg:"tags" +// map tags = 7; +// // env specifies `env` tag that set with the tracer. +// // @gotags: json:"env" msg:"env" +// string env = 8; +// // hostname specifies hostname of where the tracer is running. +// // @gotags: json:"hostname" msg:"hostname" +// string hostname = 9; +// // version specifies `version` tag that set with the tracer. +// // @gotags: json:"app_version" msg:"app_version" +// string appVersion = 10; +// } + +pub(crate) fn encode( + model_config: &ModelConfig, + traces: Vec<&[SpanData]>, + get_service_name: S, + get_name: N, + get_resource: R, + unified_tags: &UnifiedTags, + resource: Option<&Resource>, +) -> Result, Error> +where + for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, +{ + let mut encoded = Vec::with_capacity(traces.len() * 512); + + rmp::encode::write_map_len( + &mut encoded, + 3 + unified_tags.env.len() + unified_tags.version.len(), + )?; + + // note we still don't support sending the container_id, language_version, runtime_id, tracer_version, hostname + rmp::encode::write_str(&mut encoded, "language_name")?; + rmp::encode::write_str(&mut encoded, "rust")?; + + encode_chunks( + &mut encoded, + traces, + get_name, + get_resource, + get_service_name, + model_config, + resource, + )?; + + encode_tags(&mut encoded, unified_tags)?; + + if let Some(env) = &unified_tags.env.value { + rmp::encode::write_str(&mut encoded, "env")?; + rmp::encode::write_str(&mut encoded, env)?; + } + + if let Some(version) = &unified_tags.version.value { + rmp::encode::write_str(&mut encoded, "app_version")?; + rmp::encode::write_str(&mut encoded, version)?; + } + + Ok(encoded) +} + +fn encode_chunks( + encoded: &mut Vec, + traces: Vec<&[SpanData]>, + get_name: N, + get_resource: R, + get_service_name: S, + model_config: &ModelConfig, + resource: Option<&Resource>, +) -> Result<(), Error> +where + for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, +{ + rmp::encode::write_str(encoded, "chunks")?; + rmp::encode::write_array_len(encoded, traces.len() as u32)?; + for trace in traces.into_iter() { + rmp::encode::write_map_len(encoded, 3)?; + // This field isn't set on spans that didn't originate from a datadog agent so we default to 1. + // https://github.com/vectordotdev/vector/blob/3ea8c86f9461f1e3d403c3c6820fdf19b280fe75/src/sinks/datadog/traces/request_builder.rs#L289 + rmp::encode::write_str(encoded, "priority")?; + rmp::encode::write_i32(encoded, 1)?; + rmp::encode::write_str(encoded, "origin")?; + rmp::encode::write_str(encoded, "")?; + + encode_spans( + encoded, + trace, + &get_name, + &get_resource, + &get_service_name, + model_config, + resource, + )?; + + // I assume the tags here are some common values that can be extracted and deduplicated. + // maybe support this in the future? + // rmp::encode::write_str(payload, "tags")?; + + // todo: how to find it the trace was dropped? + // for now assume it wasn't + // rmp::encode::write_str(payload, "dropped_trace")?; + } + + Ok(()) +} + +fn encode_spans( + encoded: &mut Vec, + trace: &[SpanData], + get_name: N, + get_resource: R, + get_service_name: S, + model_config: &ModelConfig, + resource: Option<&Resource>, +) -> Result<(), Error> +where + for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, + for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str, +{ + rmp::encode::write_str(encoded, "spans")?; + rmp::encode::write_array_len(encoded, trace.len() as u32)?; + for span in trace { + rmp::encode::write_map_len(encoded, 14)?; + + rmp::encode::write_str(encoded, "name")?; + rmp::encode::write_str(encoded, get_name(span, model_config))?; + + rmp::encode::write_str(encoded, "span_id")?; + rmp::encode::write_u64( + encoded, + u64::from_be_bytes(span.span_context.span_id().to_bytes()), + )?; + + rmp::encode::write_str(encoded, "trace_id")?; + rmp::encode::write_u64( + encoded, + u128::from_be_bytes(span.span_context.trace_id().to_bytes()) as u64, + )?; + + rmp::encode::write_str(encoded, "start")?; + let start = span + .start_time + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + rmp::encode::write_i64(encoded, start)?; + + rmp::encode::write_str(encoded, "duration")?; + let duration = span + .end_time + .duration_since(span.start_time) + .map(|x| x.as_nanos() as i64) + .unwrap_or(0); + rmp::encode::write_i64(encoded, duration)?; + + rmp::encode::write_str(encoded, "parent_id")?; + rmp::encode::write_u64(encoded, u64::from_be_bytes(span.parent_span_id.to_bytes()))?; + + rmp::encode::write_str(encoded, "service")?; + rmp::encode::write_str(encoded, get_service_name(span, model_config))?; + + rmp::encode::write_str(encoded, "resource")?; + rmp::encode::write_str(encoded, get_resource(span, model_config))?; + + rmp::encode::write_str(encoded, "type")?; + let span_type = match get_span_type(span) { + Some(value) => value.as_str(), + None => "".into(), + }; + rmp::encode::write_str(encoded, &span_type)?; + + rmp::encode::write_str(encoded, "error")?; + rmp::encode::write_i32( + encoded, + match span.status { + Status::Error { .. } => 1, + _ => 0, + }, + )?; + + rmp::encode::write_str(encoded, "meta")?; + rmp::encode::write_map_len( + encoded, + (span.attributes.len() + resource.map(|r| r.len()).unwrap_or(0)) as u32, + )?; + if let Some(resource) = resource { + for (key, value) in resource.iter() { + rmp::encode::write_str(encoded, key.as_str())?; + rmp::encode::write_str(encoded, value.as_str().as_ref())?; + } + } + for kv in span.attributes.iter() { + rmp::encode::write_str(encoded, kv.key.as_str())?; + rmp::encode::write_str(encoded, kv.value.as_str().as_ref())?; + } + + encode_metrics(encoded, span)?; + + // the meta struct is usually set by datadog trace libraries and not otel so we ignore and don't serialize it + // rmp::encode::write_str(payload, "meta_struct")?; + + encode_span_links(encoded, span)?; + encode_span_events(encoded, span)?; + } + + Ok(()) +} + +fn encode_metrics(encoded: &mut Vec, span: &SpanData) -> Result<(), Error> { + rmp::encode::write_str(encoded, "metrics")?; + rmp::encode::write_map_len(encoded, 2)?; + + rmp::encode::write_str(encoded, SAMPLING_PRIORITY_KEY)?; + let sampling_priority = get_sampling_priority(span); + rmp::encode::write_f64(encoded, sampling_priority)?; + + rmp::encode::write_str(encoded, DD_MEASURED_KEY)?; + let measuring = get_measuring(span); + rmp::encode::write_f64(encoded, measuring)?; + + Ok(()) +} + +fn encode_span_events(encoded: &mut Vec, span: &SpanData) -> Result<(), Error> { + rmp::encode::write_str(encoded, "span_events")?; + rmp::encode::write_array_len(encoded, span.events.len() as u32)?; + for event in span.events.iter() { + rmp::encode::write_map_len(encoded, 3)?; + rmp::encode::write_str(encoded, "name")?; + rmp::encode::write_str(encoded, event.name.to_string().as_str())?; + + rmp::encode::write_str(encoded, "time_unix_nano")?; + let timestamp = event + .timestamp + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64; + rmp::encode::write_i64(encoded, timestamp)?; + + rmp::encode::write_str(encoded, "attributes")?; + rmp::encode::write_map_len(encoded, event.attributes.len() as u32)?; + for kv in event.attributes.iter() { + encode_attribute_any_value(encoded, kv)?; + } + } + + Ok(()) +} + +// https://github.com/DataDog/datadog-agent/blob/main/pkg/proto/datadog/trace/span.proto#L38 +// AttributeAnyValue is used to represent any type of attribute value. AttributeAnyValue may contain a +// primitive value such as a string or integer or it may contain an arbitrary nested +// object containing arrays, key-value lists and primitives. +// message AttributeAnyValue { +// // We implement a union manually here because Go's MessagePack generator does not support +// // Protobuf `oneof` unions: https://github.com/tinylib/msgp/issues/184 +// // Despite this, the format represented here is binary compatible with `oneof`, if we choose +// // to migrate to that in the future. +// // @gotags: json:"type" msg:"type" +// AttributeAnyValueType type = 1; + +// enum AttributeAnyValueType { +// STRING_VALUE = 0; +// BOOL_VALUE = 1; +// INT_VALUE = 2; +// DOUBLE_VALUE = 3; +// ARRAY_VALUE = 4; +// } +// // @gotags: json:"string_value" msg:"string_value" +// string string_value = 2; +// // @gotags: json:"bool_value" msg:"bool_value" +// bool bool_value = 3; +// // @gotags: json:"int_value" msg:"int_value" +// int64 int_value = 4; +// // @gotags: json:"double_value" msg:"double_value" +// double double_value = 5; +// // @gotags: json:"array_value" msg:"array_value" +// AttributeArray array_value = 6; +// } + +// // AttributeArray is a list of AttributeArrayValue messages. We need this as a message since `oneof` in AttributeAnyValue does not allow repeated fields. +// message AttributeArray { +// // Array of values. The array may be empty (contain 0 elements). +// // @gotags: json:"values" msg:"values" +// repeated AttributeArrayValue values = 1; +// } + +// // An element in the homogeneous AttributeArray. +// // Compared to AttributeAnyValue, it only supports scalar values. +// message AttributeArrayValue { +// // We implement a union manually here because Go's MessagePack generator does not support +// // Protobuf `oneof` unions: https://github.com/tinylib/msgp/issues/184 +// // Despite this, the format represented here is binary compatible with `oneof`, if we choose +// // to migrate to that in the future. +// // @gotags: json:"type" msg:"type" +// AttributeArrayValueType type = 1; + +// enum AttributeArrayValueType { +// STRING_VALUE = 0; +// BOOL_VALUE = 1; +// INT_VALUE = 2; +// DOUBLE_VALUE = 3; +// } + +// // @gotags: json:"string_value" msg:"string_value" +// string string_value = 2; +// // @gotags: json:"bool_value" msg:"bool_value" +// bool bool_value = 3; +// // @gotags: json:"int_value" msg:"int_value" +// int64 int_value = 4; +// // @gotags: json:"double_value" msg:"double_value" +// double double_value = 5; +// } +fn encode_attribute_any_value(encoded: &mut Vec, kv: &KeyValue) -> Result<(), Error> { + rmp::encode::write_str(encoded, kv.key.as_str())?; + + rmp::encode::write_map_len(encoded, 2)?; + + let (enum_type, value_str) = match &kv.value { + opentelemetry::Value::String(_) => (0, "string_value"), + opentelemetry::Value::Bool(_) => (1, "bool_value"), + opentelemetry::Value::I64(_) => (2, "int_value"), + opentelemetry::Value::F64(_) => (3, "double_value"), + opentelemetry::Value::Array(_) => (4, "array_value"), + unknown_value => { + return Err(Error::Other(format!( + "Unsupported value type: {:?}", + unknown_value + ))); + } + }; + rmp::encode::write_str(encoded, "type")?; + rmp::encode::write_i32(encoded, enum_type)?; + + rmp::encode::write_str(encoded, value_str)?; + match &kv.value { + opentelemetry::Value::String(value) => rmp::encode::write_str(encoded, value.as_str())?, + // I think writing bool can't fail with writing data so we convert it to the invalid marker write to match the other errors + opentelemetry::Value::Bool(value) => rmp::encode::write_bool(encoded, *value) + .map_err(|e| ValueWriteError::InvalidMarkerWrite(e))?, + opentelemetry::Value::I64(value) => rmp::encode::write_i64(encoded, *value)?, + opentelemetry::Value::F64(value) => rmp::encode::write_f64(encoded, *value)?, + opentelemetry::Value::Array(array_value) => { + encode_attribute_array(encoded, array_value)?; + } + _ => { + return Err(Error::Other(format!( + "Unsupported value type: {:?}", + kv.value + ))); + } + } + + Ok(()) +} + +fn encode_attribute_array( + encoded: &mut Vec, + array_value: &opentelemetry::Array, +) -> Result<(), Error> { + match array_value { + opentelemetry::Array::String(string_values) => { + rmp::encode::write_array_len(encoded, string_values.len() as u32)?; + for value in string_values.iter() { + rmp::encode::write_map_len(encoded, 2)?; + + rmp::encode::write_str(encoded, "type")?; + rmp::encode::write_uint8(encoded, 0)?; + + rmp::encode::write_str(encoded, "string_value")?; + rmp::encode::write_str(encoded, value.as_str())?; + } + } + opentelemetry::Array::Bool(items) => { + rmp::encode::write_array_len(encoded, items.len() as u32)?; + for item in items.iter() { + rmp::encode::write_map_len(encoded, 2)?; + + rmp::encode::write_str(encoded, "type")?; + rmp::encode::write_uint8(encoded, 1)?; + + rmp::encode::write_str(encoded, "bool_value")?; + rmp::encode::write_bool(encoded, *item) + .map_err(|e| ValueWriteError::InvalidMarkerWrite(e))?; + } + } + opentelemetry::Array::I64(items) => { + rmp::encode::write_array_len(encoded, items.len() as u32)?; + for item in items.iter() { + rmp::encode::write_map_len(encoded, 2)?; + + rmp::encode::write_str(encoded, "type")?; + rmp::encode::write_uint8(encoded, 2)?; + + rmp::encode::write_str(encoded, "int_value")?; + rmp::encode::write_i64(encoded, *item)?; + } + } + opentelemetry::Array::F64(items) => { + rmp::encode::write_array_len(encoded, items.len() as u32)?; + for item in items.iter() { + rmp::encode::write_map_len(encoded, 2)?; + + rmp::encode::write_str(encoded, "type")?; + rmp::encode::write_uint8(encoded, 3)?; + + rmp::encode::write_str(encoded, "double_value")?; + rmp::encode::write_f64(encoded, *item)?; + } + } + unknown => { + return Err(Error::Other(format!( + "Unsupported array type: {:?}", + unknown + ))) + } + } + Ok(()) +} + +fn encode_span_links(encoded: &mut Vec, span: &SpanData) -> Result<(), Error> { + rmp::encode::write_str(encoded, "span_links")?; + rmp::encode::write_array_len(encoded, span.links.len() as u32)?; + for link in span.links.as_ref() { + rmp::encode::write_map_len(encoded, 6)?; + rmp::encode::write_str(encoded, "trace_id")?; + rmp::encode::write_u64( + encoded, + u128::from_be_bytes(link.span_context.trace_id().to_bytes()) as u64, + )?; + + rmp::encode::write_str(encoded, "trace_id_high")?; + rmp::encode::write_u64( + encoded, + (u128::from_be_bytes(link.span_context.trace_id().to_bytes()) >> 64) as u64, + )?; + rmp::encode::write_str(encoded, "span_id")?; + rmp::encode::write_u64( + encoded, + u64::from_be_bytes(link.span_context.span_id().to_bytes()), + )?; + + rmp::encode::write_str(encoded, "attributes")?; + rmp::encode::write_map_len(encoded, link.attributes.len() as u32)?; + for kv in link.attributes.iter() { + rmp::encode::write_str(encoded, kv.key.as_str())?; + rmp::encode::write_str(encoded, kv.value.as_str().as_ref())?; + } + rmp::encode::write_str(encoded, "tracestate")?; + rmp::encode::write_str(encoded, link.span_context.trace_state().header().as_str())?; + + rmp::encode::write_str(encoded, "flags")?; + rmp::encode::write_u8(encoded, link.span_context.trace_flags().to_u8())?; + } + + Ok(()) +} + +fn encode_tags(encoded: &mut Vec, unified_tags: &UnifiedTags) -> Result<(), Error> { + // Not too sure about this, but to support unified tagging we encode the service, version and env in the tags. + // Some of them like service and version are also encoded explicitly in the payload in their own fields + let length = unified_tags.service.len() + unified_tags.version.len() + unified_tags.env.len(); + + rmp::encode::write_str(encoded, "tags")?; + rmp::encode::write_map_len(encoded, length)?; + if let Some(value) = &unified_tags.service.value { + rmp::encode::write_str(encoded, unified_tags.service.get_tag_name())?; + rmp::encode::write_str(encoded, value)?; + } + + if let Some(value) = &unified_tags.version.value { + rmp::encode::write_str(encoded, unified_tags.version.get_tag_name())?; + rmp::encode::write_str(encoded, value)?; + } + + if let Some(value) = &unified_tags.env.value { + rmp::encode::write_str(encoded, unified_tags.env.get_tag_name())?; + rmp::encode::write_str(encoded, value)?; + } + + Ok(()) +}