From 979e47b95d364e610c7af52651f30f7d4147f025 Mon Sep 17 00:00:00 2001 From: Luke Zhang Date: Wed, 5 Mar 2025 09:56:42 -0800 Subject: [PATCH] feat: Add instrumentation for Lambda Java interface HandleStreamRequest Problem: All Lambda Java users using HandleStreamRequest experience broken application signal traces. This issue affects all Spring Boot 3 users. Cause: AWS Lambda for Java provides two handler interfaces: com.amazonaws.services.lambda.runtime.RequestHandler com.amazonaws.services.lambda.runtime.RequestStreamHandler However, instrumentation for RequestStreamHandler is missing in the OpenTelemetry Java agent. Fix: Added instrumentation support for RequestStreamHandler. Test: Unit tests pass (./gradlew spotlessCheck assemble instrumentation:test). Manual end-to-end tests pass: Deployed Lambda functions with Spring Boot 3 and Amazon Serverless Java Container. Enabled application signals, observed broken traces. Disabled application signals and added a private build of the Java layer for Lambda with this change. Verified traces and spans are now correct. Backward Compatibility: No risk of breaking existing functionality. The change only adds instrumentation for RequestStreamHandler without modifying existing behavior for RequestHandler. Existing users not using RequestStreamHandler remain unaffected. --- lambda-layer/build-layer.sh | 2 + .../StreamHandlerInstrumentation.patch | 505 ++++++++++++++++++ 2 files changed, 507 insertions(+) create mode 100644 lambda-layer/patches/StreamHandlerInstrumentation.patch diff --git a/lambda-layer/build-layer.sh b/lambda-layer/build-layer.sh index 7a09ff48e7..7d15d962a0 100755 --- a/lambda-layer/build-layer.sh +++ b/lambda-layer/build-layer.sh @@ -28,6 +28,8 @@ patch -p1 < "$SOURCEDIR"/../.github/patches/opentelemetry-java-instrumentation.p # This patch is for Lambda related context propagation patch -p1 < "$SOURCEDIR"/patches/opentelemetry-java-instrumentation.patch +patch -p1 < "$SOURCEDIR"/patches/StreamHandlerInstrumentation.patch + ./gradlew publishToMavenLocal popd rm -rf opentelemetry-java-instrumentation diff --git a/lambda-layer/patches/StreamHandlerInstrumentation.patch b/lambda-layer/patches/StreamHandlerInstrumentation.patch new file mode 100644 index 0000000000..48a1deb634 --- /dev/null +++ b/lambda-layer/patches/StreamHandlerInstrumentation.patch @@ -0,0 +1,505 @@ +diff --git a/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaInstrumentationModule.java b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaInstrumentationModule.java +index 35d6b70ed6..b6a305178e 100644 +--- a/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaInstrumentationModule.java ++++ b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaInstrumentationModule.java +@@ -6,17 +6,18 @@ + package io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0; + + import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +-import static java.util.Collections.singletonList; + import static net.bytebuddy.matcher.ElementMatchers.not; + + import com.google.auto.service.AutoService; + import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; + import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; ++import java.util.Arrays; + import java.util.List; + import net.bytebuddy.matcher.ElementMatcher; + + @AutoService(InstrumentationModule.class) + public class AwsLambdaInstrumentationModule extends InstrumentationModule { ++ + public AwsLambdaInstrumentationModule() { + super("aws-lambda-core", "aws-lambda-core-1.0", "aws-lambda"); + } +@@ -34,6 +35,8 @@ public class AwsLambdaInstrumentationModule extends InstrumentationModule { + + @Override + public List typeInstrumentations() { +- return singletonList(new AwsLambdaRequestHandlerInstrumentation()); ++ return Arrays.asList( ++ new AwsLambdaRequestHandlerInstrumentation(), ++ new AwsLambdaRequestStreamHandlerInstrumentation()); + } + } +diff --git a/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaRequestStreamHandlerInstrumentation.java b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaRequestStreamHandlerInstrumentation.java +new file mode 100644 +index 0000000000..73b82a62a2 +--- /dev/null ++++ b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaRequestStreamHandlerInstrumentation.java +@@ -0,0 +1,90 @@ ++/* ++ * Copyright The OpenTelemetry Authors ++ * SPDX-License-Identifier: Apache-2.0 ++ */ ++ ++package io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0; ++ ++import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; ++import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; ++import static io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0.AwsLambdaInstrumentationHelper.functionInstrumenter; ++import static net.bytebuddy.matcher.ElementMatchers.isMethod; ++import static net.bytebuddy.matcher.ElementMatchers.isPublic; ++import static net.bytebuddy.matcher.ElementMatchers.named; ++import static net.bytebuddy.matcher.ElementMatchers.takesArgument; ++ ++import com.amazonaws.services.lambda.runtime.Context; ++import io.opentelemetry.context.Scope; ++import io.opentelemetry.instrumentation.awslambdacore.v1_0.AwsLambdaRequest; ++import io.opentelemetry.javaagent.bootstrap.OpenTelemetrySdkAccess; ++import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; ++import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; ++import java.io.InputStream; ++import java.util.Collections; ++import java.util.concurrent.TimeUnit; ++import net.bytebuddy.asm.Advice; ++import net.bytebuddy.description.type.TypeDescription; ++import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing; ++import net.bytebuddy.matcher.ElementMatcher; ++ ++public class AwsLambdaRequestStreamHandlerInstrumentation implements TypeInstrumentation { ++ ++ @Override ++ public ElementMatcher classLoaderOptimization() { ++ return hasClassesNamed("com.amazonaws.services.lambda.runtime.RequestStreamHandler"); ++ } ++ ++ @Override ++ public ElementMatcher typeMatcher() { ++ return implementsInterface(named("com.amazonaws.services.lambda.runtime.RequestStreamHandler")); ++ } ++ ++ @Override ++ public void transform(TypeTransformer transformer) { ++ transformer.applyAdviceToMethod( ++ isMethod() ++ .and(isPublic()) ++ .and(named("handleRequest")) ++ .and(takesArgument(2, named("com.amazonaws.services.lambda.runtime.Context"))), ++ AwsLambdaRequestStreamHandlerInstrumentation.class.getName() + "$HandleRequestAdvice"); ++ } ++ ++ @SuppressWarnings("unused") ++ public static class HandleRequestAdvice { ++ ++ @Advice.OnMethodEnter(suppress = Throwable.class) ++ public static void onEnter( ++ @Advice.Argument(0) InputStream input, ++ @Advice.Argument(2) Context context, ++ @Advice.Local("otelInput") AwsLambdaRequest otelInput, ++ @Advice.Local("otelContext") io.opentelemetry.context.Context otelContext, ++ @Advice.Local("otelScope") Scope otelScope) { ++ ++ otelInput = AwsLambdaRequest.create(context, input, Collections.emptyMap()); ++ io.opentelemetry.context.Context parentContext = functionInstrumenter().extract(otelInput); ++ ++ if (!functionInstrumenter().shouldStart(parentContext, otelInput)) { ++ return; ++ } ++ ++ otelContext = functionInstrumenter().start(parentContext, otelInput); ++ otelScope = otelContext.makeCurrent(); ++ } ++ ++ @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) ++ public static void stopSpan( ++ @Advice.Argument(value = 0, typing = Typing.DYNAMIC) Object arg, ++ @Advice.Thrown Throwable throwable, ++ @Advice.Local("otelInput") AwsLambdaRequest input, ++ @Advice.Local("otelContext") io.opentelemetry.context.Context functionContext, ++ @Advice.Local("otelScope") Scope functionScope) { ++ ++ if (functionScope != null) { ++ functionScope.close(); ++ functionInstrumenter().end(functionContext, input, null, throwable); ++ } ++ ++ OpenTelemetrySdkAccess.forceFlush(1, TimeUnit.SECONDS); ++ } ++ } ++} +diff --git a/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaStreamHandlerTest.java b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaStreamHandlerTest.java +new file mode 100644 +index 0000000000..7bed968d77 +--- /dev/null ++++ b/instrumentation/aws-lambda/aws-lambda-core-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdacore/v1_0/AwsLambdaStreamHandlerTest.java +@@ -0,0 +1,113 @@ ++/* ++ * Copyright The OpenTelemetry Authors ++ * SPDX-License-Identifier: Apache-2.0 ++ */ ++ ++package io.opentelemetry.javaagent.instrumentation.awslambdacore.v1_0; ++ ++import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; ++import static org.assertj.core.api.Assertions.assertThat; ++import static org.assertj.core.api.Assertions.catchThrowable; ++import static org.mockito.Mockito.when; ++ ++import com.amazonaws.services.lambda.runtime.Context; ++import com.amazonaws.services.lambda.runtime.RequestStreamHandler; ++import io.opentelemetry.api.trace.SpanKind; ++import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; ++import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; ++import io.opentelemetry.sdk.trace.data.StatusData; ++import io.opentelemetry.semconv.SemanticAttributes; ++import java.io.BufferedReader; ++import java.io.BufferedWriter; ++import java.io.ByteArrayInputStream; ++import java.io.ByteArrayOutputStream; ++import java.io.IOException; ++import java.io.InputStream; ++import java.io.InputStreamReader; ++import java.io.OutputStream; ++import java.io.OutputStreamWriter; ++import java.nio.charset.StandardCharsets; ++import org.junit.jupiter.api.AfterEach; ++import org.junit.jupiter.api.BeforeEach; ++import org.junit.jupiter.api.Test; ++import org.junit.jupiter.api.extension.ExtendWith; ++import org.junit.jupiter.api.extension.RegisterExtension; ++import org.mockito.Mock; ++import org.mockito.junit.jupiter.MockitoExtension; ++ ++@ExtendWith(MockitoExtension.class) ++public class AwsLambdaStreamHandlerTest { ++ ++ @RegisterExtension ++ public static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); ++ ++ @Mock private Context context; ++ ++ @BeforeEach ++ void setUp() { ++ when(context.getFunctionName()).thenReturn("my_function"); ++ when(context.getAwsRequestId()).thenReturn("1-22-333"); ++ } ++ ++ @AfterEach ++ void tearDown() { ++ assertThat(testing.forceFlushCalled()).isTrue(); ++ } ++ ++ @Test ++ void handlerTraced() throws Exception { ++ InputStream input = new ByteArrayInputStream("hello\n".getBytes(StandardCharsets.UTF_8)); ++ OutputStream output = new ByteArrayOutputStream(); ++ RequestStreamHandlerTestImpl handler = new RequestStreamHandlerTestImpl(); ++ handler.handleRequest(input, output, context); ++ ++ testing.waitAndAssertTraces( ++ trace -> ++ trace.hasSpansSatisfyingExactly( ++ span -> ++ span.hasName("my_function") ++ .hasKind(SpanKind.SERVER) ++ .hasAttributesSatisfyingExactly( ++ equalTo(SemanticAttributes.FAAS_INVOCATION_ID, "1-22-333")))); ++ } ++ ++ @Test ++ void handlerTracedWithException() { ++ InputStream input = new ByteArrayInputStream("bye\n".getBytes(StandardCharsets.UTF_8)); ++ OutputStream output = new ByteArrayOutputStream(); ++ RequestStreamHandlerTestImpl handler = new RequestStreamHandlerTestImpl(); ++ ++ Throwable thrown = catchThrowable(() -> handler.handleRequest(input, output, context)); ++ assertThat(thrown).isInstanceOf(IllegalArgumentException.class); ++ ++ testing.waitAndAssertTraces( ++ trace -> ++ trace.hasSpansSatisfyingExactly( ++ span -> ++ span.hasName("my_function") ++ .hasKind(SpanKind.SERVER) ++ .hasStatus(StatusData.error()) ++ .hasException(thrown) ++ .hasAttributesSatisfyingExactly( ++ equalTo(SemanticAttributes.FAAS_INVOCATION_ID, "1-22-333")))); ++ } ++ ++ static final class RequestStreamHandlerTestImpl implements RequestStreamHandler { ++ @Override ++ public void handleRequest(InputStream input, OutputStream output, Context context) ++ throws IOException { ++ BufferedReader reader = ++ new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); ++ BufferedWriter writer = ++ new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)); ++ String line = reader.readLine(); ++ if (line.equals("hello")) { ++ writer.write("world"); ++ writer.flush(); ++ writer.close(); ++ } else { ++ throw new IllegalArgumentException("bad argument"); ++ } ++ } ++ } ++} +diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaInstrumentationModule.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaInstrumentationModule.java +index 9e0e372241..2dd6051c23 100644 +--- a/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaInstrumentationModule.java ++++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaInstrumentationModule.java +@@ -6,11 +6,11 @@ + package io.opentelemetry.javaagent.instrumentation.awslambdaevents.v2_2; + + import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +-import static java.util.Collections.singletonList; + + import com.google.auto.service.AutoService; + import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; + import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; ++import java.util.Arrays; + import java.util.List; + import net.bytebuddy.matcher.ElementMatcher; + +@@ -32,6 +32,8 @@ public class AwsLambdaInstrumentationModule extends InstrumentationModule { + + @Override + public List typeInstrumentations() { +- return singletonList(new AwsLambdaRequestHandlerInstrumentation()); ++ return Arrays.asList( ++ new AwsLambdaRequestHandlerInstrumentation(), ++ new AwsLambdaRequestStreamHandlerInstrumentation()); + } + } +diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaRequestStreamHandlerInstrumentation.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaRequestStreamHandlerInstrumentation.java +new file mode 100644 +index 0000000000..f21a4a5526 +--- /dev/null ++++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaRequestStreamHandlerInstrumentation.java +@@ -0,0 +1,104 @@ ++/* ++ * Copyright The OpenTelemetry Authors ++ * SPDX-License-Identifier: Apache-2.0 ++ */ ++ ++package io.opentelemetry.javaagent.instrumentation.awslambdaevents.v2_2; ++ ++import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; ++import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; ++import static net.bytebuddy.matcher.ElementMatchers.isMethod; ++import static net.bytebuddy.matcher.ElementMatchers.isPublic; ++import static net.bytebuddy.matcher.ElementMatchers.named; ++import static net.bytebuddy.matcher.ElementMatchers.takesArgument; ++ ++import com.amazonaws.services.lambda.runtime.Context; ++import com.amazonaws.services.lambda.runtime.events.SQSEvent; ++import io.opentelemetry.context.Scope; ++import io.opentelemetry.instrumentation.awslambdacore.v1_0.AwsLambdaRequest; ++import io.opentelemetry.javaagent.bootstrap.OpenTelemetrySdkAccess; ++import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; ++import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; ++import java.io.InputStream; ++import java.util.Collections; ++import java.util.concurrent.TimeUnit; ++import net.bytebuddy.asm.Advice; ++import net.bytebuddy.description.type.TypeDescription; ++import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing; ++import net.bytebuddy.matcher.ElementMatcher; ++ ++public class AwsLambdaRequestStreamHandlerInstrumentation implements TypeInstrumentation { ++ ++ @Override ++ public ElementMatcher classLoaderOptimization() { ++ return hasClassesNamed("com.amazonaws.services.lambda.runtime.RequestStreamHandler"); ++ } ++ ++ @Override ++ public ElementMatcher typeMatcher() { ++ return implementsInterface(named("com.amazonaws.services.lambda.runtime.RequestStreamHandler")); ++ } ++ ++ @Override ++ public void transform(TypeTransformer transformer) { ++ transformer.applyAdviceToMethod( ++ isMethod() ++ .and(isPublic()) ++ .and(named("handleRequest")) ++ .and(takesArgument(2, named("com.amazonaws.services.lambda.runtime.Context"))), ++ AwsLambdaRequestStreamHandlerInstrumentation.class.getName() + "$HandleRequestAdvice"); ++ } ++ ++ @SuppressWarnings("unused") ++ public static class HandleRequestAdvice { ++ ++ @Advice.OnMethodEnter(suppress = Throwable.class) ++ public static void onEnter( ++ @Advice.Argument(0) InputStream input, ++ @Advice.Argument(2) Context context, ++ @Advice.Local("otelInput") AwsLambdaRequest otelInput, ++ @Advice.Local("otelFunctionContext") io.opentelemetry.context.Context functionContext, ++ @Advice.Local("otelFunctionScope") Scope functionScope, ++ @Advice.Local("otelMessageContext") io.opentelemetry.context.Context messageContext, ++ @Advice.Local("otelMessageScope") Scope messageScope) { ++ otelInput = AwsLambdaRequest.create(context, input, Collections.emptyMap()); ++ io.opentelemetry.context.Context parentContext = ++ AwsLambdaInstrumentationHelper.functionInstrumenter().extract(otelInput); ++ ++ if (!AwsLambdaInstrumentationHelper.functionInstrumenter() ++ .shouldStart(parentContext, otelInput)) { ++ return; ++ } ++ ++ functionContext = ++ AwsLambdaInstrumentationHelper.functionInstrumenter().start(parentContext, otelInput); ++ ++ functionScope = functionContext.makeCurrent(); ++ } ++ ++ @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) ++ public static void stopSpan( ++ @Advice.Argument(value = 0, typing = Typing.DYNAMIC) Object arg, ++ @Advice.Thrown Throwable throwable, ++ @Advice.Local("otelInput") AwsLambdaRequest input, ++ @Advice.Local("otelFunctionContext") io.opentelemetry.context.Context functionContext, ++ @Advice.Local("otelFunctionScope") Scope functionScope, ++ @Advice.Local("otelMessageContext") io.opentelemetry.context.Context messageContext, ++ @Advice.Local("otelMessageScope") Scope messageScope) { ++ ++ if (messageScope != null) { ++ messageScope.close(); ++ AwsLambdaInstrumentationHelper.messageInstrumenter() ++ .end(messageContext, (SQSEvent) arg, null, throwable); ++ } ++ ++ if (functionScope != null) { ++ functionScope.close(); ++ AwsLambdaInstrumentationHelper.functionInstrumenter() ++ .end(functionContext, input, null, throwable); ++ } ++ ++ OpenTelemetrySdkAccess.forceFlush(1, TimeUnit.SECONDS); ++ } ++ } ++} +diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaStreamHandlerTest.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaStreamHandlerTest.java +new file mode 100644 +index 0000000000..e30690418d +--- /dev/null ++++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awslambdaevents/v2_2/AwsLambdaStreamHandlerTest.java +@@ -0,0 +1,113 @@ ++/* ++ * Copyright The OpenTelemetry Authors ++ * SPDX-License-Identifier: Apache-2.0 ++ */ ++ ++package io.opentelemetry.javaagent.instrumentation.awslambdaevents.v2_2; ++ ++import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; ++import static org.assertj.core.api.Assertions.assertThat; ++import static org.assertj.core.api.Assertions.catchThrowable; ++import static org.mockito.Mockito.when; ++ ++import com.amazonaws.services.lambda.runtime.Context; ++import com.amazonaws.services.lambda.runtime.RequestStreamHandler; ++import io.opentelemetry.api.trace.SpanKind; ++import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; ++import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; ++import io.opentelemetry.sdk.trace.data.StatusData; ++import io.opentelemetry.semconv.SemanticAttributes; ++import java.io.BufferedReader; ++import java.io.BufferedWriter; ++import java.io.ByteArrayInputStream; ++import java.io.ByteArrayOutputStream; ++import java.io.IOException; ++import java.io.InputStream; ++import java.io.InputStreamReader; ++import java.io.OutputStream; ++import java.io.OutputStreamWriter; ++import java.nio.charset.StandardCharsets; ++import org.junit.jupiter.api.AfterEach; ++import org.junit.jupiter.api.BeforeEach; ++import org.junit.jupiter.api.Test; ++import org.junit.jupiter.api.extension.ExtendWith; ++import org.junit.jupiter.api.extension.RegisterExtension; ++import org.mockito.Mock; ++import org.mockito.junit.jupiter.MockitoExtension; ++ ++@ExtendWith(MockitoExtension.class) ++public class AwsLambdaStreamHandlerTest { ++ ++ @RegisterExtension ++ public static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); ++ ++ @Mock private Context context; ++ ++ @BeforeEach ++ void setUp() { ++ when(context.getFunctionName()).thenReturn("my_function"); ++ when(context.getAwsRequestId()).thenReturn("1-22-333"); ++ } ++ ++ @AfterEach ++ void tearDown() { ++ assertThat(testing.forceFlushCalled()).isTrue(); ++ } ++ ++ @Test ++ void handlerTraced() throws Exception { ++ InputStream input = new ByteArrayInputStream("hello\n".getBytes(StandardCharsets.UTF_8)); ++ OutputStream output = new ByteArrayOutputStream(); ++ RequestStreamHandlerTestImpl handler = new RequestStreamHandlerTestImpl(); ++ handler.handleRequest(input, output, context); ++ ++ testing.waitAndAssertTraces( ++ trace -> ++ trace.hasSpansSatisfyingExactly( ++ span -> ++ span.hasName("my_function") ++ .hasKind(SpanKind.SERVER) ++ .hasAttributesSatisfyingExactly( ++ equalTo(SemanticAttributes.FAAS_INVOCATION_ID, "1-22-333")))); ++ } ++ ++ @Test ++ void handlerTracedWithException() { ++ InputStream input = new ByteArrayInputStream("bye\n".getBytes(StandardCharsets.UTF_8)); ++ OutputStream output = new ByteArrayOutputStream(); ++ RequestStreamHandlerTestImpl handler = new RequestStreamHandlerTestImpl(); ++ ++ Throwable thrown = catchThrowable(() -> handler.handleRequest(input, output, context)); ++ assertThat(thrown).isInstanceOf(IllegalArgumentException.class); ++ ++ testing.waitAndAssertTraces( ++ trace -> ++ trace.hasSpansSatisfyingExactly( ++ span -> ++ span.hasName("my_function") ++ .hasKind(SpanKind.SERVER) ++ .hasStatus(StatusData.error()) ++ .hasException(thrown) ++ .hasAttributesSatisfyingExactly( ++ equalTo(SemanticAttributes.FAAS_INVOCATION_ID, "1-22-333")))); ++ } ++ ++ static final class RequestStreamHandlerTestImpl implements RequestStreamHandler { ++ @Override ++ public void handleRequest(InputStream input, OutputStream output, Context context) ++ throws IOException { ++ BufferedReader reader = ++ new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); ++ BufferedWriter writer = ++ new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)); ++ String line = reader.readLine(); ++ if (line.equals("hello")) { ++ writer.write("world"); ++ writer.flush(); ++ writer.close(); ++ } else { ++ throw new IllegalArgumentException("bad argument"); ++ } ++ } ++ } ++}