Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
47f9942
modify the Trace event to include a TraceSampling field, which stores…
andrewqian2001datadog Jan 21, 2026
704d1b1
add the probabilistic sampler
andrewqian2001datadog Jan 21, 2026
b3979ab
add the error sampler
andrewqian2001datadog Jan 21, 2026
55cdb76
add the core sampler
andrewqian2001datadog Jan 21, 2026
e921c3a
add the score sampler
andrewqian2001datadog Jan 21, 2026
b6dac83
add setter for trace spans
andrewqian2001datadog Jan 21, 2026
de45c6d
add trace_sampler to mod.rs
andrewqian2001datadog Jan 21, 2026
30b0625
checks for user set sampling in the translator
andrewqian2001datadog Jan 21, 2026
8017f4c
add the trace sampler transform component
andrewqian2001datadog Jan 21, 2026
a32499f
add helper class to compute signature
andrewqian2001datadog Jan 21, 2026
df45987
modified the trace encoder to use sampling info from the transform
andrewqian2001datadog Jan 21, 2026
f5b45ed
make check clippy + fmt
andrewqian2001datadog Jan 21, 2026
76ab423
wire up component
andrewqian2001datadog Jan 22, 2026
321b7f7
remove default macro which sets probablistic sampling to false
andrewqian2001datadog Jan 22, 2026
df98d8f
from tag from span which is already added to trace chunk
andrewqian2001datadog Jan 23, 2026
740b8f7
make fmt
andrewqian2001datadog Jan 23, 2026
ecdb386
remove tag added during translation process
andrewqian2001datadog Jan 23, 2026
91254c7
disable tag when probalistic sampler isn't being used
andrewqian2001datadog Jan 23, 2026
43a889d
revert change
andrewqian2001datadog Jan 23, 2026
cd1d216
use apm config
andrewqian2001datadog Jan 23, 2026
67c2985
make check-clippy
andrewqian2001datadog Jan 23, 2026
d17c04f
address comments
andrewqian2001datadog Jan 23, 2026
633b43a
fix missing doc, fix bugs
andrewqian2001datadog Jan 23, 2026
9c20678
address comments part two
andrewqian2001datadog Jan 23, 2026
34f0114
remove dead code, move code only used in test
andrewqian2001datadog Jan 23, 2026
934411a
remove more dead code
andrewqian2001datadog Jan 23, 2026
fdc04fd
formatting
andrewqian2001datadog Jan 23, 2026
731883c
Merge remote-tracking branch 'origin/main' into andrewq/add-trace-sam…
andrewqian2001datadog Jan 23, 2026
2a77b75
move missing functionality to docs of trace_sampler
andrewqian2001datadog Jan 23, 2026
bce63b0
set probilistic sampler to false by default
andrewqian2001datadog Jan 23, 2026
9f9449e
make error_tps configurable
andrewqian2001datadog Jan 26, 2026
97e1701
turn off error sampling by default (the agent has this on by default)
andrewqian2001datadog Jan 26, 2026
6c2b039
make fmt
andrewqian2001datadog Jan 26, 2026
3aa3e31
oops remove print
andrewqian2001datadog Jan 26, 2026
f16ccf8
nvm change it back to true
andrewqian2001datadog Jan 26, 2026
8cde3f8
move constant to shared folder
andrewqian2001datadog Jan 26, 2026
4f07191
Merge remote-tracking branch 'origin/main' into andrewq/add-trace-sam…
andrewqian2001datadog Jan 26, 2026
96f28a9
add unit tests for errors sampler
andrewqian2001datadog Jan 27, 2026
7a8f540
make check clippy and fmt
andrewqian2001datadog Jan 27, 2026
5fce437
change probabilistic sampling to on by default
andrewqian2001datadog Jan 27, 2026
6b3dc7d
make fmt
andrewqian2001datadog Jan 27, 2026
be2ddd8
Merge remote-tracking branch 'origin/main' into andrewq/add-trace-sam…
andrewqian2001datadog Jan 27, 2026
c529497
make fmt
andrewqian2001datadog Jan 27, 2026
e37b5f5
change probabilistic back to false
andrewqian2001datadog Jan 27, 2026
e7ec247
add no priority sampler
andrewqian2001datadog Jan 27, 2026
7d0e8cc
????
andrewqian2001datadog Jan 28, 2026
0e7373a
remove testing code
andrewqian2001datadog Jan 28, 2026
e76278f
make fmt
andrewqian2001datadog Jan 28, 2026
0ee5cf2
move constants to shared file
andrewqian2001datadog Jan 28, 2026
61401fb
remove test covered by correctness test
andrewqian2001datadog Jan 28, 2026
24c0678
Merge remote-tracking branch 'origin/main' into andrewq/add-trace-sam…
andrewqian2001datadog Jan 28, 2026
b3d4856
add comment
andrewqian2001datadog Jan 28, 2026
8b28ef4
move duplicate code to shared folder
andrewqian2001datadog Jan 28, 2026
664c399
make fmt
andrewqian2001datadog Jan 28, 2026
7b9a38f
add comment
andrewqian2001datadog Jan 28, 2026
57c5208
remove unecessary code
andrewqian2001datadog Jan 28, 2026
e2794aa
remove potentially??? unecessary code
andrewqian2001datadog Jan 28, 2026
1186241
Merge remote-tracking branch 'origin/main' into andrewq/add-trace-sam…
andrewqian2001datadog Jan 28, 2026
b0ba6d8
Merge branch 'main' into andrewq/add-trace-sampling
andrewqian2001datadog Jan 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions bin/agent-data-plane/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use saluki_components::{
transforms::{
AggregateConfiguration, ApmStatsTransformConfiguration, ChainedConfiguration, DogstatsDMapperConfiguration,
DogstatsDPrefixFilterConfiguration, HostEnrichmentConfiguration, HostTagsConfiguration,
TraceSamplerConfiguration,
},
};
use saluki_config::{ConfigurationLoader, GenericConfiguration};
Expand Down Expand Up @@ -327,6 +328,7 @@ async fn add_baseline_traces_pipeline_to_blueprint(
.await?;
let dd_traces_enrich_config =
ChainedConfiguration::default().with_transform_builder("apm_onboarding", ApmOnboardingConfiguration);
let trace_sampler_config = TraceSamplerConfiguration::default();
let apm_stats_transform_config = ApmStatsTransformConfiguration::from_configuration(config)
.error_context("Failed to configure APM Stats transform.")?
.with_environment_provider(env_provider.clone())
Expand All @@ -338,11 +340,13 @@ async fn add_baseline_traces_pipeline_to_blueprint(

blueprint
.add_transform("traces_enrich", dd_traces_enrich_config)?
.add_transform("trace_sampler", trace_sampler_config)?
.add_transform("dd_apm_stats", apm_stats_transform_config)?
.add_encoder("dd_stats_encode", dd_apm_stats_encoder)?
.add_encoder("dd_traces_encode", dd_traces_config)?
.connect_component("dd_apm_stats", ["traces_enrich"])?
.connect_component("dd_traces_encode", ["traces_enrich"])?
.connect_component("trace_sampler", ["traces_enrich"])?
.connect_component("dd_apm_stats", ["trace_sampler"])?
.connect_component("dd_traces_encode", ["trace_sampler"])?
.connect_component("dd_stats_encode", ["dd_apm_stats"])?
.connect_component("dd_out", ["dd_traces_encode", "dd_stats_encode"])?;

Expand Down
20 changes: 17 additions & 3 deletions lib/saluki-components/src/common/otlp/traces/translator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use otlp_protos::opentelemetry::proto::resource::v1::Resource as OtlpResource;
use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans;
use saluki_common::collections::FastHashMap;
use saluki_context::tags::TagSet;
use saluki_core::data_model::event::trace::{Span as DdSpan, Trace};
use saluki_core::data_model::event::trace::{Span as DdSpan, Trace, TraceSampling};
use saluki_core::data_model::event::Event;

use crate::common::otlp::config::TracesConfig;
Expand Down Expand Up @@ -52,6 +52,7 @@ impl OtlpTracesTranslator {
let resource: OtlpResource = resource_spans.resource.unwrap_or_default();
let resource_tags: TagSet = resource_attributes_to_tagset(&resource.attributes);
let mut traces_by_id: FastHashMap<u64, Vec<DdSpan>> = FastHashMap::default();
let mut priorities_by_id: FastHashMap<u64, i32> = FastHashMap::default();
let ignore_missing_fields = self.config.ignore_missing_datadog_fields;

for scope_spans in resource_spans.scope_spans {
Expand All @@ -67,17 +68,30 @@ impl OtlpTracesTranslator {
ignore_missing_fields,
self.config.enable_otlp_compute_top_level_by_span_kind,
);

// Track last-seen priority for this trace (overwrites previous values)
if let Some(&priority) = dd_span.metrics().get("_sampling_priority_v1") {
priorities_by_id.insert(trace_id, priority as i32);
}

traces_by_id.entry(trace_id).or_default().push(dd_span);
}
}

traces_by_id
.into_iter()
.filter_map(|(_, spans)| {
.filter_map(|(trace_id, spans)| {
if spans.is_empty() {
None
} else {
Some(Event::Trace(Trace::new(spans, resource_tags.clone())))
let mut trace = Trace::new(spans, resource_tags.clone());

// Set the trace-level sampling priority if one was found
if let Some(&priority) = priorities_by_id.get(&trace_id) {
trace.set_sampling(Some(TraceSampling::new(false, Some(priority), None, None)));
}

Some(Event::Trace(trace))
}
})
.collect()
Expand Down
52 changes: 35 additions & 17 deletions lib/saluki-components/src/encoders/datadog/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container";
const MAX_TRACES_PER_PAYLOAD: usize = 10000;
static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");

// Sampling metadata keys / values.
const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr";
const TAG_DECISION_MAKER: &str = "_dd.p.dm";
const SAMPLING_PRIORITY_METRIC_KEY: &str = "_sampling_priority_v1";
const DEFAULT_CHUNK_PRIORITY: i32 = 1; // PRIORITY_AUTO_KEEP

fn default_serializer_compressor_kind() -> String {
"zstd".to_string()
}
Expand Down Expand Up @@ -547,28 +553,40 @@ impl TraceEndpointEncoder {
}

fn build_trace_chunk(&self, trace: &Trace) -> TraceChunk {
let mut spans: Vec<ProtoSpan> = trace.spans().iter().map(convert_span).collect();
let spans: Vec<ProtoSpan> = trace.spans().iter().map(convert_span).collect();
let mut chunk = TraceChunk::new();

let rate = self.sampling_rate();
let mut tags = std::collections::HashMap::new();
tags.insert("_dd.otlp_sr".to_string(), format!("{:.2}", rate));

// TODO: Remove this once we have sampling. We have to hardcode the priority to 1 for now so that intake does not drop the trace.
const PRIORITY_AUTO_KEEP: i32 = 1;
chunk.set_priority(PRIORITY_AUTO_KEEP);

// Set _dd.p.dm (decision maker)
// Only set if sampling priority is "keep" (which it is, since we set PRIORITY_AUTO_KEEP)
// Decision maker "-9" indicates probabilistic sampler made the decision
const DECISION_MAKER: &str = "-9";
if let Some(first_span) = spans.first_mut() {
let mut meta = first_span.take_meta();
meta.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
first_span.set_meta(meta);

// Use trace-level sampling metadata if available (set by the trace sampler transform).
// This provides explicit trace-level sampling information without needing to scan spans.
if let Some(sampling) = trace.sampling() {
// Set priority from trace metadata
chunk.set_priority(sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY));
chunk.set_droppedTrace(sampling.dropped_trace);

// Set decision maker tag if present
if let Some(dm) = &sampling.decision_maker {
tags.insert(TAG_DECISION_MAKER.to_string(), dm.to_string());
}

// Set OTLP sampling rate tag if present (from sampler)
if let Some(otlp_sr) = &sampling.otlp_sampling_rate {
tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), otlp_sr.to_string());
} else {
// Fallback to encoder's computed rate
let rate = self.sampling_rate();
tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), format!("{:.2}", rate));
}
} else {
// Fallback: if trace.sampling is None, use defaults
// (No span scanning per the plan's "no fallback scan" requirement)
chunk.set_priority(DEFAULT_CHUNK_PRIORITY);
chunk.set_droppedTrace(false);
let rate = self.sampling_rate();
tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), format!("{:.2}", rate));
}

tags.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string());
chunk.set_tags(tags);

chunk.set_spans(spans);
Expand Down
3 changes: 3 additions & 0 deletions lib/saluki-components/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ pub use self::dogstatsd_mapper::DogstatsDMapperConfiguration;
mod metric_router;
pub use self::metric_router::MetricRouterConfiguration;

mod trace_sampler;
pub use self::trace_sampler::TraceSamplerConfiguration;

mod apm_stats;
pub use self::apm_stats::ApmStatsTransformConfiguration;
Loading