Skip to content

Commit 3d5802a

Browse files
committed
Let AWS Lambda SQS handlers report partial batch failures
1 parent efc7151 commit 3d5802a

File tree

15 files changed

+631
-11
lines changed

15 files changed

+631
-11
lines changed

instrumentation/aws-lambda/aws-lambda-events-2.2/library/build.gradle.kts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ dependencies {
88
compileOnly("io.opentelemetry:opentelemetry-sdk")
99
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
1010

11-
compileOnly("com.google.auto.value:auto-value-annotations")
12-
annotationProcessor("com.google.auto.value:auto-value")
13-
1411
library("com.amazonaws:aws-lambda-java-core:1.0.0")
1512
// First version to includes support for SQSEvent, currently the most popular message queue used
1613
// with lambda.

instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.ApiGatewayProxyRequest;
1313
import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.MapUtils;
1414
import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.WrappedLambda;
15+
import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.LambdaParameters;
1516
import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.SerializationUtil;
1617
import io.opentelemetry.sdk.OpenTelemetrySdk;
1718
import java.io.ByteArrayInputStream;

instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapperBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.WrappedLambda;
1414
import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.WrapperConfiguration;
1515
import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.AwsLambdaEventsInstrumenterFactory;
16+
import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.LambdaParameters;
1617
import io.opentelemetry.sdk.OpenTelemetrySdk;
1718
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
1819
import java.lang.reflect.InvocationTargetException;

instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingSqsEventWrapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
1010
import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.WrappedLambda;
1111
import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.WrapperConfiguration;
12+
import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.LambdaParameters;
1213
import io.opentelemetry.sdk.OpenTelemetrySdk;
1314
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
1415
import java.lang.reflect.InvocationTargetException;
Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,20 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package io.opentelemetry.instrumentation.awslambdaevents.v2_2;
6+
package io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal;
77

88
import com.amazonaws.services.lambda.runtime.Context;
99
import java.io.InputStream;
1010
import java.lang.reflect.Method;
1111
import java.util.function.BiFunction;
1212

13-
final class LambdaParameters {
13+
/**
14+
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
15+
* any time.
16+
*/
17+
public final class LambdaParameters {
1418

15-
static <T> Object[] toArray(
19+
public static <T> Object[] toArray(
1620
Method targetMethod, T input, Context context, BiFunction<T, Class<?>, Object> mapper) {
1721
Class<?>[] parameterTypes = targetMethod.getParameterTypes();
1822
Object[] parameters = new Object[parameterTypes.length];
@@ -28,7 +32,7 @@ static <T> Object[] toArray(
2832
return parameters;
2933
}
3034

31-
static <T> Object[] toParameters(Method targetMethod, T input, Context context) {
35+
public static <T> Object[] toParameters(Method targetMethod, T input, Context context) {
3236
Class<?>[] parameterTypes = targetMethod.getParameterTypes();
3337
Object[] parameters = new Object[parameterTypes.length];
3438
for (int i = 0; i < parameterTypes.length; i++) {
@@ -43,7 +47,7 @@ static <T> Object[] toParameters(Method targetMethod, T input, Context context)
4347
return parameters;
4448
}
4549

46-
static Object toInput(
50+
public static Object toInput(
4751
Method targetMethod,
4852
InputStream inputStream,
4953
BiFunction<InputStream, Class<?>, Object> mapper) {
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package io.opentelemetry.instrumentation.awslambdaevents.v2_2;
6+
package io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal;
77

88
import static org.assertj.core.api.Assertions.assertThat;
99
import static org.mockito.Mockito.mock;
1010

1111
import com.amazonaws.services.lambda.runtime.Context;
12-
import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.SerializationUtil;
1312
import java.io.ByteArrayInputStream;
1413
import java.lang.reflect.Method;
1514
import org.junit.jupiter.api.Test;

instrumentation/aws-lambda/aws-lambda-events-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/AbstractAwsLambdaSqsEventHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public abstract class AbstractAwsLambdaSqsEventHandlerTest {
3535
private static final String AWS_TRACE_HEADER =
3636
"Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1";
3737

38-
protected abstract RequestHandler<SQSEvent, Void> handler();
38+
protected abstract RequestHandler<SQSEvent, ?> handler();
3939

4040
protected abstract InstrumentationExtension testing();
4141

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
plugins {
2+
id("otel.library-instrumentation")
3+
}
4+
5+
dependencies {
6+
implementation(project(":instrumentation:aws-lambda:aws-lambda-events-2.2:library"))
7+
8+
compileOnly("io.opentelemetry:opentelemetry-sdk")
9+
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
10+
11+
library("com.amazonaws:aws-lambda-java-core:1.0.0")
12+
library("com.amazonaws:aws-lambda-java-events:3.11.0")
13+
14+
// allows to get the function ARN
15+
testLibrary("com.amazonaws:aws-lambda-java-core:1.2.1")
16+
17+
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
18+
testImplementation("io.opentelemetry:opentelemetry-extension-trace-propagators")
19+
testImplementation("com.amazonaws:aws-lambda-java-serialization:1.1.5")
20+
21+
testImplementation(project(":instrumentation:aws-lambda:aws-lambda-events-2.2:testing"))
22+
testImplementation("uk.org.webcompere:system-stubs-jupiter")
23+
}
24+
25+
tasks.withType<Test>().configureEach {
26+
// required on jdk17
27+
jvmArgs("--add-opens=java.base/java.lang=ALL-UNNAMED")
28+
jvmArgs("--add-opens=java.base/java.util=ALL-UNNAMED")
29+
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.awslambdaevents.v3_11;
7+
8+
import com.amazonaws.services.lambda.runtime.Context;
9+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
10+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
11+
import io.opentelemetry.context.Scope;
12+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
13+
import io.opentelemetry.instrumentation.awslambdacore.v1_0.TracingRequestHandler;
14+
import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.AwsLambdaSqsInstrumenterFactory;
15+
import io.opentelemetry.sdk.OpenTelemetrySdk;
16+
import java.time.Duration;
17+
import javax.annotation.Nullable;
18+
19+
public abstract class TracingSqsEventHandler
20+
extends TracingRequestHandler<SQSEvent, SQSBatchResponse> {
21+
22+
private final Instrumenter<SQSEvent, Void> instrumenter;
23+
24+
/**
25+
* Creates a new {@link TracingSqsEventHandler} which traces using the provided {@link
26+
* OpenTelemetrySdk} and has a timeout of 1s when flushing at the end of an invocation.
27+
*/
28+
protected TracingSqsEventHandler(OpenTelemetrySdk openTelemetrySdk) {
29+
this(openTelemetrySdk, DEFAULT_FLUSH_TIMEOUT);
30+
}
31+
32+
/**
33+
* Creates a new {@link TracingSqsEventHandler} which traces using the provided {@link
34+
* OpenTelemetrySdk} and has a timeout of {@code flushTimeout} when flushing at the end of an
35+
* invocation.
36+
*/
37+
protected TracingSqsEventHandler(OpenTelemetrySdk openTelemetrySdk, Duration flushTimeout) {
38+
this(
39+
openTelemetrySdk, flushTimeout, AwsLambdaSqsInstrumenterFactory.forEvent(openTelemetrySdk));
40+
}
41+
42+
/**
43+
* Creates a new {@link TracingSqsEventHandler} which flushes the provided {@link
44+
* OpenTelemetrySdk}, has a timeout of {@code flushTimeout} when flushing at the end of an
45+
* invocation, and traces using the provided {@link
46+
* io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.AwsLambdaFunctionInstrumenter}.
47+
*/
48+
protected TracingSqsEventHandler(
49+
OpenTelemetrySdk openTelemetrySdk,
50+
Duration flushTimeout,
51+
Instrumenter<SQSEvent, Void> instrumenter) {
52+
super(openTelemetrySdk, flushTimeout);
53+
this.instrumenter = instrumenter;
54+
}
55+
56+
@Nullable
57+
@Override
58+
public SQSBatchResponse doHandleRequest(SQSEvent event, Context context) {
59+
io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.current();
60+
if (instrumenter.shouldStart(parentContext, event)) {
61+
io.opentelemetry.context.Context otelContext = instrumenter.start(parentContext, event);
62+
Throwable error = null;
63+
try (Scope ignored = otelContext.makeCurrent()) {
64+
return handleEvent(event, context);
65+
} catch (Throwable t) {
66+
error = t;
67+
throw t;
68+
} finally {
69+
instrumenter.end(otelContext, event, null, error);
70+
}
71+
} else {
72+
return handleEvent(event, context);
73+
}
74+
}
75+
76+
/**
77+
* Handles a {@linkplain SQSEvent batch of messages}. Implement this class to do the actual
78+
* processing of incoming SQS messages.
79+
*/
80+
@Nullable
81+
protected abstract SQSBatchResponse handleEvent(SQSEvent event, Context context);
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.awslambdaevents.v3_11;
7+
8+
import com.amazonaws.services.lambda.runtime.Context;
9+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
10+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
11+
import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.WrappedLambda;
12+
import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.WrapperConfiguration;
13+
import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.LambdaParameters;
14+
import io.opentelemetry.sdk.OpenTelemetrySdk;
15+
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
16+
import java.lang.reflect.InvocationTargetException;
17+
import java.lang.reflect.Method;
18+
19+
public class TracingSqsEventWrapper extends TracingSqsEventHandler {
20+
21+
private final WrappedLambda wrappedLambda;
22+
private final Method targetMethod;
23+
24+
public TracingSqsEventWrapper() {
25+
this(
26+
AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(),
27+
WrappedLambda.fromConfiguration());
28+
}
29+
30+
// Visible for testing
31+
TracingSqsEventWrapper(OpenTelemetrySdk openTelemetrySdk, WrappedLambda wrappedLambda) {
32+
super(openTelemetrySdk, WrapperConfiguration.flushTimeout());
33+
this.wrappedLambda = wrappedLambda;
34+
this.targetMethod = wrappedLambda.getRequestTargetMethod();
35+
}
36+
37+
@Override
38+
protected SQSBatchResponse handleEvent(SQSEvent sqsEvent, Context context) {
39+
Object[] parameters =
40+
LambdaParameters.toArray(targetMethod, sqsEvent, context, (event, clazz) -> event);
41+
try {
42+
Object result = targetMethod.invoke(wrappedLambda.getTargetObject(), parameters);
43+
return result instanceof SQSBatchResponse ? (SQSBatchResponse) result : null;
44+
} catch (IllegalAccessException e) {
45+
throw new IllegalStateException("Method is inaccessible", e);
46+
} catch (InvocationTargetException e) {
47+
throw (e.getCause() instanceof RuntimeException
48+
? (RuntimeException) e.getCause()
49+
: new IllegalStateException(e.getTargetException()));
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)