Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions awsagentprovider/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ dependencies {
implementation("com.amazonaws:aws-java-sdk-core:1.12.773")
// Export configuration
compileOnly("io.opentelemetry:opentelemetry-exporter-otlp")
// For Udp emitter
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
implementation("io.opentelemetry:opentelemetry-exporter-common")

testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.Immutable;

@Immutable
public class OtlpUdpSpanExporter implements SpanExporter {

private static final Logger logger = Logger.getLogger(OtlpUdpSpanExporter.class.getName());
private static final String PROTOCOL_HEADER = "{\"format\": \"json\", \"version\": 1}";
private static final char PROTOCOL_DELIMITER = '\n';
private static final String FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX = "T1S";
private static final String FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX = "T1U";

private final AtomicBoolean isShutdown = new AtomicBoolean();

private final UdpSender sender;
private final boolean sampled;

OtlpUdpSpanExporter(UdpSender sender, boolean sampled) {
this.sender = sender;
this.sampled = sampled;
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
exportRequest.writeBinaryTo(baos);
String payload =
PROTOCOL_HEADER
+ PROTOCOL_DELIMITER
+ (sampled
? FORMAT_OTEL_SAMPLED_TRACES_BINARY_PREFIX
: FORMAT_OTEL_UNSAMPLED_TRACES_BINARY_PREFIX)
+ Base64.getEncoder().encodeToString(baos.toByteArray());
sender.send(payload.getBytes(StandardCharsets.UTF_8));
return CompletableResultCode.ofSuccess();
} catch (Exception e) {
logger.log(Level.SEVERE, "Failed to export spans. Error: " + e.getMessage(), e);
return CompletableResultCode.ofFailure();
}
}

@Override
public CompletableResultCode flush() {
// TODO: implement
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return sender.shutdown();
}

// Visible for testing
UdpSender getSender() {
return sender;
}

// Visible for testing
boolean isSampled() {
return sampled;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import static java.util.Objects.requireNonNull;

import java.net.SocketException;

public final class OtlpUdpSpanExporterBuilder {

private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 2000;

private UdpSender sender;
private boolean sampled = true;

public OtlpUdpSpanExporterBuilder setEndpoint(String endpoint) {
requireNonNull(endpoint, "endpoint must not be null");
try {
String[] parts = endpoint.split(":");
String host = parts[0];
int port = Integer.parseInt(parts[1]);
this.sender = new UdpSender(host, port);
} catch (Exception e) {
throw new IllegalArgumentException("Invalid endpoint, must be a valid URL: " + endpoint, e);
}
return this;
}

public OtlpUdpSpanExporterBuilder setSampled(boolean sampled) {
this.sampled = sampled;
return this;
}

public OtlpUdpSpanExporter build() {
if (sender == null) {
try {
this.sender = new UdpSender(DEFAULT_HOST, DEFAULT_PORT);
} catch (SocketException e) {
throw new IllegalStateException("Failed to create OtlpUdpSpanExporter", e);
}
}
return new OtlpUdpSpanExporter(this.sender, this.sampled);
}

// Only for testing
OtlpUdpSpanExporterBuilder setSender(UdpSender sender) {
this.sender = sender;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class UdpSender {
private static final Logger logger = Logger.getLogger(UdpSender.class.getName());

private final DatagramSocket socket;
private final InetSocketAddress endpoint;

public UdpSender(String host, int port) throws SocketException {
this.endpoint = new InetSocketAddress(host, port);
try {
socket = new DatagramSocket();
} catch (SocketException e) {
logger.log(Level.SEVERE, "Exception while instantiating UdpSender socket.", e);
throw e;
}
}

public CompletableResultCode shutdown() {
try {
socket.close();
return CompletableResultCode.ofSuccess();
} catch (Exception e) {
logger.log(Level.SEVERE, "Exception while closing UdpSender socket.", e);
return CompletableResultCode.ofFailure();
}
}

public void send(byte[] data) {
DatagramPacket packet = new DatagramPacket(data, data.length, endpoint);
try {
socket.send(packet);
} catch (IOException e) {
logger.log(Level.SEVERE, "Exception while sending data.", e);
}
}

// Visible for testing
InetSocketAddress getEndpoint() {
return endpoint;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright Amazon.com, Inc. or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.opentelemetry.javaagent.providers;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.*;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.*;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.util.Collections;
import org.junit.jupiter.api.Test;

public class UdpExporterTest {

@Test
public void testUdpExporterWithDefaults() {
OtlpUdpSpanExporter exporter = new OtlpUdpSpanExporterBuilder().build();
UdpSender sender = exporter.getSender();
assertThat(sender.getEndpoint().getHostName())
.isEqualTo("localhost"); // getHostName implicitly converts 127.0.0.1 to localhost
assertThat(sender.getEndpoint().getPort()).isEqualTo(2000);
assertThat(exporter.isSampled()).isTrue();
}

@Test
public void testUdpExporterWithCustomEndpointAndSample() {
OtlpUdpSpanExporter exporter =
new OtlpUdpSpanExporterBuilder().setEndpoint("somehost:1000").setSampled(false).build();
UdpSender sender = exporter.getSender();
assertThat(sender.getEndpoint().getHostName()).isEqualTo("somehost");
assertThat(sender.getEndpoint().getPort()).isEqualTo(1000);
assertThat(exporter.isSampled()).isFalse();
}

@Test
public void testUdpExporterWithInvalidEndpoint() {
assertThatThrownBy(
() -> {
new OtlpUdpSpanExporterBuilder().setEndpoint("invalidhost");
})
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must be a valid URL: invalidhost");
}

@Test
public void testExport() {
UdpSender senderMock = mock(UdpSender.class);

// mock SpanData
SpanData spanData = buildSpanDataMock();

OtlpUdpSpanExporter exporter = new OtlpUdpSpanExporterBuilder().setSender(senderMock).build();
exporter.export(Collections.singletonList(spanData));

// assert that the senderMock.send is called once
verify(senderMock, times(1)).send(any(byte[].class));
}

private static SpanData buildSpanDataMock() {
SpanData mockSpanData = mock(SpanData.class);

Attributes spanAttributes =
Attributes.of(AttributeKey.stringKey("original key"), "original value");
when(mockSpanData.getAttributes()).thenReturn(spanAttributes);
when(mockSpanData.getTotalAttributeCount()).thenReturn(spanAttributes.size());
when(mockSpanData.getKind()).thenReturn(SpanKind.SERVER);

SpanContext parentSpanContextMock = mock(SpanContext.class);
when(mockSpanData.getParentSpanContext()).thenReturn(parentSpanContextMock);

SpanContext spanContextMock = mock(SpanContext.class);
when(spanContextMock.isValid()).thenReturn(true);
when(mockSpanData.getSpanContext()).thenReturn(spanContextMock);

TraceState traceState = TraceState.builder().build();
when(spanContextMock.getTraceState()).thenReturn(traceState);

when(mockSpanData.getStatus()).thenReturn(StatusData.unset());
when(mockSpanData.getInstrumentationScopeInfo())
.thenReturn(InstrumentationScopeInfo.create("Dummy Scope"));

Resource testResource = Resource.empty();
when(mockSpanData.getResource()).thenReturn(testResource);

return mockSpanData;
}
}
Loading