Skip to content

Commit f1139d7

Browse files
authored
Improves CSS peer tag aggregation (#9336)
* Add jmh for metrics aggregation * Cache peer tags to avoid too many strings/utf8 conversions * Use span kind cache * Fix tests
1 parent b53a739 commit f1139d7

File tree

8 files changed

+169
-49
lines changed

8 files changed

+169
-49
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package datadog.trace.common.metrics;
2+
3+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
4+
import static java.util.concurrent.TimeUnit.SECONDS;
5+
6+
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
7+
import datadog.communication.monitor.Monitoring;
8+
import datadog.trace.api.WellKnownTags;
9+
import datadog.trace.core.CoreSpan;
10+
import datadog.trace.util.Strings;
11+
import java.nio.ByteBuffer;
12+
import java.util.ArrayList;
13+
import java.util.Collections;
14+
import java.util.List;
15+
import java.util.Set;
16+
import org.openjdk.jmh.annotations.Benchmark;
17+
import org.openjdk.jmh.annotations.BenchmarkMode;
18+
import org.openjdk.jmh.annotations.Fork;
19+
import org.openjdk.jmh.annotations.Measurement;
20+
import org.openjdk.jmh.annotations.Mode;
21+
import org.openjdk.jmh.annotations.OutputTimeUnit;
22+
import org.openjdk.jmh.annotations.Scope;
23+
import org.openjdk.jmh.annotations.State;
24+
import org.openjdk.jmh.annotations.Warmup;
25+
import org.openjdk.jmh.infra.Blackhole;
26+
27+
@State(Scope.Benchmark)
28+
@Warmup(iterations = 1, time = 30, timeUnit = SECONDS)
29+
@Measurement(iterations = 3, time = 30, timeUnit = SECONDS)
30+
@BenchmarkMode(Mode.AverageTime)
31+
@OutputTimeUnit(MICROSECONDS)
32+
@Fork(value = 1)
33+
public class ConflatingMetricsAggregatorBenchmark {
34+
private final DDAgentFeaturesDiscovery featuresDiscovery =
35+
new FixedAgentFeaturesDiscovery(
36+
Collections.singleton("peer.hostname"), Collections.emptySet());
37+
private final ConflatingMetricsAggregator aggregator =
38+
new ConflatingMetricsAggregator(
39+
new WellKnownTags("", "", "", "", "", ""),
40+
Collections.emptySet(),
41+
featuresDiscovery,
42+
new NullSink(),
43+
2048,
44+
2048);
45+
private final List<CoreSpan<?>> spans = generateTrace(64);
46+
47+
static List<CoreSpan<?>> generateTrace(int len) {
48+
final List<CoreSpan<?>> trace = new ArrayList<>();
49+
for (int i = 0; i < len; i++) {
50+
SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1);
51+
span.setTag("peer.hostname", Strings.random(10));
52+
trace.add(span);
53+
}
54+
return trace;
55+
}
56+
57+
static class NullSink implements Sink {
58+
59+
@Override
60+
public void register(EventListener listener) {}
61+
62+
@Override
63+
public void accept(int messageCount, ByteBuffer buffer) {}
64+
}
65+
66+
static class FixedAgentFeaturesDiscovery extends DDAgentFeaturesDiscovery {
67+
private final Set<String> peerTags;
68+
private final Set<String> spanKinds;
69+
70+
public FixedAgentFeaturesDiscovery(Set<String> peerTags, Set<String> spanKinds) {
71+
// create a fixed discovery with metrics enabled
72+
super(null, Monitoring.DISABLED, null, false, true);
73+
this.peerTags = peerTags;
74+
this.spanKinds = spanKinds;
75+
}
76+
77+
@Override
78+
public void discover() {
79+
// do nothing
80+
}
81+
82+
@Override
83+
public boolean supportsMetrics() {
84+
return true;
85+
}
86+
87+
@Override
88+
public Set<String> peerTags() {
89+
return peerTags;
90+
}
91+
92+
@Override
93+
public Set<String> spanKindsToComputedStats() {
94+
return spanKinds;
95+
}
96+
}
97+
98+
@Benchmark
99+
public void benchmark(Blackhole blackhole) {
100+
blackhole.consume(aggregator.publish(spans));
101+
}
102+
}

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
1616
import datadog.communication.ddagent.SharedCommunicationObjects;
1717
import datadog.trace.api.Config;
18+
import datadog.trace.api.Pair;
1819
import datadog.trace.api.WellKnownTags;
1920
import datadog.trace.api.cache.DDCache;
2021
import datadog.trace.api.cache.DDCaches;
@@ -24,15 +25,16 @@
2425
import datadog.trace.core.CoreSpan;
2526
import datadog.trace.core.DDTraceCoreInfo;
2627
import datadog.trace.util.AgentTaskScheduler;
28+
import java.util.ArrayList;
2729
import java.util.Collections;
28-
import java.util.HashMap;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Queue;
3233
import java.util.Set;
3334
import java.util.concurrent.CompletableFuture;
3435
import java.util.concurrent.Future;
3536
import java.util.concurrent.TimeUnit;
37+
import java.util.function.Function;
3638
import org.jctools.maps.NonBlockingHashMap;
3739
import org.jctools.queues.MpscCompoundQueue;
3840
import org.jctools.queues.SpmcArrayQueue;
@@ -49,6 +51,21 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
4951
private static final DDCache<String, UTF8BytesString> SERVICE_NAMES =
5052
DDCaches.newFixedSizeCache(32);
5153

