Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions instrumentation/aws-sdk/aws-sdk-2.2/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ testing {
}
}

val s3CrtTest by registering(JvmTestSuite::class) {
dependencies {
if (latestDepTest) {
implementation("software.amazon.awssdk:s3:latest.release")
implementation("software.amazon.awssdk.crt:aws-crt:latest.release")
} else {
implementation("software.amazon.awssdk:s3:2.27.21")
implementation("software.amazon.awssdk.crt:aws-crt:0.30.11")
}
implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:library"))
implementation("org.testcontainers:localstack")
}
}

val testBedrockRuntime by registering(JvmTestSuite::class) {
dependencies {
implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:testing"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class AwsAsyncClientHandlerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// This class is used internally by the aws async clients to execute requests. Alternatively
// we could instrument all methods that return a CompletableFuture in classes whose name ends
// with "AsyncClient" that extend software.amazon.awssdk.core.SdkClient
return named("software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("execute").and(returns(CompletableFuture.class)),
this.getClass().getName() + "$WrapFutureAdvice");
}

@SuppressWarnings("unused")
public static class WrapFutureAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(@Advice.Return(readOnly = false) CompletableFuture<?> future) {
// propagate context into CompletableFuture returned from aws async client methods
future = CompletableFutureWrapper.wrap(future);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@

package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.HelperResourceBuilder;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.extension.instrumentation.internal.injection.ClassInjector;
import io.opentelemetry.javaagent.extension.instrumentation.internal.injection.InjectionMode;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class AwsSdkInstrumentationModule extends AbstractAwsSdkInstrumentationModule {
Expand All @@ -35,6 +39,12 @@ public void injectClasses(ClassInjector injector) {
.inject(InjectionMode.CLASS_ONLY);
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new ResourceInjectingTypeInstrumentation(), new AwsAsyncClientHandlerInstrumentation());
}

@Override
void doTransform(TypeTransformer transformer) {
// Nothing to transform, this type instrumentation is only used for injecting resources.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.concurrent.CompletableFuture;

public final class CompletableFutureWrapper {

private CompletableFutureWrapper() {}

public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
return wrap(future, Context.current());
}

public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future, Context context) {
CompletableFuture<T> result = new CompletableFuture<>();
future.whenComplete(
(T value, Throwable throwable) -> {
try (Scope ignored = context.makeCurrent()) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(value);
}
}
});

return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;

import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;

class S3CrtClientTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

static LocalStackContainer localStack;
static S3AsyncClient s3Client;

@BeforeAll
static void setUp() {
localStack =
new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.0.2"))
.withServices(LocalStackContainer.Service.S3)
.withEnv("DEBUG", "1")
.withStartupTimeout(Duration.ofMinutes(2));
localStack.start();
localStack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test")));

AwsCredentialsProvider credentialsProvider =
StaticCredentialsProvider.create(
AwsBasicCredentials.create(localStack.getAccessKey(), localStack.getSecretKey()));

s3Client =
S3AsyncClient.crtBuilder()
.endpointOverride(localStack.getEndpointOverride(LocalStackContainer.Service.S3))
.credentialsProvider(credentialsProvider)
.region(Region.of(localStack.getRegion()))
.build();
}

@AfterAll
static void cleanUp() {
localStack.stop();
}

@Test
void testCopyObject() {
s3Client.createBucket(request -> request.bucket("bucket")).join();
s3Client
.putObject(
request -> request.bucket("bucket").key("file1.txt"),
AsyncRequestBody.fromString("file content"))
.join();
testing.waitForTraces(2);
testing.clearData();

testing.runWithSpan(
"parent",
() ->
s3Client
.copyObject(
request ->
request
.sourceBucket("bucket")
.sourceKey("file1.txt")
.destinationBucket("bucket")
.destinationKey("file2.txt"))
.join());

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent().hasKind(SpanKind.INTERNAL),
span ->
span.hasName("S3.HeadObject")
.hasParent(trace.getSpan(0))
.hasKind(SpanKind.CLIENT),
span ->
span.hasName("S3.CopyObject")
.hasParent(trace.getSpan(0))
.hasKind(SpanKind.CLIENT)));
}
}
Loading