Skip to content

Commit 2e10824

Browse files
authored
Replace jctools NonBlockingHashMap with ConcurrentHashMap (#9700)
1 parent 3da04c5 commit 2e10824

File tree

5 files changed

+77
-34
lines changed

5 files changed

+77
-34
lines changed

dd-java-agent/agent-bootstrap/src/main/resources/META-INF/native-image/com.datadoghq/dd-java-agent/reflect-config.json

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,5 @@
181181
"fields": [
182182
{"name": "consumerIndex", "allowUnsafeAccess": true}
183183
]
184-
},
185-
{
186-
"name" : "datadog.jctools.maps.NonBlockingHashMap",
187-
"fields": [
188-
{"name": "_kvs", "allowUnsafeAccess": true}
189-
]
190184
}
191185
]

dd-java-agent/agent-profiling/profiling-controller-jfr/src/main/java/com/datadog/profiling/controller/jfr/parser/MetadataEvent.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.io.IOException;
44
import java.nio.charset.StandardCharsets;
5-
import org.jctools.maps.NonBlockingHashMapLong;
65

76
/**
87
* JFR Chunk metadata
@@ -17,10 +16,6 @@ public final class MetadataEvent {
1716
public final long duration;
1817
public final long metadataId;
1918

20-
private final NonBlockingHashMapLong<String> eventTypeNameMapBacking =
21-
new NonBlockingHashMapLong<>(256);
22-
private final LongMapping<String> eventTypeMap;
23-
2419
MetadataEvent(RecordingStream stream) throws IOException {
2520
size = (int) stream.readVarint();
2621
long typeId = stream.readVarint();
@@ -31,16 +26,6 @@ public final class MetadataEvent {
3126
duration = stream.readVarint();
3227
metadataId = stream.readVarint();
3328
readElements(stream, readStringTable(stream));
34-
eventTypeMap = eventTypeNameMapBacking::get;
35-
}
36-
37-
/**
38-
* Lazily compute and return the mappings of event type ids to event type names
39-
*
40-
* @return mappings of event type ids to event type names
41-
*/
42-
public LongMapping<String> getEventTypeNameMap() {
43-
return eventTypeMap;
4429
}
4530

