Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send(
mod tests {
use super::*;
use crate::error::ddog_trace_exporter_error_free;
use datadog_trace_utils::span::SpanSlice;
use datadog_trace_utils::span::v04::SpanSlice;
use httpmock::prelude::*;
use httpmock::MockServer;
use std::{borrow::Borrow, mem::MaybeUninit};
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/benches/span_concentrator_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use criterion::{criterion_group, Criterion};
use data_pipeline::span_concentrator::SpanConcentrator;
use datadog_trace_utils::span::SpanBytes;
use datadog_trace_utils::span::v04::SpanBytes;

fn get_bucket_start(now: SystemTime, n: u64) -> i64 {
let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n);
Expand Down
21 changes: 9 additions & 12 deletions data-pipeline/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
//! This includes the aggregation key to group spans together and the computation of stats from a
//! span.
use datadog_trace_protobuf::pb;
use datadog_trace_utils::span::trace_utils;
use datadog_trace_utils::span::Span;
use datadog_trace_utils::span::SpanText;
use datadog_trace_utils::span::v04::Span;
use datadog_trace_utils::span::{trace_utils, SpanText, TraceData};
use hashbrown::HashMap;
use std::borrow::Borrow;

const TAG_STATUS_CODE: &str = "http.status_code";
const TAG_SYNTHETICS: &str = "synthetics";
Expand Down Expand Up @@ -101,10 +101,7 @@ impl<'a> BorrowedAggregationKey<'a> {
///
/// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the
/// key.
pub(super) fn from_span<T>(span: &'a Span<T>, peer_tag_keys: &'a [String]) -> Self
where
T: SpanText,
{
pub(super) fn from_span<T: TraceData>(span: &'a Span<T>, peer_tag_keys: &'a [String]) -> Self {
let span_kind = span
.meta
.get(TAG_SPANKIND)
Expand Down Expand Up @@ -176,7 +173,7 @@ impl From<pb::ClientGroupedStats> for OwnedAggregationKey {
/// Return the status code of a span based on the metrics and meta tags.
fn get_status_code<T>(span: &Span<T>) -> u32
where
T: SpanText,
T: TraceData,
{
if let Some(status_code) = span.metrics.get(TAG_STATUS_CODE) {
*status_code as u32
Expand Down Expand Up @@ -205,7 +202,7 @@ fn get_peer_tags<'k, 'v, T>(
peer_tag_keys: &'k [String],
) -> Vec<(&'k str, &'v str)>
where
T: SpanText,
T: TraceData,
{
peer_tag_keys
.iter()
Expand All @@ -228,7 +225,7 @@ impl GroupedStats {
/// Update the stats of a GroupedStats by inserting a span.
fn insert<T>(&mut self, value: &Span<T>)
where
T: SpanText,
T: TraceData,
{
self.hits += 1;
self.duration += value.duration as u64;
Expand Down Expand Up @@ -266,7 +263,7 @@ impl StatsBucket {
/// not exist it creates it.
pub(super) fn insert<T>(&mut self, key: BorrowedAggregationKey<'_>, value: &Span<T>)
where
T: SpanText,
T: TraceData,
{
self.data.entry_ref(&key).or_default().insert(value);
}
Expand Down Expand Up @@ -326,7 +323,7 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl

#[cfg(test)]
mod tests {
use datadog_trace_utils::span::{SpanBytes, SpanSlice};
use datadog_trace_utils::span::v04::{SpanBytes, SpanSlice};

use super::*;
use std::{collections::HashMap, hash::Hash};
Expand Down
9 changes: 5 additions & 4 deletions data-pipeline/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
//! This module implements the SpanConcentrator used to aggregate spans into stats
use std::borrow::Borrow;
use std::collections::HashMap;
use std::time::{self, Duration, SystemTime};

use datadog_trace_protobuf::pb;
use datadog_trace_utils::span::{trace_utils, Span, SpanText};
use datadog_trace_utils::span::{trace_utils, v04::Span, TraceData};

use aggregation::{BorrowedAggregationKey, StatsBucket};

Expand All @@ -27,7 +28,7 @@ fn align_timestamp(t: u64, bucket_size: u64) -> u64 {
/// Return true if the span has a span.kind that is eligible for stats computation
fn compute_stats_for_span_kind<T>(span: &Span<T>, span_kinds_stats_computed: &[String]) -> bool
where
T: SpanText,
T: TraceData,
{
!span_kinds_stats_computed.is_empty()
&& span.meta.get("span.kind").is_some_and(|span_kind| {
Expand All @@ -38,7 +39,7 @@ where
/// Return true if the span should be ignored for stats computation
fn should_ignore_span<T>(span: &Span<T>, span_kinds_stats_computed: &[String]) -> bool
where
T: SpanText,
T: TraceData,
{
!(trace_utils::has_top_level(span)
|| trace_utils::is_measured(span)
Expand Down Expand Up @@ -121,7 +122,7 @@ impl SpanConcentrator {
/// computation.
pub fn add_span<T>(&mut self, span: &Span<T>)
where
T: SpanText,
T: TraceData,
{
// If the span is eligible for stats computation
if !should_ignore_span(span, self.span_kinds_stats_computed.as_slice()) {
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/span_concentrator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::span_concentrator::aggregation::OwnedAggregationKey;

use super::*;
use datadog_trace_utils::span::{trace_utils::compute_top_level_span, SpanSlice};
use datadog_trace_utils::span::{trace_utils::compute_top_level_span, v04::SpanSlice};
use rand::{thread_rng, Rng};

const BUCKET_SIZE: u64 = Duration::from_secs(2).as_nanos() as u64;
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<hyper::Uri> {
#[cfg(test)]
mod tests {
use super::*;
use datadog_trace_utils::span::{trace_utils, SpanSlice};
use datadog_trace_utils::span::{trace_utils, v04::SpanSlice};
use datadog_trace_utils::test_utils::poll_for_mock_hit;
use httpmock::prelude::*;
use httpmock::MockServer;
Expand Down
10 changes: 5 additions & 5 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datadog_trace_utils::msgpack_decoder;
use datadog_trace_utils::send_with_retry::{
send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
};
use datadog_trace_utils::span::{Span, SpanText};
use datadog_trace_utils::span::{v04::Span, TraceData};
use datadog_trace_utils::trace_utils::TracerHeaderTags;
use ddcommon::MutexExt;
use ddcommon::{hyper_migration, Endpoint};
Expand Down Expand Up @@ -577,7 +577,7 @@ impl TraceExporter {
/// # Returns
/// * Ok(String): The response from the agent
/// * Err(TraceExporterError): An error detailing what went wrong in the process
pub fn send_trace_chunks<T: SpanText>(
pub fn send_trace_chunks<T: TraceData>(
&self,
trace_chunks: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError> {
Expand Down Expand Up @@ -658,7 +658,7 @@ impl TraceExporter {
self.handle_send_result(result, chunks, payload_len).await
}

fn send_trace_chunks_inner<T: SpanText>(
fn send_trace_chunks_inner<T: TraceData>(
&self,
mut traces: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError> {
Expand Down Expand Up @@ -982,8 +982,8 @@ mod tests {
use self::error::AgentErrorKind;
use super::*;
use datadog_trace_utils::msgpack_encoder;
use datadog_trace_utils::span::v04::SpanBytes;
use datadog_trace_utils::span::v05;
use datadog_trace_utils::span::SpanBytes;
use httpmock::prelude::*;
use httpmock::MockServer;
use std::collections::HashMap;
Expand Down Expand Up @@ -1993,7 +1993,7 @@ mod single_threaded_tests {
use super::*;
use crate::agent_info;
use datadog_trace_utils::msgpack_encoder;
use datadog_trace_utils::span::SpanBytes;
use datadog_trace_utils::span::v04::SpanBytes;
use httpmock::prelude::*;
use std::time::Duration;
use tokio::time::sleep;
Expand Down
8 changes: 4 additions & 4 deletions data-pipeline/src/trace_exporter/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ pub(crate) fn handle_stats_enabled(
/// Add all spans from the given iterator into the stats concentrator
/// # Panic
/// Will panic if another thread panicked will holding the lock on `stats_concentrator`
fn add_spans_to_stats<T: datadog_trace_utils::span::SpanText>(
fn add_spans_to_stats<T: datadog_trace_utils::span::TraceData>(
stats_concentrator: &Mutex<SpanConcentrator>,
traces: &[Vec<datadog_trace_utils::span::Span<T>>],
traces: &[Vec<datadog_trace_utils::span::v04::Span<T>>],
) {
let mut stats_concentrator = stats_concentrator.lock_or_panic();

Expand All @@ -213,8 +213,8 @@ fn add_spans_to_stats<T: datadog_trace_utils::span::SpanText>(
}

/// Process traces for stats computation and update header tags accordingly
pub(crate) fn process_traces_for_stats<T: datadog_trace_utils::span::SpanText>(
traces: &mut Vec<Vec<datadog_trace_utils::span::Span<T>>>,
pub(crate) fn process_traces_for_stats<T: datadog_trace_utils::span::TraceData>(
traces: &mut Vec<Vec<datadog_trace_utils::span::v04::Span<T>>>,
header_tags: &mut datadog_trace_utils::trace_utils::TracerHeaderTags,
client_side_stats: &ArcSwap<StatsComputationStatus>,
client_computed_top_level: bool,
Expand Down
10 changes: 5 additions & 5 deletions data-pipeline/src/trace_exporter/trace_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::trace_exporter::error::TraceExporterError;
use crate::trace_exporter::TraceExporterOutputFormat;
use datadog_trace_utils::msgpack_decoder::decode::error::DecodeError;
use datadog_trace_utils::msgpack_encoder;
use datadog_trace_utils::span::{Span, SpanText};
use datadog_trace_utils::span::{v04::Span, TraceData};
use datadog_trace_utils::trace_utils::{self, TracerHeaderTags};
use datadog_trace_utils::tracer_payload;
use ddcommon::header::{
Expand Down Expand Up @@ -46,7 +46,7 @@ impl<'a> TraceSerializer<'a> {
}

/// Prepare traces payload and HTTP headers for sending to agent
pub(super) fn prepare_traces_payload<T: SpanText>(
pub(super) fn prepare_traces_payload<T: TraceData>(
&self,
traces: Vec<Vec<Span<T>>>,
header_tags: TracerHeaderTags,
Expand All @@ -64,7 +64,7 @@ impl<'a> TraceSerializer<'a> {
}

/// Collect trace chunks based on output format
fn collect_and_process_traces<T: SpanText>(
fn collect_and_process_traces<T: TraceData>(
&self,
traces: Vec<Vec<Span<T>>>,
) -> Result<tracer_payload::TraceChunks<T>, TraceExporterError> {
Expand Down Expand Up @@ -97,7 +97,7 @@ impl<'a> TraceSerializer<'a> {
}

/// Serialize payload to msgpack format
fn serialize_payload<T: SpanText>(
fn serialize_payload<T: TraceData>(
&self,
payload: &tracer_payload::TraceChunks<T>,
) -> Result<Vec<u8>, TraceExporterError> {
Expand All @@ -114,7 +114,7 @@ impl<'a> TraceSerializer<'a> {
mod tests {
use super::*;
use crate::trace_exporter::agent_response::AgentResponsePayloadVersion;
use datadog_trace_utils::span::SpanBytes;
use datadog_trace_utils::span::v04::SpanBytes;
use datadog_trace_utils::trace_utils::TracerHeaderTags;
use ddcommon::header::{
APPLICATION_MSGPACK_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, DATADOG_TRACE_COUNT_STR,
Expand Down
2 changes: 1 addition & 1 deletion datadog-sidecar-ffi/src/span.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use datadog_trace_utils::span::{
use datadog_trace_utils::span::v04::{
AttributeAnyValueBytes, AttributeArrayValueBytes, SpanBytes, SpanEventBytes, SpanLinkBytes,
};
use ddcommon_ffi::slice::{AsBytes, CharSlice};
Expand Down
2 changes: 1 addition & 1 deletion datadog-sidecar-ffi/tests/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use datadog_sidecar_ffi::span::*;
use datadog_trace_utils::span::*;
use datadog_trace_utils::span::v04::*;
use ddcommon_ffi::slice::*;
use std::collections::HashMap;
use tinybytes::*;
Expand Down
5 changes: 2 additions & 3 deletions datadog-sidecar/src/service/tracing/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,8 @@ mod tests {
};

let send_data_1 = create_send_data(size, &target_endpoint);

let send_data_2 = send_data_1.clone();
let send_data_3 = send_data_1.clone();
let send_data_2 = create_send_data(size, &target_endpoint);
let send_data_3 = create_send_data(size, &target_endpoint);

trace_flusher.enqueue(send_data_1);
trace_flusher.enqueue(send_data_2);
Expand Down
63 changes: 63 additions & 0 deletions datadog-trace-utils/src/msgpack_decoder/decode/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use crate::span::TraceData;
use rmp::decode;
use rmp::decode::DecodeStringError;

use std::borrow::Borrow;
use std::ops::Deref;

/// Read a string from `buf`.
///
/// # Errors
/// Fails if the buffer doesn't contain a valid utf8 msgpack string.
#[inline]
pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> {
decode::read_str_from_slice(buf).map_err(|e| match e {
DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()),
DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()),
DecodeStringError::TypeMismatch(marker) => {
DecodeError::InvalidType(format!("Type mismatch at marker {marker:?}"))
}
DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()),
_ => DecodeError::IOError,
})
}

/// Internal Buffer used to wrap msgpack data for decoding.
/// Provides a couple accessors to extract data from the buffer.
pub struct Buffer<T: TraceData>(T::Bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we use &[u8] for the buffer ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to at least store the Bytes reference to construct the Bytes/BytesString object.


impl<T: TraceData> Buffer<T> {
pub fn new(data: T::Bytes) -> Self {
Buffer(data)
}

/// Returns a mutable reference to the underlying slice.
pub fn as_mut_slice(&mut self) -> &mut &'static [u8] {
T::get_mut_slice(&mut self.0)
}

/// Tries to extract a slice of `bytes` from the buffer and advances the buffer.
pub fn try_slice_and_advance(&mut self, bytes: usize) -> Option<T::Bytes> {
T::try_slice_and_advance(&mut self.0, bytes)
}

/// Read a string from the slices `buf`.
///
/// # Errors
/// Fails if the buffer doesn't contain a valid utf8 msgpack string.
pub fn read_string(&mut self) -> Result<T::Text, DecodeError> {
T::read_string(&mut self.0)
}
}

impl<T: TraceData> Deref for Buffer<T> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
self.0.borrow()
}
}
12 changes: 7 additions & 5 deletions datadog-trace-utils/src/msgpack_decoder/decode/map.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use crate::msgpack_decoder::decode::{buffer::Buffer, error::DecodeError};
use crate::span::TraceData;
use rmp::{decode, decode::RmpRead, Marker};
use std::collections::HashMap;

Expand Down Expand Up @@ -33,14 +34,14 @@ use std::collections::HashMap;
/// * `V` - The type of the values in the map.
/// * `F` - The type of the function used to read key-value pairs from the buffer.
#[inline]
pub fn read_map<'a, K, V, F>(
pub fn read_map<K, V, F, B>(
len: usize,
buf: &mut &'a [u8],
buf: &mut B,
read_pair: F,
) -> Result<HashMap<K, V>, DecodeError>
where
K: std::hash::Hash + Eq,
F: Fn(&mut &'a [u8]) -> Result<(K, V), DecodeError>,
F: Fn(&mut B) -> Result<(K, V), DecodeError>,
{
let mut map = HashMap::with_capacity(len);
for _ in 0..len {
Expand All @@ -67,7 +68,8 @@ where
/// - The buffer does not contain a map.
/// - There is an error reading from the buffer.
#[inline]
pub fn read_map_len(buf: &mut &[u8]) -> Result<usize, DecodeError> {
pub fn read_map_len<T: TraceData>(buf: &mut Buffer<T>) -> Result<usize, DecodeError> {
let buf = buf.as_mut_slice();
match decode::read_marker(buf)
.map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))?
{
Expand Down
Loading
Loading