-
Notifications
You must be signed in to change notification settings - Fork 324
Inject trace context into AWS Step Functions input #7585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
7f8721d
a974597
7ecfa8c
fa36832
22c652b
eb642b0
0778f3d
f9d0eea
421856b
895341a
9d76dec
aba1119
e56f2d3
3039780
370df6d
a644a8b
fc6e570
ee83940
973cd79
96f7f39
fdb01a2
8417f6e
0aac239
2992508
da7af0b
ca13195
2d4507d
29a651c
a434846
1197c1d
0f304c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| muzzle { | ||
| pass { | ||
| group = "software.amazon.awssdk" | ||
| module = "sfn" | ||
| versions = "[2.15.35,)" | ||
| assertInverse = true | ||
| } | ||
| } | ||
|
|
||
| apply from: "$rootDir/gradle/java.gradle" | ||
|
|
||
| addTestSuiteForDir('latestDepTest', 'test') | ||
| addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test') | ||
|
|
||
| dependencies { | ||
| compileOnly group: 'software.amazon.awssdk', name: 'sfn', version: '2.15.35' | ||
|
|
||
| // Include httpclient instrumentation for testing because it is a dependency for aws-sdk. | ||
| testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4') | ||
| testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2') | ||
| testImplementation 'software.amazon.awssdk:sfn:2.15.35' | ||
| testImplementation libs.testcontainers | ||
|
|
||
| latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sfn', version: '+' | ||
| } | ||
|
|
||
| tasks.withType(Test).configureEach { | ||
| usesService(testcontainersLimit) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| package datadog.trace.instrumentation.aws.v2.sfn; | ||
|
|
||
| import datadog.trace.bootstrap.instrumentation.api.AgentSpan; | ||
|
|
||
| public class InputAttributeInjector { | ||
| public static String buildTraceContext(AgentSpan span) { | ||
| // Extract span tags | ||
| StringBuilder spanTagsJSON = new StringBuilder(); | ||
| spanTagsJSON.append('{'); | ||
| span.getTags() | ||
| .forEach( | ||
| (tagKey, tagValue) -> | ||
| spanTagsJSON | ||
| .append('"') | ||
| .append(tagKey) | ||
| .append("\":\"") | ||
| .append(tagValue) | ||
| .append("\",")); | ||
| spanTagsJSON.setLength(spanTagsJSON.length() - 1); // remove trailing comma | ||
| spanTagsJSON.append('}'); | ||
|
|
||
| // Build DD trace context object | ||
| String ddTraceContextJSON = | ||
| String.format( | ||
| "\"_datadog\": { \"x-datadog-trace-id\": \"%s\",\"x-datadog-parent-id\":\"%s\", \"x-datadog-tags\": %s }", | ||
| span.getTraceId().toString(), span.getSpanId(), spanTagsJSON); | ||
|
|
||
| return ddTraceContextJSON; | ||
| } | ||
|
|
||
| public static String getModifiedInput(String request, String ddTraceContextJSON) { | ||
| StringBuilder modifiedInput = new StringBuilder(request); | ||
| int startPos = modifiedInput.indexOf("{"); | ||
nhulston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| int endPos = modifiedInput.lastIndexOf("}"); | ||
| String inputContent = modifiedInput.substring(startPos + 1, endPos); | ||
| if (inputContent.isEmpty()) { | ||
| modifiedInput.insert(endPos, ddTraceContextJSON); | ||
| } else { | ||
| modifiedInput.insert( | ||
| endPos, ",".concat(ddTraceContextJSON)); // prepend comma to existing input | ||
| } | ||
| return modifiedInput.toString(); | ||
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| package datadog.trace.instrumentation.aws.v2.sfn; | ||
|
|
||
| import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.agent.tooling.InstrumenterModule; | ||
| import java.util.List; | ||
| import net.bytebuddy.asm.Advice; | ||
| import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; | ||
|
|
||
| /** AWS SDK v2 Step Function instrumentation */ | ||
| @AutoService(InstrumenterModule.class) | ||
| public final class SfnClientInstrumentation extends InstrumenterModule.Tracing | ||
| implements Instrumenter.ForSingleType { | ||
|
|
||
| public SfnClientInstrumentation() { | ||
| super("sfn", "aws-sdk"); | ||
| } | ||
|
|
||
| @Override | ||
| public String instrumentedType() { | ||
| return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder"; | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice( | ||
| isMethod().and(named("resolveExecutionInterceptors")), | ||
| SfnClientInstrumentation.class.getName() + "$AwsSfnBuilderAdvice"); | ||
| } | ||
|
|
||
| @Override | ||
| public String[] helperClassNames() { | ||
| return new String[] {packageName + ".SfnInterceptor", packageName + ".InputAttributeInjector"}; | ||
| } | ||
|
|
||
| public static class AwsSfnBuilderAdvice { | ||
| @Advice.OnMethodExit(suppress = Throwable.class) | ||
| public static void addHandler(@Advice.Return final List<ExecutionInterceptor> interceptors) { | ||
| for (ExecutionInterceptor interceptor : interceptors) { | ||
| if (interceptor instanceof SfnInterceptor) { | ||
| return; | ||
| } | ||
| } | ||
| interceptors.add(new SfnInterceptor()); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| package datadog.trace.instrumentation.aws.v2.sfn; | ||
|
|
||
| import datadog.trace.bootstrap.InstanceStore; | ||
| import datadog.trace.bootstrap.instrumentation.api.AgentSpan; | ||
| import software.amazon.awssdk.core.SdkRequest; | ||
| import software.amazon.awssdk.core.interceptor.Context; | ||
| import software.amazon.awssdk.core.interceptor.ExecutionAttribute; | ||
| import software.amazon.awssdk.core.interceptor.ExecutionAttributes; | ||
| import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; | ||
| import software.amazon.awssdk.services.sfn.model.StartExecutionRequest; | ||
| import software.amazon.awssdk.services.sfn.model.StartSyncExecutionRequest; | ||
|
|
||
| public class SfnInterceptor implements ExecutionInterceptor { | ||
|
|
||
| public static final ExecutionAttribute<AgentSpan> SPAN_ATTRIBUTE = | ||
| InstanceStore.of(ExecutionAttribute.class) | ||
| .putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan")); | ||
|
|
||
| public SfnInterceptor() {} | ||
|
|
||
| @Override | ||
| public SdkRequest modifyRequest( | ||
nhulston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Context.ModifyRequest context, ExecutionAttributes executionAttributes) { | ||
| final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); | ||
| // StartExecutionRequest | ||
| if (context.request() instanceof StartExecutionRequest) { | ||
| StartExecutionRequest request = (StartExecutionRequest) context.request(); | ||
| if (request.input() == null) { | ||
| return request; | ||
| } | ||
| return injectTraceContext(span, request); | ||
| } | ||
|
|
||
| // StartSyncExecutionRequest | ||
| if (context.request() instanceof StartSyncExecutionRequest) { | ||
| StartSyncExecutionRequest request = (StartSyncExecutionRequest) context.request(); | ||
| if (request.input() == null) { | ||
| return request; | ||
| } | ||
| return injectTraceContext(span, request); | ||
| } | ||
|
|
||
| return context.request(); | ||
| } | ||
|
|
||
| private SdkRequest injectTraceContext(AgentSpan span, StartExecutionRequest request) { | ||
| String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span); | ||
| // Inject the trace context into the Step Function input | ||
| String modifiedInput = | ||
| InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON); | ||
|
|
||
| return request.toBuilder().input(modifiedInput).build(); | ||
| } | ||
|
|
||
| private SdkRequest injectTraceContext(AgentSpan span, StartSyncExecutionRequest request) { | ||
| String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span); | ||
| // Inject the trace context into the Step Function input | ||
| String modifiedInput = | ||
| InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON); | ||
|
|
||
| return request.toBuilder().input(modifiedInput).build(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| import datadog.trace.agent.test.naming.VersionedNamingTestBase | ||
| import datadog.trace.agent.test.utils.TraceUtils | ||
| import datadog.trace.api.DDSpanTypes | ||
| import datadog.trace.bootstrap.instrumentation.api.Tags | ||
| import groovy.json.JsonSlurper | ||
| import org.testcontainers.containers.GenericContainer | ||
| import org.testcontainers.utility.DockerImageName | ||
| import software.amazon.awssdk.services.sfn.SfnClient | ||
| import software.amazon.awssdk.services.sfn.model.StartExecutionResponse | ||
| import software.amazon.awssdk.regions.Region | ||
| import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider | ||
| import software.amazon.awssdk.auth.credentials.AwsBasicCredentials | ||
| import spock.lang.Shared | ||
|
|
||
| import java.time.Duration | ||
|
|
||
| import static datadog.trace.agent.test.utils.TraceUtils.basicSpan | ||
|
|
||
|
|
||
| abstract class SfnClientTest extends VersionedNamingTestBase { | ||
| @Shared GenericContainer localStack | ||
| @Shared SfnClient sfnClient | ||
DylanLovesCoffee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| @Shared String testStateMachineARN | ||
| @Shared Object endPoint | ||
|
|
||
| def setupSpec() { | ||
| localStack = new GenericContainer(DockerImageName.parse("localstack/localstack")) | ||
| .withExposedPorts(4566) | ||
| .withEnv("SERVICES", "stepfunctions") | ||
| .withReuse(true) | ||
| .withStartupTimeout(Duration.ofSeconds(120)) | ||
| localStack.start() | ||
| endPoint = "http://" + localStack.getHost() + ":" + localStack.getMappedPort(4566) | ||
| sfnClient = SfnClient.builder() | ||
| .endpointOverride(URI.create(endPoint)) | ||
| .region(Region.US_EAST_1) | ||
| .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) | ||
| .build() | ||
|
|
||
| def response = sfnClient.createStateMachine { builder -> | ||
| builder.name("testStateMachine") | ||
| .definition("{\"StartAt\": \"HelloWorld\", \"States\": {\"HelloWorld\": {\"Type\": \"Pass\", \"End\": true}}}") | ||
| .build() | ||
| } | ||
| testStateMachineARN = response.stateMachineArn() | ||
| } | ||
|
|
||
| def cleanupSpec() { | ||
| sfnClient.close() | ||
| localStack.stop() | ||
| } | ||
|
|
||
| def "Step Functions span is created"() { | ||
| when: | ||
| StartExecutionResponse response | ||
| TraceUtils.runUnderTrace('parent', { | ||
| response = sfnClient.startExecution { builder -> | ||
| builder.stateMachineArn(testStateMachineARN) | ||
| .input("{\"key\": \"value\"}") | ||
| .build() | ||
| } | ||
| }) | ||
|
|
||
| then: | ||
| assertTraces(1) { | ||
| trace(2) { | ||
| basicSpan(it, "parent") | ||
| span { | ||
| serviceName service() | ||
| operationName operation() | ||
| resourceName "Sfn.StartExecution" | ||
| spanType DDSpanTypes.HTTP_CLIENT | ||
| errored false | ||
| measured true | ||
| childOf(span(0)) | ||
| tags { | ||
| "$Tags.COMPONENT" "java-aws-sdk" | ||
| "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT | ||
| "$Tags.HTTP_URL" endPoint+'/' | ||
| "$Tags.HTTP_METHOD" "POST" | ||
| "$Tags.HTTP_STATUS" 200 | ||
| "$Tags.PEER_PORT" localStack.getMappedPort(4566) | ||
| "$Tags.PEER_HOSTNAME" localStack.getHost() | ||
| "aws.service" "Sfn" | ||
| "aws.operation" "StartExecution" | ||
| "aws.agent" "java-aws-sdk" | ||
| "aws.requestId" response.responseMetadata().requestId() | ||
| "aws_service" "Sfn" | ||
| defaultTags() | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def "Trace context is injected to Step Functions input"() { | ||
| when: | ||
| StartExecutionResponse response | ||
| TraceUtils.runUnderTrace('parent', { | ||
| response = sfnClient.startExecution { builder -> | ||
| builder.stateMachineArn(testStateMachineARN) | ||
| .input("{\"key\": \"value\"}") | ||
| .build() | ||
| } | ||
| }) | ||
|
|
||
| then: | ||
| def execution = sfnClient.describeExecution { builder -> | ||
| builder.executionArn(response.executionArn()) | ||
| .build() | ||
| } | ||
| def input = new JsonSlurper().parseText(execution.input()) | ||
| input["key"] == "value" | ||
| input["_datadog"]["x-datadog-trace-id"] != null | ||
| input["_datadog"]["x-datadog-parent-id"] != null | ||
| input["_datadog"]["x-datadog-tags"] != null | ||
| } | ||
| } | ||
|
|
||
| class SfnClientV0Test extends SfnClientTest { | ||
| @Override | ||
| int version() { | ||
| 0 | ||
| } | ||
|
|
||
| @Override | ||
| String service() { | ||
| return "java-aws-sdk" | ||
| } | ||
|
|
||
| @Override | ||
| String operation() { | ||
| return "aws.http" | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.