4631
private String[] readStringTable(RecordingStream stream) throws IOException {
@@ -76,10 +61,6 @@ private void readElements(RecordingStream stream, String[] stringConstants) thro
7661
}
7762
}
7863
}
79-
// only event types are currently collected
80-
if (name != null && id != null && "jdk.jfr.Event".equals(superType)) {
81-
eventTypeNameMapBacking.put(Long.parseLong(id), name);
82-
}
8364
// now inspect all the enclosed elements
8465
int elemCount = (int) stream.readVarint();
8566
for (int i = 0; i < elemCount; i++) {
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package datadog.trace.common;
2+
3+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
4+
import static java.util.concurrent.TimeUnit.SECONDS;
5+
6+
import de.thetaphi.forbiddenapis.SuppressForbidden;
7+
import java.util.concurrent.ConcurrentHashMap;
8+
import org.jctools.maps.NonBlockingHashMap;
9+
import org.openjdk.jmh.annotations.Benchmark;
10+
import org.openjdk.jmh.annotations.BenchmarkMode;
11+
import org.openjdk.jmh.annotations.Fork;
12+
import org.openjdk.jmh.annotations.Level;
13+
import org.openjdk.jmh.annotations.Measurement;
14+
import org.openjdk.jmh.annotations.Mode;
15+
import org.openjdk.jmh.annotations.OutputTimeUnit;
16+
import org.openjdk.jmh.annotations.Scope;
17+
import org.openjdk.jmh.annotations.Setup;
18+
import org.openjdk.jmh.annotations.State;
19+
import org.openjdk.jmh.annotations.Threads;
20+
import org.openjdk.jmh.annotations.Warmup;
21+
import org.openjdk.jmh.infra.Blackhole;
22+
23+
/*
24+
JDK 1.8
25+
Benchmark Mode Cnt Score Error Units
26+
NonBlockingHashMapBenchmark.benchConcurrentHashMap avgt 1.153 us/op
27+
NonBlockingHashMapBenchmark.benchNonBlockingHashMap avgt 1.457 us/op
28+
29+
JDK 21
30+
Benchmark Mode Cnt Score Error Units
31+
NonBlockingHashMapBenchmark.benchConcurrentHashMap avgt 1.088 us/op
32+
NonBlockingHashMapBenchmark.benchNonBlockingHashMap avgt 1.278 us/op
33+
*/
34+
@State(Scope.Benchmark)
35+
@Warmup(iterations = 1, time = 30, timeUnit = SECONDS)
36+
@Measurement(iterations = 1, time = 30, timeUnit = SECONDS)
37+
@BenchmarkMode(Mode.AverageTime)
38+
@OutputTimeUnit(MICROSECONDS)
39+
@Fork(value = 1)
40+
@SuppressForbidden
41+
public class NonBlockingHashMapBenchmark {
42+
private NonBlockingHashMap nonBlockingHashMap;
43+
private ConcurrentHashMap concurrentHashMap;
44+
45+
@Setup(Level.Iteration)
46+
public void setup() {
47+
nonBlockingHashMap = new NonBlockingHashMap(512);
48+
concurrentHashMap = new ConcurrentHashMap(512);
49+
for (int i = 0; i < 256; i++) {
50+
nonBlockingHashMap.put("test" + i, "test");
51+
concurrentHashMap.put("test" + i, "test");
52+
}
53+
}
54+
55+
@Benchmark
56+
@Threads(Threads.MAX)
57+
public void benchNonBlockingHashMap(Blackhole blackhole) {
58+
nonBlockingHashMap.put("test", "test");
59+
blackhole.consume(nonBlockingHashMap.remove("test"));
60+
}
61+
62+
@Benchmark
63+
@Threads(Threads.MAX)
64+
public void benchConcurrentHashMap(Blackhole blackhole) {
65+
concurrentHashMap.put("test", "test");
66+
blackhole.consume(concurrentHashMap.remove("test"));
67+
}
68+
}

dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import java.util.Map;
99
import java.util.Queue;
1010
import java.util.Set;
11+
import java.util.concurrent.ConcurrentMap;
1112
import java.util.concurrent.TimeUnit;
12-
import org.jctools.maps.NonBlockingHashMap;
1313
import org.jctools.queues.MessagePassingQueue;
1414
import org.jctools.queues.MpscCompoundQueue;
1515
import org.slf4j.Logger;
@@ -24,7 +24,7 @@ final class Aggregator implements Runnable {
2424
private final Queue<Batch> batchPool;
2525
private final MpscCompoundQueue<InboxItem> inbox;
2626
private final LRUCache<MetricKey, AggregateMetric> aggregates;
27-
private final NonBlockingHashMap<MetricKey, Batch> pending;
27+
private final ConcurrentMap<MetricKey, Batch> pending;
2828
private final Set<MetricKey> commonKeys;
2929
private final MetricWriter writer;
3030
// the reporting interval controls how much history will be buffered
@@ -40,7 +40,7 @@ final class Aggregator implements Runnable {
4040
MetricWriter writer,
4141
Queue<Batch> batchPool,
4242
MpscCompoundQueue<InboxItem> inbox,
43-
NonBlockingHashMap<MetricKey, Batch> pending,
43+
ConcurrentMap<MetricKey, Batch> pending,
4444
final Set<MetricKey> commonKeys,
4545
int maxAggregates,
4646
long reportingInterval,
@@ -61,7 +61,7 @@ final class Aggregator implements Runnable {
6161
MetricWriter writer,
6262
Queue<Batch> batchPool,
6363
MpscCompoundQueue<InboxItem> inbox,
64-
NonBlockingHashMap<MetricKey, Batch> pending,
64+
ConcurrentMap<MetricKey, Batch> pending,
6565
final Set<MetricKey> commonKeys,
6666
int maxAggregates,
6767
long reportingInterval,

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@
4242
import java.util.Queue;
4343
import java.util.Set;
4444
import java.util.concurrent.CompletableFuture;
45+
import java.util.concurrent.ConcurrentHashMap;
4546
import java.util.concurrent.Future;
4647
import java.util.concurrent.TimeUnit;
4748
import java.util.function.Function;
48-
import org.jctools.maps.NonBlockingHashMap;
4949
import org.jctools.queues.MpscCompoundQueue;
5050
import org.jctools.queues.SpmcArrayQueue;
5151
import org.slf4j.Logger;
@@ -90,8 +90,8 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
9090

9191
private final Set<String> ignoredResources;
9292
private final Queue<Batch> batchPool;
93-
private final NonBlockingHashMap<MetricKey, Batch> pending;
94-
private final NonBlockingHashMap<MetricKey, MetricKey> keys;
93+
private final ConcurrentHashMap<MetricKey, Batch> pending;
94+
private final ConcurrentHashMap<MetricKey, MetricKey> keys;
9595
private final Thread thread;
9696
private final MpscCompoundQueue<InboxItem> inbox;
9797
private final Sink sink;
@@ -178,8 +178,8 @@ public ConflatingMetricsAggregator(
178178
this.ignoredResources = ignoredResources;
179179
this.inbox = new MpscCompoundQueue<>(queueSize);
180180
this.batchPool = new SpmcArrayQueue<>(maxAggregates);
181-
this.pending = new NonBlockingHashMap<>(maxAggregates * 4 / 3);
182-
this.keys = new NonBlockingHashMap<>();
181+
this.pending = new ConcurrentHashMap<>(maxAggregates * 4 / 3);
182+
this.keys = new ConcurrentHashMap<>();
183183
this.features = features;
184184
this.healthMetrics = healthMetric;
185185
this.sink = sink;

0 commit comments

Comments
 (0)