Skip to content

Commit 1a1e77e

Browse files
committed
Refactored DSM tags for all integrations
1 parent 4f895c7 commit 1a1e77e

File tree

34 files changed

+520
-537
lines changed

34 files changed

+520
-537
lines changed

dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/main/java/datadog/trace/instrumentation/armeria/grpc/client/GrpcClientDecorator.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
import static datadog.context.propagation.Propagators.defaultPropagator;
44
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
55
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
6-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
7-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
8-
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
96

107
import datadog.context.Context;
118
import datadog.context.propagation.CarrierSetter;
@@ -14,6 +11,8 @@
1411
import datadog.trace.api.cache.DDCache;
1512
import datadog.trace.api.cache.DDCaches;
1613
import datadog.trace.api.datastreams.DataStreamsContext;
14+
import datadog.trace.api.datastreams.DataStreamsTags;
15+
import datadog.trace.api.datastreams.DataStreamsTagsBuilder;
1716
import datadog.trace.api.naming.SpanNaming;
1817
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1918
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
@@ -23,7 +22,6 @@
2322
import io.grpc.MethodDescriptor;
2423
import io.grpc.Status;
2524
import java.util.BitSet;
26-
import java.util.LinkedHashMap;
2725
import java.util.Set;
2826
import java.util.function.Function;
2927

