Skip to content

Commit e609841

Browse files
committed
instrument sqs spring context propagation
1 parent e992b65 commit e609841

File tree

5 files changed

+347
-3
lines changed

5 files changed

+347
-3
lines changed

dd-java-agent/instrumentation/aws-java/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ protected void startNewMessageSpan(Message message) {
8989

9090
DataStreamsTags tags = create("sqs", INBOUND, urlFileName(queueUrl));
9191
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(tags, 0, 0));
92+
System.out.println("Setting a checkpoint in thread" + Thread.currentThread().getId());
9293

9394
CONSUMER_DECORATE.afterStart(span);
9495
CONSUMER_DECORATE.onConsume(span, queueUrl);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package datadog.trace.instrumentation.aws.v2.sqs;
2+
3+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
4+
import static java.util.Collections.singletonMap;
5+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
6+
7+
import com.google.auto.service.AutoService;
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import datadog.trace.agent.tooling.InstrumenterModule;
10+
import datadog.trace.bootstrap.InstrumentationContext;
11+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
12+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
13+
import java.util.Map;
14+
import net.bytebuddy.asm.Advice;
15+
import software.amazon.awssdk.services.sqs.model.Message;
16+
17+
@AutoService(InstrumenterModule.class)
18+
public class SqsMessageInstrumentation extends InstrumenterModule.Tracing
19+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
20+
21+
public SqsMessageInstrumentation() {
22+
super("aws-java-sqs-2.0");
23+
}
24+
25+
@Override
26+
public String instrumentedType() {
27+
return "software.amazon.awssdk.services.sqs.model.Message";
28+
}
29+
30+
@Override
31+
public void methodAdvice(MethodTransformer transformer) {
32+
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureActiveScope");
33+
}
34+
35+
@Override
36+
public Map<String, String> contextStore() {
37+
return singletonMap("software.amazon.awssdk.services.sqs.model.Message", State.class.getName());
38+
}
39+
40+
public static class CaptureActiveScope {
41+
@Advice.OnMethodExit(suppress = Throwable.class)
42+
public static void captureActiveScope(@Advice.This Message message) {
43+
AgentSpan span = activeSpan();
44+
if (span != null) {
45+
State state = State.FACTORY.create();
46+
state.captureAndSetContinuation(span);
47+
InstrumentationContext.get(Message.class, State.class).put(message, state);
48+
System.out.println("[SQS] Captured state for SQS message: " + message.messageId() +
49+
" with span: " + span.getSpanId() + " on thread: " + Thread.currentThread().getId());
50+
} else {
51+
System.out.println("[SQS] No active span found when creating SQS message: " +
52+
message.messageId() + " on thread: " + Thread.currentThread().getId());
53+
}
54+
}
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package datadog.trace.instrumentation.springmessaging;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
4+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
5+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
6+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
7+
8+
import com.google.auto.service.AutoService;
9+
import datadog.trace.agent.tooling.Instrumenter;
10+
import datadog.trace.agent.tooling.InstrumenterModule;
11+
import datadog.trace.bootstrap.ContextStore;
12+
import datadog.trace.bootstrap.InstrumentationContext;
13+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
14+
import java.util.Map;
15+
import java.util.TreeMap;
16+
import net.bytebuddy.asm.Advice;
17+
import net.bytebuddy.description.type.TypeDescription;
18+
import net.bytebuddy.matcher.ElementMatcher;
19+
import org.springframework.messaging.Message;
20+
21+
// @AutoService(InstrumenterModule.class) // Temporarily disabled to test SqsToSpringMessageTransferInstrumentation
22+
public class AbstractMessageConvertingMessageSourceToMessagingInstrumentation extends InstrumenterModule.Tracing
23+
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {
24+
25+
public AbstractMessageConvertingMessageSourceToMessagingInstrumentation() {
26+
super("spring-messaging", "spring-messaging-4");
27+
}
28+
29+
@Override
30+
public String hierarchyMarkerType() {
31+
return "io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter";
32+
}
33+
34+
@Override
35+
public ElementMatcher<TypeDescription> hierarchyMatcher() {
36+
return extendsClass(named(hierarchyMarkerType()));
37+
}
38+
39+
@Override
40+
public void methodAdvice(MethodTransformer transformer) {
41+
// Instrument toMessagingMessage method
42+
transformer.applyAdvice(
43+
named("toMessagingMessage"),
44+
getClass().getName() + "$ToMessagingMessageAdvice");
45+
46+
}
47+
48+
@Override
49+
public Map<String, String> contextStore() {
50+
Map<String, String> contextStore = new TreeMap<>();
51+
// contextStore.put("Object", State.class.getName());
52+
// contextStore.put("org.springframework.messaging.Message", State.class.getName());
53+
return contextStore;
54+
}
55+
56+
public static class ToMessagingMessageAdvice {
57+
@Advice.OnMethodEnter(suppress = Throwable.class)
58+
public static void onEnter(@Advice.Argument(0) Object sqsMessage, @Advice.This Object converter) {
59+
System.out.println("[ToMessaging] toMessagingMessage called with SQS message: " +
60+
sqsMessage + " on thread: " + Thread.currentThread().getId());
61+
62+
// Print the actual child class being used
63+
System.out.println("[ToMessaging] Converter class: " + converter.getClass().getName());
64+
System.out.println("[ToMessaging] Converter class hierarchy:");
65+
Class<?> currentClass = converter.getClass();
66+
int level = 0;
67+
while (currentClass != null && level < 3) {
68+
System.out.println("[ToMessaging] Level " + level + ": " + currentClass.getName());
69+
currentClass = currentClass.getSuperclass();
70+
level++;
71+
}
72+
73+
// Print stack trace to see the call flow
74+
System.out.println("[ToMessaging] Stack trace:");
75+
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
76+
for (int i = 0; i < Math.min(stackTrace.length, 15); i++) {
77+
System.out.println("[ToMessaging] at " + stackTrace[i]);
78+
}
79+
}
80+
81+
@Advice.OnMethodExit(suppress = Throwable.class)
82+
public static void onExit(
83+
@Advice.Argument(0) Object sqsMessage,
84+
@Advice.Return Message springMessage) {
85+
System.out.println("[ToMessaging] toMessagingMessage completed - SQS: " + sqsMessage +
86+
" -> Spring: " + springMessage + " on thread: " + Thread.currentThread().getId());
87+
88+
// Transfer state from SQS message to Spring message
89+
// if (null != sqsMessage && null != springMessage &&
90+
// sqsMessage.getClass().getName().equals("software.amazon.awssdk.services.sqs.model.Message")) {
91+
//
92+
// ContextStore<Object, State> from =
93+
// InstrumentationContext.get(Object.class, State.class);
94+
// State state = from.get(sqsMessage);
95+
// if (null != state) {
96+
// from.put(sqsMessage, null);
97+
// // InstrumentationContext.get(Message.class, State.class).put(springMessage, state);
98+
// System.out.println("[ToMessaging] Transferred state from SQS message to Spring message on thread: " +
99+
// Thread.currentThread().getId());
100+
// } else {
101+
// System.out.println("[ToMessaging] No state found in SQS message during conversion on thread: " +
102+
// Thread.currentThread().getId());
103+
// }
104+
// } else {
105+
// System.out.println("[ToMessaging] Skipping transfer - not an SQS message or null message on thread: " +
106+
// Thread.currentThread().getId());
107+
// }
108+
}
109+
}
110+
}

dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,28 @@
11
package datadog.trace.instrumentation.springmessaging;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static datadog.trace.api.datastreams.DataStreamsContext.create;
5+
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND;
46
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
5-
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
6-
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
7-
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
7+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.*;
88
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE;
99
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.SPRING_INBOUND;
1010
import static datadog.trace.instrumentation.springmessaging.SpringMessageExtractAdapter.GETTER;
11+
import static java.util.Collections.singletonMap;
1112
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
1213
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1314

1415
import com.google.auto.service.AutoService;
1516
import datadog.trace.agent.tooling.Instrumenter;
1617
import datadog.trace.agent.tooling.InstrumenterModule;
18+
import datadog.trace.api.datastreams.DataStreamsTags;
19+
import datadog.trace.bootstrap.InstrumentationContext;
1720
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
1821
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1922
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
23+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
24+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
25+
import java.util.Map;
2026
import net.bytebuddy.asm.Advice;
2127
import org.springframework.messaging.Message;
2228
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
@@ -53,22 +59,88 @@ public String[] helperClassNames() {
5359
};
5460
}
5561

