Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import io.opentelemetry.contrib.awsxray.AlwaysRecordSampler;
import io.opentelemetry.contrib.awsxray.ResourceHolder;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
import io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
Expand All @@ -39,13 +41,7 @@
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -66,6 +62,8 @@
*/
public class AwsApplicationSignalsCustomizerProvider
implements AutoConfigurationCustomizerProvider {
static final String AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME";

private static final Duration DEFAULT_METRIC_EXPORT_INTERVAL = Duration.ofMinutes(1);
private static final Logger logger =
Logger.getLogger(AwsApplicationSignalsCustomizerProvider.class.getName());
Expand All @@ -85,6 +83,15 @@ public class AwsApplicationSignalsCustomizerProvider
"otel.aws.application.signals.exporter.endpoint";

private static final String OTEL_JMX_TARGET_SYSTEM_CONFIG = "otel.jmx.target.system";
private static final String OTEL_EXPORTER_OTLP_TRACES_ENDPOINT_CONFIG =
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT";
private static final String AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS";
private static final String DEFAULT_UDP_ENDPOINT = "127.0.0.1:2000";

// UDP packet can be upto 64KB. To limit the packet size, we limit the exported batch size.
// This is a bit of a magic number, as there is no simple way to tell how many spans can make a
// 64KB batch since spans can vary in size.
private static final int LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10;

public void customize(AutoConfigurationCustomizer autoConfiguration) {
autoConfiguration.addPropertiesCustomizer(this::customizeProperties);
Expand All @@ -95,6 +102,10 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
autoConfiguration.addSpanExporterCustomizer(this::customizeSpanExporter);
}

static boolean isLambdaEnvironment() {
return System.getenv(AWS_LAMBDA_FUNCTION_NAME_CONFIG) != null;
}

private boolean isApplicationSignalsEnabled(ConfigProperties configProps) {
return configProps.getBoolean(
APPLICATION_SIGNALS_ENABLED_CONFIG,
Expand Down Expand Up @@ -156,6 +167,17 @@ private SdkTracerProviderBuilder customizeTracerProviderBuilder(
// Construct and set local and remote attributes span processor
tracerProviderBuilder.addSpanProcessor(
AttributePropagatingSpanProcessorBuilder.create().build());

// If running on Lambda, we just need to export 100% spans and skip generating any Application
// Signals metrics.
if (isLambdaEnvironment()) {
tracerProviderBuilder.addSpanProcessor(
AwsUnsampledOnlySpanProcessorBuilder.create()
.setMaxExportBatchSize(LAMBDA_SPAN_EXPORT_BATCH_SIZE)
.build());
return tracerProviderBuilder;
}

// Construct meterProvider
MetricExporter metricsExporter =
ApplicationSignalsExporterProvider.INSTANCE.createExporter(configProps);
Expand Down Expand Up @@ -207,6 +229,21 @@ private SdkMeterProviderBuilder customizeMeterProvider(

private SpanExporter customizeSpanExporter(
SpanExporter spanExporter, ConfigProperties configProps) {
// When running in Lambda, override the default OTLP exporter with UDP exporter
if (isLambdaEnvironment()) {
if (isOtlpSpanExporter(spanExporter)
&& System.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT_CONFIG) == null) {
String tracesEndpoint =
Optional.ofNullable(System.getenv(AWS_XRAY_DAEMON_ADDRESS_CONFIG))
.orElse(DEFAULT_UDP_ENDPOINT);
spanExporter =
new OtlpUdpSpanExporterBuilder()
.setPayloadSampleDecision(TracePayloadSampleDecision.SAMPLED)
.setEndpoint(tracesEndpoint)
.build();
}
}

if (isApplicationSignalsEnabled(configProps)) {
return AwsMetricAttributesSpanExporterBuilder.create(
spanExporter, ResourceHolder.getResource())
Expand All @@ -216,6 +253,11 @@ private SpanExporter customizeSpanExporter(
return spanExporter;
}

private boolean isOtlpSpanExporter(SpanExporter spanExporter) {
return spanExporter instanceof OtlpGrpcSpanExporter
|| spanExporter instanceof OtlpHttpSpanExporter;
}

private enum ApplicationSignalsExporterProvider {
INSTANCE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.SemanticAttributes.MessagingOperationValues.PROCESS;
import static io.opentelemetry.semconv.SemanticAttributes.RPC_SYSTEM;
import static software.amazon.opentelemetry.javaagent.providers.AwsApplicationSignalsCustomizerProvider.AWS_LAMBDA_FUNCTION_NAME_CONFIG;
import static software.amazon.opentelemetry.javaagent.providers.AwsApplicationSignalsCustomizerProvider.isLambdaEnvironment;
import static software.amazon.opentelemetry.javaagent.providers.AwsAttributeKeys.AWS_LOCAL_OPERATION;

import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -82,9 +84,13 @@ static List<String> getDialectKeywords() {
/**
* Ingress operation (i.e. operation for Server and Consumer spans) will be generated from
* "http.method + http.target/with the first API path parameter" if the default span name equals
* null, UnknownOperation or http.method value.
* null, UnknownOperation or http.method value. If running in Lambda, the ingress operation will
* be the function name + /FunctionHandler.
*/
static String getIngressOperation(SpanData span) {
if (isLambdaEnvironment()) {
return System.getenv(AWS_LAMBDA_FUNCTION_NAME_CONFIG) + "/FunctionHandler";
}
String operation = span.getName();
if (shouldUseInternalOperation(span)) {
operation = INTERNAL_OPERATION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ final class AwsUnsampledOnlySpanProcessor implements SpanProcessor {
this.delegate = delegate;
}

public static AwsUnsampledOnlySpanProcessorBuilder builder() {
return new AwsUnsampledOnlySpanProcessorBuilder();
}

@Override
public void onStart(Context parentContext, ReadWriteSpan span) {
if (!span.getSpanContext().isSampled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,40 @@
import io.opentelemetry.sdk.trace.export.SpanExporter;

final class AwsUnsampledOnlySpanProcessorBuilder {
public static AwsUnsampledOnlySpanProcessorBuilder create() {
return new AwsUnsampledOnlySpanProcessorBuilder();
}

// Default exporter is OtlpUdpSpanExporter with unsampled payload prefix
private SpanExporter exporter =
new OtlpUdpSpanExporterBuilder()
.setPayloadSampleDecision(TracePayloadSampleDecision.UNSAMPLED)
.build();

// Default batch size to be same as Otel BSP default
private int maxExportBatchSize = 512;

public AwsUnsampledOnlySpanProcessorBuilder setSpanExporter(SpanExporter exporter) {
requireNonNull(exporter, "exporter cannot be null");
this.exporter = exporter;
return this;
}

public AwsUnsampledOnlySpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
this.maxExportBatchSize = maxExportBatchSize;
return this;
}

public AwsUnsampledOnlySpanProcessor build() {
BatchSpanProcessor bsp =
BatchSpanProcessor.builder(exporter).setExportUnsampledSpans(true).build();
BatchSpanProcessor.builder(exporter)
.setExportUnsampledSpans(true)
.setMaxExportBatchSize(maxExportBatchSize)
.build();
return new AwsUnsampledOnlySpanProcessor(bsp);
}

// Visible for testing
SpanExporter getSpanExporter() {
return exporter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ public class AwsUnsampledOnlySpanProcessorTest {

@Test
public void testIsStartRequired() {
SpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build();
SpanProcessor processor = AwsUnsampledOnlySpanProcessorBuilder.create().build();
assertThat(processor.isStartRequired()).isTrue();
}

@Test
public void testIsEndRequired() {
SpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build();
SpanProcessor processor = AwsUnsampledOnlySpanProcessorBuilder.create().build();
assertThat(processor.isEndRequired()).isTrue();
}

@Test
public void testDefaultSpanProcessor() {
AwsUnsampledOnlySpanProcessorBuilder builder = AwsUnsampledOnlySpanProcessor.builder();
AwsUnsampledOnlySpanProcessorBuilder builder = AwsUnsampledOnlySpanProcessorBuilder.create();
AwsUnsampledOnlySpanProcessor unsampledSP = builder.build();

assertThat(builder.getSpanExporter()).isInstanceOf(OtlpUdpSpanExporter.class);
Expand All @@ -59,12 +59,14 @@ public void testDefaultSpanProcessor() {
.contains(
"spanExporter=software.amazon.opentelemetry.javaagent.providers.OtlpUdpSpanExporter");
assertThat(delegateBspString).contains("exportUnsampledSpans=true");
assertThat(delegateBspString).contains("maxExportBatchSize=512");
}

@Test
public void testSpanProcessorWithExporter() {
AwsUnsampledOnlySpanProcessorBuilder builder =
AwsUnsampledOnlySpanProcessor.builder().setSpanExporter(InMemorySpanExporter.create());
AwsUnsampledOnlySpanProcessorBuilder.create()
.setSpanExporter(InMemorySpanExporter.create());
AwsUnsampledOnlySpanProcessor unsampledSP = builder.build();

assertThat(builder.getSpanExporter()).isInstanceOf(InMemorySpanExporter.class);
Expand All @@ -77,6 +79,19 @@ public void testSpanProcessorWithExporter() {
assertThat(delegateBspString).contains("exportUnsampledSpans=true");
}

@Test
public void testSpanProcessorWithBatchSize() {
AwsUnsampledOnlySpanProcessorBuilder builder =
AwsUnsampledOnlySpanProcessorBuilder.create().setMaxExportBatchSize(100);
AwsUnsampledOnlySpanProcessor unsampledSP = builder.build();

SpanProcessor delegate = unsampledSP.getDelegate();
assertThat(delegate).isInstanceOf(BatchSpanProcessor.class);
BatchSpanProcessor delegateBsp = (BatchSpanProcessor) delegate;
String delegateBspString = delegateBsp.toString();
assertThat(delegateBspString).contains("maxExportBatchSize=100");
}

@Test
public void testStartAddsAttributeToSampledSpan() {
SpanContext mockSpanContext = mock(SpanContext.class);
Expand All @@ -85,7 +100,7 @@ public void testStartAddsAttributeToSampledSpan() {
ReadWriteSpan spanMock = mock(ReadWriteSpan.class);
when(spanMock.getSpanContext()).thenReturn(mockSpanContext);

AwsUnsampledOnlySpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build();
AwsUnsampledOnlySpanProcessor processor = AwsUnsampledOnlySpanProcessorBuilder.create().build();
processor.onStart(parentContextMock, spanMock);

// verify setAttribute was never called
Expand All @@ -100,7 +115,7 @@ public void testStartAddsAttributeToUnsampledSpan() {
ReadWriteSpan spanMock = mock(ReadWriteSpan.class);
when(spanMock.getSpanContext()).thenReturn(mockSpanContext);

AwsUnsampledOnlySpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build();
AwsUnsampledOnlySpanProcessor processor = AwsUnsampledOnlySpanProcessorBuilder.create().build();
processor.onStart(parentContextMock, spanMock);

// verify setAttribute was called with the correct arguments
Expand Down
Loading