Skip to content

Commit 4f895c7

Browse files
committed
DSM optimizations - major refactoring to get rid of LinkedHashMap
1 parent 90a02ce commit 4f895c7

File tree

13 files changed

+448
-339
lines changed

13 files changed

+448
-339
lines changed

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java

Lines changed: 11 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,6 @@
33
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V01_DATASTREAMS_ENDPOINT;
44
import static datadog.trace.api.datastreams.DataStreamsContext.fromTags;
55
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
6-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
7-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
8-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
9-
import static datadog.trace.core.datastreams.TagsProcessor.MANUAL_TAG;
10-
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
11-
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
126
import static datadog.trace.util.AgentThreadFactory.AgentThread.DATA_STREAMS_MONITORING;
137
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
148
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
@@ -19,12 +13,7 @@
1913
import datadog.trace.api.Config;
2014
import datadog.trace.api.TraceConfig;
2115
import datadog.trace.api.WellKnownTags;
22-
import datadog.trace.api.datastreams.Backlog;
23-
import datadog.trace.api.datastreams.DataStreamsContext;
24-
import datadog.trace.api.datastreams.InboxItem;
25-
import datadog.trace.api.datastreams.NoopPathwayContext;
26-
import datadog.trace.api.datastreams.PathwayContext;
27-
import datadog.trace.api.datastreams.StatsPoint;
16+
import datadog.trace.api.datastreams.*;
2817
import datadog.trace.api.experimental.DataStreamsContextCarrier;
2918
import datadog.trace.api.time.TimeSource;
3019
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
@@ -36,11 +25,9 @@
3625
import datadog.trace.core.DDSpan;
3726
import datadog.trace.core.DDTraceCoreInfo;
3827
import datadog.trace.util.AgentTaskScheduler;
39-
import java.util.ArrayList;
4028
import java.util.Collections;
4129
import java.util.HashMap;
4230
import java.util.Iterator;
43-
import java.util.LinkedHashMap;
4431
import java.util.LinkedList;
4532
import java.util.List;
4633
import java.util.Map;
@@ -57,9 +44,9 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
5744
static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5);
5845

5946
private static final StatsPoint REPORT =
60-
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0, null);
47+
new StatsPoint(DataStreamsTags.EMPTY, 0, 0, 0, 0, 0, 0, 0, null);
6148
private static final StatsPoint POISON_PILL =
62-
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0, 0, null);
49+
new StatsPoint(DataStreamsTags.EMPTY, 0, 0, 0, 0, 0, 0, 0, null);
6350

6451
private final Map<Long, Map<String, StatsBucket>> timeToBucket = new HashMap<>();
6552
private final MpscArrayQueue<InboxItem> inbox = new MpscArrayQueue<>(1024);
@@ -223,15 +210,7 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie
223210
}
224211
}
225212

226-
public void trackBacklog(LinkedHashMap<String, String> sortedTags, long value) {
227-
List<String> tags = new ArrayList<>(sortedTags.size());
228-
for (Map.Entry<String, String> entry : sortedTags.entrySet()) {
229-
String tag = TagsProcessor.createTag(entry.getKey(), entry.getValue());
230-
if (tag == null) {
231-
continue;
232-
}
233-
tags.add(tag);
234-
}
213+
public void trackBacklog(DataStreamsTags tags, long value) {
235214
inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceName()));
236215
}
237216

@@ -256,14 +235,9 @@ public void setConsumeCheckpoint(String type, String source, DataStreamsContextC
256235
return;
257236
}
258237
mergePathwayContextIntoSpan(span, carrier);
259-
260-
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
261-
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
262-
sortedTags.put(MANUAL_TAG, "true");
263-
sortedTags.put(TOPIC_TAG, source);
264-
sortedTags.put(TYPE_TAG, type);
265-
266-
setCheckpoint(span, fromTags(sortedTags));
238+
setCheckpoint(
239+
span,
240+
fromTags(DataStreamsTags.Create(type, DataStreamsTags.Direction.Inbound, source, true)));
267241
}
268242