62+
@Override
63+
public Map<String, String> contextStore() {
64+
return singletonMap("org.springframework.messaging.Message", State.class.getName());
65+
}
66+
5667
public static class HandleMessageAdvice {
5768
@Advice.OnMethodEnter(suppress = Throwable.class)
5869
public static AgentScope onEnter(
5970
@Advice.This InvocableHandlerMethod thiz, @Advice.Argument(0) Message<?> message) {
6071
AgentSpanContext parentContext;
6172
AgentSpan parent = activeSpan();
73+
74+
// First try to get context from continuation (preferred method)
75+
State state = InstrumentationContext.get(Message.class, State.class).get(message);
76+
if (null != state) {
77+
System.out.println("[Spring] Found state in Spring message, attempting to activate continuation on thread: " +
78+
Thread.currentThread().getId());
79+
AgentScope.Continuation continuation = state.getAndResetContinuation();
80+
if (null != continuation) {
81+
try (AgentScope scope = continuation.activate()) {
82+
AgentSpan span = startSpan(SPRING_INBOUND);
83+
DECORATE.afterStart(span);
84+
span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod()));
85+
System.out.println("[Spring] Successfully activated continuation from Spring Message with span: " +
86+
span.getSpanId() + " on thread: " + Thread.currentThread().getId());
87+
return activateSpan(span);
88+
}
89+
} else {
90+
System.out.println("[Spring] No continuation found in state on thread: " + Thread.currentThread().getId());
91+
}
92+
} else {
93+
System.out.println("[Spring] No state found in Spring message 2, falling back to header extraction on thread: " +
94+
Thread.currentThread().getId());
95+
}
96+
97+
// Fallback to existing context or header extraction
6298
if (null != parent) {
6399
// prefer existing context, assume it was already extracted from this message
64100
parentContext = parent.context();
101+
System.out.println("[Spring] Using existing active span context on thread: " + Thread.currentThread().getId());
65102
} else {
66103
// otherwise try to re-extract the message context to avoid disconnected trace
67104
parentContext = extractContextAndGetSpanContext(message, GETTER);
105+
System.out.println("[Spring] Extracted context from message headers on thread: " + Thread.currentThread().getId());
68106
}
107+
69108
AgentSpan span = startSpan(SPRING_INBOUND, parentContext);
70109
DECORATE.afterStart(span);
71110
span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod()));
111+
112+
// Extract SQS queue information - try different header patterns
113+
Object queueUrl = message.getHeaders().get("Sqs_QueueUrl");
114+
Object queueName = message.getHeaders().get("Sqs_QueueName");
115+
116+
// If not found in Sqs_ prefixed headers, try aws. prefixed headers
117+
if (queueUrl == null) {
118+
queueUrl = message.getHeaders().get("aws.queue.url");
119+
}
120+
if (queueName == null) {
121+
queueName = message.getHeaders().get("aws.queue.name");
122+
}
123+
124+
// If still not found, try to extract from QueueAttributes
125+
if (queueUrl == null || queueName == null) {
126+
Object queueAttributes = message.getHeaders().get("Sqs_QueueAttributes");
127+
if (queueAttributes != null) {
128+
String attributesStr = queueAttributes.toString();
129+
// Extract queue name from attributes if available
130+
if (queueName == null && attributesStr.contains("queueName=")) {
131+
queueName = attributesStr.substring(attributesStr.indexOf("queueName=") + 10).split(",")[0];
132+
}
133+
}
134+
}
135+
136+
// Add SQS queue tags to the span
137+
if (queueUrl != null) {
138+
span.setTag("aws.sqs.queue_url", queueUrl.toString());
139+
}
140+
if (queueName != null) {
141+
span.setTag("aws.sqs.queue_name", queueName.toString());
142+
}
143+
72144
return activateSpan(span);
73145
}
74146

0 commit comments

Comments
 (0)