Skip to content

Commit cc7af45

Browse files
committed
adding span processor for exporting only unsampled spans
1 parent ffc6afe commit cc7af45

File tree

4 files changed

+254
-0
lines changed

4 files changed

+254
-0
lines changed

awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsAttributeKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ private AwsAttributeKeys() {}
7070
static final AttributeKey<String> AWS_LAMBDA_RESOURCE_ID =
7171
AttributeKey.stringKey("aws.lambda.resource_mapping.id");
7272

73+
static final AttributeKey<Boolean> AWS_TRACE_FLAG_SAMPLED =
74+
AttributeKey.booleanKey("aws.trace.flag.sampled");
75+
7376
// use the same AWS Resource attribute name defined by OTel java auto-instr for aws_sdk_v_1_1
7477
// TODO: all AWS specific attributes should be defined in semconv package and reused cross all
7578
// otel packages. Related sim -
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.opentelemetry.javaagent.providers;
17+
18+
import io.opentelemetry.context.Context;
19+
import io.opentelemetry.sdk.common.CompletableResultCode;
20+
import io.opentelemetry.sdk.trace.ReadWriteSpan;
21+
import io.opentelemetry.sdk.trace.ReadableSpan;
22+
import io.opentelemetry.sdk.trace.SpanProcessor;
23+
24+
final class AwsUnsampledOnlySpanProcessor implements SpanProcessor {
25+
26+
private final SpanProcessor delegate;
27+
28+
AwsUnsampledOnlySpanProcessor(SpanProcessor delegate) {
29+
this.delegate = delegate;
30+
}
31+
32+
public static AwsUnsampledOnlySpanProcessorBuilder builder() {
33+
return new AwsUnsampledOnlySpanProcessorBuilder();
34+
}
35+
36+
@Override
37+
public void onStart(Context parentContext, ReadWriteSpan span) {
38+
if (span.getSpanContext().isSampled()) {
39+
span.setAttribute(AwsAttributeKeys.AWS_TRACE_FLAG_SAMPLED, true);
40+
} else {
41+
span.setAttribute(AwsAttributeKeys.AWS_TRACE_FLAG_SAMPLED, false);
42+
}
43+
delegate.onStart(parentContext, span);
44+
}
45+
46+
@Override
47+
public void onEnd(ReadableSpan span) {
48+
if (!span.getSpanContext().isSampled()) {
49+
delegate.onEnd(span);
50+
}
51+
}
52+
53+
@Override
54+
public boolean isStartRequired() {
55+
return delegate.isStartRequired();
56+
}
57+
58+
@Override
59+
public boolean isEndRequired() {
60+
return delegate.isEndRequired();
61+
}
62+
63+
@Override
64+
public CompletableResultCode shutdown() {
65+
return delegate.shutdown();
66+
}
67+
68+
@Override
69+
public CompletableResultCode forceFlush() {
70+
return delegate.forceFlush();
71+
}
72+
73+
@Override
74+
public void close() {
75+
delegate.close();
76+
}
77+
78+
// Visible for testing
79+
SpanProcessor getDelegate() {
80+
return delegate;
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.opentelemetry.javaagent.providers;
17+
18+
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
19+
import io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder;
20+
import io.opentelemetry.sdk.trace.export.SpanExporter;
21+
22+
import static java.util.Objects.requireNonNull;
23+
24+
final class AwsUnsampledOnlySpanProcessorBuilder {
25+
26+
// Default exporter is OtlpUdpSpanExporter with unsampled payload prefix
27+
private SpanExporter exporter = new OtlpUdpSpanExporterBuilder()
28+
.setPayloadSampleDecision(TracePayloadSampleDecision.UNSAMPLED)
29+
.build();
30+
31+
public AwsUnsampledOnlySpanProcessorBuilder setSpanExporter(SpanExporter exporter) {
32+
requireNonNull(exporter, "exporter cannot be null");
33+
this.exporter = exporter;
34+
return this;
35+
}
36+
37+
public AwsUnsampledOnlySpanProcessorBuilder setMaxQueueSize(int maxQueueSize) {
38+
39+
return this;
40+
}
41+
42+
public AwsUnsampledOnlySpanProcessor build() {
43+
BatchSpanProcessor bsp =
44+
BatchSpanProcessor.builder(exporter).setExportUnsampledSpans(true).build();
45+
return new AwsUnsampledOnlySpanProcessor(bsp);
46+
}
47+
48+
SpanExporter getSpanExporter() {
49+
return exporter;
50+
}
51+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package software.amazon.opentelemetry.javaagent.providers;
2+
3+
import io.opentelemetry.api.trace.SpanContext;
4+
import io.opentelemetry.context.Context;
5+
import io.opentelemetry.sdk.common.CompletableResultCode;
6+
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
7+
import io.opentelemetry.sdk.trace.ReadWriteSpan;
8+
import io.opentelemetry.sdk.trace.ReadableSpan;
9+
import io.opentelemetry.sdk.trace.SpanProcessor;
10+
import io.opentelemetry.sdk.trace.data.SpanData;
11+
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
12+
import io.opentelemetry.sdk.trace.export.SpanExporter;
13+
import org.junit.jupiter.api.Test;
14+
import org.mockito.ArgumentCaptor;
15+
16+
import java.util.Collection;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.Mockito.*;
20+
21+
public class AwsUnsampledOnlySpanProcessorTest {
22+
23+
@Test
24+
public void testDefaultSpanProcessor() {
25+
AwsUnsampledOnlySpanProcessorBuilder builder = AwsUnsampledOnlySpanProcessor.builder();
26+
AwsUnsampledOnlySpanProcessor unsampledSP = builder.build();
27+
28+
assertThat(builder.getSpanExporter()).isInstanceOf(OtlpUdpSpanExporter.class);
29+
SpanProcessor delegate = unsampledSP.getDelegate();
30+
assertThat(delegate).isInstanceOf(BatchSpanProcessor.class);
31+
BatchSpanProcessor delegateBsp = (BatchSpanProcessor) delegate;
32+
String delegateBspString = delegateBsp.toString();
33+
assertThat(delegateBspString).contains("spanExporter=software.amazon.opentelemetry.javaagent.providers.OtlpUdpSpanExporter");
34+
assertThat(delegateBspString).contains("exportUnsampledSpans=true");
35+
}
36+
37+
@Test
38+
public void testSpanProcessorWithExporter() {
39+
AwsUnsampledOnlySpanProcessorBuilder builder = AwsUnsampledOnlySpanProcessor
40+
.builder()
41+
.setSpanExporter(InMemorySpanExporter.create());
42+
AwsUnsampledOnlySpanProcessor unsampledSP = builder.build();
43+
44+
assertThat(builder.getSpanExporter()).isInstanceOf(InMemorySpanExporter.class);
45+
SpanProcessor delegate = unsampledSP.getDelegate();
46+
assertThat(delegate).isInstanceOf(BatchSpanProcessor.class);
47+
BatchSpanProcessor delegateBsp = (BatchSpanProcessor) delegate;
48+
String delegateBspString = delegateBsp.toString();
49+
assertThat(delegateBspString).contains("spanExporter=io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter");
50+
assertThat(delegateBspString).contains("exportUnsampledSpans=true");
51+
}
52+
53+
@Test
54+
public void testStartAddsAttributeToSampledSpan() {
55+
SpanContext mockSpanContext = mock(SpanContext.class);
56+
when(mockSpanContext.isSampled()).thenReturn(true);
57+
Context parentContextMock = mock(Context.class);
58+
ReadWriteSpan spanMock = mock(ReadWriteSpan.class);
59+
when(spanMock.getSpanContext()).thenReturn(mockSpanContext);
60+
61+
AwsUnsampledOnlySpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build();
62+
processor.onStart(parentContextMock, spanMock);
63+
64+
//verify setAttribute was called with the correct arguments
65+
verify(spanMock, times(1)).setAttribute(AwsAttributeKeys.AWS_TRACE_FLAG_SAMPLED, true);
66+
}
67+
68+
@Test
69+
public void testStartAddsAttributeToUnsampledSpan() {
70+
SpanContext mockSpanContext = mock(SpanContext.class);
71+
when(mockSpanContext.isSampled()).thenReturn(false);
72+
Context parentContextMock = mock(Context.class);
73+
ReadWriteSpan spanMock = mock(ReadWriteSpan.class);
74+
when(spanMock.getSpanContext()).thenReturn(mockSpanContext);
75+
76+
AwsUnsampledOnlySpanProcessor processor = AwsUnsampledOnlySpanProcessor.builder().build();
77+
processor.onStart(parentContextMock, spanMock);
78+
79+
//verify setAttribute was called with the correct arguments
80+
verify(spanMock, times(1)).setAttribute(AwsAttributeKeys.AWS_TRACE_FLAG_SAMPLED, false);
81+
}
82+
83+
@Test
84+
public void testExportsOnlyUnsampledSpans() {
85+
SpanExporter mockExporter = mock(SpanExporter.class);
86+
when(mockExporter.export(anyCollection())).thenReturn(CompletableResultCode.ofSuccess());
87+
88+
BatchSpanProcessor delegate = BatchSpanProcessor.builder(mockExporter)
89+
.setExportUnsampledSpans(true)
90+
.setMaxExportBatchSize(1)
91+
.setMaxQueueSize(1)
92+
.build();
93+
94+
AwsUnsampledOnlySpanProcessor processor = new AwsUnsampledOnlySpanProcessor(delegate);
95+
96+
// unsampled span
97+
SpanContext mockSpanContextUnsampled = mock(SpanContext.class);
98+
when(mockSpanContextUnsampled.isSampled()).thenReturn(false);
99+
ReadableSpan mockSpanUnsampled = mock(ReadableSpan.class);
100+
when(mockSpanUnsampled.getSpanContext()).thenReturn(mockSpanContextUnsampled);
101+
102+
// sampled span
103+
SpanContext mockSpanContextSampled = mock(SpanContext.class);
104+
when(mockSpanContextSampled.isSampled()).thenReturn(true);
105+
ReadableSpan mockSpanSampled = mock(ReadableSpan.class);
106+
when(mockSpanSampled.getSpanContext()).thenReturn(mockSpanContextSampled);
107+
108+
// flush the unsampled span and verify export was called once
109+
processor.onEnd(mockSpanUnsampled);
110+
processor.forceFlush();
111+
verify(mockExporter, times(1)).export(anyCollection());
112+
113+
// flush the sampled span and verify export was not called again
114+
processor.onEnd(mockSpanSampled);
115+
processor.forceFlush();
116+
verify(mockExporter, times(1)).export(anyCollection());
117+
}
118+
}

0 commit comments

Comments
 (0)