54+
private static final DDCache<CharSequence, UTF8BytesString> SPAN_KINDS =
55+
DDCaches.newFixedSizeCache(16);
56+
private static final DDCache<
57+
String, Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>>
58+
PEER_TAGS_CACHE =
59+
DDCaches.newFixedSizeCache(
60+
64); // it can be unbounded since those values are returned by the agent and should be
61+
// under control. 64 entries is enough in this case to contain all the peer tags.
62+
private static final Function<
63+
String, Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>>
64+
PEER_TAGS_CACHE_ADDER =
65+
key ->
66+
Pair.of(
67+
DDCaches.newFixedSizeCache(512),
68+
value -> UTF8BytesString.create(key + ":" + value));
5269
private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";
5370

5471
private final Set<String> ignoredResources;
@@ -252,7 +269,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
252269
span.getHttpStatusCode(),
253270
isSynthetic(span),
254271
span.isTopLevel(),
255-
span.getTag(SPAN_KIND, ""),
272+
SPAN_KINDS.computeIfAbsent(span.getTag(SPAN_KIND, ""), UTF8BytesString::create),
256273
getPeerTags(span));
257274
boolean isNewKey = false;
258275
MetricKey key = keys.putIfAbsent(newKey, newKey);
@@ -288,12 +305,14 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
288305
return isNewKey || span.getError() > 0;
289306
}
290307

