|
7 | 7 | import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; |
8 | 8 |
|
9 | 9 | import datadog.trace.api.Config; |
| 10 | +import datadog.trace.api.ConfigDefaults; |
10 | 11 | import datadog.trace.api.DDTags; |
11 | 12 | import datadog.trace.api.cache.DDCache; |
12 | 13 | import datadog.trace.api.cache.DDCaches; |
|
23 | 24 | import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; |
24 | 25 | import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator; |
25 | 26 | import datadog.trace.core.datastreams.TagsProcessor; |
| 27 | +import datadog.trace.payloadtags.PayloadTagsData; |
26 | 28 | import java.net.URI; |
27 | 29 | import java.time.Instant; |
| 30 | +import java.util.ArrayDeque; |
| 31 | +import java.util.ArrayList; |
| 32 | +import java.util.Collection; |
28 | 33 | import java.util.Collections; |
29 | 34 | import java.util.HashSet; |
30 | 35 | import java.util.LinkedHashMap; |
31 | 36 | import java.util.List; |
| 37 | +import java.util.Map; |
32 | 38 | import java.util.Optional; |
33 | 39 | import java.util.Set; |
34 | 40 | import javax.annotation.Nonnull; |
35 | 41 | import software.amazon.awssdk.awscore.AwsResponse; |
| 42 | +import software.amazon.awssdk.core.SdkBytes; |
36 | 43 | import software.amazon.awssdk.core.SdkField; |
37 | 44 | import software.amazon.awssdk.core.SdkPojo; |
38 | 45 | import software.amazon.awssdk.core.SdkRequest; |
@@ -115,6 +122,12 @@ public AgentSpan onSdkRequest( |
115 | 122 | final String awsOperationName = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME); |
116 | 123 | onOperation(span, awsServiceName, awsOperationName); |
117 | 124 |
|
| 125 | + Config config = Config.get(); |
| 126 | + if (config.isCloudRequestPayloadTaggingEnabled() |
| 127 | + && config.isCloudPayloadTaggingEnabledFor(awsServiceName)) { |
| 128 | + awsPojoToTags(span, ConfigDefaults.DEFAULT_TRACE_CLOUD_PAYLOAD_REQUEST_TAG, request); |
| 129 | + } |
| 130 | + |
118 | 131 | // S3 |
119 | 132 | request.getValueForField("Bucket", String.class).ifPresent(name -> setBucketName(span, name)); |
120 | 133 | if ("s3".equalsIgnoreCase(awsServiceName) && span.traceConfig().isDataStreamsEnabled()) { |
@@ -295,6 +308,14 @@ public AgentSpan onSdkResponse( |
295 | 308 | final SdkResponse response, |
296 | 309 | final SdkHttpResponse httpResponse, |
297 | 310 | final ExecutionAttributes attributes) { |
| 311 | + |
| 312 | + Config config = Config.get(); |
| 313 | + String serviceName = attributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME); |
| 314 | + if (config.isCloudResponsePayloadTaggingEnabled() |
| 315 | + && config.isCloudPayloadTaggingEnabledFor(serviceName)) { |
| 316 | + awsPojoToTags(span, ConfigDefaults.DEFAULT_TRACE_CLOUD_PAYLOAD_RESPONSE_TAG, response); |
| 317 | + } |
| 318 | + |
298 | 319 | if (response instanceof AwsResponse) { |
299 | 320 | span.setTag( |
300 | 321 | InstrumentationTags.AWS_REQUEST_ID, |
@@ -437,4 +458,48 @@ protected String getRequestHeader(SdkHttpRequest request, String headerName) { |
437 | 458 | protected String getResponseHeader(SdkHttpResponse response, String headerName) { |
438 | 459 | return response.firstMatchingHeader(headerName).orElse(null); |
439 | 460 | } |
| 461 | + |
| 462 | + private void awsPojoToTags(AgentSpan span, String tagsPrefix, Object pojo) { |
| 463 | + Collection<PayloadTagsData.PathAndValue> payloadTagsData = new ArrayList<>(); |
| 464 | + ArrayDeque<Object> path = new ArrayDeque<>(); |
| 465 | + collectPayloadTagsData(payloadTagsData, path, pojo); |
| 466 | + span.setTag( |
| 467 | + tagsPrefix, |
| 468 | + new PayloadTagsData(payloadTagsData.toArray(new PayloadTagsData.PathAndValue[0]))); |
| 469 | + } |
| 470 | + |
| 471 | + private void collectPayloadTagsData( |
| 472 | + Collection<PayloadTagsData.PathAndValue> payloadTagsData, |
| 473 | + ArrayDeque<Object> path, |
| 474 | + Object object) { |
| 475 | + if (object instanceof SdkPojo) { |
| 476 | + SdkPojo pojo = (SdkPojo) object; |
| 477 | + for (SdkField<?> field : pojo.sdkFields()) { |
| 478 | + Object val = field.getValueOrDefault(pojo); |
| 479 | + path.push(field.locationName()); |
| 480 | + collectPayloadTagsData(payloadTagsData, path, val); |
| 481 | + path.pop(); |
| 482 | + } |
| 483 | + } else if (object instanceof Collection) { |
| 484 | + int index = 0; |
| 485 | + for (Object value : (Collection<?>) object) { |
| 486 | + path.push(index); |
| 487 | + collectPayloadTagsData(payloadTagsData, path, value); |
| 488 | + path.pop(); |
| 489 | + index++; |
| 490 | + } |
| 491 | + } else if (object instanceof Map) { |
| 492 | + Map<?, ?> map = (Map<?, ?>) object; |
| 493 | + for (Map.Entry<?, ?> entry : map.entrySet()) { |
| 494 | + path.push(entry.getKey().toString()); |
| 495 | + collectPayloadTagsData(payloadTagsData, path, entry.getValue()); |
| 496 | + path.pop(); |
| 497 | + } |
| 498 | + } else if (object instanceof SdkBytes) { |
| 499 | + SdkBytes bytes = (SdkBytes) object; |
| 500 | + payloadTagsData.add(new PayloadTagsData.PathAndValue(path.toArray(), bytes.asInputStream())); |
| 501 | + } else { |
| 502 | + payloadTagsData.add(new PayloadTagsData.PathAndValue(path.toArray(), object)); |
| 503 | + } |
| 504 | + } |
440 | 505 | } |
0 commit comments