Skip to content

Commit a1f6917

Browse files
Oberon00Mateusz Rzeszutek
andauthored
aws-sdk-2.2.: Support injection into SQS.SendMessageBatch message attributes (open-telemetry#8798)
Co-authored-by: Mateusz Rzeszutek <[email protected]>
1 parent eaf11ab commit a1f6917

File tree

9 files changed

+319
-84
lines changed

9 files changed

+319
-84
lines changed

instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ testing {
4242
tasks {
4343
withType<Test> {
4444
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
45+
46+
// NB: If you'd like to change these, there is some cleanup work to be done, as most tests ignore this and
47+
// set the value directly (the "library" does not normally query it, only library-autoconfigure)
4548
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true)
4649
systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true)
4750
}

instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java

Lines changed: 19 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,10 @@
77

88
import io.opentelemetry.context.propagation.TextMapPropagator;
99
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
10+
import javax.annotation.Nullable;
1011
import software.amazon.awssdk.core.SdkRequest;
11-
import software.amazon.awssdk.core.SdkResponse;
1212
import software.amazon.awssdk.core.interceptor.Context;
1313
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
14-
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
15-
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
16-
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
1714

1815
// helper class for calling methods that use sqs types in SqsImpl
1916
// if SqsImpl is not present these methods are no op
@@ -23,42 +20,26 @@ private SqsAccess() {}
2320
private static final boolean enabled = PluginImplUtil.isImplPresent("SqsImpl");
2421

2522
@NoMuzzle
26-
static boolean isSendMessageRequest(SdkRequest request) {
27-
return enabled && request instanceof SendMessageRequest;
28-
}
29-
30-
@NoMuzzle
31-
static SdkRequest injectIntoSendMessageRequest(
32-
TextMapPropagator messagingPropagator,
33-
SdkRequest rawRequest,
34-
io.opentelemetry.context.Context otelContext) {
35-
assert enabled; // enabled checked already in instance check.
36-
return SqsImpl.injectIntoSendMessageRequest(messagingPropagator, rawRequest, otelContext);
37-
}
38-
39-
@NoMuzzle
40-
static boolean isReceiveMessageRequest(SdkRequest request) {
41-
return enabled && request instanceof ReceiveMessageRequest;
42-
}
43-
44-
@NoMuzzle
45-
public static SdkRequest modifyReceiveMessageRequest(
46-
SdkRequest request, boolean useXrayPropagator, TextMapPropagator messagingPropagator) {
47-
assert enabled; // enabled checked already in instance check.
48-
return SqsImpl.modifyReceiveMessageRequest(request, useXrayPropagator, messagingPropagator);
49-
}
50-
51-
@NoMuzzle
52-
static boolean isReceiveMessageResponse(SdkResponse response) {
53-
return enabled && response instanceof ReceiveMessageResponse;
23+
static boolean afterReceiveMessageExecution(
24+
Context.AfterExecution context,
25+
ExecutionAttributes executionAttributes,
26+
TracingExecutionInterceptor config) {
27+
return enabled && SqsImpl.afterReceiveMessageExecution(context, executionAttributes, config);
5428
}
5529

30+
/**
31+
* Returns {@code null} (not the unmodified {@code request}!) if nothing matched, so that other
32+
* handling can be tried.
33+
*/
34+
@Nullable
5635
@NoMuzzle
57-
static void afterReceiveMessageExecution(
58-
TracingExecutionInterceptor config,
59-
Context.AfterExecution context,
60-
ExecutionAttributes executionAttributes) {
61-
assert enabled; // enabled checked already in instance check.
62-
SqsImpl.afterReceiveMessageExecution(config, executionAttributes, context);
36+
static SdkRequest modifyRequest(
37+
SdkRequest request,
38+
io.opentelemetry.context.Context otelContext,
39+
boolean useXrayPropagator,
40+
TextMapPropagator messagingPropagator) {
41+
return enabled
42+
? SqsImpl.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator)
43+
: null;
6344
}
6445
}

instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java

Lines changed: 92 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import java.util.HashMap;
1212
import java.util.List;
1313
import java.util.Map;
14+
import javax.annotation.Nullable;
1415
import software.amazon.awssdk.core.SdkRequest;
16+
import software.amazon.awssdk.core.SdkResponse;
1517
import software.amazon.awssdk.core.interceptor.Context;
1618
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
1719
import software.amazon.awssdk.http.SdkHttpResponse;
@@ -20,6 +22,8 @@
2022
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
2123
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
2224
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
25+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
26+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
2327
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
2428

2529
// this class is only used from SqsAccess from method with @NoMuzzle annotation
@@ -33,44 +37,30 @@ final class SqsImpl {
3337

3438
private SqsImpl() {}
3539

36-
static SdkRequest injectIntoSendMessageRequest(
37-
TextMapPropagator messagingPropagator,
38-
SdkRequest rawRequest,
39-
io.opentelemetry.context.Context otelContext) {
40-
SendMessageRequest request = (SendMessageRequest) rawRequest;
41-
Map<String, MessageAttributeValue> messageAttributes =
42-
new HashMap<>(request.messageAttributes());
43-
44-
messagingPropagator.inject(
45-
otelContext,
46-
messageAttributes,
47-
(carrier, k, v) -> {
48-
carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build());
49-
});
40+
static boolean afterReceiveMessageExecution(
41+
Context.AfterExecution context,
42+
ExecutionAttributes executionAttributes,
43+
TracingExecutionInterceptor config) {
5044

51-
if (messageAttributes.size() > 10) { // Too many attributes, we don't want to break the call.
52-
return request;
45+
SdkResponse rawResponse = context.response();
46+
if (!(rawResponse instanceof ReceiveMessageResponse)) {
47+
return false;
5348
}
54-
return request.toBuilder().messageAttributes(messageAttributes).build();
55-
}
5649

57-
/** Create and close CONSUMER span for each message consumed. */
58-
static void afterReceiveMessageExecution(
59-
TracingExecutionInterceptor config,
60-
ExecutionAttributes executionAttributes,
61-
Context.AfterExecution context) {
62-
ReceiveMessageResponse response = (ReceiveMessageResponse) context.response();
50+
ReceiveMessageResponse response = (ReceiveMessageResponse) rawResponse;
6351
SdkHttpResponse httpResponse = context.httpResponse();
6452
for (Message message : response.messages()) {
65-
createConsumerSpan(config, message, executionAttributes, httpResponse);
53+
createConsumerSpan(message, httpResponse, executionAttributes, config);
6654
}
55+
56+
return true;
6757
}
6858

6959
private static void createConsumerSpan(
70-
TracingExecutionInterceptor config,
7160
Message message,
61+
SdkHttpResponse httpResponse,
7262
ExecutionAttributes executionAttributes,
73-
SdkHttpResponse httpResponse) {
63+
TracingExecutionInterceptor config) {
7464

7565
io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.root();
7666

@@ -99,9 +89,81 @@ private static void createConsumerSpan(
9989
}
10090
}
10191

102-
static SdkRequest modifyReceiveMessageRequest(
103-
SdkRequest rawRequest, boolean useXrayPropagator, TextMapPropagator messagingPropagator) {
104-
ReceiveMessageRequest request = (ReceiveMessageRequest) rawRequest;
92+
@Nullable
93+
static SdkRequest modifyRequest(
94+
SdkRequest request,
95+
io.opentelemetry.context.Context otelContext,
96+
boolean useXrayPropagator,
97+
TextMapPropagator messagingPropagator) {
98+
if (request instanceof ReceiveMessageRequest) {
99+
return modifyReceiveMessageRequest(
100+
(ReceiveMessageRequest) request, useXrayPropagator, messagingPropagator);
101+
} else if (messagingPropagator != null) {
102+
if (request instanceof SendMessageRequest) {
103+
return injectIntoSendMessageRequest(
104+
(SendMessageRequest) request, otelContext, messagingPropagator);
105+
} else if (request instanceof SendMessageBatchRequest) {
106+
return injectIntoSendMessageBatchRequest(
107+
(SendMessageBatchRequest) request, otelContext, messagingPropagator);
108+
}
109+
}
110+
return null;
111+
}
112+
113+
private static SdkRequest injectIntoSendMessageBatchRequest(
114+
SendMessageBatchRequest request,
115+
io.opentelemetry.context.Context otelContext,
116+
TextMapPropagator messagingPropagator) {
117+
ArrayList<SendMessageBatchRequestEntry> entries = new ArrayList<>(request.entries());
118+
for (int i = 0; i < entries.size(); ++i) {
119+
SendMessageBatchRequestEntry entry = entries.get(i);
120+
Map<String, MessageAttributeValue> messageAttributes =
121+
new HashMap<>(entry.messageAttributes());
122+
123+
// TODO: Per https://github.com/open-telemetry/oteps/pull/220, each message should get
124+
// a separate context. We don't support this yet, also because it would be inconsistent
125+
// with the header-based X-Ray propagation
126+
// (probably could override it here by setting the X-Ray message system attribute)
127+
if (injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) {
128+
entries.set(i, entry.toBuilder().messageAttributes(messageAttributes).build());
129+
}
130+
}
131+
return request.toBuilder().entries(entries).build();
132+
}
133+
134+
private static SdkRequest injectIntoSendMessageRequest(
135+
SendMessageRequest request,
136+
io.opentelemetry.context.Context otelContext,
137+
TextMapPropagator messagingPropagator) {
138+
Map<String, MessageAttributeValue> messageAttributes =
139+
new HashMap<>(request.messageAttributes());
140+
if (!injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) {
141+
return request;
142+
}
143+
return request.toBuilder().messageAttributes(messageAttributes).build();
144+
}
145+
146+
private static boolean injectIntoMessageAttributes(
147+
Map<String, MessageAttributeValue> messageAttributes,
148+
io.opentelemetry.context.Context otelContext,
149+
TextMapPropagator messagingPropagator) {
150+
messagingPropagator.inject(
151+
otelContext,
152+
messageAttributes,
153+
(carrier, k, v) -> {
154+
carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build());
155+
});
156+
157+
// Return whether the injection resulted in an attribute count that is still supported.
158+
// See
159+
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes
160+
return messageAttributes.size() <= 10;
161+
}
162+
163+
private static SdkRequest modifyReceiveMessageRequest(
164+
ReceiveMessageRequest request,
165+
boolean useXrayPropagator,
166+
TextMapPropagator messagingPropagator) {
105167
boolean hasXrayAttribute = true;
106168
List<String> existingAttributeNames = null;
107169
if (useXrayPropagator) {

instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,14 @@ public SdkRequest modifyRequest(
121121
throw throwable;
122122
}
123123

124-
if (SqsAccess.isReceiveMessageRequest(request)) {
125-
return SqsAccess.modifyReceiveMessageRequest(request, useXrayPropagator, messagingPropagator);
126-
} else if (messagingPropagator != null) {
127-
if (SqsAccess.isSendMessageRequest(request)) {
128-
return SqsAccess.injectIntoSendMessageRequest(messagingPropagator, request, otelContext);
129-
}
130-
// TODO: Support SendMessageBatchRequest (and thus SendMessageBatchRequestEntry)
124+
SdkRequest sqsModifiedRequest =
125+
SqsAccess.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator);
126+
if (sqsModifiedRequest != null) {
127+
return sqsModifiedRequest;
131128
}
129+
130+
// Insert other special handling here, following the same pattern as SQS.
131+
132132
return request;
133133
}
134134

@@ -225,9 +225,9 @@ private void populateRequestAttributes(
225225
@Override
226226
public void afterExecution(
227227
Context.AfterExecution context, ExecutionAttributes executionAttributes) {
228-
if (SqsAccess.isReceiveMessageResponse(context.response())) {
229-
SqsAccess.afterReceiveMessageExecution(this, context, executionAttributes);
230-
}
228+
229+
// Other special handling could be shortcut-&&ed after this (false is returned if not handled).
230+
SqsAccess.afterReceiveMessageExecution(context, executionAttributes, this);
231231

232232
io.opentelemetry.context.Context otelContext = getContext(executionAttributes);
233233
if (otelContext != null) {

instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,9 @@ class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTe
1818
.build()
1919
.newExecutionInterceptor())
2020
}
21+
22+
@Override
23+
boolean isSqsAttributeInjectionEnabled() {
24+
false
25+
}
2126
}

instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagator.groovy

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,19 @@ class Aws2SqsTracingTestWithW3CPropagator extends AbstractAws2SqsTracingTest imp
1515
.addExecutionInterceptor(
1616
AwsSdkTelemetry.builder(getOpenTelemetry())
1717
.setCaptureExperimentalSpanAttributes(true)
18-
.setUseConfiguredPropagatorForMessaging(true) // Difference to main test
19-
.setUseXrayPropagator(false) // Disable to confirm messaging propagator actually works
18+
.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
19+
.setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works
2020
.build()
2121
.newExecutionInterceptor())
2222
}
23+
24+
@Override
25+
boolean isSqsAttributeInjectionEnabled() {
26+
true
27+
}
28+
29+
@Override
30+
boolean isXrayInjectionEnabled() {
31+
false
32+
}
2333
}

instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,13 @@ class Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator extends AbstractAws2S
1616
.addExecutionInterceptor(
1717
AwsSdkTelemetry.builder(getOpenTelemetry())
1818
.setCaptureExperimentalSpanAttributes(true)
19-
.setUseConfiguredPropagatorForMessaging(true) // Difference to main test
19+
.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
2020
.build()
2121
.newExecutionInterceptor())
2222
}
23+
24+
@Override
25+
boolean isSqsAttributeInjectionEnabled() {
26+
true
27+
}
2328
}

instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2ClientCoreTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT
3131

3232
@Unroll
3333
abstract class AbstractAws2ClientCoreTest extends InstrumentationSpecification {
34-
def isSqsAttributeInjectionEnabled() {
34+
static boolean isSqsAttributeInjectionEnabled() {
3535
// See io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.TracingExecutionInterceptor
3636
return ConfigPropertiesUtil.getBoolean("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false)
3737
}

0 commit comments

Comments
 (0)