Skip to content

Commit 5723072

Browse files
committed
test
1 parent 6575d67 commit 5723072

File tree

11 files changed

+327
-48
lines changed

11 files changed

+327
-48
lines changed

dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ abstract class InstrumentationSpecification extends DDSpecification implements A
361361
long bucketDuration = dataStreamsBucketDuration()
362362
TEST_DATA_STREAMS_MONITORING = new DefaultDataStreamsMonitoring(sink, features, SystemTimeSource.INSTANCE, { MOCK_DSM_TRACE_CONFIG }, TEST_DATA_STREAMS_WRITER, bucketDuration)
363363

364+
System.out.println("new test writer")
364365
TEST_WRITER = new ListWriter()
365366

366367
if (isTestAgentEnabled()) {

dd-java-agent/instrumentation/aws-java/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,6 @@ protected void startNewMessageSpan(Message message) {
113113
State state = State.FACTORY.create();
114114
state.captureAndSetContinuation(span);
115115
messageStateStore.put(message, state);
116-
System.out.println(
117-
"[TracingIterator] Captured state for SQS message: "
118-
+ message.messageId()
119-
+ " with span: "
120-
+ span.getSpanId()
121-
+ " on thread: "
122-
+ Thread.currentThread().getId());
123116
} catch (Exception stateException) {
124117
log.debug("Problem capturing state for SQS message", stateException);
125118
}

dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -70,46 +70,24 @@ public static AgentScope onEnter(
7070
// First try to get context from continuation (preferred method)
7171
State state = InstrumentationContext.get(Message.class, State.class).get(message);
7272
if (null != state) {
73-
System.out.println(
74-
"[Spring] Found state in Spring message, attempting to activate continuation on thread: "
75-
+ Thread.currentThread().getId());
7673
AgentScope.Continuation continuation = state.getAndResetContinuation();
7774
if (null != continuation) {
7875
try (AgentScope scope = continuation.activate()) {
7976
AgentSpan span = startSpan(SPRING_INBOUND);
8077
DECORATE.afterStart(span);
8178
span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod()));
82-
System.out.println(
83-
"[Spring] Successfully activated continuation from Spring Message with span: "
84-
+ span.getSpanId()
85-
+ " on thread: "
86-
+ Thread.currentThread().getId());
8779
return activateSpan(span);
8880
}
89-
} else {
90-
System.out.println(
91-
"[Spring] No continuation found in state on thread: "
92-
+ Thread.currentThread().getId());
9381
}
94-
} else {
95-
System.out.println(
96-
"[Spring] No state found in Spring message 2, falling back to header extraction on thread: "
97-
+ Thread.currentThread().getId());
9882
}
9983

10084
// Fallback to existing context or header extraction
10185
if (null != parent) {
10286
// prefer existing context, assume it was already extracted from this message
10387
parentContext = parent.context();
104-
System.out.println(
105-
"[Spring] Using existing active span context on thread: "
106-
+ Thread.currentThread().getId());
10788
} else {
10889
// otherwise try to re-extract the message context to avoid disconnected trace
10990
parentContext = extractContextAndGetSpanContext(message, GETTER);
110-
System.out.println(
111-
"[Spring] Extracted context from message headers on thread: "
112-
+ Thread.currentThread().getId());
11391
}
11492

11593
AgentSpan span = startSpan(SPRING_INBOUND, parentContext);

dd-java-agent/instrumentation/spring/spring-sqs/build.gradle

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,32 @@ muzzle {
77
}
88
}
99

10-
ext {
11-
minJavaVersionForTests = JavaVersion.VERSION_17
12-
}
13-
1410
apply from: "$rootDir/gradle/java.gradle"
1511

1612
addTestSuiteForDir('latestDepTest', 'test')
1713

18-
[compileTestGroovy, compileLatestDepTestGroovy].each {
19-
it.javaLauncher = getJavaLauncherFor(17)
20-
}
21-
2214
dependencies {
2315
compileOnly group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162'
2416
compileOnly group: 'org.springframework', name: 'spring-messaging', version: '5.3.23'
17+
18+
testImplementation project(':dd-java-agent:instrumentation:trace-annotation')
19+
testImplementation project(':dd-java-agent:instrumentation:aws-java:aws-java-sqs-2.0')
20+
testImplementation project(':dd-java-agent:instrumentation:spring:spring-messaging-4.0')
21+
22+
testImplementation group: 'org.testcontainers', name: 'localstack', version: libs.versions.testcontainers.get()
23+
testImplementation group: 'org.springframework', name: 'spring-context', version: '5.3.23'
24+
testImplementation group: 'org.springframework', name: 'spring-test', version: '5.3.23'
25+
testImplementation group: 'org.springframework', name: 'spring-core', version: '5.3.23'
26+
testImplementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.20.162'
27+
testImplementation group: 'software.amazon.awssdk', name: 'aws-core', version: '2.20.162'
28+
29+
latestDepTestImplementation group: 'org.springframework', name: 'spring-context', version: '5.+'
30+
latestDepTestImplementation group: 'org.springframework', name: 'spring-test', version: '5.+'
31+
latestDepTestImplementation group: 'org.springframework', name: 'spring-core', version: '5.+'
32+
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sqs', version: '2.+'
33+
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'aws-core', version: '2.+'
34+
}
35+
36+
tasks.withType(Test).configureEach {
37+
usesService(testcontainersLimit)
2538
}

