Skip to content

Commit 34b5241

Browse files
authored
Propagate context into CompletableFuture returned from aws2 async client methods (open-telemetry#13810)
1 parent f2ed832 commit 34b5241

File tree

5 files changed

+201
-0
lines changed

5 files changed

+201
-0
lines changed

instrumentation/aws-sdk/aws-sdk-2.2/javaagent/build.gradle.kts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,20 @@ testing {
141141
}
142142
}
143143

144+
val s3CrtTest by registering(JvmTestSuite::class) {
145+
dependencies {
146+
if (latestDepTest) {
147+
implementation("software.amazon.awssdk:s3:latest.release")
148+
implementation("software.amazon.awssdk.crt:aws-crt:latest.release")
149+
} else {
150+
implementation("software.amazon.awssdk:s3:2.27.21")
151+
implementation("software.amazon.awssdk.crt:aws-crt:0.30.11")
152+
}
153+
implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:library"))
154+
implementation("org.testcontainers:localstack")
155+
}
156+
}
157+
144158
val testBedrockRuntime by registering(JvmTestSuite::class) {
145159
dependencies {
146160
implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:testing"))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.returns;
10+
11+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
13+
import java.util.concurrent.CompletableFuture;
14+
import net.bytebuddy.asm.Advice;
15+
import net.bytebuddy.description.type.TypeDescription;
16+
import net.bytebuddy.matcher.ElementMatcher;
17+
18+
public class AwsAsyncClientHandlerInstrumentation implements TypeInstrumentation {
19+
20+
@Override
21+
public ElementMatcher<TypeDescription> typeMatcher() {
22+
// This class is used internally by the aws async clients to execute requests. Alternatively
23+
// we could instrument all methods that return a CompletableFuture in classes whose name ends
24+
// with "AsyncClient" that extend software.amazon.awssdk.core.SdkClient
25+
return named("software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler");
26+
}
27+
28+
@Override
29+
public void transform(TypeTransformer transformer) {
30+
transformer.applyAdviceToMethod(
31+
named("execute").and(returns(CompletableFuture.class)),
32+
this.getClass().getName() + "$WrapFutureAdvice");
33+
}
34+
35+
@SuppressWarnings("unused")
36+
public static class WrapFutureAdvice {
37+
38+
@Advice.OnMethodExit(suppress = Throwable.class)
39+
public static void methodExit(@Advice.Return(readOnly = false) CompletableFuture<?> future) {
40+
// propagate context into CompletableFuture returned from aws async client methods
41+
future = CompletableFutureWrapper.wrap(future);
42+
}
43+
}
44+
}

instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/AwsSdkInstrumentationModule.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@
55

66
package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
77

8+
import static java.util.Arrays.asList;
9+
810
import com.google.auto.service.AutoService;
911
import io.opentelemetry.javaagent.extension.instrumentation.HelperResourceBuilder;
1012
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1114
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1215
import io.opentelemetry.javaagent.extension.instrumentation.internal.injection.ClassInjector;
1316
import io.opentelemetry.javaagent.extension.instrumentation.internal.injection.InjectionMode;
17+
import java.util.List;
1418

1519
@AutoService(InstrumentationModule.class)
1620
public class AwsSdkInstrumentationModule extends AbstractAwsSdkInstrumentationModule {
@@ -35,6 +39,12 @@ public void injectClasses(ClassInjector injector) {
3539
.inject(InjectionMode.CLASS_ONLY);
3640
}
3741

42+
@Override
43+
public List<TypeInstrumentation> typeInstrumentations() {
44+
return asList(
45+
new ResourceInjectingTypeInstrumentation(), new AwsAsyncClientHandlerInstrumentation());
46+
}
47+
3848
@Override
3949
void doTransform(TypeTransformer transformer) {
4050
// Nothing to transform, this type instrumentation is only used for injecting resources.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import java.util.concurrent.CompletableFuture;
11+
12+
public final class CompletableFutureWrapper {
13+
14+
private CompletableFutureWrapper() {}
15+
16+
public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
17+
return wrap(future, Context.current());
18+
}
19+
20+
public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future, Context context) {
21+
CompletableFuture<T> result = new CompletableFuture<>();
22+
future.whenComplete(
23+
(T value, Throwable throwable) -> {
24+
try (Scope ignored = context.makeCurrent()) {
25+
if (throwable != null) {
26+
result.completeExceptionally(throwable);
27+
} else {
28+
result.complete(value);
29+
}
30+
}
31+
});
32+
33+
return result;
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
7+
8+
import io.opentelemetry.api.trace.SpanKind;
9+
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
10+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
11+
import java.time.Duration;
12+
import org.junit.jupiter.api.AfterAll;
13+
import org.junit.jupiter.api.BeforeAll;
14+
import org.junit.jupiter.api.Test;
15+
import org.junit.jupiter.api.extension.RegisterExtension;
16+
import org.slf4j.LoggerFactory;
17+
import org.testcontainers.containers.localstack.LocalStackContainer;
18+
import org.testcontainers.containers.output.Slf4jLogConsumer;
19+
import org.testcontainers.utility.DockerImageName;
20+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
21+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
22+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
23+
import software.amazon.awssdk.core.async.AsyncRequestBody;
24+
import software.amazon.awssdk.regions.Region;
25+
import software.amazon.awssdk.services.s3.S3AsyncClient;
26+
27+
class S3CrtClientTest {
28+
@RegisterExtension
29+
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
30+
31+
static LocalStackContainer localStack;
32+
static S3AsyncClient s3Client;
33+
34+
@BeforeAll
35+
static void setUp() {
36+
localStack =
37+
new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.0.2"))
38+
.withServices(LocalStackContainer.Service.S3)
39+
.withEnv("DEBUG", "1")
40+
.withStartupTimeout(Duration.ofMinutes(2));
41+
localStack.start();
42+
localStack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test")));
43+
44+
AwsCredentialsProvider credentialsProvider =
45+
StaticCredentialsProvider.create(
46+
AwsBasicCredentials.create(localStack.getAccessKey(), localStack.getSecretKey()));
47+
48+
s3Client =
49+
S3AsyncClient.crtBuilder()
50+
.endpointOverride(localStack.getEndpointOverride(LocalStackContainer.Service.S3))
51+
.credentialsProvider(credentialsProvider)
52+
.region(Region.of(localStack.getRegion()))
53+
.build();
54+
}
55+
56+
@AfterAll
57+
static void cleanUp() {
58+
localStack.stop();
59+
}
60+
61+
@Test
62+
void testCopyObject() {
63+
s3Client.createBucket(request -> request.bucket("bucket")).join();
64+
s3Client
65+
.putObject(
66+
request -> request.bucket("bucket").key("file1.txt"),
67+
AsyncRequestBody.fromString("file content"))
68+
.join();
69+
testing.waitForTraces(2);
70+
testing.clearData();
71+
72+
testing.runWithSpan(
73+
"parent",
74+
() ->
75+
s3Client
76+
.copyObject(
77+
request ->
78+
request
79+
.sourceBucket("bucket")
80+
.sourceKey("file1.txt")
81+
.destinationBucket("bucket")
82+
.destinationKey("file2.txt"))
83+
.join());
84+
85+
testing.waitAndAssertTraces(
86+
trace ->
87+
trace.hasSpansSatisfyingExactly(
88+
span -> span.hasName("parent").hasNoParent().hasKind(SpanKind.INTERNAL),
89+
span ->
90+
span.hasName("S3.HeadObject")
91+
.hasParent(trace.getSpan(0))
92+
.hasKind(SpanKind.CLIENT),
93+
span ->
94+
span.hasName("S3.CopyObject")
95+
.hasParent(trace.getSpan(0))
96+
.hasKind(SpanKind.CLIENT)));
97+
}
98+
}

0 commit comments

Comments
 (0)