@@ -35,10 +33,11 @@ public class GrpcClientDecorator extends ClientDecorator {
3533
public static final CharSequence GRPC_MESSAGE = UTF8BytesString.create("grpc.message");
3634

3735
private static DataStreamsContext createDsmContext() {
38-
LinkedHashMap<String, String> result = new LinkedHashMap<>();
39-
result.put(DIRECTION_TAG, DIRECTION_OUT);
40-
result.put(TYPE_TAG, "grpc");
41-
return DataStreamsContext.fromTags(result);
36+
return DataStreamsContext.fromTags(
37+
new DataStreamsTagsBuilder()
38+
.withDirection(DataStreamsTags.Direction.Outbound)
39+
.withType("grpc")
40+
.build());
4241
}
4342

4443
public static final GrpcClientDecorator DECORATE = new GrpcClientDecorator();

dd-java-agent/instrumentation/armeria/armeria-grpc-0.84/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/GrpcServerDecorator.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package datadog.trace.instrumentation.armeria.grpc.server;
22

3-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
4-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
5-
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
6-
73
import datadog.trace.api.Config;
84
import datadog.trace.api.cache.DDCache;
95
import datadog.trace.api.cache.DDCaches;
6+
import datadog.trace.api.datastreams.DataStreamsTags;
7+
import datadog.trace.api.datastreams.DataStreamsTagsBuilder;
108
import datadog.trace.api.naming.SpanNaming;
119
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1210
import datadog.trace.bootstrap.instrumentation.api.ErrorPriorities;
@@ -18,7 +16,6 @@
1816
import io.grpc.StatusException;
1917
import io.grpc.StatusRuntimeException;
2018
import java.util.BitSet;
21-
import java.util.LinkedHashMap;
2219
import java.util.function.Function;
2320

2421
public class GrpcServerDecorator extends ServerDecorator {
@@ -33,15 +30,14 @@ public class GrpcServerDecorator extends ServerDecorator {
3330
public static final CharSequence COMPONENT_NAME = UTF8BytesString.create("armeria-grpc-server");
3431
public static final CharSequence GRPC_MESSAGE = UTF8BytesString.create("grpc.message");
3532

36-
private static final LinkedHashMap<String, String> createServerPathwaySortedTags() {
37-
LinkedHashMap<String, String> result = new LinkedHashMap<>();
38-
result.put(DIRECTION_TAG, DIRECTION_IN);
39-
result.put(TYPE_TAG, "grpc");
40-
return result;
33+
private static final DataStreamsTags createServerPathwaySortedTags() {
34+
return new DataStreamsTagsBuilder()
35+
.withDirection(DataStreamsTags.Direction.Inbound)
36+
.withGroup("grpc")
37+
.build();
4138
}
4239

43-
public static final LinkedHashMap<String, String> SERVER_PATHWAY_EDGE_TAGS =
44-
createServerPathwaySortedTags();
40+
public static final DataStreamsTags SERVER_PATHWAY_EDGE_TAGS = createServerPathwaySortedTags();
4541
public static final GrpcServerDecorator DECORATE = new GrpcServerDecorator();
4642

4743
private static final Function<String, String> NORMALIZE =

dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeInterceptor.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,15 @@
22

33
import static datadog.context.propagation.Propagators.defaultPropagator;
44
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
5-
import static datadog.trace.core.datastreams.TagsProcessor.BUS_TAG;
6-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
7-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
8-
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
95
import static datadog.trace.instrumentation.aws.v2.eventbridge.TextMapInjectAdapter.SETTER;
106

117
import datadog.trace.api.datastreams.DataStreamsContext;
8+
import datadog.trace.api.datastreams.DataStreamsTags;
9+
import datadog.trace.api.datastreams.DataStreamsTagsBuilder;
1210
import datadog.trace.api.datastreams.PathwayContext;
1311
import datadog.trace.bootstrap.InstanceStore;
1412
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1513
import java.util.ArrayList;
16-
import java.util.LinkedHashMap;
1714
import java.util.List;
1815
import org.slf4j.Logger;
1916
import org.slf4j.LoggerFactory;
@@ -89,7 +86,13 @@ private String getTraceContextToInject(
8986
// Inject context
9087
datadog.context.Context context = span;
9188
if (traceConfig().isDataStreamsEnabled()) {
92-
DataStreamsContext dsmContext = DataStreamsContext.fromTags(getTags(eventBusName));
89+
DataStreamsTags tags =
90+
new DataStreamsTagsBuilder()
91+
.withDirection(DataStreamsTags.Direction.Outbound)
92+
.withType("bus")
93+
.withBus(eventBusName)
94+
.build();
95+
DataStreamsContext dsmContext = DataStreamsContext.fromTags(tags);
9396
context = context.with(dsmContext);
9497
}
9598
defaultPropagator().inject(context, jsonBuilder, SETTER);
@@ -111,13 +114,4 @@ private String getTraceContextToInject(
111114
jsonBuilder.append('}');
112115
return jsonBuilder.toString();
113116
}
114-
115-
private LinkedHashMap<String, String> getTags(String eventBusName) {
116-
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
117-
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
118-
sortedTags.put(BUS_TAG, eventBusName);
119-
sortedTags.put(TYPE_TAG, "bus");
120-
121-
return sortedTags;
122-
}
123117
}

dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static datadog.trace.api.datastreams.DataStreamsContext.create;
44
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
55
import static datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities.RPC_COMMAND_NAME;
6-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
76

87
import com.amazonaws.AmazonWebServiceRequest;
98
import com.amazonaws.AmazonWebServiceResponse;
@@ -15,6 +14,8 @@
1514
import datadog.trace.api.DDTags;
1615
import datadog.trace.api.cache.DDCache;
1716
import datadog.trace.api.cache.DDCaches;
17+
import datadog.trace.api.datastreams.DataStreamsTags;
18+
import datadog.trace.api.datastreams.DataStreamsTagsBuilder;
1819
import datadog.trace.api.naming.SpanNaming;
1920
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2021
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
@@ -23,9 +24,7 @@
2324
import datadog.trace.bootstrap.instrumentation.api.Tags;
2425
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
2526
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
26-
import datadog.trace.core.datastreams.TagsProcessor;
2727
import java.net.URI;
28-
import java.util.LinkedHashMap;
2928
import java.util.List;
3029
import java.util.Locale;
3130
import java.util.regex.Matcher;
@@ -255,17 +254,17 @@ && traceConfig().isDataStreamsEnabled()) {
255254
if (HttpMethodName.GET.name().equals(span.getTag(Tags.HTTP_METHOD))
256255
&& ("GetObjectMetadataRequest".equalsIgnoreCase(awsOperation)
257256
|| "GetObjectRequest".equalsIgnoreCase(awsOperation))) {
258-
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
259-
260-
sortedTags.put(TagsProcessor.DIRECTION_TAG, TagsProcessor.DIRECTION_IN);
261-
sortedTags.put(TagsProcessor.DATASET_NAME_TAG, key);
262-
sortedTags.put(TagsProcessor.DATASET_NAMESPACE_TAG, bucket);
263-
sortedTags.put(TagsProcessor.TOPIC_TAG, bucket);
264-
sortedTags.put(TagsProcessor.TYPE_TAG, "s3");
265-
257+
DataStreamsTags tags =
258+
new DataStreamsTagsBuilder()
259+
.withType("s3")
260+
.withDirection(DataStreamsTags.Direction.Inbound)
261+
.withTopic(bucket)
262+
.withDatasetNamespace(bucket)
263+
.withDatasetName(key)
264+
.build();
266265
AgentTracer.get()
267266
.getDataStreamsMonitoring()
268-
.setCheckpoint(span, create(sortedTags, 0, responseSize));
267+
.setCheckpoint(span, create(tags, 0, responseSize));
269268
}
270269

271270
if ("PutObjectRequest".equalsIgnoreCase(awsOperation)
@@ -276,17 +275,18 @@ && traceConfig().isDataStreamsEnabled()) {
276275
payloadSize = (long) requestSize;
277276
}
278277

279-
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
280-
281-
sortedTags.put(TagsProcessor.DIRECTION_TAG, DIRECTION_OUT);
282-
sortedTags.put(TagsProcessor.DATASET_NAME_TAG, key);
283-
sortedTags.put(TagsProcessor.DATASET_NAMESPACE_TAG, bucket);
284-
sortedTags.put(TagsProcessor.TOPIC_TAG, bucket);
285-
sortedTags.put(TagsProcessor.TYPE_TAG, "s3");
278+
DataStreamsTags tags =
279+
new DataStreamsTagsBuilder()
280+
.withType("s3")
281+
.withDirection(DataStreamsTags.Direction.Outbound)
282+
.withTopic(bucket)
283+
.withDatasetNamespace(bucket)
284+
.withDatasetName(key)
285+
.build();
286286

287287
AgentTracer.get()
288288
.getDataStreamsMonitoring()
289-
.setCheckpoint(span, create(sortedTags, 0, payloadSize));
289+
.setCheckpoint(span, create(tags, 0, payloadSize));
290290
}
291291
}
292292
}

dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@
66
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.blackholeSpan;
77
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
88
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
9-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
10-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
11-
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
12-
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
139
import static datadog.trace.instrumentation.aws.v0.AwsSdkClientDecorator.AWS_LEGACY_TRACING;
1410
import static datadog.trace.instrumentation.aws.v0.AwsSdkClientDecorator.DECORATE;
1511

@@ -20,14 +16,11 @@
2016
import com.amazonaws.handlers.RequestHandler2;
2117
import datadog.context.propagation.Propagators;
2218
import datadog.trace.api.Config;
23-
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
24-
import datadog.trace.api.datastreams.DataStreamsContext;
25-
import datadog.trace.api.datastreams.PathwayContext;
19+
import datadog.trace.api.datastreams.*;
2620
import datadog.trace.bootstrap.ContextStore;
2721
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2822
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
2923
import java.util.Date;
30-
import java.util.LinkedHashMap;
3124
import java.util.List;
3225
import org.slf4j.Logger;
3326
import org.slf4j.LoggerFactory;
@@ -116,16 +109,19 @@ && traceConfig().isDataStreamsEnabled()
116109
List<?> records =
117110
GetterAccess.of(response.getAwsResponse()).getRecords(response.getAwsResponse());
118111
if (null != records) {
119-
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
120-
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
121-
sortedTags.put(TOPIC_TAG, streamArn);
122-
sortedTags.put(TYPE_TAG, "kinesis");
112+
DataStreamsTags tags =
113+
new DataStreamsTagsBuilder()
114+
.withType("kinesis")
115+
.withDirection(DataStreamsTags.Direction.Inbound)
116+
.withTopic(streamArn)
117+
.build();
118+
123119
for (Object record : records) {
124120
Date arrivalTime = GetterAccess.of(record).getApproximateArrivalTimestamp(record);
125121
AgentDataStreamsMonitoring dataStreamsMonitoring =
126122
AgentTracer.get().getDataStreamsMonitoring();
127123
PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext();
128-
DataStreamsContext context = create(sortedTags, arrivalTime.getTime(), 0);
124+
DataStreamsContext context = create(tags, arrivalTime.getTime(), 0);
129125
pathwayContext.setCheckpoint(context, dataStreamsMonitoring::add);
130126
if (!span.context().getPathwayContext().isStarted()) {
131127
span.context().mergePathwayContext(pathwayContext);

dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,6 @@
22

33
import static datadog.trace.api.datastreams.DataStreamsContext.create;
44
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
5-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
6-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
7-
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
8-
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
9-
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
105

116
import datadog.context.propagation.CarrierSetter;
127
import datadog.trace.api.Config;
@@ -15,6 +10,8 @@
1510
import datadog.trace.api.cache.DDCache;
1611
import datadog.trace.api.cache.DDCaches;
1712
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
13+
import datadog.trace.api.datastreams.DataStreamsTags;
14+
import datadog.trace.api.datastreams.DataStreamsTagsBuilder;
1815
import datadog.trace.api.datastreams.PathwayContext;
1916
import datadog.trace.api.naming.SpanNaming;
2017
import datadog.trace.bootstrap.InstanceStore;
@@ -25,15 +22,13 @@
2522
import datadog.trace.bootstrap.instrumentation.api.Tags;
2623
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
2724
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
28-
import datadog.trace.core.datastreams.TagsProcessor;
2925
import datadog.trace.payloadtags.PayloadTagsData;
3026
import java.net.URI;
3127
import java.time.Instant;
3228
import java.util.ArrayList;
3329
import java.util.Collection;
3430
import java.util.Collections;
3531
import java.util.HashSet;
36-
import java.util.LinkedHashMap;
3732
import java.util.List;
3833
import java.util.Map;
3934
import java.util.Optional;
@@ -338,10 +333,13 @@ public AgentSpan onSdkResponse(
338333
//noinspection unchecked
339334
List<SdkPojo> records = (List<SdkPojo>) recordsRaw;
340335
if (!records.isEmpty()) {
341-
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
342-
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
343-
sortedTags.put(TOPIC_TAG, streamArn);
344-
sortedTags.put(TYPE_TAG, "kinesis");
336+
DataStreamsTags tags =
337+
new DataStreamsTagsBuilder()
338+
.withType("kinesis")
339+
.withDirection(DataStreamsTags.Direction.Inbound)
340+
.withTopic(streamArn)
341+
.build();
342+
345343
if (null == kinesisApproximateArrivalTimestampField) {
346344
Optional<SdkField<?>> maybeField =
347345
records.get(0).sdkFields().stream()
@@ -363,7 +361,7 @@ public AgentSpan onSdkResponse(
363361
AgentTracer.get().getDataStreamsMonitoring();
364362
PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext();
365363
pathwayContext.setCheckpoint(
366-
create(sortedTags, arrivalTime.toEpochMilli(), 0),
364+
create(tags, arrivalTime.toEpochMilli(), 0),
367365
dataStreamsMonitoring::add);
368366
if (!span.context().getPathwayContext().isStarted()) {
369367
span.context().mergePathwayContext(pathwayContext);
@@ -384,17 +382,18 @@ public AgentSpan onSdkResponse(
384382

385383
if (key != null && bucket != null && awsOperation != null) {
386384
if ("GetObject".equalsIgnoreCase(awsOperation)) {
387-
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
388-
389-
sortedTags.put(TagsProcessor.DIRECTION_TAG, TagsProcessor.DIRECTION_IN);
390-
sortedTags.put(TagsProcessor.DATASET_NAME_TAG, key);
391-
sortedTags.put(TagsProcessor.DATASET_NAMESPACE_TAG, bucket);
392-
sortedTags.put(TagsProcessor.TOPIC_TAG, bucket);
393-
sortedTags.put(TagsProcessor.TYPE_TAG, "s3");
385+
DataStreamsTags tags =
386+
new DataStreamsTagsBuilder()
387+
.withDirection(DataStreamsTags.Direction.Inbound)
388+
.withType("s3")
389+
.withDatasetName(key)
390+
.withDatasetNamespace(bucket)
391+
.withTopic(bucket)
392+
.build();
394393

395394
AgentTracer.get()
396395
.getDataStreamsMonitoring()
397-
.setCheckpoint(span, create(sortedTags, 0, responseSize));
396+
.setCheckpoint(span, create(tags, 0, responseSize));
398397
}
399398

400399
if ("PutObject".equalsIgnoreCase(awsOperation)) {
@@ -404,17 +403,18 @@ public AgentSpan onSdkResponse(
404403
payloadSize = (long) requestSize;
405404
}
406405

407-
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
408-
409-
sortedTags.put(TagsProcessor.DIRECTION_TAG, DIRECTION_OUT);
410-
sortedTags.put(TagsProcessor.DATASET_NAME_TAG, key);
411-
sortedTags.put(TagsProcessor.DATASET_NAMESPACE_TAG, bucket);
412-
sortedTags.put(TagsProcessor.TOPIC_TAG, bucket);
413-
sortedTags.put(TagsProcessor.TYPE_TAG, "s3");
406+
DataStreamsTags tags =
407+
new DataStreamsTagsBuilder()
408+
.withType("s3")
409+
.withDirection(DataStreamsTags.Direction.Outbound)
410+
.withDatasetName(key)
411+
.withDatasetNamespace(bucket)
412+
.withTopic(bucket)
413+
.build();
414414

415415
AgentTracer.get()
416416
.getDataStreamsMonitoring()
417-
.setCheckpoint(span, create(sortedTags, 0, payloadSize));
417+
.setCheckpoint(span, create(tags, 0, payloadSize));
418418
}
419419
}
420420
}

0 commit comments

Comments
 (0)