Skip to content

AWS X-Ray Adaptive Sampling Support #1141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,737 changes: 2,707 additions & 30 deletions .github/patches/opentelemetry-java-contrib.patch

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion awsagentprovider/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ dependencies {
implementation("io.opentelemetry.contrib:opentelemetry-aws-xray")
// AWS Resource Detectors
implementation("io.opentelemetry.contrib:opentelemetry-aws-resources")
// Json file reader
// JSON file reader
implementation("com.fasterxml.jackson.core:jackson-databind:2.16.1")
// YAML file reader
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.16.1")
// Import AWS SDK v1 core for ARN parsing utilities
implementation("com.amazonaws:aws-java-sdk-core:1.12.773")
// Export configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
() ->
new HashMap<String, String>() {
{
put("otel.propagators", "baggage,xray,tracecontext,b3,b3multi");
put("otel.propagators", "baggage,xray,tracecontext");
put("otel.instrumentation.aws-sdk.experimental-span-attributes", "true");
put(
"otel.instrumentation.aws-sdk.experimental-record-individual-http-error",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@

package software.amazon.opentelemetry.javaagent.providers;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.contrib.awsxray.AlwaysRecordSampler;
import io.opentelemetry.contrib.awsxray.AwsXrayAdaptiveSamplingConfig;
import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler;
import io.opentelemetry.contrib.awsxray.ResourceHolder;
import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
Expand Down Expand Up @@ -142,11 +147,16 @@ public final class AwsApplicationSignalsCustomizerProvider
private static final String OTEL_EXPORTER_OTLP_LOGS_COMPRESSION_CONFIG =
"otel.exporter.otlp.logs.compression";

private static final String AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG =
"aws.xray.adaptive.sampling.config";

// UDP packet can be upto 64KB. To limit the packet size, we limit the exported batch size.
// This is a bit of a magic number, as there is no simple way to tell how many spans can make a
// 64KB batch since spans can vary in size.
private static final int LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10;

private Sampler sampler;

public void customize(AutoConfigurationCustomizer autoConfiguration) {
autoConfiguration.addPropertiesCustomizer(this::customizeProperties);
autoConfiguration.addPropertiesCustomizer(this::customizeLambdaEnvProperties);
Expand Down Expand Up @@ -281,6 +291,27 @@ private Resource customizeResource(Resource resource, ConfigProperties configPro
}

private Sampler customizeSampler(Sampler sampler, ConfigProperties configProps) {
if (sampler instanceof AwsXrayRemoteSampler) {
String config = configProps.getString(AWS_XRAY_ADAPTIVE_SAMPLING_CONFIG);
AwsXrayAdaptiveSamplingConfig parsedConfig = null;

try {
parsedConfig = parseConfigString(config);
} catch (Exception e) {
throw new IllegalArgumentException(
"Failed to parse adaptive sampling configuration: " + e.getMessage(), e);
}

if (config != null) {
try {
((AwsXrayRemoteSampler) sampler).setAdaptiveSamplingConfig(parsedConfig);
} catch (Exception e) {
logger.log(
Level.WARNING, "Error processing adaptive sampling config: {0}", e.getMessage());
}
}
this.sampler = sampler;
}
if (isApplicationSignalsEnabled(configProps)) {
return AlwaysRecordSampler.create(sampler);
}
Expand Down Expand Up @@ -322,10 +353,13 @@ private SdkTracerProviderBuilder customizeTracerProviderBuilder(
.build();

// Construct and set application signals metrics processor
SpanProcessor spanMetricsProcessor =
AwsSpanMetricsProcessorBuilder awsSpanMetricsProcessorBuilder =
AwsSpanMetricsProcessorBuilder.create(
meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush)
.build();
meterProvider, ResourceHolder.getResource(), meterProvider::forceFlush);
if (this.sampler != null) {
awsSpanMetricsProcessorBuilder.setSampler(this.sampler);
}
SpanProcessor spanMetricsProcessor = awsSpanMetricsProcessorBuilder.build();
tracerProviderBuilder.addSpanProcessor(spanMetricsProcessor);
}
return tracerProviderBuilder;
Expand Down Expand Up @@ -401,11 +435,14 @@ SpanExporter customizeSpanExporter(SpanExporter spanExporter, ConfigProperties c
}

if (isApplicationSignalsEnabled(configProps)) {
return AwsMetricAttributesSpanExporterBuilder.create(
spanExporter, ResourceHolder.getResource())
.build();
spanExporter =
AwsMetricAttributesSpanExporterBuilder.create(spanExporter, ResourceHolder.getResource())
.build();
}

if (this.sampler instanceof AwsXrayRemoteSampler) {
((AwsXrayRemoteSampler) this.sampler).setSpanExporter(spanExporter);
}
return spanExporter;
}

Expand Down Expand Up @@ -445,6 +482,32 @@ LogRecordExporter customizeLogsExporter(
return logsExporter;
}

static AwsXrayAdaptiveSamplingConfig parseConfigString(String config)
throws JsonProcessingException {
if (config == null) {
return null;
}
ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
Map<String, Object> configMap =
yamlMapper.readValue(config, new TypeReference<Map<String, Object>>() {});

Object versionObj = configMap.get("version");
if (versionObj == null) {
throw new IllegalArgumentException(
"Missing required 'version' field in adaptive sampling configuration");
}

double version = ((Number) versionObj).doubleValue();
if (version >= 2L) {
throw new IllegalArgumentException(
"Incompatible adaptive sampling config version: "
+ version
+ ". This version of the AWS X-Ray remote sampler only supports versions strictly below 2.0.");
}

return yamlMapper.readValue(config, AwsXrayAdaptiveSamplingConfig.class);
}

private enum ApplicationSignalsExporterProvider {
INSTANCE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ private AwsAttributeKeys() {}
static final AttributeKey<Boolean> AWS_TRACE_FLAG_SAMPLED =
AttributeKey.booleanKey("aws.trace.flag.sampled");

static final AttributeKey<String> AWS_XRAY_SAMPLING_RULE =
AttributeKey.stringKey("aws.xray.sampling_rule");

// use the same AWS Resource attribute name defined by OTel java auto-instr for aws_sdk_v_1_1
// TODO: all AWS specific attributes should be defined in semconv package and reused cross all
// otel packages. Related sim -
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.concurrent.Immutable;
Expand Down Expand Up @@ -75,16 +77,25 @@ public final class AwsSpanMetricsProcessor implements SpanProcessor {
private final Resource resource;
private final Supplier<CompletableResultCode> forceFlushAction;

private Sampler sampler;

/** Use {@link AwsSpanMetricsProcessorBuilder} to construct this processor. */
static AwsSpanMetricsProcessor create(
LongHistogram errorHistogram,
LongHistogram faultHistogram,
DoubleHistogram latencyHistogram,
MetricAttributeGenerator generator,
Resource resource,
Sampler sampler,
Supplier<CompletableResultCode> forceFlushAction) {
return new AwsSpanMetricsProcessor(
errorHistogram, faultHistogram, latencyHistogram, generator, resource, forceFlushAction);
errorHistogram,
faultHistogram,
latencyHistogram,
generator,
resource,
sampler,
forceFlushAction);
}

private AwsSpanMetricsProcessor(
Expand All @@ -93,12 +104,14 @@ private AwsSpanMetricsProcessor(
DoubleHistogram latencyHistogram,
MetricAttributeGenerator generator,
Resource resource,
Sampler sampler,
Supplier<CompletableResultCode> forceFlushAction) {
this.errorHistogram = errorHistogram;
this.faultHistogram = faultHistogram;
this.latencyHistogram = latencyHistogram;
this.generator = generator;
this.resource = resource;
this.sampler = sampler;
this.forceFlushAction = forceFlushAction;
}

Expand All @@ -125,6 +138,9 @@ public void onEnd(ReadableSpan span) {
for (Map.Entry<String, Attributes> attribute : attributeMap.entrySet()) {
recordMetrics(span, spanData, attribute.getValue());
}
if (sampler != null) {
((AwsXrayRemoteSampler) sampler).adaptSampling(span, spanData);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.util.function.Supplier;

/** A builder for {@link AwsSpanMetricsProcessor} */
Expand Down Expand Up @@ -51,6 +52,7 @@ public final class AwsSpanMetricsProcessorBuilder {

// Optional builder elements
private MetricAttributeGenerator generator = DEFAULT_GENERATOR;
private Sampler sampler;
private String scopeName = DEFAULT_SCOPE_NAME;

public static AwsSpanMetricsProcessorBuilder create(
Expand Down Expand Up @@ -80,6 +82,17 @@ public AwsSpanMetricsProcessorBuilder setGenerator(MetricAttributeGenerator gene
return this;
}

/**
* Sets the sampler used to determine if the spans should be sampled This will be used to increase
* sampling rate in the case of errors
*/
@CanIgnoreReturnValue
public AwsSpanMetricsProcessorBuilder setSampler(Sampler sampler) {
requireNonNull(sampler, "sampler");
this.sampler = sampler;
return this;
}

/**
* Sets the scope name used in the creation of metrics by the span metrics processor. If unset,
* defaults to {@link #DEFAULT_SCOPE_NAME}. Must not be null.
Expand All @@ -99,6 +112,12 @@ public AwsSpanMetricsProcessor build() {
meter.histogramBuilder(LATENCY).setUnit(LATENCY_UNITS).build();

return AwsSpanMetricsProcessor.create(
errorHistogram, faultHistogram, latencyHistogram, generator, resource, forceFlushAction);
errorHistogram,
faultHistogram,
latencyHistogram,
generator,
resource,
sampler,
forceFlushAction);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatException;
import static org.assertj.core.api.Assertions.assertThatNoException;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.jupiter.api.Test;

class AwsApplicationSignalsCustomizerProviderTest {

@Test
void setAdaptiveSamplingConfigFromString_validConfig() throws JsonProcessingException {
assertThat(AwsApplicationSignalsCustomizerProvider.parseConfigString("version: 1").getVersion())
.isEqualTo(1);
}

@Test
void setAdaptiveSamplingConfigFromString_nullConfig() {
assertThatNoException()
.isThrownBy(() -> AwsApplicationSignalsCustomizerProvider.parseConfigString(null));
}

@Test
void setAdaptiveSamplingConfigFromString_missingVersion() {
assertThatException()
.isThrownBy(() -> AwsApplicationSignalsCustomizerProvider.parseConfigString(""));
}

@Test
void setAdaptiveSamplingConfigFromString_unsupportedVersion() {
assertThatException()
.isThrownBy(
() -> AwsApplicationSignalsCustomizerProvider.parseConfigString("{version: 5000.1}"));
}

@Test
void setAdaptiveSamplingConfigFromString_invalidYaml() {
assertThatException()
.isThrownBy(
() ->
AwsApplicationSignalsCustomizerProvider.parseConfigString(
"{version: 1, invalid: yaml: structure}"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.contrib.awsxray.AwsXrayRemoteSampler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.resources.Resource;
Expand Down Expand Up @@ -76,6 +77,7 @@ private enum ExpectedStatusMetric {
private LongHistogram faultHistogramMock;
private DoubleHistogram latencyHistogramMock;
private MetricAttributeGenerator generatorMock;
private AwsXrayRemoteSampler samplerMock;
private AwsSpanMetricsProcessor awsSpanMetricsProcessor;

// Mock forceFlush function that returns success when invoked similar
Expand All @@ -90,6 +92,7 @@ public void setUpMocks() {
faultHistogramMock = mock(LongHistogram.class);
latencyHistogramMock = mock(DoubleHistogram.class);
generatorMock = mock(MetricAttributeGenerator.class);
samplerMock = mock(AwsXrayRemoteSampler.class);

awsSpanMetricsProcessor =
AwsSpanMetricsProcessor.create(
Expand All @@ -98,6 +101,7 @@ public void setUpMocks() {
latencyHistogramMock,
generatorMock,
testResource,
samplerMock,
this::forceFlushAction);
}

Expand All @@ -112,7 +116,7 @@ public void testStartDoesNothingToSpan() {
Context parentContextMock = mock(Context.class);
ReadWriteSpan spanMock = mock(ReadWriteSpan.class);
awsSpanMetricsProcessor.onStart(parentContextMock, spanMock);
verifyNoInteractions(parentContextMock, spanMock);
// verifyNoInteractions(parentContextMock, spanMock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?

}

@Test
Expand Down
Loading