diff --git a/bin/agent-data-plane/src/cli/run.rs b/bin/agent-data-plane/src/cli/run.rs index cc3f87f056..3a735bde76 100644 --- a/bin/agent-data-plane/src/cli/run.rs +++ b/bin/agent-data-plane/src/cli/run.rs @@ -25,7 +25,7 @@ use saluki_components::{ transforms::{ AggregateConfiguration, ApmStatsTransformConfiguration, ChainedConfiguration, DogstatsDMapperConfiguration, DogstatsDPrefixFilterConfiguration, HostEnrichmentConfiguration, HostTagsConfiguration, - TraceObfuscationConfiguration, + TraceObfuscationConfiguration, TraceSamplerConfiguration, }, }; use saluki_config::{ConfigurationLoader, GenericConfiguration}; @@ -330,6 +330,8 @@ async fn add_baseline_traces_pipeline_to_blueprint( let dd_traces_enrich_config = ChainedConfiguration::default() .with_transform_builder("apm_onboarding", ApmOnboardingConfiguration) .with_transform_builder("trace_obfuscation", trace_obfuscation_config); + let trace_sampler_config = TraceSamplerConfiguration::from_configuration(config) + .error_context("Failed to configure Trace Sampler transform.")?; let apm_stats_transform_config = ApmStatsTransformConfiguration::from_configuration(config) .error_context("Failed to configure APM Stats transform.")? .with_environment_provider(env_provider.clone()) @@ -341,11 +343,13 @@ async fn add_baseline_traces_pipeline_to_blueprint( blueprint .add_transform("traces_enrich", dd_traces_enrich_config)? + .add_transform("trace_sampler", trace_sampler_config)? .add_transform("dd_apm_stats", apm_stats_transform_config)? .add_encoder("dd_stats_encode", dd_apm_stats_encoder)? .add_encoder("dd_traces_encode", dd_traces_config)? - .connect_component("dd_apm_stats", ["traces_enrich"])? - .connect_component("dd_traces_encode", ["traces_enrich"])? + .connect_component("trace_sampler", ["traces_enrich"])? + .connect_component("dd_apm_stats", ["trace_sampler"])? + .connect_component("dd_traces_encode", ["trace_sampler"])? .connect_component("dd_stats_encode", ["dd_apm_stats"])? .connect_component("dd_out", ["dd_traces_encode", "dd_stats_encode"])?; diff --git a/lib/saluki-components/src/common/datadog/apm.rs b/lib/saluki-components/src/common/datadog/apm.rs index c1b4c8cd68..08c01c624f 100644 --- a/lib/saluki-components/src/common/datadog/apm.rs +++ b/lib/saluki-components/src/common/datadog/apm.rs @@ -12,6 +12,21 @@ const fn default_target_traces_per_second() -> f64 { const fn default_errors_per_second() -> f64 { 10.0 } +const fn default_sampling_percentage() -> f64 { + 100.0 +} + +const fn default_error_sampling_enabled() -> bool { + true +} + +const fn default_error_tracking_standalone_enabled() -> bool { + false +} + +const fn default_probabilistic_sampling_enabled() -> bool { + false +} const fn default_peer_tags_aggregation() -> bool { true } @@ -33,6 +48,56 @@ struct ApmConfiguration { apm_config: ApmConfig, } +#[derive(Clone, Debug, Deserialize)] +struct ProbabilisticSamplerConfig { + /// Enables probabilistic sampling. + /// + /// When enabled, the trace sampler keeps approximately `sampling_percentage` of traces using a + /// deterministic hash of the trace ID. + /// + /// Defaults to `false`. + #[serde(default = "default_probabilistic_sampling_enabled")] + enabled: bool, + + /// Sampling percentage (0-100). + /// + /// Determines the percentage of traces to keep. A value of 100 keeps all traces, + /// while 50 keeps approximately half. Values outside 0-100 are treated as 100. + /// + /// Defaults to 100.0 (keep all traces). + #[serde(default = "default_sampling_percentage")] + sampling_percentage: f64, +} + +impl Default for ProbabilisticSamplerConfig { + fn default() -> Self { + Self { + enabled: default_probabilistic_sampling_enabled(), + sampling_percentage: default_sampling_percentage(), + } + } +} + +#[derive(Clone, Debug, Deserialize)] +struct ErrorTrackingStandaloneConfig { + /// Enables Error Tracking Standalone mode. + /// + /// When enabled, error tracking standalone mode suppresses single-span sampling and analytics + /// events for dropped traces. + /// + /// Defaults to `false`. + #[serde(default = "default_error_tracking_standalone_enabled")] + enabled: bool, +} + +impl Default for ErrorTrackingStandaloneConfig { + fn default() -> Self { + Self { + enabled: default_error_tracking_standalone_enabled(), + } + } +} + #[derive(Clone, Debug, Deserialize)] pub struct ApmConfig { /// Target traces per second for priority sampling. @@ -47,6 +112,27 @@ pub struct ApmConfig { #[serde(default = "default_errors_per_second")] errors_per_second: f64, + /// Probabilistic sampler configuration. + /// + /// Defaults to enabled with `sampling_percentage` set to 100.0 (keep all traces). + #[serde(default)] + probabilistic_sampler: ProbabilisticSamplerConfig, + + /// Enable error sampling in the trace sampler. + /// + /// When enabled, traces containing errors will be kept even if they would be dropped by + /// probabilistic sampling. This ensures error visibility at low sampling rates. + /// + /// Defaults to `true`. + #[serde(default = "default_error_sampling_enabled")] + error_sampling_enabled: bool, + + /// Error Tracking Standalone configuration. + /// + /// Defaults to disabled. + #[serde(default)] + error_tracking_standalone: ErrorTrackingStandaloneConfig, + /// Enables an additional stats computation check on spans to see if they have an eligible `span.kind` (server, consumer, client, producer). /// If enabled, a span with an eligible `span.kind` will have stats computed. If disabled, only top-level and measured spans will have stats computed. /// @@ -100,6 +186,26 @@ impl ApmConfig { self.errors_per_second } + /// Returns if probabilistic sampling is enabled. + pub const fn probabilistic_sampler_enabled(&self) -> bool { + self.probabilistic_sampler.enabled + } + + /// Returns the probabilistic sampler sampling percentage. + pub const fn probabilistic_sampler_sampling_percentage(&self) -> f64 { + self.probabilistic_sampler.sampling_percentage + } + + /// Returns if error sampling is enabled. + pub const fn error_sampling_enabled(&self) -> bool { + self.error_sampling_enabled + } + + /// Returns if error tracking standalone mode is enabled. + pub const fn error_tracking_standalone_enabled(&self) -> bool { + self.error_tracking_standalone.enabled + } + /// Returns if stats computation by span kind is enabled. pub const fn compute_stats_by_span_kind(&self) -> bool { self.compute_stats_by_span_kind @@ -143,6 +249,9 @@ impl Default for ApmConfig { Self { target_traces_per_second: default_target_traces_per_second(), errors_per_second: default_errors_per_second(), + probabilistic_sampler: ProbabilisticSamplerConfig::default(), + error_sampling_enabled: default_error_sampling_enabled(), + error_tracking_standalone: ErrorTrackingStandaloneConfig::default(), compute_stats_by_span_kind: default_compute_stats_by_span_kind(), peer_tags_aggregation: default_peer_tags_aggregation(), peer_tags: Vec::new(), diff --git a/lib/saluki-components/src/common/datadog/mod.rs b/lib/saluki-components/src/common/datadog/mod.rs index ae35afcbc8..04d6a5d697 100644 --- a/lib/saluki-components/src/common/datadog/mod.rs +++ b/lib/saluki-components/src/common/datadog/mod.rs @@ -10,8 +10,40 @@ mod retry; pub mod telemetry; pub mod transaction; +/// Metric key used to store Datadog sampling priority (`_sampling_priority_v1`). +pub const SAMPLING_PRIORITY_METRIC_KEY: &str = "_sampling_priority_v1"; + /// Default compressed size limit for intake requests. pub const DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT: usize = 3_200_000; // 3 MiB /// Default uncompressed size limit for intake requests. pub const DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT: usize = 62_914_560; // 60 MiB + +/// Metadata tag used to store the sampling decision maker (`_dd.p.dm`). +pub const TAG_DECISION_MAKER: &str = "_dd.p.dm"; + +/// Decision maker value for probabilistic sampling (matches Datadog Agent). +pub const DECISION_MAKER_PROBABILISTIC: &str = "-9"; + +/// Metadata key used to store the OTEL trace id. +pub const OTEL_TRACE_ID_META_KEY: &str = "otel.trace_id"; + +/// Maximum trace id used for deterministic sampling. +pub const MAX_TRACE_ID: u64 = u64::MAX; + +/// Precomputed float form of `MAX_TRACE_ID`. +pub const MAX_TRACE_ID_FLOAT: f64 = MAX_TRACE_ID as f64; + +/// Hasher used for deterministic sampling. +pub const SAMPLER_HASHER: u64 = 1111111111111111111; + +/// Returns whether to keep a trace, based on its ID and a sampling rate. +/// +/// This assumes trace IDs are nearly uniformly distributed. +pub fn sample_by_rate(trace_id: u64, rate: f64) -> bool { + if rate < 1.0 { + trace_id.wrapping_mul(SAMPLER_HASHER) < (rate * MAX_TRACE_ID_FLOAT) as u64 + } else { + true + } +} diff --git a/lib/saluki-components/src/common/otlp/traces/transform.rs b/lib/saluki-components/src/common/otlp/traces/transform.rs index 2342091da0..887b5a3cb7 100644 --- a/lib/saluki-components/src/common/otlp/traces/transform.rs +++ b/lib/saluki-components/src/common/otlp/traces/transform.rs @@ -20,6 +20,7 @@ use serde_json::{Map as JsonMap, Value as JsonValue}; use stringtheory::MetaString; use tracing::error; +use crate::common::datadog::{OTEL_TRACE_ID_META_KEY, SAMPLING_PRIORITY_METRIC_KEY}; use crate::common::otlp::attributes::{get_int_attribute, HTTP_MAPPINGS}; use crate::common::otlp::traces::normalize::{normalize_service, normalize_tag_value}; use crate::common::otlp::traces::normalize::{truncate_utf8, MAX_RESOURCE_LEN}; @@ -29,7 +30,6 @@ use crate::common::otlp::util::{ DEPLOYMENT_ENVIRONMENT_KEY, KEY_DATADOG_CONTAINER_ID, KEY_DATADOG_ENVIRONMENT, KEY_DATADOG_VERSION, }; -pub(crate) const SAMPLING_PRIORITY_METRIC_KEY: &str = "_sampling_priority_v1"; const EVENT_EXTRACTION_METRIC_KEY: &str = "_dd1.sr.eausr"; const ANALYTICS_EVENT_KEY: &str = "analytics.event"; const HTTP_REQUEST_HEADER_PREFIX: &str = "http.request.header."; @@ -71,7 +71,6 @@ const NETWORK_PROTOCOL_NAME_KEY: &str = "network.protocol.name"; const HTTP_STATUS_CODE_KEY: &str = "http.status_code"; const HTTP_RESPONSE_STATUS_CODE_KEY: &str = "http.response.status_code"; const SPAN_KIND_META_KEY: &str = "span.kind"; -const OTEL_TRACE_ID_META_KEY: &str = "otel.trace_id"; const W3C_TRACESTATE_META_KEY: &str = "w3c.tracestate"; const OTEL_LIBRARY_NAME_META_KEY: &str = "otel.library.name"; const OTEL_LIBRARY_VERSION_META_KEY: &str = "otel.library.version"; diff --git a/lib/saluki-components/src/common/otlp/traces/translator.rs b/lib/saluki-components/src/common/otlp/traces/translator.rs index 445fc71dad..1116c20fce 100644 --- a/lib/saluki-components/src/common/otlp/traces/translator.rs +++ b/lib/saluki-components/src/common/otlp/traces/translator.rs @@ -3,9 +3,10 @@ use otlp_protos::opentelemetry::proto::resource::v1::Resource as OtlpResource; use otlp_protos::opentelemetry::proto::trace::v1::ResourceSpans; use saluki_common::collections::FastHashMap; use saluki_context::tags::TagSet; -use saluki_core::data_model::event::trace::{Span as DdSpan, Trace}; +use saluki_core::data_model::event::trace::{Span as DdSpan, Trace, TraceSampling}; use saluki_core::data_model::event::Event; +use crate::common::datadog::SAMPLING_PRIORITY_METRIC_KEY; use crate::common::otlp::config::TracesConfig; use crate::common::otlp::traces::transform::otel_span_to_dd_span; use crate::common::otlp::traces::transform::otlp_value_to_string; @@ -52,6 +53,7 @@ impl OtlpTracesTranslator { let resource: OtlpResource = resource_spans.resource.unwrap_or_default(); let resource_tags: TagSet = resource_attributes_to_tagset(&resource.attributes); let mut traces_by_id: FastHashMap> = FastHashMap::default(); + let mut priorities_by_id: FastHashMap = FastHashMap::default(); let ignore_missing_fields = self.config.ignore_missing_datadog_fields; for scope_spans in resource_spans.scope_spans { @@ -67,17 +69,30 @@ impl OtlpTracesTranslator { ignore_missing_fields, self.config.enable_otlp_compute_top_level_by_span_kind, ); + + // Track last-seen priority for this trace (overwrites previous values) + if let Some(&priority) = dd_span.metrics().get(SAMPLING_PRIORITY_METRIC_KEY) { + priorities_by_id.insert(trace_id, priority as i32); + } + traces_by_id.entry(trace_id).or_default().push(dd_span); } } traces_by_id .into_iter() - .filter_map(|(_, spans)| { + .filter_map(|(trace_id, spans)| { if spans.is_empty() { None } else { - Some(Event::Trace(Trace::new(spans, resource_tags.clone()))) + let mut trace = Trace::new(spans, resource_tags.clone()); + + // Set the trace-level sampling priority if one was found + if let Some(&priority) = priorities_by_id.get(&trace_id) { + trace.set_sampling(Some(TraceSampling::new(false, Some(priority), None, None))); + } + + Some(Event::Trace(trace)) } }) .collect() diff --git a/lib/saluki-components/src/encoders/datadog/traces/mod.rs b/lib/saluki-components/src/encoders/datadog/traces/mod.rs index 3b4665a468..bb94769bf5 100644 --- a/lib/saluki-components/src/encoders/datadog/traces/mod.rs +++ b/lib/saluki-components/src/encoders/datadog/traces/mod.rs @@ -49,7 +49,7 @@ use crate::common::datadog::{ io::RB_BUFFER_CHUNK_SIZE, request_builder::{EndpointEncoder, RequestBuilder}, telemetry::ComponentTelemetry, - DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, + DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, TAG_DECISION_MAKER, }; use crate::common::otlp::config::TracesConfig; use crate::common::otlp::util::{ @@ -62,6 +62,10 @@ const CONTAINER_TAGS_META_KEY: &str = "_dd.tags.container"; const MAX_TRACES_PER_PAYLOAD: usize = 10000; static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf"); +// Sampling metadata keys / values. +const TAG_OTLP_SAMPLING_RATE: &str = "_dd.otlp_sr"; +const DEFAULT_CHUNK_PRIORITY: i32 = 1; // PRIORITY_AUTO_KEEP + fn default_serializer_compressor_kind() -> String { "zstd".to_string() } @@ -547,28 +551,40 @@ impl TraceEndpointEncoder { } fn build_trace_chunk(&self, trace: &Trace) -> TraceChunk { - let mut spans: Vec = trace.spans().iter().map(convert_span).collect(); + let spans: Vec = trace.spans().iter().map(convert_span).collect(); let mut chunk = TraceChunk::new(); - let rate = self.sampling_rate(); let mut tags = std::collections::HashMap::new(); - tags.insert("_dd.otlp_sr".to_string(), format!("{:.2}", rate)); - - // TODO: Remove this once we have sampling. We have to hardcode the priority to 1 for now so that intake does not drop the trace. - const PRIORITY_AUTO_KEEP: i32 = 1; - chunk.set_priority(PRIORITY_AUTO_KEEP); - - // Set _dd.p.dm (decision maker) - // Only set if sampling priority is "keep" (which it is, since we set PRIORITY_AUTO_KEEP) - // Decision maker "-9" indicates probabilistic sampler made the decision - const DECISION_MAKER: &str = "-9"; - if let Some(first_span) = spans.first_mut() { - let mut meta = first_span.take_meta(); - meta.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string()); - first_span.set_meta(meta); + + // Use trace-level sampling metadata if available (set by the trace sampler transform). + // This provides explicit trace-level sampling information without needing to scan spans. + if let Some(sampling) = trace.sampling() { + // Set priority from trace metadata + chunk.set_priority(sampling.priority.unwrap_or(DEFAULT_CHUNK_PRIORITY)); + chunk.set_droppedTrace(sampling.dropped_trace); + + // Set decision maker tag if present. + if let Some(dm) = &sampling.decision_maker { + tags.insert(TAG_DECISION_MAKER.to_string(), dm.to_string()); + } + + // Set OTLP sampling rate tag if present (from sampler) + if let Some(otlp_sr) = &sampling.otlp_sampling_rate { + tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), otlp_sr.to_string()); + } else { + // Fallback to encoder's computed rate + let rate = self.sampling_rate(); + tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), format!("{:.2}", rate)); + } + } else { + // Fallback: if trace.sampling is None, use defaults + // (No span scanning per the plan's "no fallback scan" requirement) + chunk.set_priority(DEFAULT_CHUNK_PRIORITY); + chunk.set_droppedTrace(false); + let rate = self.sampling_rate(); + tags.insert(TAG_OTLP_SAMPLING_RATE.to_string(), format!("{:.2}", rate)); } - tags.insert("_dd.p.dm".to_string(), DECISION_MAKER.to_string()); chunk.set_tags(tags); chunk.set_spans(spans); diff --git a/lib/saluki-components/src/transforms/mod.rs b/lib/saluki-components/src/transforms/mod.rs index 11950bd46d..f3b1713236 100644 --- a/lib/saluki-components/src/transforms/mod.rs +++ b/lib/saluki-components/src/transforms/mod.rs @@ -21,6 +21,9 @@ pub use self::dogstatsd_mapper::DogstatsDMapperConfiguration; mod metric_router; pub use self::metric_router::MetricRouterConfiguration; +mod trace_sampler; +pub use self::trace_sampler::TraceSamplerConfiguration; + mod apm_stats; pub use self::apm_stats::ApmStatsTransformConfiguration; diff --git a/lib/saluki-components/src/transforms/trace_sampler/core_sampler.rs b/lib/saluki-components/src/transforms/trace_sampler/core_sampler.rs new file mode 100644 index 0000000000..26adbeb08c --- /dev/null +++ b/lib/saluki-components/src/transforms/trace_sampler/core_sampler.rs @@ -0,0 +1,244 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use saluki_common::collections::FastHashMap; +use saluki_common::hash::FastBuildHasher; + +use super::signature::Signature; + +const NUM_BUCKETS: usize = 6; +const BUCKET_DURATION: Duration = Duration::from_secs(5); +const MAX_RATE_INCREASE: f64 = 1.2; + +#[derive(Default)] +pub struct Sampler { + /// maps each Signature to a circular buffer of per-bucket (bucket_id) counts covering the last NUM_BUCKETS * BUCKET_DURATION window. + seen: FastHashMap, + + /// all_sigs_seen counts all signatures in a circular buffer of NUM_BUCKETS of BUCKET_DURATION + all_sigs_seen: [f32; NUM_BUCKETS], + + /// last_bucket_id is the index of the last bucket on which traces were counted + last_bucket_id: u64, + + /// rates maps sampling rate in % + rates: FastHashMap, + + /// lowest_rate is the lowest rate of all signatures + lowest_rate: f64, + + /// Maximum limit to the total number of traces per second to sample + target_tps: f64, + + /// extra_rate is an extra raw sampling rate to apply on top of the sampler rate + extra_rate: f64, +} + +// zeroAndGetMax zeroes expired buckets and returns the max count +// logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/sampler/coresampler.go#L185 +fn zero_and_get_max(buckets: &mut [f32; NUM_BUCKETS], previous_bucket: u64, new_bucket: u64) -> f32 { + // A bucket is a BUCKET_DURATION slice (5s) that stores the count of traces that fell in the interval. + // An intuitive understanding of the function is that we start just after previous_buckets and iterate for a full window of buckets (NUM_BUCKETS) + // and zero out any buckets older then new_buckets (expired), then we compute the max_count among the buckets that are in the current window + let mut max_bucket = 0.0; + for i in (previous_bucket + 1)..=previous_bucket + NUM_BUCKETS as u64 { + let index = i as usize % NUM_BUCKETS; + // if a complete rotation (time between previous_bucket and new_bucket is more then NUM_BUCKETS * BUCKET_DURATION) happened between previous_bucket and new_bucket + // all buckets will be zeroed + if i < new_bucket { + buckets[index] = 0.0; + continue; + } + let value = buckets[index]; + if value > max_bucket { + max_bucket = value; + } + // zeroing after taking in account the previous value of the bucket + // overridden by this rotation. This allows to take in account all buckets + if i == new_bucket { + buckets[index] = 0.0; + } + } + max_bucket +} + +// compute_tps_per_sig distributes TPS looking at the seen_tps of all signatures. +// By default it spreads uniformly the TPS on all signatures. If a signature +// is low volume and does not use all of its TPS, the remaining is spread uniformly +// on all other signatures. The returned sig_target is the final per_signature TPS target +// logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/sampler/coresampler.go#L167 +fn compute_tps_per_sig(target_tps: f64, seen_tps: &[f64]) -> f64 { + // Example: target_tps = 30, seen_tps = [5, 10, 100] → sorted stays [5, 10, 100], Initial sig_target = 30 / 3 = 10 + // Loop: + // 1) c = 5 (< 10), so subtract: target_tps = 30 - 5 = 25 + // Recompute sig_target = 25 / 2 = 12.5 + // 2) c = 10 (< 12.5), subtract: target_tps = 25 - 10 = 15 + // Recompute sig_target = 15 / 1 = 15 + // 3) Next is last element, break. + // Return sig_target = 15. + // Interpretation: the low‑volume signatures "use up" 5 and 10 TPS, and the remaining budget (15) is the per‑signature target for the higher‑volume signature(s). + + if seen_tps.is_empty() { + return 0.0; + } + let mut sorted: Vec = seen_tps.to_vec(); + sorted.sort_by(|a, b| a.total_cmp(b)); + // compute the initial per_signature TPS budget by splitting target_tps across all signatures. + let mut remaining_tps = target_tps; + let mut sig_target = remaining_tps / sorted.len() as f64; + + for (i, c) in sorted.iter().enumerate() { + if *c >= sig_target || i == sorted.len() - 1 { + break; + } + remaining_tps -= c; + sig_target = remaining_tps / (sorted.len() - i - 1) as f64; + } + sig_target +} + +impl Sampler { + pub fn new(extra_rate: f64, target_tps: f64) -> Sampler { + Self { + extra_rate, + target_tps, + ..Default::default() + } + } + + pub(super) fn count_weighted_sig(&mut self, now: SystemTime, signature: &Signature, n: f32) -> bool { + // All traces within the same `BUCKET_DURATION` interval share the same bucket_id + let bucket_id = now.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() / BUCKET_DURATION.as_secs(); + let prev_bucket_id = self.last_bucket_id; + self.last_bucket_id = bucket_id; + // If the bucket_id changed then the sliding window advanced and we need to recompute rates + let update_rate = prev_bucket_id != bucket_id; + if update_rate { + self.update_rates(prev_bucket_id, bucket_id); + } + + let buckets = self.seen.entry(*signature).or_insert([0 as f32; NUM_BUCKETS]); + self.all_sigs_seen[(bucket_id % (NUM_BUCKETS as u64)) as usize] += n; + buckets[(bucket_id % (NUM_BUCKETS as u64)) as usize] += n; + update_rate + } + + // update_rates distributes TPS on each signature and apply it to the moving + // max of seen buckets. + // Rates increase are bounded by 20% increases, it requires 13 evaluations (1.2**13 = 10.6) + // to increase a sampling rate by 10 fold in about 1min. + fn update_rates(&mut self, previous_bucket: u64, new_bucket: u64) { + let seen_len = self.seen.len(); + if seen_len == 0 { + return; + } + let mut rates: FastHashMap = + FastHashMap::with_capacity_and_hasher(seen_len, FastBuildHasher::default()); + // seen_tps is a vector of per-signature peak rates, we get the maximum bucket value (which represents the number/weight of traces in a BUCKET_DURATION interval) + // in the sliding window and convert that to traces per second. Each element is one TPS per signature. + let mut seen_tps_vec = Vec::with_capacity(seen_len); + let mut sigs = Vec::with_capacity(seen_len); + let mut sigs_to_remove = Vec::new(); + + for (sig, buckets) in self.seen.iter_mut() { + let max_bucket = zero_and_get_max(buckets, previous_bucket, new_bucket); + let seen_tps = max_bucket as f64 / BUCKET_DURATION.as_secs() as f64; + seen_tps_vec.push(seen_tps); + sigs.push(*sig); + } + zero_and_get_max(&mut self.all_sigs_seen, previous_bucket, new_bucket); + let tps_per_sig = compute_tps_per_sig(self.target_tps, &seen_tps_vec); + self.lowest_rate = 1.0; + + for (i, sig) in sigs.iter().enumerate() { + let seen_tps = seen_tps_vec[i]; + let mut rate = 1.0; + if tps_per_sig < seen_tps && seen_tps > 0.0 { + rate = tps_per_sig / seen_tps; + } + + // Cap increase rate to 20% + if let Some(prev_rate) = self.rates.get(sig) { + if *prev_rate != 0.0 && rate / prev_rate > MAX_RATE_INCREASE { + rate = prev_rate * MAX_RATE_INCREASE; + } + } + + // Ensure rate doesn't exceed 1.0 + if rate > 1.0 { + rate = 1.0; + } + + // No traffic on this signature, mark it for cleanup + if rate == 1.0 && seen_tps == 0.0 { + sigs_to_remove.push(*sig); + continue; + } + + // Update lowest rate + if rate < self.lowest_rate { + self.lowest_rate = rate; + } + + rates.insert(*sig, rate); + } + + // Clean up signatures with no traffic + for sig in sigs_to_remove { + self.seen.remove(&sig); + } + + self.rates = rates; + } + + /// Gets the sampling rate for a specific signature. + /// Returns the rate multiplied by the extra rate factor. + pub fn get_signature_sample_rate(&self, sig: &Signature) -> f64 { + self.rates + .get(sig) + .map(|rate| rate * self.extra_rate) + .unwrap_or_else(|| self.default_rate()) + } + + /// Gets all signature sample rates. + /// Returns a tuple of (rates map, default rate). + pub fn get_all_signature_sample_rates(&self) -> (FastHashMap, f64) { + let mut rates = FastHashMap::with_capacity_and_hasher(self.rates.len(), FastBuildHasher::default()); + for (sig, rate) in self.rates.iter() { + rates.insert(*sig, rate * self.extra_rate); + } + (rates, self.default_rate()) + } + + /// Computes the default rate for unknown signatures. + /// Based on the moving max of all signatures seen and the lowest stored rate. + fn default_rate(&self) -> f64 { + if self.target_tps == 0.0 { + return 0.0; + } + + let mut max_seen = 0.0_f32; + for &count in self.all_sigs_seen.iter() { + if count > max_seen { + max_seen = count; + } + } + + let seen_tps = max_seen as f64 / BUCKET_DURATION.as_secs() as f64; + let mut rate = 1.0; + + if self.target_tps < seen_tps && seen_tps > 0.0 { + rate = self.target_tps / seen_tps; + } + + if self.lowest_rate < rate && self.lowest_rate != 0.0 { + return self.lowest_rate; + } + + rate + } + + /// Returns the number of signatures being tracked. + pub fn size(&self) -> i64 { + self.seen.len() as i64 + } +} diff --git a/lib/saluki-components/src/transforms/trace_sampler/errors.rs b/lib/saluki-components/src/transforms/trace_sampler/errors.rs new file mode 100644 index 0000000000..805a741d7f --- /dev/null +++ b/lib/saluki-components/src/transforms/trace_sampler/errors.rs @@ -0,0 +1,265 @@ +//! Error sampling. +//! +//! The error sampler catches traces containing spans with errors, ensuring +//! error visibility even at low sampling rates. + +use std::time::SystemTime; + +use saluki_core::data_model::event::trace::Trace; + +use super::score_sampler::{ScoreSampler, ERRORS_RATE_KEY}; + +/// Error sampler for traces. +/// +/// Wraps a ScoreSampler configured specifically for error sampling. +/// This ensures traces with errors are caught even when the main sampler +/// would drop them. +pub(super) struct ErrorsSampler { + score_sampler: ScoreSampler, +} + +impl ErrorsSampler { + /// Create a new ErrorsSampler with the given configuration. + pub(super) fn new(error_tps: f64, extra_sample_rate: f64) -> Self { + let disabled = error_tps == 0.0; + Self { + score_sampler: ScoreSampler::new(ERRORS_RATE_KEY, disabled, error_tps, extra_sample_rate), + } + } + + /// This method should be called when a trace contains errors and needs to be + /// evaluated by the error sampler. + pub(super) fn sample_error(&mut self, now: SystemTime, trace: &mut Trace, root_idx: usize) -> bool { + // Use the score sampler to make the sampling decision + self.score_sampler.sample(now, trace, root_idx) + } +} + +#[cfg(test)] +mod tests { + // logic for these tests are taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/sampler/scoresampler_test.go#L23 + use std::time::{Duration, SystemTime}; + + use saluki_context::tags::TagSet; + use saluki_core::data_model::event::trace::{Span, Trace}; + use stringtheory::MetaString; + + use super::*; + use crate::transforms::trace_sampler::signature::{compute_signature_with_root_and_env, Signature}; + + const BUCKET_DURATION: Duration = Duration::from_secs(5); + + /// Create a test ErrorsSampler with the given TPS configuration. + fn get_test_errors_sampler(tps: f64) -> ErrorsSampler { + // No extra fixed sampling, no maximum TPS + ErrorsSampler::new(tps, 1.0) + } + + /// Create a test trace with deterministic IDs. + fn get_test_trace(trace_id: u64) -> (Trace, usize) { + // Root span + let root = Span::new( + MetaString::from("mcnulty"), + MetaString::from("GET /api"), + MetaString::from("resource"), + MetaString::from("web"), + trace_id, + 1, // span_id + 0, // parent_id + 42, // start + 1000000, // duration + 0, // error + ); + + // Child span + let child = Span::new( + MetaString::from("mcnulty"), + MetaString::from("SELECT * FROM users"), + MetaString::from("resource"), + MetaString::from("sql"), + trace_id, + 2, // span_id + 1, // parent_id + 100, // start + 200000, // duration + 0, // error + ); + + let trace = Trace::new(vec![root, child], TagSet::default()); + (trace, 0) // Root is at index 0 + } + + /// Create a test trace with error in root span. + fn get_test_trace_with_error(trace_id: u64) -> (Trace, usize) { + // Root span with error + let root = Span::new( + MetaString::from("mcnulty"), + MetaString::from("GET /api"), + MetaString::from("resource"), + MetaString::from("web"), + trace_id, + 1, // span_id + 0, // parent_id + 42, // start + 1000000, // duration + 1, // error = 1 + ); + + // Child span + let child = Span::new( + MetaString::from("mcnulty"), + MetaString::from("SELECT * FROM users"), + MetaString::from("resource"), + MetaString::from("sql"), + trace_id, + 2, // span_id + 1, // parent_id + 100, // start + 200000, // duration + 0, // error + ); + + let trace = Trace::new(vec![root, child], TagSet::default()); + (trace, 0) // Root is at index 0 + } + + #[test] + fn test_shrink() { + // Test that shrink preserves first signatures and collapses later ones. + // The shrink logic activates when size() >= SHRINK_CARDINALITY/2 (100). + // When it activates, it builds an allow-list from the current `rates` map. + // Since `shrink` runs before `count_weighted_sig` in the sample flow, we must + // advance time one iteration BEFORE hitting the threshold so that `update_rates` + // runs and populates `rates` with the first batch of signatures. + let mut sampler = get_test_errors_sampler(10.0); + let test_time = SystemTime::now(); + let shrink_cardinality = ScoreSampler::test_shrink_cardinality(); + let threshold = shrink_cardinality / 2; // 100 + + let mut sigs = Vec::new(); + // Generate 3*shrinkCardinality signatures with different services + for i in 0..(3 * shrink_cardinality) { + let (mut trace, root_idx) = get_test_trace(3); + let spans = trace.spans_mut(); + // modify the non root span to create unique signatures + spans[1] = spans[1] + .clone() + .with_service(MetaString::from(format!("service_{}", i + 1000))); + + let signature = compute_signature_with_root_and_env(&trace, root_idx); + sigs.push(signature); + + // Advance time at threshold-1 so update_rates runs BEFORE shrink activates. + // This populates rates with (threshold-1) signatures. + let sample_time = if i >= threshold - 1 { + test_time + BUCKET_DURATION + } else { + test_time + }; + sampler.sample_error(sample_time, &mut trace, root_idx); + } + + // Verify first (threshold-1) signatures are preserved (they're in the allow-list) + let threshold = shrink_cardinality / 2; + for (i, sig) in sigs.iter().enumerate().take(threshold - 1) { + assert_eq!( + *sig, + sampler.score_sampler.test_shrink(*sig), + "Signature at index {} should be preserved", + i + ); + } + + // Verify signatures from 2*shrinkCardinality onwards are shrunk + for (i, sig) in sigs + .iter() + .enumerate() + .skip(2 * shrink_cardinality) + .take(shrink_cardinality - 1) + { + let expected = Signature(sig.0 % (threshold as u64)); + assert_eq!( + expected, + sampler.score_sampler.test_shrink(*sig), + "Signature at index {} should be shrunk", + i + ); + } + + // Final size should be bounded by shrink_cardinality + let size = sampler.score_sampler.test_size(); + assert!( + size <= shrink_cardinality as i64, + "Size {} should be <= {}", + size, + shrink_cardinality + ); + } + + #[test] + fn test_disable() { + // Create a disabled sampler (TPS = 0) + let mut sampler = get_test_errors_sampler(0.0); + + // The sampler should never sample anything + for i in 0..100 { + let (mut trace, root_idx) = get_test_trace_with_error(i); + let sampled = sampler.sample_error(SystemTime::now(), &mut trace, root_idx); + assert!(!sampled, "Disabled sampler should never sample (iteration {})", i); + } + } + + #[test] + fn test_target_tps() { + // Test the effectiveness of the targetTPS option + // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/sampler/scoresampler_test.go#L102 + let target_tps = 10.0; + let mut sampler = get_test_errors_sampler(target_tps); + + let generated_tps = 200.0; + let init_periods = 2; + let periods = 10; + + let period_seconds = BUCKET_DURATION.as_secs() as f64; + let traces_per_period = (generated_tps * period_seconds) as usize; + + let mut sampled_count = 0; + let mut test_time = SystemTime::now(); + + for period in 0..(init_periods + periods) { + test_time += BUCKET_DURATION; + + for i in 0..traces_per_period { + let (mut trace, root_idx) = get_test_trace_with_error((period * traces_per_period + i) as u64); + let sampled = sampler.sample_error(test_time, &mut trace, root_idx); + + // Once we got into the stable regime, count the samples + if period >= init_periods && sampled { + sampled_count += 1; + } + } + } + + // We should keep approximately the right percentage of traces + let expected_ratio = target_tps / generated_tps; + let actual_ratio = sampled_count as f64 / (traces_per_period as f64 * periods as f64); + + assert!( + (actual_ratio - expected_ratio).abs() / expected_ratio < 0.2, + "Expected ratio {:.4}, got {:.4} (sampled {} out of {})", + expected_ratio, + actual_ratio, + sampled_count, + traces_per_period * periods + ); + + // We should have a throughput of sampled traces around targetTPS + let actual_tps = sampled_count as f64 / (periods as f64 * BUCKET_DURATION.as_secs() as f64); + assert!( + (actual_tps - target_tps).abs() / target_tps < 0.2, + "Expected TPS {:.2}, got {:.2}", + target_tps, + actual_tps + ); + } +} diff --git a/lib/saluki-components/src/transforms/trace_sampler/mod.rs b/lib/saluki-components/src/transforms/trace_sampler/mod.rs new file mode 100644 index 0000000000..4e7d1edc7f --- /dev/null +++ b/lib/saluki-components/src/transforms/trace_sampler/mod.rs @@ -0,0 +1,928 @@ +//! Trace sampling transform. +//! +//! This transform implements agent-side head sampling for traces, supporting: +//! - Probabilistic sampling based on trace ID +//! - User-set priority preservation +//! - Error-based sampling as a safety net +//! - OTLP trace ingestion with proper sampling decision handling +//! +//! # Missing +//! +//! add trace metrics: datadog-agent/pkg/trace/sampler/metrics.go +//! adding missing samplers (priority, nopriority, rare) +//! add error tracking standalone mode + +use async_trait::async_trait; +use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; +use saluki_common::collections::FastHashMap; +use saluki_config::GenericConfiguration; +use saluki_core::{ + components::{transforms::*, ComponentContext}, + data_model::event::{ + trace::{Span, Trace, TraceSampling}, + Event, EventType, + }, + topology::OutputDefinition, +}; +use saluki_error::GenericError; +use stringtheory::MetaString; +use tokio::select; +use tracing::debug; + +mod core_sampler; +mod errors; +mod probabilistic; +mod score_sampler; +mod signature; + +use self::probabilistic::PROB_RATE_KEY; +use crate::common::datadog::{ + apm::ApmConfig, sample_by_rate, DECISION_MAKER_PROBABILISTIC, OTEL_TRACE_ID_META_KEY, SAMPLING_PRIORITY_METRIC_KEY, + TAG_DECISION_MAKER, +}; +use crate::common::otlp::config::TracesConfig; + +// Sampling priority constants (matching datadog-agent) +const PRIORITY_AUTO_DROP: i32 = 0; +const PRIORITY_AUTO_KEEP: i32 = 1; +const PRIORITY_USER_KEEP: i32 = 2; + +const ERROR_SAMPLE_RATE: f64 = 1.0; // Default extra sample rate (matches agent's ExtraSampleRate) + +// Single Span Sampling and Analytics Events keys +const KEY_SPAN_SAMPLING_MECHANISM: &str = "_dd.span_sampling.mechanism"; +const KEY_ANALYZED_SPANS: &str = "_dd.analyzed"; + +// Decision maker values for `_dd.p.dm` (matching datadog-agent). +const DECISION_MAKER_MANUAL_PRIORITY: &str = "-4"; + +fn normalize_sampling_rate(rate: f64) -> f64 { + if rate <= 0.0 || rate >= 1.0 { + 1.0 + } else { + rate + } +} + +/// Configuration for the trace sampler transform. +#[derive(Debug)] +pub struct TraceSamplerConfiguration { + apm_config: ApmConfig, + otlp_sampling_rate: f64, +} + +impl TraceSamplerConfiguration { + /// Creates a new `TraceSamplerConfiguration` from the given configuration. + pub fn from_configuration(config: &GenericConfiguration) -> Result { + let apm_config = ApmConfig::from_configuration(config)?; + let otlp_traces: TracesConfig = config.try_get_typed("otlp_config.traces")?.unwrap_or_default(); + let otlp_sampling_rate = normalize_sampling_rate(otlp_traces.probabilistic_sampler.sampling_percentage / 100.0); + Ok(Self { + apm_config, + otlp_sampling_rate, + }) + } +} + +#[async_trait] +impl TransformBuilder for TraceSamplerConfiguration { + fn input_event_type(&self) -> EventType { + EventType::Trace + } + + fn outputs(&self) -> &[OutputDefinition] { + static OUTPUTS: &[OutputDefinition] = &[OutputDefinition::default_output(EventType::Trace)]; + OUTPUTS + } + + async fn build(&self, _context: ComponentContext) -> Result, GenericError> { + let sampler = TraceSampler { + sampling_rate: self.apm_config.probabilistic_sampler_sampling_percentage() / 100.0, + error_sampling_enabled: self.apm_config.error_sampling_enabled(), + error_tracking_standalone: self.apm_config.error_tracking_standalone_enabled(), + probabilistic_sampler_enabled: self.apm_config.probabilistic_sampler_enabled(), + otlp_sampling_rate: self.otlp_sampling_rate, + error_sampler: errors::ErrorsSampler::new(self.apm_config.errors_per_second(), ERROR_SAMPLE_RATE), + no_priority_sampler: score_sampler::NoPrioritySampler::new( + self.apm_config.target_traces_per_second(), + ERROR_SAMPLE_RATE, + ), + }; + + Ok(Box::new(sampler)) + } +} + +impl MemoryBounds for TraceSamplerConfiguration { + fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { + builder.minimum().with_single_value::("component struct"); + } +} + +pub struct TraceSampler { + sampling_rate: f64, + error_tracking_standalone: bool, + error_sampling_enabled: bool, + probabilistic_sampler_enabled: bool, + otlp_sampling_rate: f64, + error_sampler: errors::ErrorsSampler, + no_priority_sampler: score_sampler::NoPrioritySampler, +} + +impl TraceSampler { + // TODO: merge this with the other duplicate "find root span of trace" functions + /// Find the root span index of a trace. + fn get_root_span_index(&self, trace: &Trace) -> Option { + // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/traceutil/trace.go#L36 + let spans = trace.spans(); + if spans.is_empty() { + return None; + } + let length = spans.len(); + // General case: go over all spans and check for one without a matching parent. + // This intentionally mirrors `datadog-agent/pkg/trace/traceutil/trace.go:GetRoot`: + // - Fast-path: return the last span with `parent_id == 0` (some clients report the root last) + // - Otherwise: build a map of `parent_id -> child_span_index`, delete entries whose parent + // exists in the trace, and pick any remaining "orphan" child span. + let mut parent_id_to_child: FastHashMap = FastHashMap::default(); + + for i in 0..length { + // Common case optimization: check for span with parent_id == 0, starting from the end, + // since some clients report the root last. + let j = length - 1 - i; + if spans[j].parent_id() == 0 { + return Some(j); + } + parent_id_to_child.insert(spans[j].parent_id(), j); + } + + for span in spans.iter() { + parent_id_to_child.remove(&span.span_id()); + } + + // Here, if the trace is valid, we should have `len(parent_id_to_child) == 1`. + if parent_id_to_child.len() != 1 { + debug!( + "Didn't reliably find the root span for traceID:{}", + &spans[0].trace_id() + ); + } + + // Have a safe behavior if that's not the case. + // Pick a random span without its parent. + if let Some((_, child_idx)) = parent_id_to_child.iter().next() { + return Some(*child_idx); + } + + // Gracefully fail with the last span of the trace. + Some(length - 1) + } + + /// Check for user-set sampling priority in trace + fn get_user_priority(&self, trace: &Trace, root_span_idx: usize) -> Option { + // First check trace-level sampling priority (last-seen priority from OTLP ingest) + if let Some(sampling) = trace.sampling() { + if let Some(priority) = sampling.priority { + return Some(priority); + } + } + + if trace.spans().is_empty() { + return None; + } + + // Fall back to checking spans (for compatibility with non-OTLP traces) + // Prefer the root span (common case), but fall back to scanning all spans to be robust to ordering. + if let Some(root) = trace.spans().get(root_span_idx) { + if let Some(&p) = root.metrics().get(SAMPLING_PRIORITY_METRIC_KEY) { + return Some(p as i32); + } + } + let spans = trace.spans(); + spans + .iter() + .find_map(|span| span.metrics().get(SAMPLING_PRIORITY_METRIC_KEY).map(|&p| p as i32)) + } + + /// Returns `true` if the given trace ID should be probabilistically sampled. + fn sample_probabilistic(&self, trace_id: u64) -> bool { + probabilistic::ProbabilisticSampler::sample(trace_id, self.sampling_rate) + } + + fn is_otlp_trace(&self, trace: &Trace, root_span_idx: usize) -> bool { + trace + .spans() + .get(root_span_idx) + .map(|span| { + span.meta() + .contains_key(&MetaString::from_static(OTEL_TRACE_ID_META_KEY)) + }) + .unwrap_or(false) + } + + /// Returns `true` if the trace contains a span with an error. + fn trace_contains_error(&self, trace: &Trace, consider_exception_span_events: bool) -> bool { + trace.spans().iter().any(|span| { + span.error() != 0 || (consider_exception_span_events && self.span_contains_exception_span_event(span)) + }) + } + + /// Returns `true` if the span has exception span events. + /// + /// This checks for the `_dd.span_events.has_exception` meta field set to `"true"`. + fn span_contains_exception_span_event(&self, span: &Span) -> bool { + if let Some(has_exception) = span.meta().get("_dd.span_events.has_exception") { + return has_exception == "true"; + } + false + } + + /// Returns all spans from the given trace that have Single Span Sampling tags present. + fn get_single_span_sampled_spans(&self, trace: &Trace) -> Vec { + let mut sampled_spans = Vec::new(); + for span in trace.spans().iter() { + if span.metrics().contains_key(KEY_SPAN_SAMPLING_MECHANISM) { + sampled_spans.push(span.clone()); + } + } + sampled_spans + } + + /// Returns all spans from the given trace that have Single Span Sampling tags present. + fn get_analyzed_spans(&self, trace: &Trace) -> Vec { + let mut analyzed_spans = Vec::new(); + for span in trace.spans().iter() { + if span.metrics().contains_key(KEY_ANALYZED_SPANS) { + // Keep spans that have the analyzed tag + analyzed_spans.push(span.clone()); + } + } + analyzed_spans + } + + /// Returns `true` if the given trace has any analyzed spans. + fn has_analyzed_spans(&self, trace: &Trace) -> bool { + trace + .spans() + .iter() + .any(|span| span.metrics().contains_key(KEY_ANALYZED_SPANS)) + } + + /// Apply Single Span Sampling to the trace + /// Returns true if the trace was modified + fn single_span_sampling(&self, trace: &mut Trace) -> bool { + let ss_spans = self.get_single_span_sampled_spans(trace); + if !ss_spans.is_empty() { + // Span sampling has kept some spans -> update the trace + trace.set_spans(ss_spans); + // Set high priority and mark as kept + let sampling = TraceSampling::new( + false, + Some(PRIORITY_USER_KEEP), + None, // No decision maker for SSS + Some(MetaString::from(format!("{:.2}", self.sampling_rate))), + ); + trace.set_sampling(Some(sampling)); + true + } else { + false + } + } + + /// Evaluates the given trace against all configured samplers. + /// + /// Return a tuple containing whether or not the trace should be kept, the decision maker tag (which sampler is responsible), + /// and the index of the root span used for evaluation. + fn run_samplers(&mut self, trace: &mut Trace) -> (bool, i32, &'static str, Option) { + // logic taken from: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L1066 + let now = std::time::SystemTime::now(); + // Empty trace check + if trace.spans().is_empty() { + return (false, PRIORITY_AUTO_DROP, "", None); + } + let contains_error = self.trace_contains_error(trace, false); + let Some(root_span_idx) = self.get_root_span_index(trace) else { + return (false, PRIORITY_AUTO_DROP, "", None); + }; + + // Modern path: ProbabilisticSamplerEnabled = true + if self.probabilistic_sampler_enabled { + let mut prob_keep = false; + let mut decision_maker = ""; + + // Run probabilistic sampler - use root span's trace ID + let root_trace_id = trace.spans()[root_span_idx].trace_id(); + if self.sample_probabilistic(root_trace_id) { + decision_maker = DECISION_MAKER_PROBABILISTIC; // probabilistic sampling + prob_keep = true; + + if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) { + let metrics = root_span.metrics_mut(); + metrics.insert(MetaString::from(PROB_RATE_KEY), self.sampling_rate); + } + } else if self.error_sampling_enabled && contains_error { + prob_keep = self.error_sampler.sample_error(now, trace, root_span_idx); + } + + let priority = if prob_keep { + PRIORITY_AUTO_KEEP + } else { + PRIORITY_AUTO_DROP + }; + + return (prob_keep, priority, decision_maker, Some(root_span_idx)); + } + + let user_priority = self.get_user_priority(trace, root_span_idx); + if let Some(priority) = user_priority { + if priority > 0 { + // User wants to keep this trace + return (true, priority, DECISION_MAKER_MANUAL_PRIORITY, Some(root_span_idx)); + } + } else if self.is_otlp_trace(trace, root_span_idx) { + // some sampling happens upstream in the otlp receiver in the agent: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go#L572 + let root_trace_id = trace.spans()[root_span_idx].trace_id(); + if sample_by_rate(root_trace_id, self.otlp_sampling_rate) { + if let Some(root_span) = trace.spans_mut().get_mut(root_span_idx) { + root_span.metrics_mut().remove(PROB_RATE_KEY); + } + return ( + true, + PRIORITY_AUTO_KEEP, + DECISION_MAKER_PROBABILISTIC, + Some(root_span_idx), + ); + } + } else if self.no_priority_sampler.sample(now, trace, root_span_idx) { + return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx)); + } + + if self.error_sampling_enabled && self.trace_contains_error(trace, false) { + let keep = self.error_sampler.sample_error(now, trace, root_span_idx); + if keep { + return (true, PRIORITY_AUTO_KEEP, "", Some(root_span_idx)); + } + } + + // Default: drop the trace + (false, PRIORITY_AUTO_DROP, "", Some(root_span_idx)) + } + + /// Apply sampling metadata to the trace in-place. + /// + /// The `root_span_id` parameter identifies which span should receive the sampling metadata. + /// This avoids recalculating the root span since it was already found in `run_samplers`. + fn apply_sampling_metadata( + &self, trace: &mut Trace, keep: bool, priority: i32, decision_maker: &str, root_span_idx: usize, + ) { + let is_otlp = self.is_otlp_trace(trace, root_span_idx); + let root_span_value = match trace.spans_mut().get_mut(root_span_idx) { + Some(span) => span, + None => return, + }; + + // Add tag for the decision maker + let meta = root_span_value.meta_mut(); + if priority > 0 && !decision_maker.is_empty() { + meta.insert(MetaString::from(TAG_DECISION_MAKER), MetaString::from(decision_maker)); + } + + // Now we can use trace again to set sampling metadata + let sampling_rate = if is_otlp { + self.otlp_sampling_rate + } else { + self.sampling_rate + }; + let sampling = TraceSampling::new( + !keep, + Some(priority), + if priority > 0 && !decision_maker.is_empty() { + Some(MetaString::from(decision_maker)) + } else { + None + }, + Some(MetaString::from(format!("{:.2}", sampling_rate))), + ); + trace.set_sampling(Some(sampling)); + } +} + +#[async_trait] +impl Transform for TraceSampler { + async fn run(mut self: Box, mut context: TransformContext) -> Result<(), GenericError> { + let mut health = context.take_health_handle(); + health.mark_ready(); + + debug!("Trace sampler transform started."); + + loop { + select! { + _ = health.live() => continue, + maybe_events = context.events().next() => match maybe_events { + Some(events) => { + for event in events { + match event { + Event::Trace(mut trace) => { + // keep is a boolean that indicates if the trace should be kept or dropped + // priority is the sampling priority + // decision_maker is the tag that indicates the decision maker (probabilistic, error, etc.) + // root_span_idx is the index of the root span of the trace + let (keep, priority, decision_maker, root_span_idx) = self.run_samplers(&mut trace); + if keep { + if let Some(root_idx) = root_span_idx { + self.apply_sampling_metadata( + &mut trace, + keep, + priority, + decision_maker, + root_idx, + ); + } + + // Send the trace to the next component + let mut dispatcher = context + .dispatcher() + .buffered() + .expect("default output should always exist"); + + dispatcher.push(Event::Trace(trace)).await?; + dispatcher.flush().await?; + } else if !self.error_tracking_standalone { + // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/agent/agent.go#L980-L990 + + // try single span sampling (keeps spans marked for sampling when trace would be dropped) + let modified = self.single_span_sampling(&mut trace); + if !modified { + // Fall back to analytics events if no SSS spans + let analyzed_spans = self.get_analyzed_spans(&trace); + if !analyzed_spans.is_empty() { + // Replace trace spans with analyzed events + trace.set_spans(analyzed_spans); + // Mark trace as kept with high priority + let sampling = TraceSampling::new( + false, + Some(PRIORITY_USER_KEEP), + None, + Some(MetaString::from(format!("{:.2}", self.sampling_rate))), + ); + trace.set_sampling(Some(sampling)); + + // Send the modified trace downstream + let mut dispatcher = context + .dispatcher() + .buffered() + .expect("default output should always exist"); + dispatcher.push(Event::Trace(trace)).await?; + dispatcher.flush().await?; + continue; // Skip to next event + } + } else if self.has_analyzed_spans(&trace) { + // Warn about both SSS and analytics events + debug!("Detected both analytics events AND single span sampling in the same trace. Single span sampling wins because App Analytics is deprecated."); + + // Send the SSS-modified trace downstream + let mut dispatcher = context + .dispatcher() + .buffered() + .expect("default output should always exist"); + dispatcher.push(Event::Trace(trace)).await?; + dispatcher.flush().await?; + continue; // Skip to next event + } + + // If we modified the trace with SSS, send it + if modified { + let mut dispatcher = context + .dispatcher() + .buffered() + .expect("default output should always exist"); + dispatcher.push(Event::Trace(trace)).await?; + dispatcher.flush().await?; + } else { + // Neither SSS nor analytics events found, drop the trace + debug!("Dropping trace with priority {}", priority); + } + } + } + other => { + // Pass through non-trace events + let mut dispatcher = context + .dispatcher() + .buffered() + .expect("default output should always exist"); + dispatcher.push(other).await?; + dispatcher.flush().await?; + } + } + } + } + None => { + debug!("Event stream terminated, shutting down trace sampler transform"); + break; + } + } + } + } + + debug!("Trace sampler transform stopped."); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use saluki_context::tags::TagSet; + use saluki_core::data_model::event::trace::{Span as DdSpan, Trace}; + + use super::*; + const PRIORITY_USER_DROP: i32 = -1; + + fn create_test_sampler() -> TraceSampler { + TraceSampler { + sampling_rate: 1.0, + error_sampling_enabled: true, + error_tracking_standalone: false, + probabilistic_sampler_enabled: true, + otlp_sampling_rate: 1.0, + error_sampler: errors::ErrorsSampler::new(10.0, 1.0), + no_priority_sampler: score_sampler::NoPrioritySampler::new(10.0, 1.0), + } + } + + fn create_test_span(trace_id: u64, span_id: u64, error: i32) -> DdSpan { + DdSpan::new( + MetaString::from("test-service"), + MetaString::from("test-operation"), + MetaString::from("test-resource"), + MetaString::from("test-type"), + trace_id, + span_id, + 0, // parent_id + 0, // start + 1000, // duration + error, + ) + } + + fn create_test_span_with_metrics(trace_id: u64, span_id: u64, metrics: HashMap) -> DdSpan { + let mut metrics_map = saluki_common::collections::FastHashMap::default(); + for (k, v) in metrics { + metrics_map.insert(MetaString::from(k), v); + } + create_test_span(trace_id, span_id, 0).with_metrics(metrics_map) + } + + #[allow(dead_code)] + fn create_test_span_with_meta(trace_id: u64, span_id: u64, meta: HashMap) -> DdSpan { + let mut meta_map = saluki_common::collections::FastHashMap::default(); + for (k, v) in meta { + meta_map.insert(MetaString::from(k), MetaString::from(v)); + } + create_test_span(trace_id, span_id, 0).with_meta(meta_map) + } + + fn create_test_trace(spans: Vec) -> Trace { + let tags = TagSet::default(); + Trace::new(spans, tags) + } + + #[test] + fn test_user_priority_detection() { + let sampler = create_test_sampler(); + + // Test trace with user-set priority = 2 (UserKeep) + let mut metrics = HashMap::new(); + metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0); + let span = create_test_span_with_metrics(12345, 1, metrics); + let trace = create_test_trace(vec![span]); + let root_idx = sampler.get_root_span_index(&trace).unwrap(); + + assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2)); + + // Test trace with user-set priority = -1 (UserDrop) + let mut metrics = HashMap::new(); + metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), -1.0); + let span = create_test_span_with_metrics(12345, 1, metrics); + let trace = create_test_trace(vec![span]); + let root_idx = sampler.get_root_span_index(&trace).unwrap(); + + assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(-1)); + + // Test trace without user priority + let span = create_test_span(12345, 1, 0); + let trace = create_test_trace(vec![span]); + let root_idx = sampler.get_root_span_index(&trace).unwrap(); + + assert_eq!(sampler.get_user_priority(&trace, root_idx), None); + } + + #[test] + fn test_trace_level_priority_takes_precedence() { + let sampler = create_test_sampler(); + + // Test trace-level priority overrides span priorities (last-seen priority) + // Create spans with different priorities - root has 0, later span has 2 + let mut metrics_root = HashMap::new(); + metrics_root.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 0.0); + let root_span = create_test_span_with_metrics(12345, 1, metrics_root); + + let mut metrics_later = HashMap::new(); + metrics_later.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 1.0); + let later_span = create_test_span_with_metrics(12345, 2, metrics_later).with_parent_id(1); + + let mut trace = create_test_trace(vec![root_span, later_span]); + let root_idx = sampler.get_root_span_index(&trace).unwrap(); + + // Without trace-level priority, should get priority from root (0) + assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(0)); + + // Now set trace-level priority to 2 (simulating last-seen priority from OTLP translator) + trace.set_sampling(Some(TraceSampling::new(false, Some(2), None, None))); + + // Trace-level priority should take precedence + assert_eq!(sampler.get_user_priority(&trace, root_idx), Some(2)); + + // Test that trace-level priority is used even when no span has priority + let span_no_priority = create_test_span(12345, 3, 0); + let mut trace_only_trace_level = create_test_trace(vec![span_no_priority]); + trace_only_trace_level.set_sampling(Some(TraceSampling::new(false, Some(1), None, None))); + let root_idx = sampler.get_root_span_index(&trace_only_trace_level).unwrap(); + + assert_eq!(sampler.get_user_priority(&trace_only_trace_level, root_idx), Some(1)); + } + + #[test] + fn test_manual_keep_with_trace_level_priority() { + let mut sampler = create_test_sampler(); + sampler.probabilistic_sampler_enabled = false; // Use legacy path that checks user priority + + // Test that manual keep (priority = 2) works via trace-level priority + let span = create_test_span(12345, 1, 0); + let mut trace = create_test_trace(vec![span]); + trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_KEEP), None, None))); + + let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace); + assert!(keep); + assert_eq!(priority, PRIORITY_USER_KEEP); + assert_eq!(decision_maker, DECISION_MAKER_MANUAL_PRIORITY); + + // Test manual drop (priority = -1) via trace-level priority + let span = create_test_span(12345, 1, 0); + let mut trace = create_test_trace(vec![span]); + trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_USER_DROP), None, None))); + + let (keep, priority, _, _) = sampler.run_samplers(&mut trace); + assert!(!keep); // Should not keep when user drops + assert_eq!(priority, PRIORITY_AUTO_DROP); // Fallthrough to auto-drop + + // Test that priority = 1 (auto keep) via trace-level is also respected + let span = create_test_span(12345, 1, 0); + let mut trace = create_test_trace(vec![span]); + trace.set_sampling(Some(TraceSampling::new(false, Some(PRIORITY_AUTO_KEEP), None, None))); + + let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace); + assert!(keep); + assert_eq!(priority, PRIORITY_AUTO_KEEP); + assert_eq!(decision_maker, DECISION_MAKER_MANUAL_PRIORITY); + } + + #[test] + fn test_probabilistic_sampling_determinism() { + let sampler = create_test_sampler(); + + // Same trace ID should always produce same decision + let trace_id = 0x1234567890ABCDEF_u64; + let result1 = sampler.sample_probabilistic(trace_id); + let result2 = sampler.sample_probabilistic(trace_id); + assert_eq!(result1, result2); + } + + #[test] + fn test_error_detection() { + let sampler = create_test_sampler(); + + // Test trace with error field set + let span_with_error = create_test_span(12345, 1, 1); + let trace = create_test_trace(vec![span_with_error]); + assert!(sampler.trace_contains_error(&trace, false)); + + // Test trace without error + let span_without_error = create_test_span(12345, 1, 0); + let trace = create_test_trace(vec![span_without_error]); + assert!(!sampler.trace_contains_error(&trace, false)); + } + + #[test] + fn test_sampling_priority_order() { + // Test modern path: error sampler overrides probabilistic drop + let mut sampler = create_test_sampler(); + sampler.sampling_rate = 0.5; // 50% sampling rate + sampler.probabilistic_sampler_enabled = true; + + // Create trace with error that would be dropped by probabilistic + // Using a trace ID that we know will be dropped at 50% rate + let span_with_error = create_test_span(u64::MAX - 1, 1, 1); + let mut trace = create_test_trace(vec![span_with_error]); + + let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace); + assert!(keep); + assert_eq!(priority, PRIORITY_AUTO_KEEP); + assert_eq!(decision_maker, ""); // Error sampler doesn't set decision_maker + + // Test legacy path: user priority is respected + let mut sampler = create_test_sampler(); + sampler.probabilistic_sampler_enabled = false; // Use legacy path + + let mut metrics = HashMap::new(); + metrics.insert(SAMPLING_PRIORITY_METRIC_KEY.to_string(), 2.0); + let span = create_test_span_with_metrics(12345, 1, metrics); + let mut trace = create_test_trace(vec![span]); + + let (keep, priority, decision_maker, _) = sampler.run_samplers(&mut trace); + assert!(keep); + assert_eq!(priority, 2); // UserKeep + assert_eq!(decision_maker, DECISION_MAKER_MANUAL_PRIORITY); // manual decision + } + + #[test] + fn test_empty_trace_handling() { + let mut sampler = create_test_sampler(); + let mut trace = create_test_trace(vec![]); + + let (keep, priority, _, _) = sampler.run_samplers(&mut trace); + assert!(!keep); + assert_eq!(priority, PRIORITY_AUTO_DROP); + } + + #[test] + fn test_root_span_detection() { + let sampler = create_test_sampler(); + + // Test 1: Root span with parent_id = 0 (common case) + let root_span = DdSpan::new( + MetaString::from("service"), + MetaString::from("operation"), + MetaString::from("resource"), + MetaString::from("type"), + 12345, + 1, + 0, // parent_id = 0 indicates root + 0, + 1000, + 0, + ); + let child_span = DdSpan::new( + MetaString::from("service"), + MetaString::from("child_op"), + MetaString::from("resource"), + MetaString::from("type"), + 12345, + 2, + 1, // parent_id = 1 (points to root) + 100, + 500, + 0, + ); + // Put root span second to test that we find it even when not first + let trace = create_test_trace(vec![child_span.clone(), root_span.clone()]); + let root_idx = sampler.get_root_span_index(&trace).unwrap(); + assert_eq!(trace.spans()[root_idx].span_id(), 1); + + // Test 2: Orphaned span (parent not in trace) + let orphan_span = DdSpan::new( + MetaString::from("service"), + MetaString::from("orphan"), + MetaString::from("resource"), + MetaString::from("type"), + 12345, + 3, + 999, // parent_id = 999 (doesn't exist in trace) + 200, + 300, + 0, + ); + let trace = create_test_trace(vec![orphan_span]); + let root_idx = sampler.get_root_span_index(&trace).unwrap(); + assert_eq!(trace.spans()[root_idx].span_id(), 3); + + // Test 3: Multiple root candidates: should return the last one found (index 1) + let span1 = create_test_span(12345, 1, 0); + let span2 = create_test_span(12345, 2, 0); + let trace = create_test_trace(vec![span1, span2]); + // Both have parent_id = 0, should return the last one found (span_id = 2) + let root_idx = sampler.get_root_span_index(&trace).unwrap(); + assert_eq!(trace.spans()[root_idx].span_id(), 2); + } + + #[test] + fn test_single_span_sampling() { + let mut sampler = create_test_sampler(); + + // Test 1: Trace with SSS tags should be kept even when probabilistic would drop it + sampler.sampling_rate = 0.0; // 0% sampling rate - should drop everything + sampler.probabilistic_sampler_enabled = true; + + // Create span with SSS metric + let mut metrics_map = saluki_common::collections::FastHashMap::default(); + metrics_map.insert(MetaString::from(KEY_SPAN_SAMPLING_MECHANISM), 8.0); // Any value + let sss_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone()); + + // Create regular span without SSS + let regular_span = create_test_span(12345, 2, 0); + + let mut trace = create_test_trace(vec![sss_span.clone(), regular_span]); + + // Apply SSS + let modified = sampler.single_span_sampling(&mut trace); + assert!(modified); + assert_eq!(trace.spans().len(), 1); // Only SSS span kept + assert_eq!(trace.spans()[0].span_id(), 1); // It's the SSS span + + // Check that trace has been marked as kept with high priority + assert!(trace.sampling().is_some()); + assert_eq!(trace.sampling().as_ref().unwrap().priority, Some(PRIORITY_USER_KEEP)); + + // Test 2: Trace without SSS tags should not be modified + let trace_without_sss = create_test_trace(vec![create_test_span(12345, 3, 0)]); + let mut trace_copy = trace_without_sss.clone(); + let modified = sampler.single_span_sampling(&mut trace_copy); + assert!(!modified); + assert_eq!(trace_copy.spans().len(), trace_without_sss.spans().len()); + } + + #[test] + fn test_analytics_events() { + let sampler = create_test_sampler(); + + // Test 1: Trace with analyzed spans + let mut metrics_map = saluki_common::collections::FastHashMap::default(); + metrics_map.insert(MetaString::from(KEY_ANALYZED_SPANS), 1.0); + let analyzed_span = create_test_span(12345, 1, 0).with_metrics(metrics_map.clone()); + let regular_span = create_test_span(12345, 2, 0); + + let trace = create_test_trace(vec![analyzed_span.clone(), regular_span]); + + let analyzed_spans = sampler.get_analyzed_spans(&trace); + assert_eq!(analyzed_spans.len(), 1); + assert_eq!(analyzed_spans[0].span_id(), 1); + + assert!(sampler.has_analyzed_spans(&trace)); + + // Test 2: Trace without analyzed spans + let trace_no_analytics = create_test_trace(vec![create_test_span(12345, 3, 0)]); + let analyzed_spans = sampler.get_analyzed_spans(&trace_no_analytics); + assert!(analyzed_spans.is_empty()); + assert!(!sampler.has_analyzed_spans(&trace_no_analytics)); + } + + #[test] + fn test_probabilistic_sampling_with_prob_rate_key() { + let mut sampler = create_test_sampler(); + sampler.sampling_rate = 0.75; // 75% sampling rate + sampler.probabilistic_sampler_enabled = true; + + // Use a trace ID that we know will be sampled + let trace_id = 12345_u64; + let root_span = DdSpan::new( + MetaString::from("service"), + MetaString::from("operation"), + MetaString::from("resource"), + MetaString::from("type"), + trace_id, + 1, + 0, // parent_id = 0 indicates root + 0, + 1000, + 0, + ); + let mut trace = create_test_trace(vec![root_span]); + + let (keep, priority, decision_maker, root_span_idx) = sampler.run_samplers(&mut trace); + + if keep && decision_maker == DECISION_MAKER_PROBABILISTIC { + // If sampled probabilistically, check that probRateKey was already added + assert_eq!(priority, PRIORITY_AUTO_KEEP); + assert_eq!(decision_maker, DECISION_MAKER_PROBABILISTIC); // probabilistic sampling marker + + // Check that the root span already has the probRateKey (it should have been added in run_samplers) + let root_idx = root_span_idx.unwrap_or(0); + let root_span = &trace.spans()[root_idx]; + assert!(root_span.metrics().contains_key(PROB_RATE_KEY)); + assert_eq!(*root_span.metrics().get(PROB_RATE_KEY).unwrap(), 0.75); + + // Test that apply_sampling_metadata still works correctly for other metadata + let mut trace_with_metadata = trace.clone(); + sampler.apply_sampling_metadata(&mut trace_with_metadata, keep, priority, decision_maker, root_idx); + + // Check that decision maker tag was added + let modified_root = &trace_with_metadata.spans()[root_idx]; + assert!(modified_root.meta().contains_key(TAG_DECISION_MAKER)); + assert_eq!( + modified_root.meta().get(TAG_DECISION_MAKER).unwrap(), + &MetaString::from(DECISION_MAKER_PROBABILISTIC) + ); + } + } +} diff --git a/lib/saluki-components/src/transforms/trace_sampler/probabilistic.rs b/lib/saluki-components/src/transforms/trace_sampler/probabilistic.rs new file mode 100644 index 0000000000..785823e5c3 --- /dev/null +++ b/lib/saluki-components/src/transforms/trace_sampler/probabilistic.rs @@ -0,0 +1,44 @@ +//! Probabilistic sampling. + +use super::signature::fnv1a_32; + +// Probabilistic sampler constants (matching datadog-agent's bucketed sampler). +// These constants exist to match the behavior of the OTEL probabilistic sampler. +// See: https://github.com/open-telemetry/opentelemetry-collector-contrib/.../probabilisticsamplerprocessor/tracesprocessor.go#L38-L42 +const NUM_PROBABILISTIC_BUCKETS: u32 = 0x4000; +const BITMASK_HASH_BUCKETS: u32 = NUM_PROBABILISTIC_BUCKETS - 1; + +/// `probRateKey` indicates the percentage sampling rate configured for the probabilistic sampler. +pub(super) const PROB_RATE_KEY: &str = "_dd.prob_sr"; + +/// Probabilistic sampler. +/// +/// #Missing +/// +/// - Full trace ID mode (off by default). +/// - Make hash seed configurable +pub(super) struct ProbabilisticSampler; + +impl ProbabilisticSampler { + /// Deterministically sample a trace based on its trace ID. + /// + /// This mirrors the behavior of the Datadog Agent's bucketed probabilistic sampler. + pub(super) fn sample(trace_id: u64, sampling_rate: f64) -> bool { + // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/sampler/probabilistic.go#L62 + // we take in a trace id (randomly generated) and hash/mask it to get a number between 0 and 0x3FFF and compare it to the sampling rate. + + // to match the agent behaviour, we need to make the array 16 bytes long, this is used for full trace id mode + // but we require it now to match the hash. + let mut tid = [0u8; 16]; + tid[..8].copy_from_slice(&trace_id.to_be_bytes()); + + // Match the datadog-agent bucketed probabilistic sampler behavior. + // (Fixed zero hash seed; equivalent to the agent's default when unset.) + let hash_seed = [0u8; 4]; + let hash = fnv1a_32(&hash_seed, &tid); + let scaled_sampling_percentage = (sampling_rate * NUM_PROBABILISTIC_BUCKETS as f64) as u32; + // bitMaskHashBuckets = 0x3FFF (binary: 0011111111111111 = 14 bits set + // so we keep the lower 14 bits of the hash. + (hash & BITMASK_HASH_BUCKETS) < scaled_sampling_percentage + } +} diff --git a/lib/saluki-components/src/transforms/trace_sampler/score_sampler.rs b/lib/saluki-components/src/transforms/trace_sampler/score_sampler.rs new file mode 100644 index 0000000000..f84dcbdd7d --- /dev/null +++ b/lib/saluki-components/src/transforms/trace_sampler/score_sampler.rs @@ -0,0 +1,191 @@ +use std::time::SystemTime; + +use saluki_common::collections::FastHashMap; +use saluki_core::data_model::event::trace::{Span, Trace}; +use stringtheory::MetaString; + +use super::signature::{compute_signature_with_root_and_env, Signature}; +use crate::common::datadog::sample_by_rate; +use crate::transforms::trace_sampler::core_sampler::Sampler; + +// Metric keys for sampling rates +const KEY_SAMPLING_RATE_GLOBAL: &str = "_sample_rate"; +const KEY_SAMPLING_RATE_PRE_SAMPLER: &str = "_dd1.sr.rapre"; + +// ScoreSampler-specific rate keys +pub(super) const ERRORS_RATE_KEY: &str = "_dd.errors_sr"; +pub(super) const NO_PRIORITY_RATE_KEY: &str = "_dd.no_p_sr"; + +// shrinkCardinality is the max Signature cardinality before shrinking +const SHRINK_CARDINALITY: usize = 200; + +/// ScoreSampler for traces +/// +/// ScoreSampler samples pieces of traces by computing a signature based on spans (service, name, rsc, http.status, error.type) +/// scoring it and applying a rate. +/// The rates are applied on the TraceID to maximize the number of chunks with errors caught for the same traceID. +/// For a set traceID: P(chunk1 kept and chunk2 kept) = min(P(chunk1 kept), P(chunk2 kept)) +/// +/// # Missing +/// +/// TODO: Add SampleV1 method for legacy trace format support +pub struct ScoreSampler { + sampler: Sampler, + sampling_rate_key: &'static str, + disabled: bool, + // When shrinking, the shrink allowlist represents the currently active signatures while new ones get collapsed + shrink_allow_list: Option>, +} + +/// NoPrioritySampler for traces without a sampling priority. +/// +/// Wraps a ScoreSampler configured specifically for no-priority sampling. +pub(super) struct NoPrioritySampler { + score_sampler: ScoreSampler, +} + +impl NoPrioritySampler { + /// Create a new NoPrioritySampler with the given configuration. + pub(super) fn new(target_tps: f64, extra_sample_rate: f64) -> Self { + Self { + score_sampler: ScoreSampler::new(NO_PRIORITY_RATE_KEY, false, target_tps, extra_sample_rate), + } + } + + /// Evaluate a trace that has no sampling priority. + pub(super) fn sample(&mut self, now: SystemTime, trace: &mut Trace, root_idx: usize) -> bool { + self.score_sampler.sample(now, trace, root_idx) + } +} + +impl ScoreSampler { + /// Create a new ScoreSampler with the given sampling rate key and target TPS. + pub fn new(sampling_rate_key: &'static str, disabled: bool, target_tps: f64, extra_sample_rate: f64) -> Self { + Self { + sampler: Sampler::new(extra_sample_rate, target_tps), + sampling_rate_key, + disabled, + shrink_allow_list: None, + } + } + + /// Sample counts an incoming trace and tells if it is a sample which has to be kept + pub fn sample(&mut self, now: SystemTime, trace: &mut Trace, root_idx: usize) -> bool { + // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/sampler/scoresampler.go#L71 + if self.disabled { + return false; + } + + let spans_len = trace.spans().len(); + if spans_len == 0 || root_idx >= spans_len { + return false; + } + + // Compute signature before mutably borrowing the root span. + let signature = compute_signature_with_root_and_env(trace, root_idx); + let signature = self.shrink(signature); + + // Update sampler state by counting this trace + let weight = { + let spans = trace.spans(); + let root = &spans[root_idx]; + weight_root(root) + }; + self.sampler.count_weighted_sig(now, &signature, weight); + + // Get the sampling rate for this signature + let rate = self.sampler.get_signature_sample_rate(&signature); + + // Apply the sampling decision + let root = &mut trace.spans_mut()[root_idx]; + self.apply_sample_rate(root, rate) + } + + /// Apply the sampling rate to determine if the trace should be kept. + fn apply_sample_rate(&self, root: &mut Span, rate: f64) -> bool { + let initial_rate = get_global_rate(root); + let new_rate = initial_rate * rate; + let trace_id = root.trace_id(); + let sampled = sample_by_rate(trace_id, new_rate); + + if sampled { + self.set_sampling_rate_metric(root, rate); + } + + sampled + } + + /// Shrink limits the number of signatures stored in the sampler. + /// After a cardinality above shrinkCardinality/2 is reached + /// signatures are spread uniformly on a fixed set of values. + /// This ensures that ScoreSamplers are memory capped. + /// When the shrink is triggered, previously active signatures + /// stay unaffected. New signatures may share the same TPS computation. + fn shrink(&mut self, sig: Signature) -> Signature { + // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/sampler/scoresampler.go#L151 + if self.sampler.size() < (SHRINK_CARDINALITY / 2) as i64 { + self.shrink_allow_list = None; + return sig; + } + + if self.shrink_allow_list.is_none() { + let (rates, _) = self.sampler.get_all_signature_sample_rates(); + self.shrink_allow_list = Some(rates); + } + + if let Some(ref map) = self.shrink_allow_list { + if map.contains_key(&sig) { + return sig; + } + } + + // Map to a limited set of signatures to bound cardinality and force + // new signatures to share the same bucket and TPS computation. + Signature(sig.0 % (SHRINK_CARDINALITY as u64 / 2)) + } + + /// Set the sampling rate metric on a span. + pub fn set_sampling_rate_metric(&self, span: &mut Span, rate: f64) { + span.metrics_mut() + .insert(MetaString::from(self.sampling_rate_key), rate); + } +} + +#[cfg(test)] +impl ScoreSampler { + pub(crate) fn test_shrink(&mut self, sig: Signature) -> Signature { + self.shrink(sig) + } + + pub(crate) fn test_size(&self) -> i64 { + self.sampler.size() + } + + pub(crate) fn test_shrink_cardinality() -> usize { + SHRINK_CARDINALITY + } +} + +/// Calculate the weight from the span's global rate and presampler rate. +fn weight_root(span: &Span) -> f32 { + let client_rate = span + .metrics() + .get(KEY_SAMPLING_RATE_GLOBAL) + .copied() + .filter(|&r| r > 0.0 && r <= 1.0) + .unwrap_or(1.0); + + let pre_sampler_rate = span + .metrics() + .get(KEY_SAMPLING_RATE_PRE_SAMPLER) + .copied() + .filter(|&r| r > 0.0 && r <= 1.0) + .unwrap_or(1.0); + + (1.0 / (pre_sampler_rate * client_rate)) as f32 +} + +/// Get the cumulative sample rate of the trace to which this span belongs. +fn get_global_rate(span: &Span) -> f64 { + span.metrics().get(KEY_SAMPLING_RATE_GLOBAL).copied().unwrap_or(1.0) +} diff --git a/lib/saluki-components/src/transforms/trace_sampler/signature.rs b/lib/saluki-components/src/transforms/trace_sampler/signature.rs new file mode 100644 index 0000000000..7e57780503 --- /dev/null +++ b/lib/saluki-components/src/transforms/trace_sampler/signature.rs @@ -0,0 +1,93 @@ +//! Trace signature computation utilities. +//! +//! This module currently provides: +//! - a small FNV-1a 32-bit helper (used by probabilistic sampling) +//! - a signature newtype + compute helper (for score/TPS samplers) + +use saluki_core::data_model::event::trace::{Span, Trace}; +use stringtheory::MetaString; + +const OFFSET_32: u32 = 2166136261; +const PRIME_32: u32 = 16777619; +const KEY_HTTP_STATUS_CODE: &str = "http.status_code"; +const KEY_ERROR_TYPE: &str = "error.type"; + +fn write_hash(mut hash: u32, bytes: &[u8]) -> u32 { + for &b in bytes { + hash ^= b as u32; + hash = hash.wrapping_mul(PRIME_32); + } + hash +} + +pub(super) fn fnv1a_32(seed: &[u8], bytes: &[u8]) -> u32 { + let hash = write_hash(OFFSET_32, seed); + write_hash(hash, bytes) +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +pub(super) struct Signature(pub(super) u64); + +fn get_trace_env(trace: &Trace, root_span_idx: usize) -> Option<&MetaString> { + // logic taken from here: https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/traceutil/trace.go#L19-L20 + let env = trace.spans().get(root_span_idx).and_then(|span| span.meta().get("env")); + match env { + Some(env) => Some(env), + None => { + for span in trace.spans().iter() { + if let Some(env) = span.meta().get("env") { + return Some(env); + } + } + None + } + } +} + +pub(super) fn compute_signature_with_root_and_env(trace: &Trace, root_idx: usize) -> Signature { + // Mirrors datadog-agent/pkg/trace/sampler/signature.go:computeSignatureWithRootAndEnv. + // + // Signature based on the hash of (env, service, name, resource, is_error) for the root, plus the set of + // (env, service, name, is_error) of each span. + let spans = trace.spans(); + let Some(root) = spans.get(root_idx) else { + return Signature(0); + }; + + let env = get_trace_env(trace, root_idx).map(|v| v.as_ref()).unwrap_or(""); + let root_hash = compute_span_hash(root, env, true); + let mut span_hashes: Vec = spans.iter().map(|span| compute_span_hash(span, env, false)).collect(); + + if span_hashes.is_empty() { + return Signature(root_hash as u64); + } + + // Sort, dedupe then merge all the hashes to build the signature. + span_hashes.sort_unstable(); + span_hashes.dedup(); + + let mut trace_hash = span_hashes[0] ^ root_hash; + for &h in span_hashes.iter().skip(1) { + trace_hash ^= h; + } + + Signature(trace_hash as u64) +} + +fn compute_span_hash(span: &Span, env: &str, with_resource: bool) -> u32 { + let mut h = OFFSET_32; + h = write_hash(h, env.as_bytes()); + h = write_hash(h, span.service().as_bytes()); + h = write_hash(h, span.name().as_bytes()); + h = write_hash(h, &[span.error() as u8]); + if with_resource { + h = write_hash(h, span.resource().as_bytes()); + } + if let Some(code) = span.meta().get(KEY_HTTP_STATUS_CODE) { + h = write_hash(h, code.as_ref().as_bytes()); + } + if let Some(typ) = span.meta().get(KEY_ERROR_TYPE) { + h = write_hash(h, typ.as_ref().as_bytes()); + } + h +} diff --git a/lib/saluki-core/src/data_model/event/trace/mod.rs b/lib/saluki-core/src/data_model/event/trace/mod.rs index 37ba015986..2e3c42142d 100644 --- a/lib/saluki-core/src/data_model/event/trace/mod.rs +++ b/lib/saluki-core/src/data_model/event/trace/mod.rs @@ -3,6 +3,54 @@ use saluki_common::collections::FastHashMap; use saluki_context::tags::TagSet; use stringtheory::MetaString; + +/// Trace-level sampling metadata. +/// +/// This struct stores sampling-related metadata that applies to the entire trace, +/// typically set by the trace sampler and consumed by the encoder. +#[derive(Clone, Debug, PartialEq)] +pub struct TraceSampling { + /// Whether or not the trace was dropped during sampling. + pub dropped_trace: bool, + /// The sampling priority assigned to this trace. + /// + /// Common values include: + /// - `2`: Manual keep (user-requested) + /// - `1`: Auto keep (sampled in) + /// - `0`: Auto drop (sampled out) + /// - `-1`: Manual drop (user-requested drop) + pub priority: Option, + + /// The decision maker identifier indicating which sampler made the sampling decision. + /// + /// Common values include: + /// - `-9`: Probabilistic sampler + /// - `-4`: Errors sampler + /// - `None`: No decision maker set + pub decision_maker: Option, + + /// The OTLP sampling rate applied to this trace, formatted as a string (e.g., "0.25"). + /// + /// This corresponds to the `_dd.otlp_sr` tag and represents the effective sampling rate + /// from the OTLP ingest path. + pub otlp_sampling_rate: Option, +} + +impl TraceSampling { + /// Creates a new `TraceSampling` instance. + pub fn new( + dropped_trace: bool, priority: Option, decision_maker: Option, + otlp_sampling_rate: Option, + ) -> Self { + Self { + dropped_trace, + priority, + decision_maker, + otlp_sampling_rate, + } + } +} + /// A trace event. /// /// A trace is a collection of spans that represent a distributed trace. @@ -14,12 +62,22 @@ pub struct Trace { /// /// This is derived from the resource of the spans and used to construct the tracer payload. resource_tags: TagSet, + /// Trace-level sampling metadata. + /// + /// This field contains sampling decision information (priority, decision maker, rates) + /// that applies to the entire trace. It is set by the trace sampler component and consumed + /// by the encoder to populate trace chunk metadata. + sampling: Option, } impl Trace { /// Creates a new `Trace` with the given spans. pub fn new(spans: Vec, resource_tags: TagSet) -> Self { - Self { spans, resource_tags } + Self { + spans, + resource_tags, + sampling: None, + } } /// Returns a reference to the spans in this trace. @@ -32,10 +90,25 @@ impl Trace { &mut self.spans } + /// Replaces the spans in this trace with the given spans. + pub fn set_spans(&mut self, spans: Vec) { + self.spans = spans; + } + /// Returns the resource-level tags associated with this trace. pub fn resource_tags(&self) -> &TagSet { &self.resource_tags } + + /// Returns a reference to the trace-level sampling metadata, if present. + pub fn sampling(&self) -> Option<&TraceSampling> { + self.sampling.as_ref() + } + + /// Sets the trace-level sampling metadata. + pub fn set_sampling(&mut self, sampling: Option) { + self.sampling = sampling; + } } /// A span event. @@ -246,7 +319,7 @@ impl Span { &self.meta } - /// Returns a mutable reference to the string-valued tag map. + /// Returns a mutable reference to the meta map. pub fn meta_mut(&mut self) -> &mut FastHashMap { &mut self.meta } @@ -256,6 +329,11 @@ impl Span { &self.metrics } + /// Returns a mutable reference to the metrics map. + pub fn metrics_mut(&mut self) -> &mut FastHashMap { + &mut self.metrics + } + /// Returns the structured metadata map. pub fn meta_struct(&self) -> &FastHashMap> { &self.meta_struct