diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SignalType.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SignalType.java new file mode 100644 index 000000000..c66ed940e --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SignalType.java @@ -0,0 +1,12 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +public enum SignalType { + SPAN, + LOG, + METRIC +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/ExporterCallback.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/ExporterCallback.java new file mode 100644 index 000000000..f877c7b7d --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/ExporterCallback.java @@ -0,0 +1,38 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.exporters; + +import io.opentelemetry.contrib.disk.buffering.SignalType; +import javax.annotation.Nullable; + +/** Notifies about exporter and storage-related operations from within a signal to disk exporter. */ +public interface ExporterCallback { + /** + * Called when an export to disk operation succeeded. + * + * @param type The type of signal associated to the exporter. + */ + void onExportSuccess(SignalType type); + + /** + * Called when an export to disk operation failed. + * + * @param type The type of signal associated to the exporter. + * @param error Optional - provides more information of why the operation failed. + */ + void onExportError(SignalType type, @Nullable Throwable error); + + /** + * Called when the exporter is closed. + * + * @param type The type of signal associated to the exporter. + */ + void onShutdown(SignalType type); + + static ExporterCallback noop() { + return NoopExporterCallback.INSTANCE; + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/NoopExporterCallback.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/NoopExporterCallback.java new file mode 100644 index 000000000..2dd4f2f70 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/NoopExporterCallback.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.exporters; + +import io.opentelemetry.contrib.disk.buffering.SignalType; +import javax.annotation.Nullable; + +final class NoopExporterCallback implements ExporterCallback { + static final NoopExporterCallback INSTANCE = new NoopExporterCallback(); + + private NoopExporterCallback() {} + + @Override + public void onExportSuccess(SignalType type) {} + + @Override + public void onExportError(SignalType type, @Nullable Throwable error) {} + + @Override + public void onShutdown(SignalType type) {} +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SignalStorageExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SignalStorageExporter.java new file mode 100644 index 000000000..51d135299 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SignalStorageExporter.java @@ -0,0 +1,54 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.exporters; + +import io.opentelemetry.contrib.disk.buffering.SignalType; +import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; +import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** Internal utility for common export to disk operations across all exporters. */ +final class SignalStorageExporter { + private final SignalStorage storage; + private final ExporterCallback callback; + private final Duration writeTimeout; + private final SignalType type; + + public SignalStorageExporter( + SignalStorage storage, ExporterCallback callback, Duration writeTimeout, SignalType type) { + this.storage = storage; + this.callback = callback; + this.writeTimeout = writeTimeout; + this.type = type; + } + + public CompletableResultCode exportToStorage(Collection items) { + CompletableFuture future = storage.write(items); + try { + WriteResult operation = future.get(writeTimeout.toMillis(), TimeUnit.MILLISECONDS); + if (operation.isSuccessful()) { + callback.onExportSuccess(type); + return CompletableResultCode.ofSuccess(); + } + + Throwable error = operation.getError(); + callback.onExportError(type, error); + if (error != null) { + return CompletableResultCode.ofExceptionalFailure(error); + } + return CompletableResultCode.ofFailure(); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + callback.onExportError(type, e); + return CompletableResultCode.ofExceptionalFailure(e); + } + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SpanToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SpanToDiskExporter.java new file mode 100644 index 000000000..2bda19da9 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SpanToDiskExporter.java @@ -0,0 +1,76 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.exporters; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.contrib.disk.buffering.SignalType; +import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.time.Duration; +import java.util.Collection; + +/** Exporter that stores spans into disk. */ +public final class SpanToDiskExporter implements SpanExporter { + private final SignalStorageExporter storageExporter; + private final ExporterCallback callback; + private static final SignalType TYPE = SignalType.SPAN; + + private SpanToDiskExporter( + SignalStorageExporter storageExporter, ExporterCallback callback) { + this.storageExporter = storageExporter; + this.callback = callback; + } + + public Builder builder(SignalStorage.Span storage) { + return new Builder(storage); + } + + @Override + public CompletableResultCode export(Collection spans) { + return storageExporter.exportToStorage(spans); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + callback.onShutdown(TYPE); + return CompletableResultCode.ofSuccess(); + } + + public static final class Builder { + private final SignalStorage.Span storage; + private ExporterCallback callback = ExporterCallback.noop(); + private Duration writeTimeout = Duration.ofSeconds(10); + + @CanIgnoreReturnValue + public Builder setExporterCallback(ExporterCallback value) { + callback = value; + return this; + } + + @CanIgnoreReturnValue + public Builder setWriteTimeout(Duration value) { + writeTimeout = value; + return this; + } + + public SpanToDiskExporter build() { + SignalStorageExporter storageExporter = + new SignalStorageExporter<>(storage, callback, writeTimeout, TYPE); + return new SpanToDiskExporter(storageExporter, callback); + } + + private Builder(SignalStorage.Span storage) { + this.storage = storage; + } + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/package-info.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/package-info.java new file mode 100644 index 000000000..6a367dca9 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/package-info.java @@ -0,0 +1,9 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +@ParametersAreNonnullByDefault +package io.opentelemetry.contrib.disk.buffering.exporters; + +import javax.annotation.ParametersAreNonnullByDefault; diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSpanStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSpanStorage.java new file mode 100644 index 000000000..5ba51790f --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSpanStorage.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.storage; + +import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; +import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; + +/** Default storage implementation where items are stored in multiple protobuf files. */ +public final class FileSpanStorage implements SignalStorage.Span { + + @Override + public CompletableFuture write(Collection items) { + throw new UnsupportedOperationException("For next PR"); + } + + @Override + public CompletableFuture clear() { + throw new UnsupportedOperationException("For next PR"); + } + + @Override + public void close() { + throw new UnsupportedOperationException("For next PR"); + } + + @Nonnull + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException("For next PR"); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/SignalStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/SignalStorage.java new file mode 100644 index 000000000..e8daf391a --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/SignalStorage.java @@ -0,0 +1,55 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.storage; + +import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.Closeable; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * Allows writing and iterating over written signal items. + * + * @param The type of signal data supported. + */ +public interface SignalStorage extends Iterable>, Closeable { + + /** + * Stores signal items. + * + * @param items The items to be stored. + * @return A future with {@link WriteResult}. + */ + CompletableFuture write(Collection items); + + /** + * Removes all the previously stored items. + * + * @return A future with {@link WriteResult}. + */ + CompletableFuture clear(); + + /** + * Abstraction for Spans. Implementations should use this instead of {@link SignalStorage} + * directly. + */ + interface Span extends SignalStorage {} + + /** + * Abstraction for Logs. Implementations should use this instead of {@link SignalStorage} + * directly. + */ + interface LogRecord extends SignalStorage {} + + /** + * Abstraction for Metrics. Implementations should use this instead of {@link SignalStorage} + * directly. + */ + interface Metric extends SignalStorage {} +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/DefaultWriteResult.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/DefaultWriteResult.java new file mode 100644 index 000000000..ff693c74b --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/DefaultWriteResult.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.storage.result; + +import javax.annotation.Nullable; + +final class DefaultWriteResult implements WriteResult { + private final boolean successful; + @Nullable private final Throwable error; + + DefaultWriteResult(boolean successful, @Nullable Throwable error) { + this.successful = successful; + this.error = error; + } + + @Override + public boolean isSuccessful() { + return successful; + } + + @Nullable + @Override + public Throwable getError() { + return error; + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/WriteResult.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/WriteResult.java new file mode 100644 index 000000000..8e4534a55 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/WriteResult.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.storage.result; + +import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; +import javax.annotation.Nullable; + +/** The result of a {@link SignalStorage} write operation. */ +public interface WriteResult { + /** + * Whether the operation succeeded or not. + * + * @return `true` if the items have been successfully stored, `false` otherwise. + */ + boolean isSuccessful(); + + /** + * Provides details of why the operation failed. + * + * @return The error (if any) for the failed operation. It must be null for successful operations. + */ + @Nullable + Throwable getError(); + + static WriteResult create(boolean successful, @Nullable Throwable error) { + return new DefaultWriteResult(successful, error); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/exporters/SignalStorageExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/exporters/SignalStorageExporterTest.java new file mode 100644 index 000000000..6f7db99ef --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/exporters/SignalStorageExporterTest.java @@ -0,0 +1,135 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.exporters; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.opentelemetry.contrib.disk.buffering.SignalType; +import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; +import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SignalStorageExporterTest { + @Mock private ExporterCallback callback; + + @Test + void verifyExportToStorage_success() { + SignalStorage.Span storage = new TestSpanStorage(); + SignalType signalType = SignalType.SPAN; + SignalStorageExporter storageExporter = + new SignalStorageExporter<>(storage, callback, Duration.ofSeconds(1), signalType); + SpanData item1 = mock(); + SpanData item2 = mock(); + SpanData item3 = mock(); + + CompletableResultCode resultCode = storageExporter.exportToStorage(Arrays.asList(item1, item2)); + + assertThat(resultCode.isSuccess()).isTrue(); + verify(callback).onExportSuccess(signalType); + verifyNoMoreInteractions(callback); + + // Adding more items + clearInvocations(callback); + resultCode = storageExporter.exportToStorage(Collections.singletonList(item3)); + + assertThat(resultCode.isSuccess()).isTrue(); + verify(callback).onExportSuccess(signalType); + verifyNoMoreInteractions(callback); + + // Checking items + List storedItems = new ArrayList<>(); + for (Collection collection : storage) { + storedItems.addAll(collection); + } + assertThat(storedItems).containsExactly(item1, item2, item3); + } + + @SuppressWarnings("ThrowableNotThrown") + @Test + void verifyExportToStorage_failure() { + SignalStorage.Span storage = mock(); + SignalType signalType = SignalType.SPAN; + SignalStorageExporter storageExporter = + new SignalStorageExporter<>(storage, callback, Duration.ofSeconds(1), signalType); + SpanData item1 = mock(); + + // Without exception + when(storage.write(anyCollection())) + .thenReturn(CompletableFuture.completedFuture(WriteResult.create(false, null))); + + CompletableResultCode resultCode = + storageExporter.exportToStorage(Collections.singletonList(item1)); + + assertThat(resultCode.isSuccess()).isFalse(); + assertThat(resultCode.getFailureThrowable()).isNull(); + verify(callback).onExportError(signalType, null); + verifyNoMoreInteractions(callback); + + // With exception + clearInvocations(callback); + Exception exception = new Exception(); + when(storage.write(anyCollection())) + .thenReturn(CompletableFuture.completedFuture(WriteResult.create(false, exception))); + + resultCode = storageExporter.exportToStorage(Collections.singletonList(item1)); + + assertThat(resultCode.isSuccess()).isFalse(); + assertThat(resultCode.getFailureThrowable()).isEqualTo(exception); + verify(callback).onExportError(signalType, exception); + verifyNoMoreInteractions(callback); + } + + private static class TestSpanStorage implements SignalStorage.Span { + private final List> storedItems = new ArrayList<>(); + + @Override + public CompletableFuture write(Collection items) { + storedItems.add(items); + return getSuccessfulFuture(); + } + + @Override + public CompletableFuture clear() { + storedItems.clear(); + return getSuccessfulFuture(); + } + + @Override + public void close() {} + + @Nonnull + @Override + public Iterator> iterator() { + return storedItems.iterator(); + } + + @Nonnull + private static CompletableFuture getSuccessfulFuture() { + return CompletableFuture.completedFuture(WriteResult.create(true, null)); + } + } +}