Skip to content

Commit 0a778c1

Browse files
committed
Adding UdpExporter for Otlp spans
1 parent c81b3f8 commit 0a778c1

File tree

5 files changed

+316
-0
lines changed

5 files changed

+316
-0
lines changed

awsagentprovider/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ dependencies {
4040
implementation("com.amazonaws:aws-java-sdk-core:1.12.773")
4141
// Export configuration
4242
compileOnly("io.opentelemetry:opentelemetry-exporter-otlp")
43+
// For Udp emitter
44+
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
45+
implementation("io.opentelemetry:opentelemetry-exporter-common")
4346

4447
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
4548
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.opentelemetry.javaagent.providers;
17+
18+
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
19+
import io.opentelemetry.sdk.common.CompletableResultCode;
20+
import io.opentelemetry.sdk.trace.data.SpanData;
21+
import io.opentelemetry.sdk.trace.export.SpanExporter;
22+
import java.io.ByteArrayOutputStream;
23+
import java.nio.charset.StandardCharsets;
24+
import java.util.Base64;
25+
import java.util.Collection;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.logging.Level;
28+
import java.util.logging.Logger;
29+
import javax.annotation.concurrent.Immutable;
30+
31+
@Immutable
32+
public class OtlpUdpSpanExporter implements SpanExporter {
33+
34+
private static final Logger logger = Logger.getLogger(OtlpUdpSpanExporter.class.getName());
35+
private static final String PROTOCOL_HEADER = "{\"format\": \"json\", \"version\": 1}";
36+
private static final char PROTOCOL_DELIMITER = '\n';
37+
private static final String FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S";
38+
private static final String FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U";
39+
40+
private final AtomicBoolean isShutdown = new AtomicBoolean();
41+
42+
private final UdpSender sender;
43+
private final boolean sampled;
44+
45+
OtlpUdpSpanExporter(UdpSender sender, boolean sampled) {
46+
this.sender = sender;
47+
this.sampled = sampled;
48+
}
49+
50+
@Override
51+
public CompletableResultCode export(Collection<SpanData> spans) {
52+
if (isShutdown.get()) {
53+
return CompletableResultCode.ofFailure();
54+
}
55+
56+
TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans);
57+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
58+
try {
59+
exportRequest.writeBinaryTo(baos);
60+
String payload =
61+
PROTOCOL_HEADER
62+
+ PROTOCOL_DELIMITER
63+
+ (sampled
64+
? FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
65+
: FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX)
66+
+ Base64.getEncoder().encodeToString(baos.toByteArray());
67+
sender.send(payload.getBytes(StandardCharsets.UTF_8));
68+
System.out.println("Sending::: " + payload); // TODO: remove
69+
return CompletableResultCode.ofSuccess();
70+
} catch (Exception e) {
71+
logger.log(Level.SEVERE, "Failed to export spans. Error: " + e.getMessage(), e);
72+
return CompletableResultCode.ofFailure();
73+
}
74+
}
75+
76+
@Override
77+
public CompletableResultCode flush() {
78+
return CompletableResultCode.ofSuccess();
79+
}
80+
81+
@Override
82+
public CompletableResultCode shutdown() {
83+
if (!isShutdown.compareAndSet(false, true)) {
84+
logger.log(Level.INFO, "Calling shutdown() multiple times.");
85+
return CompletableResultCode.ofSuccess();
86+
}
87+
return sender.shutdown();
88+
}
89+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.opentelemetry.javaagent.providers;
17+
18+
import static java.util.Objects.requireNonNull;
19+
20+
import java.net.SocketException;
21+
22+
public final class OtlpUdpSpanExporterBuilder {
23+
24+
private static final String DEFAULT_HOST = "127.0.0.1";
25+
private static final int DEFAULT_PORT = 2000;
26+
27+
private UdpSender sender;
28+
private boolean sampled = true;
29+
30+
public OtlpUdpSpanExporterBuilder setEndpoint(String endpoint) {
31+
requireNonNull(endpoint, "endpoint must not be null");
32+
try {
33+
String[] parts = endpoint.split(":");
34+
String host = parts[0];
35+
int port = Integer.parseInt(parts[1]);
36+
this.sender = new UdpSender(host, port);
37+
} catch (Exception e) {
38+
throw new IllegalArgumentException("Invalid endpoint, must be a valid URL: " + endpoint, e);
39+
}
40+
return this;
41+
}
42+
43+
public OtlpUdpSpanExporterBuilder setSampled(boolean sampled) {
44+
this.sampled = sampled;
45+
return this;
46+
}
47+
48+
public OtlpUdpSpanExporter build() {
49+
if (sender == null) {
50+
try {
51+
this.sender = new UdpSender(DEFAULT_HOST, DEFAULT_PORT);
52+
} catch (SocketException e) {
53+
throw new IllegalStateException("Failed to create OtlpUdpSpanExporter", e);
54+
}
55+
}
56+
return new OtlpUdpSpanExporter(this.sender, this.sampled);
57+
}
58+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.opentelemetry.javaagent.providers;
17+
18+
import io.opentelemetry.sdk.common.CompletableResultCode;
19+
import java.io.IOException;
20+
import java.net.DatagramPacket;
21+
import java.net.DatagramSocket;
22+
import java.net.InetSocketAddress;
23+
import java.net.SocketException;
24+
import java.util.logging.Level;
25+
import java.util.logging.Logger;
26+
27+
public class UdpSender {
28+
private static final Logger logger = Logger.getLogger(UdpSender.class.getName());
29+
30+
private final DatagramSocket socket;
31+
private final InetSocketAddress endpoint;
32+
33+
public UdpSender(String host, int port) throws SocketException {
34+
this.endpoint = new InetSocketAddress(host, port);
35+
try {
36+
socket = new DatagramSocket();
37+
} catch (SocketException e) {
38+
logger.log(Level.SEVERE, "Exception while instantiating UdpSender socket.", e);
39+
throw e;
40+
}
41+
}
42+
43+
public CompletableResultCode shutdown() {
44+
try {
45+
socket.close();
46+
return CompletableResultCode.ofSuccess();
47+
} catch (Exception e) {
48+
logger.log(Level.SEVERE, "Exception while closing UdpSender socket.", e);
49+
return CompletableResultCode.ofFailure();
50+
}
51+
}
52+
53+
public void send(byte[] data) {
54+
DatagramPacket packet = new DatagramPacket(data, data.length, endpoint);
55+
try {
56+
socket.send(packet);
57+
} catch (IOException e) {
58+
logger.log(Level.SEVERE, "Exception while sending data.", e);
59+
}
60+
}
61+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.opentelemetry.javaagent.providers;
17+
18+
import static org.mockito.Mockito.mock;
19+
import static org.mockito.Mockito.when;
20+
21+
import io.opentelemetry.api.common.AttributeKey;
22+
import io.opentelemetry.api.common.Attributes;
23+
import io.opentelemetry.api.trace.*;
24+
import io.opentelemetry.sdk.OpenTelemetrySdk;
25+
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
26+
import io.opentelemetry.sdk.resources.Resource;
27+
import io.opentelemetry.sdk.trace.ReadableSpan;
28+
import io.opentelemetry.sdk.trace.data.SpanData;
29+
import io.opentelemetry.sdk.trace.data.StatusData;
30+
import java.util.Arrays;
31+
import java.util.Collections;
32+
import org.junit.jupiter.api.Test;
33+
34+
public class UdpExporterTest {
35+
private static final boolean CONTAINS_ATTRIBUTES = true;
36+
37+
@Test
38+
public void testExporter() { // TODO: only for testing. remove.
39+
// Tracer tracer = GlobalOpenTelemetry.getTracer("My application");
40+
Tracer tracer = OpenTelemetrySdk.builder().build().getTracer("hello");
41+
42+
Span mySpan = tracer.spanBuilder("DoTheLoop_3").startSpan();
43+
int numIterations = 5;
44+
mySpan.setAttribute("NumIterations", numIterations);
45+
mySpan.setAttribute(AttributeKey.stringArrayKey("foo"), Arrays.asList("bar1", "bar2"));
46+
try (var scope = mySpan.makeCurrent()) {
47+
for (int i = 1; i <= numIterations; i++) {
48+
System.out.println("i = " + i);
49+
}
50+
} finally {
51+
mySpan.end();
52+
}
53+
54+
OtlpUdpSpanExporter exporter = new OtlpUdpSpanExporterBuilder().build();
55+
56+
ReadableSpan readableSpan = (ReadableSpan) mySpan;
57+
SpanData spanData = readableSpan.toSpanData();
58+
59+
exporter.export(Collections.singletonList(spanData));
60+
}
61+
62+
@Test
63+
public void testUdpExporter() {
64+
// Add your test logic here
65+
OtlpUdpSpanExporter exporter = new OtlpUdpSpanExporterBuilder().build();
66+
67+
// build test span
68+
SpanContext parentSpanContextMock = mock(SpanContext.class);
69+
Attributes spanAttributes = buildSpanAttributes(CONTAINS_ATTRIBUTES);
70+
SpanData spanDataMock = buildSpanDataMock(spanAttributes);
71+
when(spanDataMock.getParentSpanContext()).thenReturn(parentSpanContextMock);
72+
SpanContext spanContextMock = mock(SpanContext.class);
73+
when(spanContextMock.isValid()).thenReturn(true);
74+
when(spanDataMock.getSpanContext()).thenReturn(spanContextMock);
75+
TraceState traceState = TraceState.builder().build();
76+
when(spanContextMock.getTraceState()).thenReturn(traceState);
77+
StatusData statusData = StatusData.unset();
78+
when(spanDataMock.getStatus()).thenReturn(statusData);
79+
when(spanDataMock.getInstrumentationScopeInfo())
80+
.thenReturn(InstrumentationScopeInfo.create("Dummy Scope"));
81+
82+
Resource testResource = Resource.empty();
83+
when(spanDataMock.getResource()).thenReturn(testResource);
84+
85+
exporter.export(Collections.singletonList(spanDataMock));
86+
}
87+
88+
private static Attributes buildSpanAttributes(boolean containsAttribute) {
89+
if (containsAttribute) {
90+
return Attributes.of(AttributeKey.stringKey("original key"), "original value");
91+
} else {
92+
return Attributes.empty();
93+
}
94+
}
95+
96+
private static SpanData buildSpanDataMock(Attributes spanAttributes) {
97+
// Configure spanData
98+
SpanData mockSpanData = mock(SpanData.class);
99+
when(mockSpanData.getAttributes()).thenReturn(spanAttributes);
100+
when(mockSpanData.getTotalAttributeCount()).thenReturn(spanAttributes.size());
101+
when(mockSpanData.getKind()).thenReturn(SpanKind.SERVER);
102+
when(mockSpanData.getParentSpanContext()).thenReturn(null);
103+
return mockSpanData;
104+
}
105+
}

0 commit comments

Comments
 (0)