Skip to content

Commit e33b714

Browse files
[AGNTLOG-468] chore(datadog): add trace sampling (#1110)
1 parent be34af7 commit e33b714

File tree

14 files changed

+2049
-28
lines changed

14 files changed

+2049
-28
lines changed

bin/agent-data-plane/src/cli/run.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use saluki_components::{
2525
transforms::{
2626
AggregateConfiguration, ApmStatsTransformConfiguration, ChainedConfiguration, DogstatsDMapperConfiguration,
2727
DogstatsDPrefixFilterConfiguration, HostEnrichmentConfiguration, HostTagsConfiguration,
28-
TraceObfuscationConfiguration,
28+
TraceObfuscationConfiguration, TraceSamplerConfiguration,
2929
},
3030
};
3131
use saluki_config::{ConfigurationLoader, GenericConfiguration};
@@ -330,6 +330,8 @@ async fn add_baseline_traces_pipeline_to_blueprint(
330330
let dd_traces_enrich_config = ChainedConfiguration::default()
331331
.with_transform_builder("apm_onboarding", ApmOnboardingConfiguration)
332332
.with_transform_builder("trace_obfuscation", trace_obfuscation_config);
333+
let trace_sampler_config = TraceSamplerConfiguration::from_configuration(config)
334+
.error_context("Failed to configure Trace Sampler transform.")?;
333335
let apm_stats_transform_config = ApmStatsTransformConfiguration::from_configuration(config)
334336
.error_context("Failed to configure APM Stats transform.")?
335337
.with_environment_provider(env_provider.clone())
@@ -341,11 +343,13 @@ async fn add_baseline_traces_pipeline_to_blueprint(
341343

342344
blueprint
343345
.add_transform("traces_enrich", dd_traces_enrich_config)?
346+
.add_transform("trace_sampler", trace_sampler_config)?
344347
.add_transform("dd_apm_stats", apm_stats_transform_config)?
345348
.add_encoder("dd_stats_encode", dd_apm_stats_encoder)?
346349
.add_encoder("dd_traces_encode", dd_traces_config)?
347-
.connect_component("dd_apm_stats", ["traces_enrich"])?
348-
.connect_component("dd_traces_encode", ["traces_enrich"])?
350+
.connect_component("trace_sampler", ["traces_enrich"])?
351+
.connect_component("dd_apm_stats", ["trace_sampler"])?
352+
.connect_component("dd_traces_encode", ["trace_sampler"])?
349353
.connect_component("dd_stats_encode", ["dd_apm_stats"])?
350354
.connect_component("dd_out", ["dd_traces_encode", "dd_stats_encode"])?;
351355

lib/saluki-components/src/common/datadog/apm.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,21 @@ const fn default_target_traces_per_second() -> f64 {
1212
const fn default_errors_per_second() -> f64 {
1313
10.0
1414
}
15+
const fn default_sampling_percentage() -> f64 {
16+
100.0
17+
}
18+
19+
const fn default_error_sampling_enabled() -> bool {
20+
true
21+
}
22+
23+
const fn default_error_tracking_standalone_enabled() -> bool {
24+
false
25+
}
26+
27+
const fn default_probabilistic_sampling_enabled() -> bool {
28+
false
29+
}
1530
const fn default_peer_tags_aggregation() -> bool {
1631
true
1732
}
@@ -33,6 +48,56 @@ struct ApmConfiguration {
3348
apm_config: ApmConfig,
3449
}
3550

51+
#[derive(Clone, Debug, Deserialize)]
52+
struct ProbabilisticSamplerConfig {
53+
/// Enables probabilistic sampling.
54+
///
55+
/// When enabled, the trace sampler keeps approximately `sampling_percentage` of traces using a
56+
/// deterministic hash of the trace ID.
57+
///
58+
/// Defaults to `false`.
59+
#[serde(default = "default_probabilistic_sampling_enabled")]
60+
enabled: bool,
61+
62+
/// Sampling percentage (0-100).
63+
///
64+
/// Determines the percentage of traces to keep. A value of 100 keeps all traces,
65+
/// while 50 keeps approximately half. Values outside 0-100 are treated as 100.
66+
///
67+
/// Defaults to 100.0 (keep all traces).
68+
#[serde(default = "default_sampling_percentage")]
69+
sampling_percentage: f64,
70+
}
71+
72+
impl Default for ProbabilisticSamplerConfig {
73+
fn default() -> Self {
74+
Self {
75+
enabled: default_probabilistic_sampling_enabled(),
76+
sampling_percentage: default_sampling_percentage(),
77+
}
78+
}
79+
}
80+
81+
#[derive(Clone, Debug, Deserialize)]
82+
struct ErrorTrackingStandaloneConfig {
83+
/// Enables Error Tracking Standalone mode.
84+
///
85+
/// When enabled, error tracking standalone mode suppresses single-span sampling and analytics
86+
/// events for dropped traces.
87+
///
88+
/// Defaults to `false`.
89+
#[serde(default = "default_error_tracking_standalone_enabled")]
90+
enabled: bool,
91+
}
92+
93+
impl Default for ErrorTrackingStandaloneConfig {
94+
fn default() -> Self {
95+
Self {
96+
enabled: default_error_tracking_standalone_enabled(),
97+
}
98+
}
99+
}
100+
36101
#[derive(Clone, Debug, Deserialize)]
37102
pub struct ApmConfig {
38103
/// Target traces per second for priority sampling.
@@ -47,6 +112,27 @@ pub struct ApmConfig {
47112
#[serde(default = "default_errors_per_second")]
48113
errors_per_second: f64,
49114

115+
/// Probabilistic sampler configuration.
116+
///
117+
/// Defaults to enabled with `sampling_percentage` set to 100.0 (keep all traces).
118+
#[serde(default)]
119+
probabilistic_sampler: ProbabilisticSamplerConfig,
120+
121+
/// Enable error sampling in the trace sampler.
122+
///
123+
/// When enabled, traces containing errors will be kept even if they would be dropped by
124+
/// probabilistic sampling. This ensures error visibility at low sampling rates.
125+
///
126+
/// Defaults to `true`.
127+
#[serde(default = "default_error_sampling_enabled")]
128+
error_sampling_enabled: bool,
129+
130+
/// Error Tracking Standalone configuration.
131+
///
132+
/// Defaults to disabled.
133+
#[serde(default)]
134+
error_tracking_standalone: ErrorTrackingStandaloneConfig,
135+
50136
/// Enables an additional stats computation check on spans to see if they have an eligible `span.kind` (server, consumer, client, producer).
51137
/// If enabled, a span with an eligible `span.kind` will have stats computed. If disabled, only top-level and measured spans will have stats computed.
52138
///
@@ -100,6 +186,26 @@ impl ApmConfig {
100186
self.errors_per_second
101187
}
102188

189+
/// Returns if probabilistic sampling is enabled.
190+
pub const fn probabilistic_sampler_enabled(&self) -> bool {
191+
self.probabilistic_sampler.enabled
192+
}
193+
194+
/// Returns the probabilistic sampler sampling percentage.
195+
pub const fn probabilistic_sampler_sampling_percentage(&self) -> f64 {
196+
self.probabilistic_sampler.sampling_percentage
197+
}
198+
199+
/// Returns if error sampling is enabled.
200+
pub const fn error_sampling_enabled(&self) -> bool {
201+
self.error_sampling_enabled
202+
}
203+
204+
/// Returns if error tracking standalone mode is enabled.
205+
pub const fn error_tracking_standalone_enabled(&self) -> bool {
206+
self.error_tracking_standalone.enabled
207+
}
208+
103209
/// Returns if stats computation by span kind is enabled.
104210
pub const fn compute_stats_by_span_kind(&self) -> bool {
105211
self.compute_stats_by_span_kind
@@ -143,6 +249,9 @@ impl Default for ApmConfig {
143249
Self {
144250
target_traces_per_second: default_target_traces_per_second(),
145251
errors_per_second: default_errors_per_second(),
252+
probabilistic_sampler: ProbabilisticSamplerConfig::default(),
253+
error_sampling_enabled: default_error_sampling_enabled(),
254+
error_tracking_standalone: ErrorTrackingStandaloneConfig::default(),
146255
compute_stats_by_span_kind: default_compute_stats_by_span_kind(),
147256
peer_tags_aggregation: default_peer_tags_aggregation(),
148257
peer_tags: Vec::new(),

lib/saluki-components/src/common/datadog/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,40 @@ mod retry;
1010
pub mod telemetry;
1111
pub mod transaction;
1212

13+
/// Metric key used to store Datadog sampling priority (`_sampling_priority_v1`).
14+
pub const SAMPLING_PRIORITY_METRIC_KEY: &str = "_sampling_priority_v1";
15+
1316
/// Default compressed size limit for intake requests.
1417
pub const DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT: usize = 3_200_000; // 3 MiB
1518

1619
/// Default uncompressed size limit for intake requests.
1720
pub const DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT: usize = 62_914_560; // 60 MiB
21+
22+
/// Metadata tag used to store the sampling decision maker (`_dd.p.dm`).
23+
pub const TAG_DECISION_MAKER: &str = "_dd.p.dm";
24+
25+
/// Decision maker value for probabilistic sampling (matches Datadog Agent).
26+
pub const DECISION_MAKER_PROBABILISTIC: &str = "-9";
27+
28+
/// Metadata key used to store the OTEL trace id.
29+
pub const OTEL_TRACE_ID_META_KEY: &str = "otel.trace_id";
30+
31+
/// Maximum trace id used for deterministic sampling.
32+
pub const MAX_TRACE_ID: u64 = u64::MAX;
33+
34+
/// Precomputed float form of `MAX_TRACE_ID`.
35+
pub const MAX_TRACE_ID_FLOAT: f64 = MAX_TRACE_ID as f64;
36+
37+
/// Hasher used for deterministic sampling.
38+
pub const SAMPLER_HASHER: u64 = 1111111111111111111;
39+
40+
/// Returns whether to keep a trace, based on its ID and a sampling rate.
41+
///
42+
/// This assumes trace IDs are nearly uniformly distributed.
43+
pub fn sample_by_rate(trace_id: u64, rate: f64) -> bool {
44+
if rate < 1.0 {
45+
trace_id.wrapping_mul(SAMPLER_HASHER) < (rate * MAX_TRACE_ID_FLOAT) as u64
46+
} else {
47+
true
48+
}
49+
}

lib/saluki-components/src/common/otlp/traces/transform.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use serde_json::{Map as JsonMap, Value as JsonValue};
2020
use stringtheory::MetaString;
2121
use tracing::error;
2222

23+
use crate::common::datadog::{OTEL_TRACE_ID_META_KEY, SAMPLING_PRIORITY_METRIC_KEY};
2324
use crate::common::otlp::attributes::{get_int_attribute, HTTP_MAPPINGS};
2425
use crate::common::otlp::traces::normalize::{normalize_service, normalize_tag_value};
2526
use crate::common::otlp::traces::normalize::{truncate_utf8, MAX_RESOURCE_LEN};
@@ -29,7 +30,6 @@ use crate::common::otlp::util::{
2930
DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_ENVIRONMENT, KEY_DATADOG_VERSION,
3031
};
3132

32-
pub(crate) const SAMPLING_PRIORITY_METRIC_KEY: &str = "_sampling_priority_v1";
3333
const EVENT_EXTRACTION_METRIC_KEY: &str = "_dd1.sr.eausr";
3434
const ANALYTICS_EVENT_KEY: &str = "analytics.event";
3535
const HTTP_REQUEST_HEADER_PREFIX: &str = "http.request.header.";
@@ -71,7 +71,6 @@ const NETWORK_PROTOCOL_NAME_KEY: &str = "network.protocol.name";
7171
const HTTP_STATUS_CODE_KEY: &str = "http.status_code";
7272
const HTTP_RESPONSE_STATUS_CODE_KEY: &str = "http.response.status_code";
7373
const SPAN_KIND_META_KEY: &str = "span.kind";
74-
const OTEL_TRACE_ID_META_KEY: &str = "otel.trace_id";
7574
const W3C_TRACESTATE_META_KEY: &str = "w3c.tracestate";
7675
const OTEL_LIBRARY_NAME_META_KEY: &str = "otel.library.name";
7776
const OTEL_LIBRARY_VERSION_META_KEY: &str = "otel.library.version";

lib/saluki-components/src/common/otlp/traces/translator.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use otlp_protos::opentelemetry::proto::resource::v1::Resource as OtlpResource;
33
use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans;
44
use saluki_common::collections::FastHashMap;
55
use saluki_context::tags::TagSet;
6-
use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
6+
use saluki_core::data_model::event::trace::{Span as DdSpan, Trace, TraceSampling};
77
use saluki_core::data_model::event::Event;
88

9+
use crate::common::datadog::SAMPLING_PRIORITY_METRIC_KEY;
910
use crate::common::otlp::config::TracesConfig;
1011
use crate::common::otlp::traces::transform::otel_span_to_dd_span;
1112
use crate::common::otlp::traces::transform::otlp_value_to_string;
@@ -52,6 +53,7 @@ impl OtlpTracesTranslator {
5253
let resource: OtlpResource = resource_spans.resource.unwrap_or_default();
5354
let resource_tags: TagSet = resource_attributes_to_tagset(&resource.attributes);
5455
let mut traces_by_id: FastHashMap<u64, Vec<DdSpan>> = FastHashMap::default();
56+
let mut priorities_by_id: FastHashMap<u64, i32> = FastHashMap::default();
5557
let ignore_missing_fields = self.config.ignore_missing_datadog_fields;
5658

5759
for scope_spans in resource_spans.scope_spans {
@@ -67,17 +69,30 @@ impl OtlpTracesTranslator {
6769
ignore_missing_fields,
6870
self.config.enable_otlp_compute_top_level_by_span_kind,
6971
);
72+
73+
// Track last-seen priority for this trace (overwrites previous values)
74+
if let Some(&priority) = dd_span.metrics().get(SAMPLING_PRIORITY_METRIC_KEY) {
75+
priorities_by_id.insert(trace_id, priority as i32);
76+
}
77+
7078
traces_by_id.entry(trace_id).or_default().push(dd_span);
7179
}
7280
}
7381

7482
traces_by_id
7583
.into_iter()
76-
.filter_map(|(_, spans)| {
84+
.filter_map(|(trace_id, spans)| {
7785
if spans.is_empty() {
7886
None
7987
} else {
80-
Some(Event::Trace(Trace::new(spans, resource_tags.clone())))
88+
let mut trace = Trace::new(spans, resource_tags.clone());
89+
90+
// Set the trace-level sampling priority if one was found
91+
if let Some(&priority) = priorities_by_id.get(&trace_id) {
92+
trace.set_sampling(Some(TraceSampling::new(false, Some(priority), None, None)));
93+
}
94+
95+
Some(Event::Trace(trace))
8196
}
8297
})
8398
.collect()

lib/saluki-components/src/encoders/datadog/traces/mod.rs

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::common::datadog::{
4949
io::RB_BUFFER_CHUNK_SIZE,
5050
request_builder::{EndpointEncoder, RequestBuilder},
5151
telemetry::ComponentTelemetry,
52-
DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
52+
DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
5353
};
5454
use crate::common::otlp::config::TracesConfig;
5555
use crate::common::otlp::util::{
@@ -62,6 +62,10 @@ const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
6262
const MAX_TRACES_PER_PAYLOAD: usize = 10000;
6363
static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
6464

65+
// Sampling metadata keys / values.
66+
const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
67+
const DEFAULT_CHUNK_PRIORITY: i32 = 1; // PRIORITY_AUTO_KEEP
68+
6569
fn default_serializer_compressor_kind() -> String {
6670
"zstd".to_string()
6771
}
@@ -547,28 +551,40 @@ impl TraceEndpointEncoder {
547551
}
548552

549553
fn build_trace_chunk(&self, trace: &Trace) -> TraceChunk {
550-
let mut spans: Vec<ProtoSpan> = trace.spans().iter().map(convert_span).collect();
554+
let spans: Vec<ProtoSpan> = trace.spans().iter().map(convert_span).collect();
551555
let mut chunk = TraceChunk::new();
552556

553-
let rate = self.sampling_rate();
554557
let mut tags = std::collections::HashMap::new();
555-
tags.insert("_dd.otlp_sr".to_string(), format!("{:.2}", rate));
556-
557-
// TODO: Remove this once we have sampling. We have to hardcode the priority to 1 for now so that intake does not drop the trace.
558-
const PRIORITY_AUTO_KEEP: i32 = 1;
559-
chunk.set_priority(PRIORITY_AUTO_KEEP);
560-
561-
// Set _dd.p.dm (decision maker)
562-
// Only set if sampling priority is "keep" (which it is, since we set PRIORITY_AUTO_KEEP)
563-
// Decision maker "-9" indicates probabilistic sampler made the decision
564-
const DECISION_MAKER: &str = "-9";
565-
if let Some(first_span) = spans.first_mut() {
566-
let mut meta = first_span.take_meta();
567-
meta.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
568-
first_span.set_meta(meta);
558+
559+
// Use trace-level sampling metadata if available (set by the trace sampler transform).
560+
// This provides explicit trace-level sampling information without needing to scan spans.
561+
if let Some(sampling) = trace.sampling() {
562+
// Set priority from trace metadata
563+
chunk.set_priority(sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY));
564+
chunk.set_droppedTrace(sampling.dropped_trace);
565+
566+
// Set decision maker tag if present.
567+
if let Some(dm) = &sampling.decision_maker {
568+
tags.insert(TAG_DECISION_MAKER.to_string(), dm.to_string());
569+
}
570+
571+
// Set OTLP sampling rate tag if present (from sampler)
572+
if let Some(otlp_sr) = &sampling.otlp_sampling_rate {
573+
tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), otlp_sr.to_string());
574+
} else {
575+
// Fallback to encoder's computed rate
576+
let rate = self.sampling_rate();
577+
tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), format!("{:.2}", rate));
578+
}
579+
} else {
580+
// Fallback: if trace.sampling is None, use defaults
581+
// (No span scanning per the plan's "no fallback scan" requirement)
582+
chunk.set_priority(DEFAULT_CHUNK_PRIORITY);
583+
chunk.set_droppedTrace(false);
584+
let rate = self.sampling_rate();
585+
tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), format!("{:.2}", rate));
569586
}
570587

571-
tags.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
572588
chunk.set_tags(tags);
573589

574590
chunk.set_spans(spans);

lib/saluki-components/src/transforms/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ pub use self::dogstatsd_mapper::DogstatsDMapperConfiguration;
2121
mod metric_router;
2222
pub use self::metric_router::MetricRouterConfiguration;
2323

24+
mod trace_sampler;
25+
pub use self::trace_sampler::TraceSamplerConfiguration;
26+
2427
mod apm_stats;
2528
pub use self::apm_stats::ApmStatsTransformConfiguration;
2629

0 commit comments

Comments
 (0)