Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions awsagentprovider/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ dependencies {
implementation("com.amazonaws:aws-java-sdk-core:1.12.773")
// Export configuration
compileOnly("io.opentelemetry:opentelemetry-exporter-otlp")
// For Udp emitter
compileOnly("io.opentelemetry:opentelemetry-exporter-otlp-common")

testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testImplementation("io.opentelemetry:opentelemetry-extension-aws")
testImplementation("io.opentelemetry:opentelemetry-extension-trace-propagators")
testImplementation("com.google.guava:guava")
testRuntimeOnly("io.opentelemetry:opentelemetry-exporter-otlp-common")

compileOnly("com.google.code.findbugs:jsr305:3.0.2")
testImplementation("org.mockito:mockito-core:5.3.1")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.Immutable;

/**
* Exports spans via UDP, using OpenTelemetry's protobuf model. The protobuf modelled spans are
* Base64 encoded and prefixed with AWS X-Ray specific information before being sent over to {@link
* UdpSender}.
*
* <p>This exporter is NOT meant for generic use since the payload is prefixed with AWS X-Ray
* specific information.
*/
@Immutable
class OtlpUdpSpanExporter implements SpanExporter {

private static final Logger logger = Logger.getLogger(OtlpUdpSpanExporter.class.getName());

// The protocol header and delimiter is required for sending data to X-Ray Daemon or when running
// in Lambda.
// https://docs.aws.amazon.com/xray/latest/devguide/xray-api-sendingdata.html#xray-api-daemon
private static final String PROTOCOL_HEADER = "{\"format\": \"json\", \"version\": 1}";
private static final char PROTOCOL_DELIMITER = '\n';

// These prefixes help the backend identify if the spans payload is sampled or not.
private static final String FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S";
private static final String FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U";

private final AtomicBoolean isShutdown = new AtomicBoolean();

private final UdpSender sender;
private final boolean sampled;

OtlpUdpSpanExporter(UdpSender sender, boolean sampled) {
this.sender = sender;
this.sampled = sampled;
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
exportRequest.writeBinaryTo(baos);
String payload =
PROTOCOL_HEADER
+ PROTOCOL_DELIMITER
+ (sampled
? FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
: FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX)
+ Base64.getEncoder().encodeToString(baos.toByteArray());
sender.send(payload.getBytes(StandardCharsets.UTF_8));
return CompletableResultCode.ofSuccess();
} catch (Exception e) {
logger.log(Level.SEVERE, "Failed to export spans. Error: " + e.getMessage(), e);
return CompletableResultCode.ofFailure();
}
}

@Override
public CompletableResultCode flush() {
// TODO: implement
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return sender.shutdown();
}

// Visible for testing
UdpSender getSender() {
return sender;
}

// Visible for testing
boolean isSampled() {
return sampled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import static java.util.Objects.requireNonNull;

import java.net.SocketException;

final class OtlpUdpSpanExporterBuilder {

private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 2000;

private UdpSender sender;
private boolean sampled = true;

public OtlpUdpSpanExporterBuilder setEndpoint(String endpoint) {
requireNonNull(endpoint, "endpoint must not be null");
try {
String[] parts = endpoint.split(":");
String host = parts[0];
int port = Integer.parseInt(parts[1]);
this.sender = new UdpSender(host, port);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid endpoint, must be a valid URL: " + endpoint, e);
}
return this;
}

public OtlpUdpSpanExporterBuilder setSampled(boolean sampled) {
this.sampled = sampled;
return this;
}

public OtlpUdpSpanExporter build() {
if (sender == null) {
try {
this.sender = new UdpSender(DEFAULT_HOST, DEFAULT_PORT);
} catch (SocketException e) {
throw new IllegalStateException("Failed to create OtlpUdpSpanExporter", e);
}
}
return new OtlpUdpSpanExporter(this.sender, this.sampled);
}

// Only for testing
OtlpUdpSpanExporterBuilder setSender(UdpSender sender) {
this.sender = sender;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* This class represents a UDP sender that sends data to a specified endpoint. It is used to send
* data to a remote host and port using UDP protocol.
*/
class UdpSender {
private static final Logger logger = Logger.getLogger(UdpSender.class.getName());

private DatagramSocket socket;
private final InetSocketAddress endpoint;

public UdpSender(String host, int port) throws SocketException {
this.endpoint = new InetSocketAddress(host, port);
try {
this.socket = new DatagramSocket();
} catch (SocketException e) {
logger.log(Level.SEVERE, "Exception while instantiating UdpSender socket.", e);
}
}

public CompletableResultCode shutdown() {
try {
socket.close();
return CompletableResultCode.ofSuccess();
} catch (Exception e) {
logger.log(Level.SEVERE, "Exception while closing UdpSender socket.", e);
return CompletableResultCode.ofFailure();
}
}

public void send(byte[] data) {
DatagramPacket packet = new DatagramPacket(data, data.length, endpoint);
try {
socket.send(packet);
} catch (IOException e) {
logger.log(Level.SEVERE, "Exception while sending data.", e);
}
}

// Visible for testing
InetSocketAddress getEndpoint() {
return endpoint;
}
}
Loading
Loading