269243
public void setProduceCheckpoint(
@@ -279,15 +253,10 @@ public void setProduceCheckpoint(
279253
return;
280254
}
281255

282-
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
283-
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
284-
if (manualCheckpoint) {
285-
sortedTags.put(MANUAL_TAG, "true");
286-
}
287-
sortedTags.put(TOPIC_TAG, target);
288-
sortedTags.put(TYPE_TAG, type);
289-
290-
DataStreamsContext dsmContext = fromTags(sortedTags);
256+
DataStreamsContext dsmContext =
257+
fromTags(
258+
DataStreamsTags.Create(
259+
type, DataStreamsTags.Direction.Outbound, target, manualCheckpoint));
291260
this.propagator.inject(
292261
span.with(dsmContext), carrier, DataStreamsContextCarrierAdapter.INSTANCE);
293262
}

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java

Lines changed: 25 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,13 @@
1212
import datadog.trace.api.ProcessTags;
1313
import datadog.trace.api.WellKnownTags;
1414
import datadog.trace.api.datastreams.DataStreamsContext;
15+
import datadog.trace.api.datastreams.DataStreamsTags;
1516
import datadog.trace.api.datastreams.PathwayContext;
1617
import datadog.trace.api.datastreams.StatsPoint;
1718
import datadog.trace.api.time.TimeSource;
1819
import datadog.trace.util.FNV64Hash;
1920
import java.io.IOException;
20-
import java.util.ArrayList;
21-
import java.util.Arrays;
2221
import java.util.Base64;
23-
import java.util.HashSet;
24-
import java.util.LinkedHashMap;
25-
import java.util.List;
26-
import java.util.Map;
27-
import java.util.Set;
2822
import java.util.concurrent.TimeUnit;
2923
import java.util.function.BiConsumer;
3024
import java.util.function.Consumer;
@@ -51,23 +45,7 @@ public class DefaultPathwayContext implements PathwayContext {
5145
// state variables used to memoize the pathway hash with
5246
// direction != current direction
5347
private long closestOppositeDirectionHash;
54-
private String previousDirection;
55-
56-
private static final Set<String> hashableTagKeys =
57-
new HashSet<String>(
58-
Arrays.asList(
59-
TagsProcessor.GROUP_TAG,
60-
TagsProcessor.TYPE_TAG,
61-
TagsProcessor.DIRECTION_TAG,
62-
TagsProcessor.TOPIC_TAG,
63-
TagsProcessor.EXCHANGE_TAG));
64-
65-
private static final Set<String> extraAggregationTagKeys =
66-
new HashSet<String>(
67-
Arrays.asList(
68-
TagsProcessor.DATASET_NAME_TAG,
69-
TagsProcessor.DATASET_NAMESPACE_TAG,
70-
TagsProcessor.MANUAL_TAG));
48+
private DataStreamsTags.Direction previousDirection;
7149

7250
public DefaultPathwayContext(
7351
TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) {
@@ -109,13 +87,8 @@ public synchronized void setCheckpoint(
10987
long startNanos = timeSource.getCurrentTimeNanos();
11088
long nanoTicks = timeSource.getNanoTicks();
11189

112-
// So far, each tag key has only one tag value, so we're initializing the capacity to match
113-
// the number of tag keys for now. We should revisit this later if it's no longer the case.
114-
LinkedHashMap<String, String> sortedTags = context.sortedTags();
115-
List<String> allTags = new ArrayList<>(sortedTags.size());
11690
PathwayHashBuilder pathwayHashBuilder =
11791
new PathwayHashBuilder(hashOfKnownTags, serviceNameOverride);
118-
DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder();
11992

12093
if (!started) {
12194
long defaultTimestamp = context.defaultTimestamp();
@@ -135,43 +108,39 @@ public synchronized void setCheckpoint(
135108
log.debug("Started {}", this);
136109
}
137110

138-
for (Map.Entry<String, String> entry : sortedTags.entrySet()) {
139-
String tag = TagsProcessor.createTag(entry.getKey(), entry.getValue());
140-
if (tag == null) {
141-
continue;
142-
}
143-
if (hashableTagKeys.contains(entry.getKey())) {
144-
pathwayHashBuilder.addTag(tag);
145-
}
146-
if (extraAggregationTagKeys.contains(entry.getKey())) {
147-
aggregationHashBuilder.addValue(tag);
148-
}
149-
allTags.add(tag);
111+
// generate node hash
112+
long nodeHash = hashOfKnownTags;
113+
if (serviceNameOverride != null) {
114+
nodeHash = FNV64Hash.continueHash(nodeHash, serviceNameOverride, FNV64Hash.Version.v1);
150115
}
116+
nodeHash =
117+
FNV64Hash.continueHash(
118+
nodeHash, DataStreamsTags.longToBytes(context.tags().getHash()), FNV64Hash.Version.v1);
151119

152-
long nodeHash = generateNodeHash(pathwayHashBuilder);
153120
// loop protection - a node should not be chosen as parent
154121
// for a sequential node with the same direction, as this
155122
// will cause a `cardinality explosion` for hash / parentHash tag values
156-
if (sortedTags.containsKey(TagsProcessor.DIRECTION_TAG)) {
157-
String direction = sortedTags.get(TagsProcessor.DIRECTION_TAG);
158-
if (direction.equals(previousDirection)) {
159-
hash = closestOppositeDirectionHash;
160-
} else {
161-
previousDirection = direction;
162-
closestOppositeDirectionHash = hash;
163-
}
123+
DataStreamsTags.Direction direction = context.tags().getDirection();
124+
if (direction == previousDirection) {
125+
hash = closestOppositeDirectionHash;
126+
} else {
127+
previousDirection = direction;
128+
closestOppositeDirectionHash = hash;
164129
}
165130

166131
long newHash = generatePathwayHash(nodeHash, hash);
167-
long aggregationHash = aggregationHashBuilder.addValue(newHash);
132+
long aggregationHash =
133+
FNV64Hash.continueHash(
134+
context.tags().getAggregationHash(),
135+
DataStreamsTags.longToBytes(newHash),
136+
FNV64Hash.Version.v1);
168137

169138
long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks;
170139
long edgeLatencyNano = nanoTicks - edgeStartNanoTicks;
171140

172141
StatsPoint point =
173142
new StatsPoint(
174-
allTags,
143+
context.tags(),
175144
newHash,
176145
hash,
177146
aggregationHash,
@@ -310,32 +279,6 @@ private static DefaultPathwayContext decode(
310279
serviceNameOverride);
311280
}
312281

313-
static class DataSetHashBuilder {
314-
private long currentHash = 0L;
315-
316-
public long addValue(String val) {
317-
currentHash = FNV64Hash.generateHash(currentHash + val, FNV64Hash.Version.v1);
318-
return currentHash;
319-
}
320-
321-
public long addValue(long val) {
322-
byte[] b =
323-
new byte[] {
324-
(byte) val,
325-
(byte) (val >> 8),
326-
(byte) (val >> 16),
327-
(byte) (val >> 24),
328-
(byte) (val >> 32),
329-
(byte) (val >> 40),
330-
(byte) (val >> 48),
331-
(byte) (val >> 56)
332-
};
333-
334-
currentHash = FNV64Hash.continueHash(currentHash, b, FNV64Hash.Version.v1);
335-
return currentHash;
336-
}
337-
}
338-
339282
private static class PathwayHashBuilder {
340283
private long hash;
341284

@@ -350,6 +293,10 @@ public void addTag(String tag) {
350293
hash = FNV64Hash.continueHash(hash, tag, FNV64Hash.Version.v1);
351294
}
352295

296+
public void addValue(long val) {
297+
hash = FNV64Hash.continueHash(hash, DataStreamsTags.longToBytes(val), FNV64Hash.Version.v1);
298+
}
299+
353300
public long getHash() {
354301
return hash;
355302
}

dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import datadog.trace.api.Config;
1010
import datadog.trace.api.ProcessTags;
1111
import datadog.trace.api.WellKnownTags;
12+
import datadog.trace.api.datastreams.DataStreamsTags;
1213
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
1314
import datadog.trace.common.metrics.Sink;
1415
import java.util.Collection;
@@ -160,9 +161,7 @@ private void writeBucket(StatsBucket bucket, Writable packer) {
160161
Collection<StatsGroup> groups = bucket.getGroups();
161162
packer.startArray(groups.size());
162163
for (StatsGroup group : groups) {
163-
boolean firstNode = group.getEdgeTags().isEmpty();
164-
165-
packer.startMap(firstNode ? 5 : 6);
164+
packer.startMap(6);
166165

167166
/* 1 */
168167
packer.writeUTF8(PATHWAY_LATENCY);
@@ -184,29 +183,42 @@ private void writeBucket(StatsBucket bucket, Writable packer) {
184183
packer.writeUTF8(PARENT_HASH);
185184
packer.writeUnsignedLong(group.getParentHash());
186185

187-
if (!firstNode) {
188-
/* 6 */
189-
packer.writeUTF8(EDGE_TAGS);
190-
packer.startArray(group.getEdgeTags().size());
191-
for (String tag : group.getEdgeTags()) {
192-
packer.writeString(tag, null);
193-
}
194-
}
186+
/* 6 */
187+
packer.writeUTF8(EDGE_TAGS);
188+
writeDataStreamsTags(group.getTags(), packer);
195189
}
196190
}
197191

198-
private void writeBacklogs(Collection<Map.Entry<List<String>, Long>> backlogs, Writable packer) {
192+
private void writeBacklogs(
193+
Collection<Map.Entry<DataStreamsTags, Long>> backlogs, Writable packer) {
199194
packer.writeUTF8(BACKLOGS);
200195
packer.startArray(backlogs.size());
201-
for (Map.Entry<List<String>, Long> entry : backlogs) {
196+
for (Map.Entry<DataStreamsTags, Long> entry : backlogs) {
202197
packer.startMap(2);
198+
203199
packer.writeUTF8(BACKLOG_TAGS);
204-
packer.startArray(entry.getKey().size());
205-
for (String tag : entry.getKey()) {
206-
packer.writeString(tag, null);
207-
}
200+
writeDataStreamsTags(entry.getKey(), packer);
201+
208202
packer.writeUTF8(BACKLOG_VALUE);
209203
packer.writeLong(entry.getValue());
210204
}
211205
}
206+
207+
private void writeStringIfNotEmpty(String name, String value, Writable packer) {
208+
if (value == null || value.isEmpty()) {
209+
return;
210+
}
211+
212+
packer.writeString(name + ":" + value, null);
213+
}
214+
215+
private void writeDataStreamsTags(DataStreamsTags tags, Writable packer) {
216+
packer.startArray(tags.getSize());
217+
218+
tags.forEachTag(
219+
(name, value) -> {
220+
packer.writeString(name + ":" + value, null);
221+
},
222+
DataStreamsTags.TagTraverseMode.All);
223+
}
212224
}

dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
package datadog.trace.core.datastreams;
22

33
import datadog.trace.api.datastreams.Backlog;
4+
import datadog.trace.api.datastreams.DataStreamsTags;
45
import datadog.trace.api.datastreams.StatsPoint;
56
import java.util.Collection;
67
import java.util.HashMap;
7-
import java.util.List;
88
import java.util.Map;
99

1010
public class StatsBucket {
1111
private final long startTimeNanos;
1212
private final long bucketDurationNanos;
1313
private final Map<Long, StatsGroup> hashToGroup = new HashMap<>();
14-
private final Map<List<String>, Long> backlogs = new HashMap<>();
14+
private final Map<DataStreamsTags, Long> backlogs = new HashMap<>();
1515

1616
public StatsBucket(long startTimeNanos, long bucketDurationNanos) {
1717
this.startTimeNanos = startTimeNanos;
@@ -27,7 +27,7 @@ public void addPoint(StatsPoint statsPoint) {
2727
statsPoint.getAggregationHash(),
2828
hash ->
2929
new StatsGroup(
30-
statsPoint.getEdgeTags(), statsPoint.getHash(), statsPoint.getParentHash()))
30+
statsPoint.getTags(), statsPoint.getHash(), statsPoint.getParentHash()))
3131
.add(
3232
statsPoint.getPathwayLatencyNano(),
3333
statsPoint.getEdgeLatencyNano(),
@@ -36,7 +36,7 @@ public void addPoint(StatsPoint statsPoint) {
3636

3737
public void addBacklog(Backlog backlog) {
3838
backlogs.compute(
39-
backlog.getSortedTags(),
39+
backlog.getTags(),
4040
(k, v) -> (v == null) ? backlog.getValue() : Math.max(v, backlog.getValue()));
4141
}
4242

@@ -52,7 +52,7 @@ public Collection<StatsGroup> getGroups() {
5252
return hashToGroup.values();
5353
}
5454

55-
public Collection<Map.Entry<List<String>, Long>> getBacklogs() {
55+
public Collection<Map.Entry<DataStreamsTags, Long>> getBacklogs() {
5656
return backlogs.entrySet();
5757
}
5858
}

0 commit comments

Comments
 (0)