Skip to content

Commit 6575d67

Browse files
committed
working version with data streams propagation
1 parent 76ad3bd commit 6575d67

File tree

7 files changed

+113
-108
lines changed

7 files changed

+113
-108
lines changed

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsMessageInstrumentation.java

Lines changed: 0 additions & 56 deletions
This file was deleted.

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/SqsReceiveResultInstrumentation.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package datadog.trace.instrumentation.aws.v2.sqs;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4-
import static java.util.Collections.singletonMap;
54
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
65

76
import com.google.auto.service.AutoService;
87
import datadog.trace.agent.tooling.Instrumenter;
98
import datadog.trace.agent.tooling.InstrumenterModule;
109
import datadog.trace.api.InstrumenterConfig;
10+
import datadog.trace.bootstrap.ContextStore;
1111
import datadog.trace.bootstrap.InstrumentationContext;
12+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
1213
import java.util.List;
1314
import java.util.Map;
1415
import net.bytebuddy.asm.Advice;
@@ -44,8 +45,13 @@ public String[] helperClassNames() {
4445

4546
@Override
4647
public Map<String, String> contextStore() {
47-
return singletonMap(
48+
Map<String, String> contextStore = new java.util.HashMap<>(2);
49+
contextStore.put(
4850
"software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse", "java.lang.String");
51+
contextStore.put(
52+
"software.amazon.awssdk.services.sqs.model.Message",
53+
"datadog.trace.bootstrap.instrumentation.java.concurrent.State");
54+
return contextStore;
4955
}
5056

5157
@Override
@@ -63,7 +69,11 @@ public static void onExit(
6369
String queueUrl =
6470
InstrumentationContext.get(ReceiveMessageResponse.class, String.class).get(result);
6571
if (queueUrl != null) {
66-
messages = new TracingList(messages, queueUrl, result.responseMetadata().requestId());
72+
ContextStore<Message, State> messageStateStore =
73+
InstrumentationContext.get(Message.class, State.class);
74+
messages =
75+
new TracingList(
76+
messageStateStore, messages, queueUrl, result.responseMetadata().requestId());
6777
}
6878
}
6979
}

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import datadog.trace.api.Config;
2020
import datadog.trace.api.datastreams.DataStreamsTags;
21+
import datadog.trace.bootstrap.ContextStore;
2122
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2223
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
2324
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
25+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
2426
import java.util.Iterator;
2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
@@ -29,12 +31,18 @@
2931
public class TracingIterator<L extends Iterator<Message>> implements Iterator<Message> {
3032
private static final Logger log = LoggerFactory.getLogger(TracingIterator.class);
3133

34+
private final ContextStore<Message, State> messageStateStore;
3235
protected final L delegate;
3336
private final String queueUrl;
3437
private final String requestId;
3538
private AgentSpanContext batchContext;
3639

37-
public TracingIterator(L delegate, String queueUrl, String requestId) {
40+
public TracingIterator(
41+
ContextStore<Message, State> messageStateStore,
42+
L delegate,
43+
String queueUrl,
44+
String requestId) {
45+
this.messageStateStore = messageStateStore;
3846
this.delegate = delegate;
3947
this.queueUrl = queueUrl;
4048
this.requestId = requestId;
@@ -99,6 +107,22 @@ protected void startNewMessageSpan(Message message) {
99107
BROKER_DECORATE.beforeFinish(queueSpan);
100108
queueSpan.finish();
101109
}
110+
111+
// Capture state after data streams checkpoint is set
112+
try {
113+
State state = State.FACTORY.create();
114+
state.captureAndSetContinuation(span);
115+
messageStateStore.put(message, state);
116+
System.out.println(
117+
"[TracingIterator] Captured state for SQS message: "
118+
+ message.messageId()
119+
+ " with span: "
120+
+ span.getSpanId()
121+
+ " on thread: "
122+
+ Thread.currentThread().getId());
123+
} catch (Exception stateException) {
124+
log.debug("Problem capturing state for SQS message", stateException);
125+
}
102126
}
103127
} catch (Exception e) {
104128
log.debug("Problem tracing new SQS message span", e);

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingList.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
11
package datadog.trace.instrumentation.aws.v2.sqs;
22

3+
import datadog.trace.bootstrap.ContextStore;
4+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
35
import java.util.Collection;
46
import java.util.Iterator;
57
import java.util.List;
68
import java.util.ListIterator;
79
import software.amazon.awssdk.services.sqs.model.Message;
810

911
public class TracingList implements List<Message> {
12+
private final ContextStore<Message, State> messageStateStore;
1013
private final List<Message> delegate;
1114
private final String queueUrl;
1215
private final String requestId;
1316

14-
public TracingList(List<Message> delegate, String queueUrl, String requestId) {
17+
public TracingList(
18+
ContextStore<Message, State> messageStateStore,
19+
List<Message> delegate,
20+
String queueUrl,
21+
String requestId) {
22+
this.messageStateStore = messageStateStore;
1523
this.delegate = delegate;
1624
this.queueUrl = queueUrl;
1725
this.requestId = requestId;
@@ -125,12 +133,14 @@ public ListIterator<Message> listIterator() {
125133
@Override
126134
public ListIterator<Message> listIterator(int index) {
127135
// every iteration will add spans. Not only the very first one
128-
return new TracingListIterator(delegate.listIterator(index), queueUrl, requestId);
136+
return new TracingListIterator(
137+
messageStateStore, delegate.listIterator(index), queueUrl, requestId);
129138
}
130139

131140
@Override
132141
public List<Message> subList(int fromIndex, int toIndex) {
133-
return new TracingList(delegate.subList(fromIndex, toIndex), queueUrl, requestId);
142+
return new TracingList(
143+
messageStateStore, delegate.subList(fromIndex, toIndex), queueUrl, requestId);
134144
}
135145

136146
@Override

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingListIterator.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@
22

33
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
44

5+
import datadog.trace.bootstrap.ContextStore;
6+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
57
import java.util.ListIterator;
68
import software.amazon.awssdk.services.sqs.model.Message;
79

810
public class TracingListIterator extends TracingIterator<ListIterator<Message>>
911
implements ListIterator<Message> {
1012

11-
public TracingListIterator(ListIterator<Message> delegate, String queueUrl, String requestId) {
12-
super(delegate, queueUrl, requestId);
13+
public TracingListIterator(
14+
ContextStore<Message, State> messageStateStore,
15+
ListIterator<Message> delegate,
16+
String queueUrl,
17+
String requestId) {
18+
super(messageStateStore, delegate, queueUrl, requestId);
1319
}
1420

1521
@Override

dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package datadog.trace.instrumentation.springmessaging;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4-
import static datadog.trace.api.datastreams.DataStreamsContext.create;
5-
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND;
64
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
75
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.*;
86
import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE;
@@ -15,12 +13,10 @@
1513
import com.google.auto.service.AutoService;
1614
import datadog.trace.agent.tooling.Instrumenter;
1715
import datadog.trace.agent.tooling.InstrumenterModule;
18-
import datadog.trace.api.datastreams.DataStreamsTags;
1916
import datadog.trace.bootstrap.InstrumentationContext;
2017
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2118
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2219
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
23-
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
2420
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
2521
import java.util.Map;
2622
import net.bytebuddy.asm.Advice;
@@ -70,77 +66,89 @@ public static AgentScope onEnter(
7066
@Advice.This InvocableHandlerMethod thiz, @Advice.Argument(0) Message<?> message) {
7167
AgentSpanContext parentContext;
7268
AgentSpan parent = activeSpan();
73-
69+
7470
// First try to get context from continuation (preferred method)
7571
State state = InstrumentationContext.get(Message.class, State.class).get(message);
7672
if (null != state) {
77-
System.out.println("[Spring] Found state in Spring message, attempting to activate continuation on thread: " +
78-
Thread.currentThread().getId());
73+
System.out.println(
74+
"[Spring] Found state in Spring message, attempting to activate continuation on thread: "
75+
+ Thread.currentThread().getId());
7976
AgentScope.Continuation continuation = state.getAndResetContinuation();
8077
if (null != continuation) {
8178
try (AgentScope scope = continuation.activate()) {
8279
AgentSpan span = startSpan(SPRING_INBOUND);
8380
DECORATE.afterStart(span);
8481
span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod()));
85-
System.out.println("[Spring] Successfully activated continuation from Spring Message with span: " +
86-
span.getSpanId() + " on thread: " + Thread.currentThread().getId());
82+
System.out.println(
83+
"[Spring] Successfully activated continuation from Spring Message with span: "
84+
+ span.getSpanId()
85+
+ " on thread: "
86+
+ Thread.currentThread().getId());
8787
return activateSpan(span);
8888
}
8989
} else {
90-
System.out.println("[Spring] No continuation found in state on thread: " + Thread.currentThread().getId());
90+
System.out.println(
91+
"[Spring] No continuation found in state on thread: "
92+
+ Thread.currentThread().getId());
9193
}
9294
} else {
93-
System.out.println("[Spring] No state found in Spring message 2, falling back to header extraction on thread: " +
94-
Thread.currentThread().getId());
95+
System.out.println(
96+
"[Spring] No state found in Spring message 2, falling back to header extraction on thread: "
97+
+ Thread.currentThread().getId());
9598
}
96-
99+
97100
// Fallback to existing context or header extraction
98101
if (null != parent) {
99102
// prefer existing context, assume it was already extracted from this message
100103
parentContext = parent.context();
101-
System.out.println("[Spring] Using existing active span context on thread: " + Thread.currentThread().getId());
104+
System.out.println(
105+
"[Spring] Using existing active span context on thread: "
106+
+ Thread.currentThread().getId());
102107
} else {
103108
// otherwise try to re-extract the message context to avoid disconnected trace
104109
parentContext = extractContextAndGetSpanContext(message, GETTER);
105-
System.out.println("[Spring] Extracted context from message headers on thread: " + Thread.currentThread().getId());
110+
System.out.println(
111+
"[Spring] Extracted context from message headers on thread: "
112+
+ Thread.currentThread().getId());
106113
}
107-
114+
108115
AgentSpan span = startSpan(SPRING_INBOUND, parentContext);
109116
DECORATE.afterStart(span);
110117
span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod()));
111-
118+
112119
// Extract SQS queue information - try different header patterns
113120
Object queueUrl = message.getHeaders().get("Sqs_QueueUrl");
114121
Object queueName = message.getHeaders().get("Sqs_QueueName");
115-
122+
116123
// If not found in Sqs_ prefixed headers, try aws. prefixed headers
117124
if (queueUrl == null) {
118125
queueUrl = message.getHeaders().get("aws.queue.url");
119126
}
120127
if (queueName == null) {
121128
queueName = message.getHeaders().get("aws.queue.name");
122129
}
123-
130+
124131
// If still not found, try to extract from QueueAttributes
125132
if (queueUrl == null || queueName == null) {
126133
Object queueAttributes = message.getHeaders().get("Sqs_QueueAttributes");
127134
if (queueAttributes != null) {
128135
String attributesStr = queueAttributes.toString();
129136
// Extract queue name from attributes if available
130137
if (queueName == null && attributesStr.contains("queueName=")) {
131-
queueName = attributesStr.substring(attributesStr.indexOf("queueName=") + 10).split(",")[0];
138+
queueName =
139+
attributesStr.substring(attributesStr.indexOf("queueName=") + 10).split(",")[0];
132140
}
133141
}
134142
}
135-
143+
136144
// Add SQS queue tags to the span
137145
if (queueUrl != null) {
138146
span.setTag("aws.sqs.queue_url", queueUrl.toString());
139147
}
140148
if (queueName != null) {
141149
span.setTag("aws.sqs.queue_name", queueName.toString());
142150
}
143-
151+
144152
return activateSpan(span);
145153
}
146154

0 commit comments

Comments
 (0)