Skip to content

Commit 9bae8df

Browse files
author
Jeel Mehta
committed
Contribute OTLP UDP Exporter
1 parent d8ce8c0 commit 9bae8df

File tree

6 files changed

+499
-0
lines changed

6 files changed

+499
-0
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
plugins {
2+
id("java")
3+
id("java-library")
4+
id("maven-publish")
5+
}
6+
7+
group = "software.opentelemetry.exporters.otlp.udp"
8+
version = "1.0-SNAPSHOT"
9+
10+
repositories {
11+
mavenLocal()
12+
mavenCentral()
13+
}
14+
15+
dependencies {
16+
implementation(platform("io.opentelemetry:opentelemetry-bom:1.44.1"))
17+
implementation("io.opentelemetry:opentelemetry-api")
18+
implementation("io.opentelemetry:opentelemetry-sdk")
19+
implementation("io.opentelemetry:opentelemetry-exporter-otlp")
20+
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
21+
implementation("io.opentelemetry.proto:opentelemetry-proto:1.0.0-alpha")
22+
implementation("com.google.code.findbugs:jsr305:3.0.2")
23+
implementation("org.apache.logging.log4j:log4j-api:2.24.1")
24+
implementation("org.apache.logging.log4j:log4j-core:2.24.1")
25+
implementation("org.slf4j:slf4j-simple:2.0.16")
26+
testImplementation(platform("org.junit:junit-bom:5.9.2"))
27+
testImplementation("org.junit.jupiter:junit-jupiter-api")
28+
testImplementation("org.junit.jupiter:junit-jupiter-engine")
29+
testImplementation("org.mockito:mockito-core:5.3.1")
30+
testImplementation("org.assertj:assertj-core:3.24.2")
31+
testImplementation("org.mockito:mockito-junit-jupiter:5.3.1")
32+
}
33+
34+
java {
35+
withSourcesJar()
36+
withJavadocJar()
37+
}
38+
39+
tasks.javadoc {
40+
options {
41+
(this as CoreJavadocOptions).addStringOption("Xdoclint:none", "-quiet")
42+
}
43+
isFailOnError = false
44+
}
45+
46+
sourceSets {
47+
main {
48+
java {
49+
srcDirs("src/main/java")
50+
}
51+
resources {
52+
srcDirs("src/main/resources")
53+
}
54+
}
55+
test {
56+
java {
57+
srcDirs("src/test/java")
58+
}
59+
resources {
60+
srcDirs("src/test/resources")
61+
}
62+
}
63+
}
64+
65+
tasks.test {
66+
useJUnitPlatform()
67+
testLogging {
68+
events("passed", "skipped", "failed")
69+
}
70+
}
71+
72+
tasks.jar {
73+
manifest {
74+
attributes(
75+
"Implementation-Title" to project.name,
76+
"Implementation-Version" to project.version
77+
)
78+
}
79+
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
80+
}
81+
82+
tasks.named<Jar>("javadocJar") {
83+
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
84+
}
85+
86+
tasks.named<Jar>("sourcesJar") {
87+
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
88+
}
89+
90+
publishing {
91+
publications {
92+
create<MavenPublication>("mavenJava") {
93+
from(components["java"])
94+
groupId = project.group.toString()
95+
artifactId = "aws-otel-otlp-udp-exporter"
96+
version = project.version.toString()
97+
}
98+
}
99+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.opentelemetry.exporters.otlp.udp;
17+
18+
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
19+
import io.opentelemetry.sdk.common.CompletableResultCode;
20+
import io.opentelemetry.sdk.trace.data.SpanData;
21+
import io.opentelemetry.sdk.trace.export.SpanExporter;
22+
import java.io.ByteArrayOutputStream;
23+
import java.nio.charset.StandardCharsets;
24+
import java.util.Base64;
25+
import java.util.Collection;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.logging.Level;
28+
import java.util.logging.Logger;
29+
import javax.annotation.concurrent.Immutable;
30+
31+
/**
32+
* Exports spans via UDP, using OpenTelemetry's protobuf model. The protobuf modelled spans are
33+
* Base64 encoded and prefixed with AWS X-Ray specific information before being sent over to {@link
34+
* UdpSender}.
35+
*
36+
* <p>This exporter is NOT meant for generic use since the payload is prefixed with AWS X-Ray
37+
* specific information.
38+
*/
39+
@Immutable
40+
public class OtlpUdpSpanExporter implements SpanExporter {
41+
42+
private static final Logger logger = Logger.getLogger(OtlpUdpSpanExporter.class.getName());
43+
44+
private final AtomicBoolean isShutdown = new AtomicBoolean();
45+
46+
private final UdpSender sender;
47+
private final String payloadPrefix;
48+
49+
OtlpUdpSpanExporter(UdpSender sender, String payloadPrefix) {
50+
this.sender = sender;
51+
this.payloadPrefix = payloadPrefix;
52+
}
53+
54+
@Override
55+
public CompletableResultCode export(Collection<SpanData> spans) {
56+
if (isShutdown.get()) {
57+
return CompletableResultCode.ofFailure();
58+
}
59+
60+
TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans);
61+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
62+
try {
63+
exportRequest.writeBinaryTo(baos);
64+
String payload = payloadPrefix + Base64.getEncoder().encodeToString(baos.toByteArray());
65+
sender.send(payload.getBytes(StandardCharsets.UTF_8));
66+
return CompletableResultCode.ofSuccess();
67+
} catch (Exception e) {
68+
logger.log(Level.SEVERE, "Failed to export spans. Error: " + e.getMessage(), e);
69+
return CompletableResultCode.ofFailure();
70+
}
71+
}
72+
73+
@Override
74+
public CompletableResultCode flush() {
75+
// TODO: implement
76+
return CompletableResultCode.ofSuccess();
77+
}
78+
79+
@Override
80+
public CompletableResultCode shutdown() {
81+
if (!isShutdown.compareAndSet(false, true)) {
82+
logger.log(Level.INFO, "Calling shutdown() multiple times.");
83+
return CompletableResultCode.ofSuccess();
84+
}
85+
return sender.shutdown();
86+
}
87+
88+
// Visible for testing
89+
UdpSender getSender() {
90+
return sender;
91+
}
92+
93+
// Visible for testing
94+
String getPayloadPrefix() {
95+
return payloadPrefix;
96+
}
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.opentelemetry.exporters.otlp.udp;
17+
18+
import static java.util.Objects.requireNonNull;
19+
20+
public final class OtlpUdpSpanExporterBuilder {
21+
22+
private static final String DEFAULT_HOST = "127.0.0.1";
23+
private static final int DEFAULT_PORT = 2000;
24+
25+
// The protocol header and delimiter is required for sending data to X-Ray Daemon or when running
26+
// in Lambda.
27+
// https://docs.aws.amazon.com/xray/latest/devguide/xray-api-sendingdata.html#xray-api-daemon
28+
private static final String PROTOCOL_HEADER = "{\"format\": \"json\", \"version\": 1}";
29+
private static final char PROTOCOL_DELIMITER = '\n';
30+
31+
// These prefixes help the backend identify if the spans payload is sampled or not.
32+
private static final String FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S";
33+
private static final String FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U";
34+
35+
private UdpSender sender;
36+
private String tracePayloadPrefix = FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX;
37+
38+
public OtlpUdpSpanExporterBuilder setEndpoint(String endpoint) {
39+
requireNonNull(endpoint, "endpoint must not be null");
40+
try {
41+
String[] parts = endpoint.split(":");
42+
String host = parts[0];
43+
int port = Integer.parseInt(parts[1]);
44+
this.sender = new UdpSender(host, port);
45+
} catch (Exception e) {
46+
throw new IllegalArgumentException("Invalid endpoint, must be a valid URL: " + endpoint, e);
47+
}
48+
return this;
49+
}
50+
51+
public OtlpUdpSpanExporterBuilder setPayloadSampleDecision(TracePayloadSampleDecision decision) {
52+
this.tracePayloadPrefix =
53+
decision == TracePayloadSampleDecision.SAMPLED
54+
? FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
55+
: FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX;
56+
return this;
57+
}
58+
59+
public OtlpUdpSpanExporter build() {
60+
if (sender == null) {
61+
this.sender = new UdpSender(DEFAULT_HOST, DEFAULT_PORT);
62+
}
63+
return new OtlpUdpSpanExporter(
64+
this.sender, PROTOCOL_HEADER + PROTOCOL_DELIMITER + tracePayloadPrefix);
65+
}
66+
67+
// Only for testing
68+
OtlpUdpSpanExporterBuilder setSender(UdpSender sender) {
69+
this.sender = sender;
70+
return this;
71+
}
72+
}
73+
74+
enum TracePayloadSampleDecision {
75+
SAMPLED,
76+
UNSAMPLED
77+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.opentelemetry.exporters.otlp.udp;
17+
18+
import io.opentelemetry.sdk.common.CompletableResultCode;
19+
import java.io.IOException;
20+
import java.net.DatagramPacket;
21+
import java.net.DatagramSocket;
22+
import java.net.InetSocketAddress;
23+
import java.net.SocketException;
24+
import java.util.logging.Level;
25+
import java.util.logging.Logger;
26+
27+
/**
28+
* This class represents a UDP sender that sends data to a specified endpoint. It is used to send
29+
* data to a remote host and port using UDP protocol.
30+
*/
31+
public class UdpSender {
32+
private static final Logger logger = Logger.getLogger(UdpSender.class.getName());
33+
34+
private DatagramSocket socket;
35+
private final InetSocketAddress endpoint;
36+
37+
public UdpSender(String host, int port) {
38+
this.endpoint = new InetSocketAddress(host, port);
39+
try {
40+
this.socket = new DatagramSocket();
41+
} catch (SocketException e) {
42+
logger.log(Level.SEVERE, "Exception while instantiating UdpSender socket.", e);
43+
}
44+
}
45+
46+
public CompletableResultCode shutdown() {
47+
try {
48+
if (socket == null) {
49+
return CompletableResultCode.ofSuccess();
50+
}
51+
socket.close();
52+
return CompletableResultCode.ofSuccess();
53+
} catch (Exception e) {
54+
logger.log(Level.SEVERE, "Exception while closing UdpSender socket.", e);
55+
return CompletableResultCode.ofFailure();
56+
}
57+
}
58+
59+
public void send(byte[] data) {
60+
if (socket == null) {
61+
logger.log(Level.WARNING, "UdpSender socket is null. Cannot send data.");
62+
return;
63+
}
64+
DatagramPacket packet = new DatagramPacket(data, data.length, endpoint);
65+
try {
66+
socket.send(packet);
67+
} catch (IOException e) {
68+
logger.log(Level.SEVERE, "Exception while sending data.", e);
69+
}
70+
}
71+
72+
// Visible for testing
73+
InetSocketAddress getEndpoint() {
74+
return endpoint;
75+
}
76+
}

0 commit comments

Comments
 (0)