291-
private Map<String, String> getPeerTags(CoreSpan<?> span) {
292-
Map<String, String> peerTags = new HashMap<>();
308+
private List<UTF8BytesString> getPeerTags(CoreSpan<?> span) {
309+
List<UTF8BytesString> peerTags = new ArrayList<>();
293310
for (String peerTag : features.peerTags()) {
294311
Object value = span.getTag(peerTag);
295312
if (value != null) {
296-
peerTags.put(peerTag, value.toString());
313+
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>> pair =
314+
PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
315+
peerTags.add(pair.getLeft().computeIfAbsent(value.toString(), pair.getRight()));
297316
}
298317
}
299318
return peerTags;

dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
66
import java.util.Collections;
7-
import java.util.Map;
7+
import java.util.List;
88

99
/** The aggregation key for tracked metrics. */
1010
public final class MetricKey {
@@ -17,7 +17,7 @@ public final class MetricKey {
1717
private final int hash;
1818
private final boolean isTraceRoot;
1919
private final UTF8BytesString spanKind;
20-
private final Map<String, String> peerTags;
20+
private final List<UTF8BytesString> peerTags;
2121

2222
public MetricKey(
2323
CharSequence resource,
@@ -28,7 +28,7 @@ public MetricKey(
2828
boolean synthetics,
2929
boolean isTraceRoot,
3030
CharSequence spanKind,
31-
Map<String, String> peerTags) {
31+
List<UTF8BytesString> peerTags) {
3232
this.resource = null == resource ? EMPTY : UTF8BytesString.create(resource);
3333
this.service = null == service ? EMPTY : UTF8BytesString.create(service);
3434
this.operationName = null == operationName ? EMPTY : UTF8BytesString.create(operationName);
@@ -37,7 +37,7 @@ public MetricKey(
3737
this.synthetics = synthetics;
3838
this.isTraceRoot = isTraceRoot;
3939
this.spanKind = null == spanKind ? EMPTY : UTF8BytesString.create(spanKind);
40-
this.peerTags = peerTags == null ? Collections.emptyMap() : peerTags;
40+
this.peerTags = peerTags == null ? Collections.emptyList() : peerTags;
4141

4242
// Unrolled polynomial hashcode to avoid varargs allocation
4343
// and eliminate data dependency between iterations as in Arrays.hashCode.
@@ -90,7 +90,7 @@ public UTF8BytesString getSpanKind() {
9090
return spanKind;
9191
}
9292

93-
public Map<String, String> getPeerTags() {
93+
public List<UTF8BytesString> getPeerTags() {
9494
return peerTags;
9595
}
9696

dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
package datadog.trace.common.metrics;
22

33
import static java.nio.charset.StandardCharsets.ISO_8859_1;
4-
import static java.nio.charset.StandardCharsets.UTF_8;
54

65
import datadog.communication.serialization.GrowableBuffer;
76
import datadog.communication.serialization.WritableFormatter;
87
import datadog.communication.serialization.msgpack.MsgPackWriter;
98
import datadog.trace.api.ProcessTags;
109
import datadog.trace.api.WellKnownTags;
1110
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
12-
import datadog.trace.util.TraceUtils;
13-
import java.util.Map;
11+
import java.util.List;
1412

1513
public final class SerializingMetricWriter implements MetricWriter {
1614

@@ -126,19 +124,11 @@ public void add(MetricKey key, AggregateMetric aggregate) {
126124
writer.writeUTF8(key.getSpanKind());
127125

128126
writer.writeUTF8(PEER_TAGS);
129-
Map<String, String> peerTags = key.getPeerTags();
127+
final List<UTF8BytesString> peerTags = key.getPeerTags();
130128
writer.startArray(peerTags.size());
131129

132-
StringBuilder peerTagBuilder = new StringBuilder();
133-
for (Map.Entry<String, String> peerTag : peerTags.entrySet()) {
134-
peerTagBuilder.setLength(0);
135-
String toWrite =
136-
peerTagBuilder
137-
.append(peerTag.getKey())
138-
.append(':')
139-
.append(TraceUtils.normalizeTag(peerTag.getValue()))
140-
.toString();
141-
writer.writeUTF8(toWrite.getBytes(UTF_8));
130+
for (UTF8BytesString peerTag : peerTags) {
131+
writer.writeUTF8(peerTag);
142132
}
143133

144134
writer.writeUTF8(HITS);

dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.common.metrics
22

3+
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString
34
import datadog.trace.test.util.DDSpecification
45

56
import java.util.concurrent.BlockingDeque
@@ -51,7 +52,7 @@ class AggregateMetricTest extends DDSpecification {
5152
given:
5253
AggregateMetric aggregate = new AggregateMetric().recordDurations(3, new AtomicLongArray(0L, 0L, 0L | ERROR_TAG | TOP_LEVEL_TAG))
5354

54-
Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault":"quux"]))
55+
Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")]))
5556
batch.add(0L, 10)
5657
batch.add(0L, 10)
5758
batch.add(0L, 10)
@@ -126,7 +127,7 @@ class AggregateMetricTest extends DDSpecification {
126127
def "consistent under concurrent attempts to read and write"() {
127128
given:
128129
AggregateMetric aggregate = new AggregateMetric()
129-
MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", ["grault":"quux"])
130+
MetricKey key = new MetricKey("foo", "bar", "qux", "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")])
130131
BlockingDeque<Batch> queue = new LinkedBlockingDeque<>(1000)
131132
ExecutorService reader = Executors.newSingleThreadExecutor()
132133
int writerCount = 10

0 commit comments

Comments
 (0)