dd-java-agent/instrumentation/spring/spring-sqs/src/main/java/datadog/trace/instrumentation/springsqs/AbstractMessagingMessageConverterToMessagingInstrumentation.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,7 @@ public static void onExit(
7171
// Transfer state from SQS Message to Spring Message
7272
ContextStore<Message, State> to = InstrumentationContext.get(Message.class, State.class);
7373
to.put(springMessage, state);
74-
System.out.println(
75-
"[ToMessaging] Transferred state from SQS message to Spring message on thread: "
76-
+ Thread.currentThread().getId());
77-
} else {
78-
System.out.println(
79-
"[ToMessaging] No state found in SQS message during conversion on thread: "
80-
+ Thread.currentThread().getId());
8174
}
82-
} else {
83-
System.out.println(
84-
"[ToMessaging] Skipping transfer - not an SQS message or null message on thread: "
85-
+ Thread.currentThread().getId());
8675
}
8776
}
8877
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import datadog.trace.agent.test.InstrumentationSpecification
2+
import datadog.trace.agent.test.utils.PortUtils
3+
import org.testcontainers.containers.localstack.LocalStackContainer
4+
import org.testcontainers.utility.DockerImageName
5+
import sqs.MessagingSqsApplication
6+
import sqs.Receiver
7+
import sqs.Sender
8+
import sqs.SqsMessageListener
9+
import spock.lang.Shared
10+
import software.amazon.awssdk.services.sqs.SqsAsyncClient
11+
12+
import java.util.concurrent.TimeUnit
13+
14+
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
15+
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS
16+
17+
class SpringSqsTest extends InstrumentationSpecification {
18+
19+
@Shared
20+
LocalStackContainer localstack
21+
22+
@Override
23+
def setupSpec() {
24+
localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest"))
25+
.withServices(SQS)
26+
localstack.start()
27+
28+
def endpoint = localstack.getEndpointOverride(SQS).toString()
29+
MessagingSqsApplication.endpoint = endpoint
30+
31+
// Wait for LocalStack to be ready
32+
PortUtils.waitForPortToOpen(
33+
localstack.getHost(),
34+
localstack.getMappedPort(4566),
35+
10,
36+
TimeUnit.SECONDS)
37+
38+
// Create the test queue with retry logic
39+
def sqsClient = SqsAsyncClient.builder()
40+
.endpointOverride(new URI(endpoint))
41+
.region(software.amazon.awssdk.regions.Region.US_EAST_1)
42+
.credentialsProvider(software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create(
43+
software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create("test", "test")))
44+
.build()
45+
46+
// Retry queue creation
47+
def maxRetries = 5
48+
def retryCount = 0
49+
while (retryCount < maxRetries) {
50+
try {
51+
MessagingSqsApplication.createQueue(sqsClient, MessagingSqsApplication.queueName)
52+
break
53+
} catch (Exception e) {
54+
retryCount++
55+
if (retryCount >= maxRetries) {
56+
throw e
57+
}
58+
Thread.sleep(2000) // Wait 2 seconds before retry
59+
}
60+
}
61+
}
62+
63+
@Override
64+
def cleanupSpec() {
65+
if (null != localstack) {
66+
localstack.close()
67+
}
68+
}
69+
70+
def "test basic SQS message send and receive"() {
71+
setup:
72+
println "=== TEST STARTING ==="
73+
def application = MessagingSqsApplication.run()
74+
def sender = application.getBean(Sender)
75+
def receiver = application.getBean(Receiver)
76+
def messageListener = application.getBean(SqsMessageListener)
77+
println "=== APPLICATION STARTED ==="
78+
79+
when:
80+
runUnderTrace("parent") {
81+
println "Sending message..."
82+
sender.send("hello")
83+
println "Message sent"
84+
85+
// Wait a bit for the message to be available
86+
Thread.sleep(1000)
87+
88+
// Poll for messages
89+
println "Polling for messages..."
90+
messageListener.receiveMessage()
91+
println "Message polling completed"
92+
}
93+
94+
then: "message was received and processed"
95+
assert receiver.latch.await(5, TimeUnit.SECONDS)
96+
97+
// Debug: Check writer state before waiting
98+
println "=== WRITER DEBUG: Before waitForTraces ==="
99+
println "TEST_WRITER type: ${TEST_WRITER.getClass()}"
100+
println "TEST_WRITER traces size: ${TEST_WRITER.traces.size()}"
101+
println "TEST_WRITER traces: ${TEST_WRITER.traces}"
102+
103+
// Wait for traces to be captured (with shorter timeout for debugging)
104+
try {
105+
TEST_WRITER.waitForTraces(1, 5, java.util.concurrent.TimeUnit.SECONDS)
106+
} catch (Exception e) {
107+
println "Timeout waiting for traces: ${e.message}"
108+
println "Current traces: ${TEST_WRITER.traces}"
109+
}
110+
111+
// Print all generated spans for debugging
112+
println "=== Generated Spans ==="
113+
if (TEST_WRITER.traces == null) {
114+
println "TEST_WRITER.traces is null"
115+
} else if (TEST_WRITER.traces.isEmpty()) {
116+
println "TEST_WRITER.traces is empty"
117+
} else {
118+
TEST_WRITER.traces.eachWithIndex { trace, traceIndex ->
119+
println "Trace $traceIndex:"
120+
if (trace == null) {
121+
println " Trace is null"
122+
} else {
123+
trace.eachWithIndex { span, spanIndex ->
124+
println " Span $spanIndex: ${span.operationName} (${span.resourceName}) - Type: ${span.spanType} - Parent: ${span.parentId}"
125+
}
126+
}
127+
}
128+
println "Total traces: ${TEST_WRITER.traces.size()}"
129+
}
130+
println "======================="
131+
132+
cleanup:
133+
application.close()
134+
}
135+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package sqs;
2+
3+
import org.springframework.context.ConfigurableApplicationContext;
4+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
5+
import org.springframework.context.annotation.Bean;
6+
import org.springframework.context.annotation.Configuration;
7+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
8+
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
9+
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
10+
11+
@Configuration
12+
public class MessagingSqsApplication {
13+
14+
public static String endpoint = "http://localhost:4566";
15+
public static String region = "us-east-1";
16+
public static String queueName = "test-queue";
17+
18+
@Bean
19+
SqsAsyncClient sqsAsyncClient() {
20+
return SqsAsyncClient.builder()
21+
.endpointOverride(java.net.URI.create(endpoint))
22+
.region(software.amazon.awssdk.regions.Region.of(region))
23+
.credentialsProvider(software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create(
24+
software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create("test", "test")))
25+
.build();
26+
}
27+
28+
public static ConfigurableApplicationContext run() {
29+
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
30+
context.register(MessagingSqsApplication.class);
31+
context.register(Receiver.class);
32+
context.register(Sender.class);
33+
context.register(SqsMessageListener.class);
34+
context.refresh();
35+
return context;
36+
}
37+
38+
public static String getQueueUrl(SqsAsyncClient sqsClient, String queueName) {
39+
try {
40+
return sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).get()
41+
.queueUrl();
42+
} catch (Exception e) {
43+
throw new RuntimeException("Failed to get queue URL", e);
44+
}
45+
}
46+
47+
public static void createQueue(SqsAsyncClient sqsClient, String queueName) {
48+
try {
49+
sqsClient.createQueue(CreateQueueRequest.builder().queueName(queueName).build()).get();
50+
} catch (Exception e) {
51+
throw new RuntimeException("Failed to create queue", e);
52+
}
53+
}
54+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package sqs;
2+
3+
import datadog.trace.api.Trace;
4+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
5+
import java.util.concurrent.CountDownLatch;
6+
import org.springframework.stereotype.Component;
7+
8+
@Component
9+
public class Receiver {
10+
11+
public final CountDownLatch latch = new CountDownLatch(1);
12+
13+
public void receiveMessage(String message) {
14+
assert null != AgentTracer.activeSpan() : "no active span during message receipt";
15+
System.out.println("Processing message: " + message);
16+
latch.countDown();
17+
}
18+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package sqs;
2+
3+
import org.springframework.stereotype.Component;
4+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
5+
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
6+
7+
@Component
8+
public class Sender {
9+
10+
private final SqsAsyncClient sqsClient;
11+
private final String queueUrl;
12+
13+
public Sender(SqsAsyncClient sqsClient) {
14+
this.sqsClient = sqsClient;
15+
this.queueUrl = MessagingSqsApplication.getQueueUrl(sqsClient, MessagingSqsApplication.queueName);
16+
}
17+
18+
public void send(String message) {
19+
try {
20+
sqsClient
21+
.sendMessage(
22+
SendMessageRequest.builder()
23+
.queueUrl(queueUrl)
24+
.messageBody(message)
25+
.build())
26+
.get();
27+
} catch (Exception e) {
28+
throw new RuntimeException("Failed to send message", e);
29+
}
30+
}
31+
}

0 commit comments

Comments
 (0)