Skip to content

Commit 9b3e236

Browse files
committed
Introduce TraceData to unify text and binary data
Also move Span structures to v04, to make space for v1 spans (which will eventually become the new default). TraceData is also going to be used in the V1 implementation, to carry around byte arrays and strings alike, separate from the indexed offsets into the big vector. Signed-off-by: Bob Weinand <[email protected]>
1 parent 0b59f64 commit 9b3e236

File tree

34 files changed

+919
-895
lines changed

34 files changed

+919
-895
lines changed

data-pipeline-ffi/src/trace_exporter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send(
544544
mod tests {
545545
use super::*;
546546
use crate::error::ddog_trace_exporter_error_free;
547-
use datadog_trace_utils::span::SpanSlice;
547+
use datadog_trace_utils::span::v04::SpanSlice;
548548
use httpmock::prelude::*;
549549
use httpmock::MockServer;
550550
use std::{borrow::Borrow, mem::MaybeUninit};

data-pipeline/benches/span_concentrator_bench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77

88
use criterion::{criterion_group, Criterion};
99
use data_pipeline::span_concentrator::SpanConcentrator;
10-
use datadog_trace_utils::span::SpanBytes;
10+
use datadog_trace_utils::span::v04::SpanBytes;
1111

1212
fn get_bucket_start(now: SystemTime, n: u64) -> i64 {
1313
let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n);

data-pipeline/src/span_concentrator/aggregation.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
//! This includes the aggregation key to group spans together and the computation of stats from a
55
//! span.
66
use datadog_trace_protobuf::pb;
7-
use datadog_trace_utils::span::trace_utils;
8-
use datadog_trace_utils::span::Span;
9-
use datadog_trace_utils::span::SpanText;
7+
use datadog_trace_utils::span::v04::Span;
8+
use datadog_trace_utils::span::{trace_utils, SpanText, TraceData};
109
use hashbrown::HashMap;
10+
use std::borrow::Borrow;
1111

1212
const TAG_STATUS_CODE: &str = "http.status_code";
1313
const TAG_SYNTHETICS: &str = "synthetics";
@@ -101,10 +101,7 @@ impl<'a> BorrowedAggregationKey<'a> {
101101
///
102102
/// If `peer_tags_keys` is not empty then the peer tags of the span will be included in the
103103
/// key.
104-
pub(super) fn from_span<T>(span: &'a Span<T>, peer_tag_keys: &'a [String]) -> Self
105-
where
106-
T: SpanText,
107-
{
104+
pub(super) fn from_span<T: TraceData>(span: &'a Span<T>, peer_tag_keys: &'a [String]) -> Self {
108105
let span_kind = span
109106
.meta
110107
.get(TAG_SPANKIND)
@@ -176,7 +173,7 @@ impl From<pb::ClientGroupedStats> for OwnedAggregationKey {
176173
/// Return the status code of a span based on the metrics and meta tags.
177174
fn get_status_code<T>(span: &Span<T>) -> u32
178175
where
179-
T: SpanText,
176+
T: TraceData,
180177
{
181178
if let Some(status_code) = span.metrics.get(TAG_STATUS_CODE) {
182179
*status_code as u32
@@ -205,7 +202,7 @@ fn get_peer_tags<'k, 'v, T>(
205202
peer_tag_keys: &'k [String],
206203
) -> Vec<(&'k str, &'v str)>
207204
where
208-
T: SpanText,
205+
T: TraceData,
209206
{
210207
peer_tag_keys
211208
.iter()
@@ -228,7 +225,7 @@ impl GroupedStats {
228225
/// Update the stats of a GroupedStats by inserting a span.
229226
fn insert<T>(&mut self, value: &Span<T>)
230227
where
231-
T: SpanText,
228+
T: TraceData,
232229
{
233230
self.hits += 1;
234231
self.duration += value.duration as u64;
@@ -266,7 +263,7 @@ impl StatsBucket {
266263
/// not exist it creates it.
267264
pub(super) fn insert<T>(&mut self, key: BorrowedAggregationKey<'_>, value: &Span<T>)
268265
where
269-
T: SpanText,
266+
T: TraceData,
270267
{
271268
self.data.entry_ref(&key).or_default().insert(value);
272269
}
@@ -326,7 +323,7 @@ fn encode_grouped_stats(key: OwnedAggregationKey, group: GroupedStats) -> pb::Cl
326323

327324
#[cfg(test)]
328325
mod tests {
329-
use datadog_trace_utils::span::{SpanBytes, SpanSlice};
326+
use datadog_trace_utils::span::v04::{SpanBytes, SpanSlice};
330327

331328
use super::*;
332329
use std::{collections::HashMap, hash::Hash};

data-pipeline/src/span_concentrator/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33
//! This module implements the SpanConcentrator used to aggregate spans into stats
4+
use std::borrow::Borrow;
45
use std::collections::HashMap;
56
use std::time::{self, Duration, SystemTime};
67

78
use datadog_trace_protobuf::pb;
8-
use datadog_trace_utils::span::{trace_utils, Span, SpanText};
9+
use datadog_trace_utils::span::{trace_utils, v04::Span, TraceData};
910

1011
use aggregation::{BorrowedAggregationKey, StatsBucket};
1112

@@ -27,7 +28,7 @@ fn align_timestamp(t: u64, bucket_size: u64) -> u64 {
2728
/// Return true if the span has a span.kind that is eligible for stats computation
2829
fn compute_stats_for_span_kind<T>(span: &Span<T>, span_kinds_stats_computed: &[String]) -> bool
2930
where
30-
T: SpanText,
31+
T: TraceData,
3132
{
3233
!span_kinds_stats_computed.is_empty()
3334
&& span.meta.get("span.kind").is_some_and(|span_kind| {
@@ -38,7 +39,7 @@ where
3839
/// Return true if the span should be ignored for stats computation
3940
fn should_ignore_span<T>(span: &Span<T>, span_kinds_stats_computed: &[String]) -> bool
4041
where
41-
T: SpanText,
42+
T: TraceData,
4243
{
4344
!(trace_utils::has_top_level(span)
4445
|| trace_utils::is_measured(span)
@@ -121,7 +122,7 @@ impl SpanConcentrator {
121122
/// computation.
122123
pub fn add_span<T>(&mut self, span: &Span<T>)
123124
where
124-
T: SpanText,
125+
T: TraceData,
125126
{
126127
// If the span is eligible for stats computation
127128
if !should_ignore_span(span, self.span_kinds_stats_computed.as_slice()) {

data-pipeline/src/span_concentrator/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use crate::span_concentrator::aggregation::OwnedAggregationKey;
55

66
use super::*;
7-
use datadog_trace_utils::span::{trace_utils::compute_top_level_span, SpanSlice};
7+
use datadog_trace_utils::span::{trace_utils::compute_top_level_span, v04::SpanSlice};
88
use rand::{thread_rng, Rng};
99

1010
const BUCKET_SIZE: u64 = Duration::from_secs(2).as_nanos() as u64;

data-pipeline/src/stats_exporter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<hyper::Uri> {
188188
#[cfg(test)]
189189
mod tests {
190190
use super::*;
191-
use datadog_trace_utils::span::{trace_utils, SpanSlice};
191+
use datadog_trace_utils::span::{trace_utils, v04::SpanSlice};
192192
use datadog_trace_utils::test_utils::poll_for_mock_hit;
193193
use httpmock::prelude::*;
194194
use httpmock::MockServer;

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datadog_trace_utils::msgpack_decoder;
3434
use datadog_trace_utils::send_with_retry::{
3535
send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
3636
};
37-
use datadog_trace_utils::span::{Span, SpanText};
37+
use datadog_trace_utils::span::{v04::Span, TraceData};
3838
use datadog_trace_utils::trace_utils::TracerHeaderTags;
3939
use ddcommon::MutexExt;
4040
use ddcommon::{hyper_migration, Endpoint};
@@ -577,7 +577,7 @@ impl TraceExporter {
577577
/// # Returns
578578
/// * Ok(String): The response from the agent
579579
/// * Err(TraceExporterError): An error detailing what went wrong in the process
580-
pub fn send_trace_chunks<T: SpanText>(
580+
pub fn send_trace_chunks<T: TraceData>(
581581
&self,
582582
trace_chunks: Vec<Vec<Span<T>>>,
583583
) -> Result<AgentResponse, TraceExporterError> {
@@ -658,7 +658,7 @@ impl TraceExporter {
658658
self.handle_send_result(result, chunks, payload_len).await
659659
}
660660

661-
fn send_trace_chunks_inner<T: SpanText>(
661+
fn send_trace_chunks_inner<T: TraceData>(
662662
&self,
663663
mut traces: Vec<Vec<Span<T>>>,
664664
) -> Result<AgentResponse, TraceExporterError> {
@@ -982,8 +982,8 @@ mod tests {
982982
use self::error::AgentErrorKind;
983983
use super::*;
984984
use datadog_trace_utils::msgpack_encoder;
985+
use datadog_trace_utils::span::v04::SpanBytes;
985986
use datadog_trace_utils::span::v05;
986-
use datadog_trace_utils::span::SpanBytes;
987987
use httpmock::prelude::*;
988988
use httpmock::MockServer;
989989
use std::collections::HashMap;
@@ -1993,7 +1993,7 @@ mod single_threaded_tests {
19931993
use super::*;
19941994
use crate::agent_info;
19951995
use datadog_trace_utils::msgpack_encoder;
1996-
use datadog_trace_utils::span::SpanBytes;
1996+
use datadog_trace_utils::span::v04::SpanBytes;
19971997
use httpmock::prelude::*;
19981998
use std::time::Duration;
19991999
use tokio::time::sleep;

data-pipeline/src/trace_exporter/stats.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,9 @@ pub(crate) fn handle_stats_enabled(
200200
/// Add all spans from the given iterator into the stats concentrator
201201
/// # Panic
202202
/// Will panic if another thread panicked will holding the lock on `stats_concentrator`
203-
fn add_spans_to_stats<T: datadog_trace_utils::span::SpanText>(
203+
fn add_spans_to_stats<T: datadog_trace_utils::span::TraceData>(
204204
stats_concentrator: &Mutex<SpanConcentrator>,
205-
traces: &[Vec<datadog_trace_utils::span::Span<T>>],
205+
traces: &[Vec<datadog_trace_utils::span::v04::Span<T>>],
206206
) {
207207
let mut stats_concentrator = stats_concentrator.lock_or_panic();
208208

@@ -213,8 +213,8 @@ fn add_spans_to_stats<T: datadog_trace_utils::span::SpanText>(
213213
}
214214

215215
/// Process traces for stats computation and update header tags accordingly
216-
pub(crate) fn process_traces_for_stats<T: datadog_trace_utils::span::SpanText>(
217-
traces: &mut Vec<Vec<datadog_trace_utils::span::Span<T>>>,
216+
pub(crate) fn process_traces_for_stats<T: datadog_trace_utils::span::TraceData>(
217+
traces: &mut Vec<Vec<datadog_trace_utils::span::v04::Span<T>>>,
218218
header_tags: &mut datadog_trace_utils::trace_utils::TracerHeaderTags,
219219
client_side_stats: &ArcSwap<StatsComputationStatus>,
220220
client_computed_top_level: bool,

data-pipeline/src/trace_exporter/trace_serializer.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::trace_exporter::error::TraceExporterError;
88
use crate::trace_exporter::TraceExporterOutputFormat;
99
use datadog_trace_utils::msgpack_decoder::decode::error::DecodeError;
1010
use datadog_trace_utils::msgpack_encoder;
11-
use datadog_trace_utils::span::{Span, SpanText};
11+
use datadog_trace_utils::span::{v04::Span, TraceData};
1212
use datadog_trace_utils::trace_utils::{self, TracerHeaderTags};
1313
use datadog_trace_utils::tracer_payload;
1414
use ddcommon::header::{
@@ -46,7 +46,7 @@ impl<'a> TraceSerializer<'a> {
4646
}
4747

4848
/// Prepare traces payload and HTTP headers for sending to agent
49-
pub(super) fn prepare_traces_payload<T: SpanText>(
49+
pub(super) fn prepare_traces_payload<T: TraceData>(
5050
&self,
5151
traces: Vec<Vec<Span<T>>>,
5252
header_tags: TracerHeaderTags,
@@ -64,7 +64,7 @@ impl<'a> TraceSerializer<'a> {
6464
}
6565

6666
/// Collect trace chunks based on output format
67-
fn collect_and_process_traces<T: SpanText>(
67+
fn collect_and_process_traces<T: TraceData>(
6868
&self,
6969
traces: Vec<Vec<Span<T>>>,
7070
) -> Result<tracer_payload::TraceChunks<T>, TraceExporterError> {
@@ -97,7 +97,7 @@ impl<'a> TraceSerializer<'a> {
9797
}
9898

9999
/// Serialize payload to msgpack format
100-
fn serialize_payload<T: SpanText>(
100+
fn serialize_payload<T: TraceData>(
101101
&self,
102102
payload: &tracer_payload::TraceChunks<T>,
103103
) -> Result<Vec<u8>, TraceExporterError> {
@@ -114,7 +114,7 @@ impl<'a> TraceSerializer<'a> {
114114
mod tests {
115115
use super::*;
116116
use crate::trace_exporter::agent_response::AgentResponsePayloadVersion;
117-
use datadog_trace_utils::span::SpanBytes;
117+
use datadog_trace_utils::span::v04::SpanBytes;
118118
use datadog_trace_utils::trace_utils::TracerHeaderTags;
119119
use ddcommon::header::{
120120
APPLICATION_MSGPACK_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, DATADOG_TRACE_COUNT_STR,

datadog-sidecar-ffi/src/span.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use datadog_trace_utils::span::{
4+
use datadog_trace_utils::span::v04::{
55
AttributeAnyValueBytes, AttributeArrayValueBytes, SpanBytes, SpanEventBytes, SpanLinkBytes,
66
};
77
use ddcommon_ffi::slice::{AsBytes, CharSlice};

0 commit comments

Comments
 (0)