Skip to content

Commit d05c0f7

Browse files
feat(trace-registry): optimize access (#93)
# Motivations The trace registry is currently a shared data structure protected by a single lock. This means that 2 threads accessing different traces will create contention. The goal of this PR is to optimize access to the trace registry so creating and finishing spans can be as fast as possible and not block the user critical path. # Changes The optimisations work in two ways: * For single threaded access: The default std Hashmap has been replaced by hasbrown. hasbrown uses foldhash, a non cryptographic hash function which is ~10x faster than the std hash function * For concurrent access: The trace registry now shards the lock based on the trace id. This means that we divide the trace registry in 64 locks. So two different traces now only have 1/64 chances of colliding. Through testing I have figured out that this should allow us to scale to up to 32 threads doing frequent concurrent reads on the trace registry
1 parent cee8240 commit d05c0f7

File tree

4 files changed

+159
-18
lines changed

4 files changed

+159
-18
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,19 @@ opentelemetry_sdk = { version = "0.30.0", features = [
3838
opentelemetry = { version = "0.30.0", features = [
3939
"trace",
4040
], default-features = false }
41-
opentelemetry-semantic-conventions = { version = "0.30.0", features = ["semconv_experimental"] }
41+
opentelemetry-semantic-conventions = { version = "0.30.0", features = [
42+
"semconv_experimental",
43+
] }
4244
tokio = { version = "1.44.1" }
4345
rand = { version = "0.8", features = ["small_rng"] }
4446

4547
serde = { version = "1.0.194" }
4648
serde_json = { version = "1.0.140" }
4749

50+
hashbrown = { version = "0.15.5" }
51+
foldhash = { version = "0.1.5" }
52+
53+
4854
[profile.dev]
4955
debug = 2 # full debug info
5056

datadog-opentelemetry/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ description.workspace = true
1212
# Workspace dependencies
1313
dd-trace = { path = "../dd-trace" }
1414
dd-trace-sampling = { path = "../dd-trace-sampling" }
15-
dd-trace-propagation = { path = "../dd-trace-propagation", features = ["opentelemetry"] }
15+
dd-trace-propagation = { path = "../dd-trace-propagation", features = [
16+
"opentelemetry",
17+
] }
1618
datadog-opentelemetry-mappings = { path = "../datadog-opentelemetry-mappings" }
1719

1820
# External dependencies
@@ -22,6 +24,8 @@ opentelemetry = { workspace = true }
2224
opentelemetry-semantic-conventions = { workspace = true }
2325
rand = { workspace = true }
2426
serde_json = { workspace = true }
27+
foldhash = { workspace = true }
28+
hashbrown = { workspace = true }
2529

2630
# Libdatadog dependencies
2731
data-pipeline = { workspace = true }
@@ -44,5 +48,8 @@ tracing = { version = "0.1.4" }
4448
tracing-opentelemetry = { version = "0.31" }
4549
tracing-subscriber = { version = "0.3" }
4650

51+
# Benchmarking
52+
criterion = "0.5.1"
53+
4754
[features]
4855
test-utils = ["dd-trace/test-utils", "data-pipeline/test-utils"]

datadog-opentelemetry/src/span_processor.rs

Lines changed: 141 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use hashbrown::{hash_map, HashMap as BHashMap};
45
use std::{
5-
collections::{hash_map, HashMap},
6+
collections::HashMap,
7+
fmt::Debug,
68
str::FromStr,
79
sync::{Arc, RwLock},
810
};
@@ -56,7 +58,7 @@ const EMPTY_PROPAGATION_DATA: TracePropagationData = TracePropagationData {
5658

5759
#[derive(Debug)]
5860
struct InnerTraceRegistry {
59-
registry: HashMap<[u8; 16], Trace>,
61+
registry: BHashMap<[u8; 16], Trace>,
6062
}
6163

6264
pub enum RegisterTracePropagationResult {
@@ -211,6 +213,12 @@ impl InnerTraceRegistry {
211213
}
212214
}
213215

216+
const TRACE_REGISTRY_SHARDS: usize = 64;
217+
218+
#[repr(align(128))]
219+
#[derive(Debug, Clone)]
220+
struct CachePadded<T>(T);
221+
214222
#[derive(Clone, Debug)]
215223
/// A registry of traces that are currently running
216224
///
@@ -220,23 +228,32 @@ impl InnerTraceRegistry {
220228
/// - The number of open spans in the trace
221229
/// - The sampling decision of the trace
222230
pub(crate) struct TraceRegistry {
223-
// TODO: The lock should probably sharded based on the hash of the trace id
224-
// so we reduce contention...
225231
// Example:
226232
// inner: Arc<[CacheAligned<RwLock<InnerTraceRegistry>>; N]>;
227233
// to access a trace we do inner[hash(trace_id) % N].read()
228-
inner: Arc<RwLock<InnerTraceRegistry>>,
234+
inner: Arc<[CachePadded<RwLock<InnerTraceRegistry>>; TRACE_REGISTRY_SHARDS]>,
235+
hasher: foldhash::fast::RandomState,
229236
}
230237

231238
impl TraceRegistry {
232239
pub fn new() -> Self {
233240
Self {
234-
inner: Arc::new(RwLock::new(InnerTraceRegistry {
235-
registry: HashMap::new(),
241+
inner: Arc::new(std::array::from_fn(|_| {
242+
CachePadded(RwLock::new(InnerTraceRegistry {
243+
registry: BHashMap::new(),
244+
}))
236245
})),
246+
hasher: foldhash::fast::RandomState::default(),
237247
}
238248
}
239249

250+
fn get_shard(&self, trace_id: [u8; 16]) -> &RwLock<InnerTraceRegistry> {
251+
use std::hash::BuildHasher;
252+
let hash = self.hasher.hash_one(u128::from_ne_bytes(trace_id));
253+
let shard = hash as usize % TRACE_REGISTRY_SHARDS;
254+
&self.inner[shard].0
255+
}
256+
240257
/// Register the trace propagation data for a given trace ID
241258
/// This increases the open span count for the trace by 1, but does not set the root span ID.
242259
/// You will then need to call `register_local_root_span` to set the root span ID
@@ -249,7 +266,7 @@ impl TraceRegistry {
249266
propagation_data: TracePropagationData,
250267
) -> RegisterTracePropagationResult {
251268
let mut inner = self
252-
.inner
269+
.get_shard(trace_id)
253270
.write()
254271
.expect("Failed to acquire lock on trace registry");
255272
inner.register_local_root_trace_propagation_data(trace_id, propagation_data)
@@ -260,7 +277,7 @@ impl TraceRegistry {
260277
/// If the trace is already registered, it will ignore the new root span ID and log a warning.
261278
pub fn register_local_root_span(&self, trace_id: [u8; 16], root_span_id: [u8; 8]) {
262279
let mut inner = self
263-
.inner
280+
.get_shard(trace_id)
264281
.write()
265282
.expect("Failed to acquire lock on trace registry");
266283
inner.register_local_root_span(trace_id, root_span_id);
@@ -274,7 +291,7 @@ impl TraceRegistry {
274291
propagation_data: TracePropagationData,
275292
) {
276293
let mut inner = self
277-
.inner
294+
.get_shard(trace_id)
278295
.write()
279296
.expect("Failed to acquire lock on trace registry");
280297
inner.register_span(trace_id, span_id, propagation_data);
@@ -285,15 +302,15 @@ impl TraceRegistry {
285302
/// flush
286303
fn finish_span(&self, trace_id: [u8; 16], span_data: SpanData) -> Option<Trace> {
287304
let mut inner = self
288-
.inner
305+
.get_shard(trace_id)
289306
.write()
290307
.expect("Failed to acquire lock on trace registry");
291308
inner.finish_span(trace_id, span_data)
292309
}
293310

294311
pub fn get_trace_propagation_data(&self, trace_id: [u8; 16]) -> TracePropagationData {
295312
let inner = self
296-
.inner
313+
.get_shard(trace_id)
297314
.read()
298315
.expect("Failed to acquire lock on trace registry");
299316

@@ -532,13 +549,22 @@ impl opentelemetry_sdk::trace::SpanProcessor for DatadogSpanProcessor {
532549

533550
#[cfg(test)]
534551
mod tests {
535-
use std::sync::{Arc, RwLock};
536-
537-
use dd_trace::Config;
552+
use std::{
553+
collections::HashMap,
554+
hint::black_box,
555+
sync::{Arc, RwLock},
556+
thread,
557+
time::Duration,
558+
};
559+
560+
use dd_trace::{
561+
sampling::{mechanism, priority, SamplingDecision},
562+
Config,
563+
};
538564
use opentelemetry::{Key, KeyValue, Value};
539565
use opentelemetry_sdk::{trace::SpanProcessor, Resource};
540566

541-
use crate::span_processor::{DatadogSpanProcessor, TraceRegistry};
567+
use crate::span_processor::{DatadogSpanProcessor, TracePropagationData, TraceRegistry};
542568

543569
#[test]
544570
fn test_set_resource_from_empty_dd_config() {
@@ -683,4 +709,103 @@ mod tests {
683709
Some(Value::String("otel-service".into()))
684710
);
685711
}
712+
713+
fn bench_trace_registry(c: &mut criterion::Criterion) {
714+
const ITERATIONS: u32 = 10000;
715+
const NUM_TRACES: usize = ITERATIONS as usize / 20;
716+
let mut group = c.benchmark_group("trace_registry_concurrent_access_threads");
717+
group
718+
.warm_up_time(Duration::from_millis(100))
719+
.measurement_time(Duration::from_millis(1000));
720+
721+
for concurrency in [1, 2, 4, 8, 16, 32] {
722+
group
723+
.throughput(criterion::Throughput::Elements(
724+
ITERATIONS as u64 * concurrency,
725+
))
726+
.bench_function(
727+
criterion::BenchmarkId::from_parameter(concurrency),
728+
move |g| {
729+
let trace_ids: Vec<_> = (0..concurrency)
730+
.map(|thread| {
731+
std::array::from_fn::<_, NUM_TRACES, _>(|i| {
732+
((thread << 16 | i as u64) as u128).to_be_bytes()
733+
})
734+
})
735+
.collect();
736+
g.iter_batched_ref(
737+
{
738+
let trace_ids = trace_ids.clone();
739+
move || {
740+
let tr: TraceRegistry = TraceRegistry::new();
741+
for trace_id in trace_ids.iter().flatten() {
742+
tr.register_local_root_trace_propagation_data(
743+
*trace_id,
744+
TracePropagationData {
745+
sampling_decision: SamplingDecision {
746+
priority: Some(priority::AUTO_KEEP),
747+
mechanism: Some(mechanism::DEFAULT),
748+
},
749+
origin: Some("rum".to_string()),
750+
tags: Some(HashMap::from_iter([(
751+
"dd.p.tid".to_string(),
752+
"foobar".to_string(),
753+
)])),
754+
},
755+
);
756+
}
757+
tr
758+
}
759+
},
760+
move |tr| {
761+
let tr = &*tr;
762+
let trace_ids = &trace_ids;
763+
thread::scope(move |s| {
764+
for trace_id in trace_ids {
765+
s.spawn(move || {
766+
for _ in 0..(ITERATIONS as usize / NUM_TRACES) {
767+
for trace_id in trace_id {
768+
black_box(tr.get_trace_propagation_data(
769+
black_box(*trace_id),
770+
));
771+
}
772+
}
773+
});
774+
}
775+
})
776+
},
777+
criterion::BatchSize::LargeInput,
778+
);
779+
},
780+
);
781+
}
782+
}
783+
784+
#[test]
785+
fn bench() {
786+
// Run with
787+
// `cargo test --profile bench -- --nocapture bench -- <benchmark_filter>
788+
// Collect cli arguments
789+
790+
// Interpret sequence of args `[ "...bench", "--", "[filter]" ]` as a trigger and extract
791+
// `filter`
792+
let filter = std::env::args()
793+
.collect::<Vec<_>>()
794+
.windows(3)
795+
.filter(|p| p.len() >= 2 && p[0].ends_with("bench") && p[1] == "--")
796+
.map(|s| s.get(2).unwrap_or(&"".to_string()).clone())
797+
.next();
798+
799+
let filter = match filter {
800+
None => return,
801+
Some(f) => f,
802+
};
803+
804+
let mut criterion = criterion::Criterion::default()
805+
.with_output_color(true)
806+
.with_filter(&filter);
807+
bench_trace_registry(&mut criterion);
808+
809+
criterion.final_summary();
810+
}
686811
}

0 commit comments

Comments
 (0)