Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
47f9942
modify the Trace event to include a TraceSampling field, which stores…
andrewqian2001datadog Jan 21, 2026
704d1b1
add the probabilistic sampler
andrewqian2001datadog Jan 21, 2026
b3979ab
add the error sampler
andrewqian2001datadog Jan 21, 2026
55cdb76
add the core sampler
andrewqian2001datadog Jan 21, 2026
e921c3a
add the score sampler
andrewqian2001datadog Jan 21, 2026
b6dac83
add setter for trace spans
andrewqian2001datadog Jan 21, 2026
de45c6d
add trace_sampler to mod.rs
andrewqian2001datadog Jan 21, 2026
30b0625
checks for user set sampling in the translator
andrewqian2001datadog Jan 21, 2026
8017f4c
add the trace sampler transform component
andrewqian2001datadog Jan 21, 2026
a32499f
add helper class to compute signature
andrewqian2001datadog Jan 21, 2026
df45987
modified the trace encoder to use sampling info from the transform
andrewqian2001datadog Jan 21, 2026
f5b45ed
make check clippy + fmt
andrewqian2001datadog Jan 21, 2026
76ab423
wire up component
andrewqian2001datadog Jan 22, 2026
321b7f7
remove default macro which sets probablistic sampling to false
andrewqian2001datadog Jan 22, 2026
df98d8f
from tag from span which is already added to trace chunk
andrewqian2001datadog Jan 23, 2026
740b8f7
make fmt
andrewqian2001datadog Jan 23, 2026
ecdb386
remove tag added during translation process
andrewqian2001datadog Jan 23, 2026
91254c7
disable tag when probalistic sampler isn't being used
andrewqian2001datadog Jan 23, 2026
43a889d
revert change
andrewqian2001datadog Jan 23, 2026
cd1d216
use apm config
andrewqian2001datadog Jan 23, 2026
67c2985
make check-clippy
andrewqian2001datadog Jan 23, 2026
d17c04f
address comments
andrewqian2001datadog Jan 23, 2026
633b43a
fix missing doc, fix bugs
andrewqian2001datadog Jan 23, 2026
9c20678
address comments part two
andrewqian2001datadog Jan 23, 2026
34f0114
remove dead code, move code only used in test
andrewqian2001datadog Jan 23, 2026
934411a
remove more dead code
andrewqian2001datadog Jan 23, 2026
fdc04fd
formatting
andrewqian2001datadog Jan 23, 2026
731883c
Merge remote-tracking branch 'origin/main' into andrewq/add-trace-sam…
andrewqian2001datadog Jan 23, 2026
2a77b75
move missing functionality to docs of trace_sampler
andrewqian2001datadog Jan 23, 2026
bce63b0
set probilistic sampler to false by default
andrewqian2001datadog Jan 23, 2026
9f9449e
make error_tps configurable
andrewqian2001datadog Jan 26, 2026
97e1701
turn off error sampling by default (the agent has this on by default)
andrewqian2001datadog Jan 26, 2026
6c2b039
make fmt
andrewqian2001datadog Jan 26, 2026
3aa3e31
oops remove print
andrewqian2001datadog Jan 26, 2026
f16ccf8
nvm change it back to true
andrewqian2001datadog Jan 26, 2026
8cde3f8
move constant to shared folder
andrewqian2001datadog Jan 26, 2026
4f07191
Merge remote-tracking branch 'origin/main' into andrewq/add-trace-sam…
andrewqian2001datadog Jan 26, 2026
96f28a9
add unit tests for errors sampler
andrewqian2001datadog Jan 27, 2026
7a8f540
make check clippy and fmt
andrewqian2001datadog Jan 27, 2026
5fce437
change probabilistic sampling to on by default
andrewqian2001datadog Jan 27, 2026
6b3dc7d
make fmt
andrewqian2001datadog Jan 27, 2026
be2ddd8
Merge remote-tracking branch 'origin/main' into andrewq/add-trace-sam…
andrewqian2001datadog Jan 27, 2026
c529497
make fmt
andrewqian2001datadog Jan 27, 2026
e37b5f5
change probabilistic back to false
andrewqian2001datadog Jan 27, 2026
e7ec247
add no priority sampler
andrewqian2001datadog Jan 27, 2026
7d0e8cc
????
andrewqian2001datadog Jan 28, 2026
0e7373a
remove testing code
andrewqian2001datadog Jan 28, 2026
e76278f
make fmt
andrewqian2001datadog Jan 28, 2026
0ee5cf2
move constants to shared file
andrewqian2001datadog Jan 28, 2026
61401fb
remove test covered by correctness test
andrewqian2001datadog Jan 28, 2026
24c0678
Merge remote-tracking branch 'origin/main' into andrewq/add-trace-sam…
andrewqian2001datadog Jan 28, 2026
b3d4856
add comment
andrewqian2001datadog Jan 28, 2026
8b28ef4
move duplicate code to shared folder
andrewqian2001datadog Jan 28, 2026
664c399
make fmt
andrewqian2001datadog Jan 28, 2026
7b9a38f
add comment
andrewqian2001datadog Jan 28, 2026
57c5208
remove unecessary code
andrewqian2001datadog Jan 28, 2026
e2794aa
remove potentially??? unecessary code
andrewqian2001datadog Jan 28, 2026
1186241
Merge remote-tracking branch 'origin/main' into andrewq/add-trace-sam…
andrewqian2001datadog Jan 28, 2026
b0ba6d8
Merge branch 'main' into andrewq/add-trace-sampling
andrewqian2001datadog Jan 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions bin/agent-data-plane/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use saluki_components::{
transforms::{
AggregateConfiguration, ApmStatsTransformConfiguration, ChainedConfiguration, DogstatsDMapperConfiguration,
DogstatsDPrefixFilterConfiguration, HostEnrichmentConfiguration, HostTagsConfiguration,
TraceObfuscationConfiguration,
TraceObfuscationConfiguration, TraceSamplerConfiguration,
},
};
use saluki_config::{ConfigurationLoader, GenericConfiguration};
Expand Down Expand Up @@ -330,6 +330,8 @@ async fn add_baseline_traces_pipeline_to_blueprint(
let dd_traces_enrich_config = ChainedConfiguration::default()
.with_transform_builder("apm_onboarding", ApmOnboardingConfiguration)
.with_transform_builder("trace_obfuscation", trace_obfuscation_config);
let trace_sampler_config = TraceSamplerConfiguration::from_configuration(config)
.error_context("Failed to configure Trace Sampler transform.")?;
let apm_stats_transform_config = ApmStatsTransformConfiguration::from_configuration(config)
.error_context("Failed to configure APM Stats transform.")?
.with_environment_provider(env_provider.clone())
Expand All @@ -341,11 +343,13 @@ async fn add_baseline_traces_pipeline_to_blueprint(

blueprint
.add_transform("traces_enrich", dd_traces_enrich_config)?
.add_transform("trace_sampler", trace_sampler_config)?
.add_transform("dd_apm_stats", apm_stats_transform_config)?
.add_encoder("dd_stats_encode", dd_apm_stats_encoder)?
.add_encoder("dd_traces_encode", dd_traces_config)?
.connect_component("dd_apm_stats", ["traces_enrich"])?
.connect_component("dd_traces_encode", ["traces_enrich"])?
.connect_component("trace_sampler", ["traces_enrich"])?
.connect_component("dd_apm_stats", ["trace_sampler"])?
.connect_component("dd_traces_encode", ["trace_sampler"])?
.connect_component("dd_stats_encode", ["dd_apm_stats"])?
.connect_component("dd_out", ["dd_traces_encode", "dd_stats_encode"])?;

Expand Down
109 changes: 109 additions & 0 deletions lib/saluki-components/src/common/datadog/apm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ const fn default_target_traces_per_second() -> f64 {
const fn default_errors_per_second() -> f64 {
10.0
}
const fn default_sampling_percentage() -> f64 {
100.0
}

const fn default_error_sampling_enabled() -> bool {
true
}

const fn default_error_tracking_standalone_enabled() -> bool {
false
}

const fn default_probabilistic_sampling_enabled() -> bool {
false
}
const fn default_peer_tags_aggregation() -> bool {
true
}
Expand All @@ -33,6 +48,56 @@ struct ApmConfiguration {
apm_config: ApmConfig,
}

#[derive(Clone, Debug, Deserialize)]
struct ProbabilisticSamplerConfig {
/// Enables probabilistic sampling.
///
/// When enabled, the trace sampler keeps approximately `sampling_percentage` of traces using a
/// deterministic hash of the trace ID.
///
/// Defaults to `false`.
#[serde(default = "default_probabilistic_sampling_enabled")]
enabled: bool,

/// Sampling percentage (0-100).
///
/// Determines the percentage of traces to keep. A value of 100 keeps all traces,
/// while 50 keeps approximately half. Values outside 0-100 are treated as 100.
///
/// Defaults to 100.0 (keep all traces).
#[serde(default = "default_sampling_percentage")]
sampling_percentage: f64,
}

impl Default for ProbabilisticSamplerConfig {
fn default() -> Self {
Self {
enabled: default_probabilistic_sampling_enabled(),
sampling_percentage: default_sampling_percentage(),
}
}
}

#[derive(Clone, Debug, Deserialize)]
struct ErrorTrackingStandaloneConfig {
/// Enables Error Tracking Standalone mode.
///
/// When enabled, error tracking standalone mode suppresses single-span sampling and analytics
/// events for dropped traces.
///
/// Defaults to `false`.
#[serde(default = "default_error_tracking_standalone_enabled")]
enabled: bool,
}

impl Default for ErrorTrackingStandaloneConfig {
fn default() -> Self {
Self {
enabled: default_error_tracking_standalone_enabled(),
}
}
}

#[derive(Clone, Debug, Deserialize)]
pub struct ApmConfig {
/// Target traces per second for priority sampling.
Expand All @@ -47,6 +112,27 @@ pub struct ApmConfig {
#[serde(default = "default_errors_per_second")]
errors_per_second: f64,

/// Probabilistic sampler configuration.
///
/// Defaults to enabled with `sampling_percentage` set to 100.0 (keep all traces).
#[serde(default)]
probabilistic_sampler: ProbabilisticSamplerConfig,

/// Enable error sampling in the trace sampler.
///
/// When enabled, traces containing errors will be kept even if they would be dropped by
/// probabilistic sampling. This ensures error visibility at low sampling rates.
///
/// Defaults to `true`.
#[serde(default = "default_error_sampling_enabled")]
error_sampling_enabled: bool,

/// Error Tracking Standalone configuration.
///
/// Defaults to disabled.
#[serde(default)]
error_tracking_standalone: ErrorTrackingStandaloneConfig,

/// Enables an additional stats computation check on spans to see if they have an eligible `span.kind` (server, consumer, client, producer).
/// If enabled, a span with an eligible `span.kind` will have stats computed. If disabled, only top-level and measured spans will have stats computed.
///
Expand Down Expand Up @@ -100,6 +186,26 @@ impl ApmConfig {
self.errors_per_second
}

/// Returns if probabilistic sampling is enabled.
pub const fn probabilistic_sampler_enabled(&self) -> bool {
self.probabilistic_sampler.enabled
}

/// Returns the probabilistic sampler sampling percentage.
pub const fn probabilistic_sampler_sampling_percentage(&self) -> f64 {
self.probabilistic_sampler.sampling_percentage
}

/// Returns if error sampling is enabled.
pub const fn error_sampling_enabled(&self) -> bool {
self.error_sampling_enabled
}

/// Returns if error tracking standalone mode is enabled.
pub const fn error_tracking_standalone_enabled(&self) -> bool {
self.error_tracking_standalone.enabled
}

/// Returns if stats computation by span kind is enabled.
pub const fn compute_stats_by_span_kind(&self) -> bool {
self.compute_stats_by_span_kind
Expand Down Expand Up @@ -143,6 +249,9 @@ impl Default for ApmConfig {
Self {
target_traces_per_second: default_target_traces_per_second(),
errors_per_second: default_errors_per_second(),
probabilistic_sampler: ProbabilisticSamplerConfig::default(),
error_sampling_enabled: default_error_sampling_enabled(),
error_tracking_standalone: ErrorTrackingStandaloneConfig::default(),
compute_stats_by_span_kind: default_compute_stats_by_span_kind(),
peer_tags_aggregation: default_peer_tags_aggregation(),
peer_tags: Vec::new(),
Expand Down
32 changes: 32 additions & 0 deletions lib/saluki-components/src/common/datadog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,40 @@ mod retry;
pub mod telemetry;
pub mod transaction;

/// Metric key used to store Datadog sampling priority (`_sampling_priority_v1`).
pub const SAMPLING_PRIORITY_METRIC_KEY: &str = "_sampling_priority_v1";

/// Default compressed size limit for intake requests.
pub const DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT: usize = 3_200_000; // 3 MiB

/// Default uncompressed size limit for intake requests.
pub const DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT: usize = 62_914_560; // 60 MiB

/// Metadata tag used to store the sampling decision maker (`_dd.p.dm`).
pub const TAG_DECISION_MAKER: &str = "_dd.p.dm";

/// Decision maker value for probabilistic sampling (matches Datadog Agent).
pub const DECISION_MAKER_PROBABILISTIC: &str = "-9";

/// Metadata key used to store the OTEL trace id.
pub const OTEL_TRACE_ID_META_KEY: &str = "otel.trace_id";

/// Maximum trace id used for deterministic sampling.
pub const MAX_TRACE_ID: u64 = u64::MAX;

/// Precomputed float form of `MAX_TRACE_ID`.
pub const MAX_TRACE_ID_FLOAT: f64 = MAX_TRACE_ID as f64;

/// Hasher used for deterministic sampling.
pub const SAMPLER_HASHER: u64 = 1111111111111111111;

/// Returns whether to keep a trace, based on its ID and a sampling rate.
///
/// This assumes trace IDs are nearly uniformly distributed.
pub fn sample_by_rate(trace_id: u64, rate: f64) -> bool {
if rate < 1.0 {
trace_id.wrapping_mul(SAMPLER_HASHER) < (rate * MAX_TRACE_ID_FLOAT) as u64
} else {
true
}
}
3 changes: 1 addition & 2 deletions lib/saluki-components/src/common/otlp/traces/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use serde_json::{Map as JsonMap, Value as JsonValue};
use stringtheory::MetaString;
use tracing::error;

use crate::common::datadog::{OTEL_TRACE_ID_META_KEY, SAMPLING_PRIORITY_METRIC_KEY};
use crate::common::otlp::attributes::{get_int_attribute, HTTP_MAPPINGS};
use crate::common::otlp::traces::normalize::{normalize_service, normalize_tag_value};
use crate::common::otlp::traces::normalize::{truncate_utf8, MAX_RESOURCE_LEN};
Expand All @@ -29,7 +30,6 @@ use crate::common::otlp::util::{
DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_ENVIRONMENT, KEY_DATADOG_VERSION,
};

pub(crate) const SAMPLING_PRIORITY_METRIC_KEY: &str = "_sampling_priority_v1";
const EVENT_EXTRACTION_METRIC_KEY: &str = "_dd1.sr.eausr";
const ANALYTICS_EVENT_KEY: &str = "analytics.event";
const HTTP_REQUEST_HEADER_PREFIX: &str = "http.request.header.";
Expand Down Expand Up @@ -71,7 +71,6 @@ const NETWORK_PROTOCOL_NAME_KEY: &str = "network.protocol.name";
const HTTP_STATUS_CODE_KEY: &str = "http.status_code";
const HTTP_RESPONSE_STATUS_CODE_KEY: &str = "http.response.status_code";
const SPAN_KIND_META_KEY: &str = "span.kind";
const OTEL_TRACE_ID_META_KEY: &str = "otel.trace_id";
const W3C_TRACESTATE_META_KEY: &str = "w3c.tracestate";
const OTEL_LIBRARY_NAME_META_KEY: &str = "otel.library.name";
const OTEL_LIBRARY_VERSION_META_KEY: &str = "otel.library.version";
Expand Down
21 changes: 18 additions & 3 deletions lib/saluki-components/src/common/otlp/traces/translator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use otlp_protos::opentelemetry::proto::resource::v1::Resource as OtlpResource;
use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans;
use saluki_common::collections::FastHashMap;
use saluki_context::tags::TagSet;
use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
use saluki_core::data_model::event::trace::{Span as DdSpan, Trace, TraceSampling};
use saluki_core::data_model::event::Event;

use crate::common::datadog::SAMPLING_PRIORITY_METRIC_KEY;
use crate::common::otlp::config::TracesConfig;
use crate::common::otlp::traces::transform::otel_span_to_dd_span;
use crate::common::otlp::traces::transform::otlp_value_to_string;
Expand Down Expand Up @@ -52,6 +53,7 @@ impl OtlpTracesTranslator {
let resource: OtlpResource = resource_spans.resource.unwrap_or_default();
let resource_tags: TagSet = resource_attributes_to_tagset(&resource.attributes);
let mut traces_by_id: FastHashMap<u64, Vec<DdSpan>> = FastHashMap::default();
let mut priorities_by_id: FastHashMap<u64, i32> = FastHashMap::default();
let ignore_missing_fields = self.config.ignore_missing_datadog_fields;

for scope_spans in resource_spans.scope_spans {
Expand All @@ -67,17 +69,30 @@ impl OtlpTracesTranslator {
ignore_missing_fields,
self.config.enable_otlp_compute_top_level_by_span_kind,
);

// Track last-seen priority for this trace (overwrites previous values)
if let Some(&priority) = dd_span.metrics().get(SAMPLING_PRIORITY_METRIC_KEY) {
priorities_by_id.insert(trace_id, priority as i32);
}

traces_by_id.entry(trace_id).or_default().push(dd_span);
}
}

traces_by_id
.into_iter()
.filter_map(|(_, spans)| {
.filter_map(|(trace_id, spans)| {
if spans.is_empty() {
None
} else {
Some(Event::Trace(Trace::new(spans, resource_tags.clone())))
let mut trace = Trace::new(spans, resource_tags.clone());

// Set the trace-level sampling priority if one was found
if let Some(&priority) = priorities_by_id.get(&trace_id) {
trace.set_sampling(Some(TraceSampling::new(false, Some(priority), None, None)));
}

Some(Event::Trace(trace))
}
})
.collect()
Expand Down
52 changes: 34 additions & 18 deletions lib/saluki-components/src/encoders/datadog/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::common::datadog::{
io::RB_BUFFER_CHUNK_SIZE,
request_builder::{EndpointEncoder, RequestBuilder},
telemetry::ComponentTelemetry,
DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT,
DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER,
};
use crate::common::otlp::config::TracesConfig;
use crate::common::otlp::util::{
Expand All @@ -62,6 +62,10 @@ const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
const MAX_TRACES_PER_PAYLOAD: usize = 10000;
static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");

// Sampling metadata keys / values.
const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
const DEFAULT_CHUNK_PRIORITY: i32 = 1; // PRIORITY_AUTO_KEEP

fn default_serializer_compressor_kind() -> String {
"zstd".to_string()
}
Expand Down Expand Up @@ -547,28 +551,40 @@ impl TraceEndpointEncoder {
}

fn build_trace_chunk(&self, trace: &Trace) -> TraceChunk {
let mut spans: Vec<ProtoSpan> = trace.spans().iter().map(convert_span).collect();
let spans: Vec<ProtoSpan> = trace.spans().iter().map(convert_span).collect();
let mut chunk = TraceChunk::new();

let rate = self.sampling_rate();
let mut tags = std::collections::HashMap::new();
tags.insert("_dd.otlp_sr".to_string(), format!("{:.2}", rate));

// TODO: Remove this once we have sampling. We have to hardcode the priority to 1 for now so that intake does not drop the trace.
const PRIORITY_AUTO_KEEP: i32 = 1;
chunk.set_priority(PRIORITY_AUTO_KEEP);

// Set _dd.p.dm (decision maker)
// Only set if sampling priority is "keep" (which it is, since we set PRIORITY_AUTO_KEEP)
// Decision maker "-9" indicates probabilistic sampler made the decision
const DECISION_MAKER: &str = "-9";
if let Some(first_span) = spans.first_mut() {
let mut meta = first_span.take_meta();
meta.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
first_span.set_meta(meta);

// Use trace-level sampling metadata if available (set by the trace sampler transform).
// This provides explicit trace-level sampling information without needing to scan spans.
if let Some(sampling) = trace.sampling() {
// Set priority from trace metadata
chunk.set_priority(sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY));
chunk.set_droppedTrace(sampling.dropped_trace);

// Set decision maker tag if present.
if let Some(dm) = &sampling.decision_maker {
tags.insert(TAG_DECISION_MAKER.to_string(), dm.to_string());
}

// Set OTLP sampling rate tag if present (from sampler)
if let Some(otlp_sr) = &sampling.otlp_sampling_rate {
tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), otlp_sr.to_string());
} else {
// Fallback to encoder's computed rate
let rate = self.sampling_rate();
tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), format!("{:.2}", rate));
}
} else {
// Fallback: if trace.sampling is None, use defaults
// (No span scanning per the plan's "no fallback scan" requirement)
chunk.set_priority(DEFAULT_CHUNK_PRIORITY);
chunk.set_droppedTrace(false);
let rate = self.sampling_rate();
tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), format!("{:.2}", rate));
}

tags.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
chunk.set_tags(tags);

chunk.set_spans(spans);
Expand Down
3 changes: 3 additions & 0 deletions lib/saluki-components/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub use self::dogstatsd_mapper::DogstatsDMapperConfiguration;
mod metric_router;
pub use self::metric_router::MetricRouterConfiguration;

mod trace_sampler;
pub use self::trace_sampler::TraceSamplerConfiguration;

mod apm_stats;
pub use self::apm_stats::ApmStatsTransformConfiguration;

Expand Down
Loading