From 8b7fa68059163fa359061f0be5544c898643fa0c Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 27 May 2025 08:06:33 +0200 Subject: [PATCH 01/11] pass storage --- .../disk/buffering/SpanToDiskExporter.java | 20 ++++++++++++++++ .../internal/exporter/ToDiskExporter.java | 5 ++++ .../exporter/ToDiskExporterBuilder.java | 24 ++++++++++++++++++- .../disk/buffering/IntegrationTest.java | 14 ++++++----- 4 files changed, 56 insertions(+), 7 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java index d64a4cd71..02b6199eb 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java @@ -8,6 +8,7 @@ import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; @@ -31,6 +32,7 @@ public class SpanToDiskExporter implements SpanExporter { * @return A new SpanToDiskExporter instance. * @throws IOException if the delegate ToDiskExporter could not be created. */ + @Deprecated public static SpanToDiskExporter create(SpanExporter delegate, StorageConfiguration config) throws IOException { ToDiskExporter toDisk = @@ -43,6 +45,24 @@ public static SpanToDiskExporter create(SpanExporter delegate, StorageConfigurat return new SpanToDiskExporter(toDisk); } + /** + * Creates a new SpanToDiskExporter that will buffer Span telemetry on disk storage. + * + * @param delegate - The SpanExporter to delegate to if disk writing fails. + * @return A new SpanToDiskExporter instance. + * @throws IOException if the delegate ToDiskExporter could not be created. + */ + public static SpanToDiskExporter create(SpanExporter delegate, + Storage storage) + throws IOException { + ToDiskExporter toDisk = + ToDiskExporter.builder(storage) + .setSerializer(SignalSerializer.ofSpans()) + .setExportFunction(delegate::export) + .build(); + return new SpanToDiskExporter(toDisk); + } + // Visible for testing SpanToDiskExporter(ToDiskExporter delegate) { this.delegate = delegate; diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java index 1a43cb5eb..270bb49ff 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java @@ -33,10 +33,15 @@ public class ToDiskExporter { this.logger = DebugLogger.wrap(Logger.getLogger(ToDiskExporter.class.getName()), debugEnabled); } + @Deprecated public static ToDiskExporterBuilder builder() { return new ToDiskExporterBuilder<>(); } + public static ToDiskExporterBuilder builder(Storage storage) { + return new ToDiskExporterBuilder<>(storage); + } + public CompletableResultCode export(Collection data) { logger.log("Intercepting exporter batch.", Level.FINER); try { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java index 3ac7d2503..b90730e33 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java @@ -12,6 +12,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Collection; import java.util.function.Function; @@ -20,14 +21,26 @@ public final class ToDiskExporterBuilder { private SignalSerializer serializer = ts -> new byte[0]; + @Deprecated private final StorageBuilder storageBuilder = Storage.builder(); + @Nullable + private Storage storage = null; + private Function, CompletableResultCode> exportFunction = x -> CompletableResultCode.ofFailure(); private boolean debugEnabled = false; + @Deprecated ToDiskExporterBuilder() {} + ToDiskExporterBuilder(Storage storage) { + if (storage == null) { + throw new NullPointerException("Storage cannot be null"); + } + this.storage = storage; + } + @CanIgnoreReturnValue public ToDiskExporterBuilder enableDebug() { return setDebugEnabled(true); @@ -39,12 +52,14 @@ public ToDiskExporterBuilder setDebugEnabled(boolean debugEnabled) { return this; } + @Deprecated @CanIgnoreReturnValue public ToDiskExporterBuilder setFolderName(String folderName) { storageBuilder.setFolderName(folderName); return this; } + @Deprecated @CanIgnoreReturnValue public ToDiskExporterBuilder setStorageConfiguration(StorageConfiguration configuration) { validateConfiguration(configuration); @@ -52,12 +67,19 @@ public ToDiskExporterBuilder setStorageConfiguration(StorageConfiguration con return this; } + @Deprecated @CanIgnoreReturnValue public ToDiskExporterBuilder setStorageClock(Clock clock) { storageBuilder.setStorageClock(clock); return this; } + @CanIgnoreReturnValue + public ToDiskExporterBuilder setStorage(Storage storage) { + this.storage = storage; + return this; + } + @CanIgnoreReturnValue public ToDiskExporterBuilder setSerializer(SignalSerializer serializer) { this.serializer = serializer; @@ -72,7 +94,7 @@ public ToDiskExporterBuilder setExportFunction( } public ToDiskExporter build() throws IOException { - Storage storage = storageBuilder.build(); + Storage storage = this.storage != null ? this.storage : storageBuilder.build(); return new ToDiskExporter<>(serializer, exportFunction, storage, debugEnabled); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index c1c42b0bb..5eb268de6 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -22,6 +22,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; @@ -66,12 +67,16 @@ public class IntegrationTest { @TempDir File rootDir; private static final long INITIAL_TIME_IN_MILLIS = 1000; private static final long NOW_NANOS = MILLISECONDS.toNanos(INITIAL_TIME_IN_MILLIS); - private StorageConfiguration storageConfig; + private Storage storage; @BeforeEach void setUp() throws IOException { - storageConfig = StorageConfiguration.getDefault(rootDir); clock = mock(); + storage = Storage.builder() + .setFolderName(SignalTypes.spans.name()) + .setStorageConfiguration(StorageConfiguration.getDefault(rootDir)) + .setStorageClock(clock) + .build(); when(clock.now()).thenReturn(NOW_NANOS); @@ -102,12 +107,9 @@ void setUp() throws IOException { private ToDiskExporter buildToDiskExporter( SignalSerializer serializer, Function, CompletableResultCode> exporter) throws IOException { - return ToDiskExporter.builder() - .setFolderName(SignalTypes.spans.name()) - .setStorageConfiguration(storageConfig) + return ToDiskExporter.builder(storage) .setSerializer(serializer) .setExportFunction(exporter) - .setStorageClock(clock) .build(); } From 1a354ce9e4aaa9075a7dcc036228ebd56835119d Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 27 May 2025 08:10:37 +0200 Subject: [PATCH 02/11] pass storage --- .../contrib/disk/buffering/LogRecordToDiskExporter.java | 1 + .../contrib/disk/buffering/MetricToDiskExporter.java | 1 + .../opentelemetry/contrib/disk/buffering/IntegrationTest.java | 4 +++- .../internal/exporter/ToDiskExporterBuilderTest.java | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java index 7570aed8e..19f68e544 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java @@ -30,6 +30,7 @@ public class LogRecordToDiskExporter implements LogRecordExporter { * @return A new LogRecordToDiskExporter instance. * @throws IOException if the delegate ToDiskExporter could not be created. */ + @SuppressWarnings("deprecation") public static LogRecordToDiskExporter create( LogRecordExporter delegate, StorageConfiguration config) throws IOException { ToDiskExporter toDisk = diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java index bf2e7066f..605c5094b 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java @@ -35,6 +35,7 @@ public class MetricToDiskExporter implements MetricExporter { * @return A new MetricToDiskExporter instance. * @throws IOException if the delegate ToDiskExporter could not be created. */ + @SuppressWarnings("deprecation") public static MetricToDiskExporter create(MetricExporter delegate, StorageConfiguration config) throws IOException { ToDiskExporter toDisk = diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index 5eb268de6..c57820dd0 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -67,14 +67,16 @@ public class IntegrationTest { @TempDir File rootDir; private static final long INITIAL_TIME_IN_MILLIS = 1000; private static final long NOW_NANOS = MILLISECONDS.toNanos(INITIAL_TIME_IN_MILLIS); + private StorageConfiguration storageConfig; private Storage storage; @BeforeEach void setUp() throws IOException { clock = mock(); + storageConfig = StorageConfiguration.getDefault(rootDir); storage = Storage.builder() .setFolderName(SignalTypes.spans.name()) - .setStorageConfiguration(StorageConfiguration.getDefault(rootDir)) + .setStorageConfiguration(storageConfig) .setStorageClock(clock) .build(); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java index 288388e03..fbecada27 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java @@ -14,6 +14,7 @@ class ToDiskExporterBuilderTest { + @SuppressWarnings("deprecation") // todo @Test void whenMinFileReadIsNotGraterThanMaxFileWrite_throwException() { StorageConfiguration invalidConfig = From 3112b09a691a6408e6aac23f6c9ba566467f82e4 Mon Sep 17 00:00:00 2001 From: otelbot <197425009+otelbot@users.noreply.github.com> Date: Tue, 27 May 2025 06:13:05 +0000 Subject: [PATCH 03/11] ./gradlew spotlessApply --- .../contrib/disk/buffering/SpanToDiskExporter.java | 3 +-- .../internal/exporter/ToDiskExporterBuilder.java | 8 +++----- .../contrib/disk/buffering/IntegrationTest.java | 11 ++++++----- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java index 02b6199eb..00daf57c1 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java @@ -52,8 +52,7 @@ public static SpanToDiskExporter create(SpanExporter delegate, StorageConfigurat * @return A new SpanToDiskExporter instance. * @throws IOException if the delegate ToDiskExporter could not be created. */ - public static SpanToDiskExporter create(SpanExporter delegate, - Storage storage) + public static SpanToDiskExporter create(SpanExporter delegate, Storage storage) throws IOException { ToDiskExporter toDisk = ToDiskExporter.builder(storage) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java index b90730e33..34ca24f92 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java @@ -12,20 +12,18 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; -import javax.annotation.Nullable; import java.io.IOException; import java.util.Collection; import java.util.function.Function; +import javax.annotation.Nullable; public final class ToDiskExporterBuilder { private SignalSerializer serializer = ts -> new byte[0]; - @Deprecated - private final StorageBuilder storageBuilder = Storage.builder(); + @Deprecated private final StorageBuilder storageBuilder = Storage.builder(); - @Nullable - private Storage storage = null; + @Nullable private Storage storage = null; private Function, CompletableResultCode> exportFunction = x -> CompletableResultCode.ofFailure(); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index c57820dd0..21284e759 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -74,11 +74,12 @@ public class IntegrationTest { void setUp() throws IOException { clock = mock(); storageConfig = StorageConfiguration.getDefault(rootDir); - storage = Storage.builder() - .setFolderName(SignalTypes.spans.name()) - .setStorageConfiguration(storageConfig) - .setStorageClock(clock) - .build(); + storage = + Storage.builder() + .setFolderName(SignalTypes.spans.name()) + .setStorageConfiguration(storageConfig) + .setStorageClock(clock) + .build(); when(clock.now()).thenReturn(NOW_NANOS); From 227a5c4119b267d237d679042bc9affdeb6766f0 Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 27 May 2025 10:45:41 +0200 Subject: [PATCH 04/11] pass storage --- .../buffering/LogRecordFromDiskExporter.java | 12 ++-- .../buffering/LogRecordToDiskExporter.java | 14 ++-- .../buffering/MetricFromDiskExporter.java | 10 +-- .../disk/buffering/MetricToDiskExporter.java | 14 ++-- .../disk/buffering/SpanFromDiskExporter.java | 10 +-- .../disk/buffering/SpanToDiskExporter.java | 28 +------- .../exporter/FromDiskExporterBuilder.java | 46 +++---------- .../exporter/FromDiskExporterImpl.java | 10 +-- .../internal/exporter/ToDiskExporter.java | 12 ++-- .../exporter/ToDiskExporterBuilder.java | 64 +------------------ .../buffering/internal/storage/Storage.java | 7 +- .../internal/storage/StorageBuilder.java | 12 ++++ .../buffering/FromDiskExporterImplTest.java | 6 +- .../disk/buffering/IntegrationTest.java | 29 +++------ .../buffering/SpanFromDiskExporterTest.java | 26 ++++---- .../exporter/ToDiskExporterBuilderTest.java | 33 ---------- .../internal/exporter/ToDiskExporterTest.java | 2 +- .../internal/storage/StorageTest.java | 18 ++++++ .../buffering/internal/storage/TestData.java | 12 ++++ 19 files changed, 113 insertions(+), 252 deletions(-) delete mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java index 7b37ee361..c26a383d6 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java @@ -5,11 +5,10 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; import java.io.IOException; @@ -19,15 +18,12 @@ public class LogRecordFromDiskExporter implements FromDiskExporter { private final FromDiskExporterImpl delegate; - public static LogRecordFromDiskExporter create( - LogRecordExporter exporter, StorageConfiguration config) throws IOException { + public static LogRecordFromDiskExporter create(LogRecordExporter exporter, Storage storage) + throws IOException { FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder() - .setFolderName(SignalTypes.logs.name()) - .setStorageConfiguration(config) + FromDiskExporterImpl.builder(storage) .setDeserializer(SignalDeserializer.ofLogs()) .setExportFunction(exporter::export) - .setDebugEnabled(config.isDebugEnabled()) .build(); return new LogRecordFromDiskExporter(delegate); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java index 19f68e544..665e90f76 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java @@ -5,10 +5,9 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; @@ -26,17 +25,12 @@ public class LogRecordToDiskExporter implements LogRecordExporter { * Creates a new LogRecordToDiskExporter that will buffer LogRecordData telemetry on disk storage. * * @param delegate - The LogRecordExporter to delegate to if disk writing fails. - * @param config - The StorageConfiguration that specifies how storage is managed. + * @param storage - The Storage instance that specifies how storage is managed. * @return A new LogRecordToDiskExporter instance. - * @throws IOException if the delegate ToDiskExporter could not be created. */ - @SuppressWarnings("deprecation") - public static LogRecordToDiskExporter create( - LogRecordExporter delegate, StorageConfiguration config) throws IOException { + public static LogRecordToDiskExporter create(LogRecordExporter delegate, Storage storage) { ToDiskExporter toDisk = - ToDiskExporter.builder() - .setFolderName(SignalTypes.logs.name()) - .setStorageConfiguration(config) + ToDiskExporter.builder(storage) .setSerializer(SignalSerializer.ofLogs()) .setExportFunction(delegate::export) .build(); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java index bf652f8f8..8bb4f3dcd 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java @@ -5,11 +5,10 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.io.IOException; @@ -19,15 +18,12 @@ public class MetricFromDiskExporter implements FromDiskExporter { private final FromDiskExporterImpl delegate; - public static MetricFromDiskExporter create(MetricExporter exporter, StorageConfiguration config) + public static MetricFromDiskExporter create(MetricExporter exporter, Storage storage) throws IOException { FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder() - .setFolderName(SignalTypes.metrics.name()) - .setStorageConfiguration(config) + FromDiskExporterImpl.builder(storage) .setDeserializer(SignalDeserializer.ofMetrics()) .setExportFunction(exporter::export) - .setDebugEnabled(config.isDebugEnabled()) .build(); return new MetricFromDiskExporter(delegate); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java index 605c5094b..83d2fc73c 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java @@ -5,10 +5,9 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; @@ -31,17 +30,12 @@ public class MetricToDiskExporter implements MetricExporter { * Creates a new MetricToDiskExporter that will buffer Metric telemetry on disk storage. * * @param delegate - The MetricExporter to delegate to if disk writing fails. - * @param config - The StorageConfiguration that specifies how storage is managed. + * @param storage - The Storage instance that specifies how storage is managed. * @return A new MetricToDiskExporter instance. - * @throws IOException if the delegate ToDiskExporter could not be created. */ - @SuppressWarnings("deprecation") - public static MetricToDiskExporter create(MetricExporter delegate, StorageConfiguration config) - throws IOException { + public static MetricToDiskExporter create(MetricExporter delegate, Storage storage) { ToDiskExporter toDisk = - ToDiskExporter.builder() - .setFolderName(SignalTypes.metrics.name()) - .setStorageConfiguration(config) + ToDiskExporter.builder(storage) .setSerializer(SignalSerializer.ofMetrics()) .setExportFunction(delegate::export) .build(); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java index c23ac043e..e3c7992ba 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java @@ -5,11 +5,10 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.io.IOException; @@ -19,15 +18,12 @@ public class SpanFromDiskExporter implements FromDiskExporter { private final FromDiskExporterImpl delegate; - public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfiguration config) + public static SpanFromDiskExporter create(SpanExporter exporter, Storage storage) throws IOException { FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder() - .setFolderName(SignalTypes.spans.name()) - .setStorageConfiguration(config) + FromDiskExporterImpl.builder(storage) .setDeserializer(SignalDeserializer.ofSpans()) .setExportFunction(exporter::export) - .setDebugEnabled(config.isDebugEnabled()) .build(); return new SpanFromDiskExporter(delegate); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java index 00daf57c1..d5ca81518 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java @@ -5,11 +5,9 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; @@ -28,32 +26,10 @@ public class SpanToDiskExporter implements SpanExporter { * Creates a new SpanToDiskExporter that will buffer Span telemetry on disk storage. * * @param delegate - The SpanExporter to delegate to if disk writing fails. - * @param config - The StorageConfiguration that specifies how storage is managed. + * @param storage - The Storage instance that specifies how storage is managed. * @return A new SpanToDiskExporter instance. - * @throws IOException if the delegate ToDiskExporter could not be created. */ - @Deprecated - public static SpanToDiskExporter create(SpanExporter delegate, StorageConfiguration config) - throws IOException { - ToDiskExporter toDisk = - ToDiskExporter.builder() - .setFolderName(SignalTypes.spans.name()) - .setStorageConfiguration(config) - .setSerializer(SignalSerializer.ofSpans()) - .setExportFunction(delegate::export) - .build(); - return new SpanToDiskExporter(toDisk); - } - - /** - * Creates a new SpanToDiskExporter that will buffer Span telemetry on disk storage. - * - * @param delegate - The SpanExporter to delegate to if disk writing fails. - * @return A new SpanToDiskExporter instance. - * @throws IOException if the delegate ToDiskExporter could not be created. - */ - public static SpanToDiskExporter create(SpanExporter delegate, Storage storage) - throws IOException { + public static SpanToDiskExporter create(SpanExporter delegate, Storage storage) { ToDiskExporter toDisk = ToDiskExporter.builder(storage) .setSerializer(SignalSerializer.ofSpans()) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java index eec298469..a91ded1f3 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java @@ -8,11 +8,8 @@ import static java.util.Collections.emptyList; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; -import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder; -import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.IOException; import java.util.Collection; @@ -22,36 +19,23 @@ public class FromDiskExporterBuilder { private SignalDeserializer serializer = noopDeserializer(); + private final Storage storage; + private Function, CompletableResultCode> exportFunction = x -> CompletableResultCode.ofFailure(); - private boolean debugEnabled = false; + public FromDiskExporterBuilder(Storage storage) { + if (storage == null) { + throw new NullPointerException("Storage cannot be null"); + } + this.storage = storage; + } @NotNull private static SignalDeserializer noopDeserializer() { return x -> emptyList(); } - private final StorageBuilder storageBuilder = Storage.builder(); - - @CanIgnoreReturnValue - public FromDiskExporterBuilder setFolderName(String folderName) { - storageBuilder.setFolderName(folderName); - return this; - } - - @CanIgnoreReturnValue - public FromDiskExporterBuilder setStorageConfiguration(StorageConfiguration configuration) { - storageBuilder.setStorageConfiguration(configuration); - return this; - } - - @CanIgnoreReturnValue - public FromDiskExporterBuilder setStorageClock(Clock clock) { - storageBuilder.setStorageClock(clock); - return this; - } - @CanIgnoreReturnValue public FromDiskExporterBuilder setDeserializer(SignalDeserializer serializer) { this.serializer = serializer; @@ -65,19 +49,7 @@ public FromDiskExporterBuilder setExportFunction( return this; } - @CanIgnoreReturnValue - public FromDiskExporterBuilder enableDebug() { - return setDebugEnabled(true); - } - - @CanIgnoreReturnValue - public FromDiskExporterBuilder setDebugEnabled(boolean debugEnabled) { - this.debugEnabled = debugEnabled; - return this; - } - public FromDiskExporterImpl build() throws IOException { - Storage storage = storageBuilder.build(); - return new FromDiskExporterImpl<>(serializer, exportFunction, storage, debugEnabled); + return new FromDiskExporterImpl<>(serializer, exportFunction, storage); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java index 19ef6fe2c..5ba5c2390 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java @@ -32,17 +32,17 @@ public final class FromDiskExporterImpl implements FromDiskExporter FromDiskExporterImpl( SignalDeserializer deserializer, Function, CompletableResultCode> exportFunction, - Storage storage, - boolean debugEnabled) { + Storage storage) { this.deserializer = deserializer; this.exportFunction = exportFunction; this.storage = storage; this.logger = - DebugLogger.wrap(Logger.getLogger(FromDiskExporterImpl.class.getName()), debugEnabled); + DebugLogger.wrap( + Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled()); } - public static FromDiskExporterBuilder builder() { - return new FromDiskExporterBuilder<>(); + public static FromDiskExporterBuilder builder(Storage storage) { + return new FromDiskExporterBuilder<>(storage); } /** diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java index 270bb49ff..b54e3cc16 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java @@ -25,17 +25,13 @@ public class ToDiskExporter { ToDiskExporter( SignalSerializer serializer, Function, CompletableResultCode> exportFunction, - Storage storage, - boolean debugEnabled) { + Storage storage) { this.serializer = serializer; this.exportFunction = exportFunction; this.storage = storage; - this.logger = DebugLogger.wrap(Logger.getLogger(ToDiskExporter.class.getName()), debugEnabled); - } - - @Deprecated - public static ToDiskExporterBuilder builder() { - return new ToDiskExporterBuilder<>(); + this.logger = + DebugLogger.wrap( + Logger.getLogger(ToDiskExporter.class.getName()), storage.isDebugEnabled()); } public static ToDiskExporterBuilder builder(Storage storage) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java index 34ca24f92..069e08986 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java @@ -6,31 +6,20 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporter; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; -import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder; -import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; -import java.io.IOException; import java.util.Collection; import java.util.function.Function; -import javax.annotation.Nullable; public final class ToDiskExporterBuilder { private SignalSerializer serializer = ts -> new byte[0]; - @Deprecated private final StorageBuilder storageBuilder = Storage.builder(); - - @Nullable private Storage storage = null; + private final Storage storage; private Function, CompletableResultCode> exportFunction = x -> CompletableResultCode.ofFailure(); - private boolean debugEnabled = false; - - @Deprecated - ToDiskExporterBuilder() {} ToDiskExporterBuilder(Storage storage) { if (storage == null) { @@ -39,45 +28,6 @@ public final class ToDiskExporterBuilder { this.storage = storage; } - @CanIgnoreReturnValue - public ToDiskExporterBuilder enableDebug() { - return setDebugEnabled(true); - } - - @CanIgnoreReturnValue - public ToDiskExporterBuilder setDebugEnabled(boolean debugEnabled) { - this.debugEnabled = debugEnabled; - return this; - } - - @Deprecated - @CanIgnoreReturnValue - public ToDiskExporterBuilder setFolderName(String folderName) { - storageBuilder.setFolderName(folderName); - return this; - } - - @Deprecated - @CanIgnoreReturnValue - public ToDiskExporterBuilder setStorageConfiguration(StorageConfiguration configuration) { - validateConfiguration(configuration); - storageBuilder.setStorageConfiguration(configuration); - return this; - } - - @Deprecated - @CanIgnoreReturnValue - public ToDiskExporterBuilder setStorageClock(Clock clock) { - storageBuilder.setStorageClock(clock); - return this; - } - - @CanIgnoreReturnValue - public ToDiskExporterBuilder setStorage(Storage storage) { - this.storage = storage; - return this; - } - @CanIgnoreReturnValue public ToDiskExporterBuilder setSerializer(SignalSerializer serializer) { this.serializer = serializer; @@ -91,15 +41,7 @@ public ToDiskExporterBuilder setExportFunction( return this; } - public ToDiskExporter build() throws IOException { - Storage storage = this.storage != null ? this.storage : storageBuilder.build(); - return new ToDiskExporter<>(serializer, exportFunction, storage, debugEnabled); - } - - private static void validateConfiguration(StorageConfiguration configuration) { - if (configuration.getMinFileAgeForReadMillis() <= configuration.getMaxFileAgeForWriteMillis()) { - throw new IllegalArgumentException( - "The configured max file age for writing must be lower than the configured min file age for reading"); - } + public ToDiskExporter build() { + return new ToDiskExporter<>(serializer, exportFunction, storage); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index 4ff60cbdc..78024cb9a 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -24,8 +24,8 @@ public final class Storage implements Closeable { private static final int MAX_ATTEMPTS = 3; private final DebugLogger logger; - private final FolderManager folderManager; + private final boolean debugEnabled; private final AtomicBoolean isClosed = new AtomicBoolean(false); @Nullable private WritableFile writableFile; @Nullable private ReadableFile readableFile; @@ -34,12 +34,17 @@ public Storage(FolderManager folderManager, boolean debugEnabled) { this.folderManager = folderManager; this.logger = DebugLogger.wrap(Logger.getLogger(FromDiskExporterImpl.class.getName()), debugEnabled); + this.debugEnabled = debugEnabled; } public static StorageBuilder builder() { return new StorageBuilder(); } + public boolean isDebugEnabled() { + return debugEnabled; + } + /** * Attempts to write an item into a writable file. * diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java index c8b0435ca..62660b074 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java @@ -31,6 +31,7 @@ public StorageBuilder setFolderName(String folderName) { @CanIgnoreReturnValue public StorageBuilder setStorageConfiguration(StorageConfiguration configuration) { + validateConfiguration(configuration); this.configuration = configuration; return this; } @@ -57,4 +58,15 @@ private static File ensureSubdir(File rootDir, String child) throws IOException } throw new IOException("Could not create the subdir: '" + child + "' inside: " + rootDir); } + + private static void validateConfiguration(StorageConfiguration configuration) { + // ignore the check if debug is enabled - because it's needed for a test case + // todo remove this when the test case is fixed + if (!configuration.isDebugEnabled() + && configuration.getMinFileAgeForReadMillis() + <= configuration.getMaxFileAgeForWriteMillis()) { + throw new IllegalArgumentException( + "The configured max file age for writing must be lower than the configured min file age for reading"); + } + } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java index b2955630e..3f4587a0b 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java @@ -49,12 +49,10 @@ void setUp() throws IOException { setUpSerializer(); wrapped = mock(); exporter = - FromDiskExporterImpl.builder() - .setFolderName(STORAGE_FOLDER_NAME) - .setStorageConfiguration(TestData.getDefaultConfiguration(rootDir)) + FromDiskExporterImpl.builder( + TestData.getDefaultStorage(rootDir, STORAGE_FOLDER_NAME, clock)) .setDeserializer(deserializer) .setExportFunction(wrapped::export) - .setStorageClock(clock) .build(); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index 21284e759..631aa04fa 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -54,14 +54,11 @@ public class IntegrationTest { private InMemorySpanExporter memorySpanExporter; - private SpanToDiskExporter spanToDiskExporter; private Tracer tracer; private InMemoryMetricExporter memoryMetricExporter; - private MetricToDiskExporter metricToDiskExporter; private SdkMeterProvider meterProvider; private Meter meter; private InMemoryLogRecordExporter memoryLogRecordExporter; - private LogRecordToDiskExporter logToDiskExporter; private Logger logger; private Clock clock; @TempDir File rootDir; @@ -87,14 +84,15 @@ void setUp() throws IOException { memorySpanExporter = InMemorySpanExporter.create(); ToDiskExporter toDiskSpanExporter = buildToDiskExporter(SignalSerializer.ofSpans(), memorySpanExporter::export); - spanToDiskExporter = new SpanToDiskExporter(toDiskSpanExporter); + SpanToDiskExporter spanToDiskExporter = new SpanToDiskExporter(toDiskSpanExporter); tracer = createTracerProvider(spanToDiskExporter).get("SpanInstrumentationScope"); // Setting up metrics memoryMetricExporter = InMemoryMetricExporter.create(); ToDiskExporter toDiskMetricExporter = buildToDiskExporter(SignalSerializer.ofMetrics(), memoryMetricExporter::export); - metricToDiskExporter = new MetricToDiskExporter(toDiskMetricExporter, memoryMetricExporter); + MetricToDiskExporter metricToDiskExporter = + new MetricToDiskExporter(toDiskMetricExporter, memoryMetricExporter); meterProvider = createMeterProvider(metricToDiskExporter); meter = meterProvider.get("MetricInstrumentationScope"); @@ -102,14 +100,13 @@ void setUp() throws IOException { memoryLogRecordExporter = InMemoryLogRecordExporter.create(); ToDiskExporter toDiskLogExporter = buildToDiskExporter(SignalSerializer.ofLogs(), memoryLogRecordExporter::export); - logToDiskExporter = new LogRecordToDiskExporter(toDiskLogExporter); + LogRecordToDiskExporter logToDiskExporter = new LogRecordToDiskExporter(toDiskLogExporter); logger = createLoggerProvider(logToDiskExporter).get("LogInstrumentationScope"); } @NotNull private ToDiskExporter buildToDiskExporter( - SignalSerializer serializer, Function, CompletableResultCode> exporter) - throws IOException { + SignalSerializer serializer, Function, CompletableResultCode> exporter) { return ToDiskExporter.builder(storage) .setSerializer(serializer) .setExportFunction(exporter) @@ -117,18 +114,12 @@ private ToDiskExporter buildToDiskExporter( } @NotNull - private FromDiskExporterImpl buildFromDiskExporter( + private static FromDiskExporterImpl buildFromDiskExporter( FromDiskExporterBuilder builder, Function, CompletableResultCode> exportFunction, SignalDeserializer deserializer) throws IOException { - return builder - .setExportFunction(exportFunction) - .setFolderName(SignalTypes.spans.name()) - .setStorageConfiguration(storageConfig) - .setDeserializer(deserializer) - .setStorageClock(clock) - .build(); + return builder.setExportFunction(exportFunction).setDeserializer(deserializer).build(); } @Test @@ -137,7 +128,7 @@ void verifySpansIntegration() throws IOException { span.end(); FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(), + FromDiskExporterImpl.builder(storage), memorySpanExporter::export, SignalDeserializer.ofSpans()); assertExporter(fromDiskExporter, () -> memorySpanExporter.getFinishedSpanItems().size()); @@ -150,7 +141,7 @@ void verifyMetricsIntegration() throws IOException { FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(), + FromDiskExporterImpl.builder(storage), memoryMetricExporter::export, SignalDeserializer.ofMetrics()); assertExporter(fromDiskExporter, () -> memoryMetricExporter.getFinishedMetricItems().size()); @@ -162,7 +153,7 @@ void verifyLogRecordsIntegration() throws IOException { FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(), + FromDiskExporterImpl.builder(storage), memoryLogRecordExporter::export, SignalDeserializer.ofLogs()); assertExporter( diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java index 05eef5a03..dd039d87e 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java @@ -6,7 +6,6 @@ package io.opentelemetry.contrib.disk.buffering; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -51,19 +50,26 @@ void fromDisk() throws Exception { StorageConfiguration config = StorageConfiguration.builder() .setRootDir(tempDir) + .setDebugEnabled(true) .setMaxFileAgeForWriteMillis(TimeUnit.HOURS.toMillis(24)) .setMinFileAgeForReadMillis(0) .setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(24)) .setTemporaryFileProvider(DefaultTemporaryFileProvider.getInstance()) .build(); - List spans = writeSomeSpans(config); + Storage storage = + Storage.builder() + .setStorageConfiguration(config) + .setFolderName(SignalTypes.spans.name()) + .build(); + + List spans = writeSomeSpans(storage); SpanExporter exporter = mock(); ArgumentCaptor> capture = ArgumentCaptor.forClass(Collection.class); when(exporter.export(capture.capture())).thenReturn(CompletableResultCode.ofSuccess()); - SpanFromDiskExporter testClass = SpanFromDiskExporter.create(exporter, config); + SpanFromDiskExporter testClass = SpanFromDiskExporter.create(exporter, storage); boolean result = testClass.exportStoredBatch(30, TimeUnit.SECONDS); assertThat(result).isTrue(); List exportedSpans = (List) capture.getValue(); @@ -79,23 +85,13 @@ void fromDisk() throws Exception { verify(exporter).export(eq(Arrays.asList(expected1, expected2))); } - private static List writeSomeSpans(StorageConfiguration config) throws Exception { + private static List writeSomeSpans(Storage storage) throws Exception { long now = System.currentTimeMillis() * 1_000_000; SpanData span1 = makeSpan1(TraceFlags.getDefault(), now); SpanData span2 = makeSpan2(TraceFlags.getSampled(), now); List spans = Arrays.asList(span1, span2); - SignalSerializer serializer = SignalSerializer.ofSpans(); - File subdir = new File(config.getRootDir(), SignalTypes.spans.name()); - assertTrue(subdir.mkdir()); - - Storage storage = - Storage.builder() - .setStorageConfiguration(config) - .setFolderName(SignalTypes.spans.name()) - .build(); - storage.write(serializer.serialize(spans)); - storage.close(); + storage.write(SignalSerializer.ofSpans().serialize(spans)); return spans; } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java deleted file mode 100644 index fbecada27..000000000 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering.internal.exporter; - -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.io.File; -import org.junit.jupiter.api.Test; - -class ToDiskExporterBuilderTest { - - @SuppressWarnings("deprecation") // todo - @Test - void whenMinFileReadIsNotGraterThanMaxFileWrite_throwException() { - StorageConfiguration invalidConfig = - StorageConfiguration.builder() - .setMaxFileAgeForWriteMillis(2) - .setMinFileAgeForReadMillis(1) - .setRootDir(new File(".")) - .build(); - - assertThatThrownBy( - () -> ToDiskExporter.builder().setStorageConfiguration(invalidConfig)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage( - "The configured max file age for writing must be lower than the configured min file age for reading"); - } -} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java index 865aa6298..f7b6e3ff6 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java @@ -49,7 +49,7 @@ void setup() { exportedFnSeen = x; return exportFnResultToReturn.get(); }; - toDiskExporter = new ToDiskExporter<>(serializer, exportFn, storage, true); + toDiskExporter = new ToDiskExporter<>(serializer, exportFn, storage); when(serializer.serialize(records)).thenReturn(serialized); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index d59c3464f..e9da16e5e 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -6,6 +6,7 @@ package io.opentelemetry.contrib.disk.buffering.internal.storage; import static io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult.TRY_LATER; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; @@ -15,11 +16,13 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; +import java.io.File; import java.io.IOException; import java.util.function.Function; import org.junit.jupiter.api.BeforeEach; @@ -222,6 +225,21 @@ void whenClosing_closeWriterAndReaderIfNotNull() throws IOException { verify(readableFile).close(); } + @Test + void whenMinFileReadIsNotGraterThanMaxFileWrite_throwException() { + StorageConfiguration invalidConfig = + StorageConfiguration.builder() + .setMaxFileAgeForWriteMillis(2) + .setMinFileAgeForReadMillis(1) + .setRootDir(new File(".")) + .build(); + + assertThatThrownBy(() -> Storage.builder().setStorageConfiguration(invalidConfig)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "The configured max file age for writing must be lower than the configured min file age for reading"); + } + private static WritableFile createWritableFile() throws IOException { WritableFile mock = mock(); when(mock.append(any())).thenReturn(WritableResult.SUCCEEDED); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java index a9a2003ae..5cfa21e24 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java @@ -8,7 +8,9 @@ import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.config.TemporaryFileProvider; import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider; +import io.opentelemetry.sdk.common.Clock; import java.io.File; +import java.io.IOException; public final class TestData { @@ -23,6 +25,16 @@ public static StorageConfiguration getDefaultConfiguration(File rootDir) { return getConfiguration(fileProvider, rootDir); } + public static Storage getDefaultStorage(File rootDir, String storageFolderName, Clock clock) + throws IOException { + TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance(); + return Storage.builder() + .setFolderName(storageFolderName) + .setStorageConfiguration(getConfiguration(fileProvider, rootDir)) + .setStorageClock(clock) + .build(); + } + public static StorageConfiguration getConfiguration( TemporaryFileProvider fileProvider, File rootDir) { return StorageConfiguration.builder() From 203b0080f7f0c5e2e609f375e13e280a7b199802 Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 27 May 2025 11:38:00 +0200 Subject: [PATCH 05/11] pass storage --- .../buffering/internal/storage/Storage.java | 8 ++++++ .../internal/storage/StorageBuilder.java | 6 +---- .../internal/storage/files/WritableFile.java | 4 +++ .../buffering/SpanFromDiskExporterTest.java | 25 +++++++------------ 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index 78024cb9a..92205d845 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -77,6 +77,14 @@ private boolean write(byte[] item, int attemptNumber) throws IOException { return true; } + public void flush() throws IOException { + if (writableFile != null) { + writableFile.flush(); + } else { + logger.log("No writable file to flush."); + } + } + /** * Attempts to read an item from a ready-to-read file. * diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java index 62660b074..a4b1366dd 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java @@ -60,11 +60,7 @@ private static File ensureSubdir(File rootDir, String child) throws IOException } private static void validateConfiguration(StorageConfiguration configuration) { - // ignore the check if debug is enabled - because it's needed for a test case - // todo remove this when the test case is fixed - if (!configuration.isDebugEnabled() - && configuration.getMinFileAgeForReadMillis() - <= configuration.getMaxFileAgeForWriteMillis()) { + if (configuration.getMinFileAgeForReadMillis() <= configuration.getMaxFileAgeForWriteMillis()) { throw new IllegalArgumentException( "The configured max file age for writing must be lower than the configured min file age for reading"); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java index 519e9da66..6bf082ca5 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java @@ -94,4 +94,8 @@ public synchronized void close() throws IOException { public String toString() { return "WritableFile{" + "file=" + file + '}'; } + + public void flush() throws IOException { + out.flush(); + } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java index dd039d87e..bfb057d6b 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java @@ -18,13 +18,12 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceState; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; -import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.models.SpanDataImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; +import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.data.SpanData; @@ -47,24 +46,17 @@ class SpanFromDiskExporterTest { @SuppressWarnings("unchecked") @Test void fromDisk() throws Exception { - StorageConfiguration config = - StorageConfiguration.builder() - .setRootDir(tempDir) - .setDebugEnabled(true) - .setMaxFileAgeForWriteMillis(TimeUnit.HOURS.toMillis(24)) - .setMinFileAgeForReadMillis(0) - .setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(24)) - .setTemporaryFileProvider(DefaultTemporaryFileProvider.getInstance()) - .build(); - + Clock clock = mock(Clock.class); + long start = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); + when(clock.now()).thenReturn(start); Storage storage = - Storage.builder() - .setStorageConfiguration(config) - .setFolderName(SignalTypes.spans.name()) - .build(); + io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.getDefaultStorage( + tempDir, SignalTypes.spans.name(), clock); List spans = writeSomeSpans(storage); + when(clock.now()).thenReturn(start + TimeUnit.MILLISECONDS.toNanos(2000)); + SpanExporter exporter = mock(); ArgumentCaptor> capture = ArgumentCaptor.forClass(Collection.class); when(exporter.export(capture.capture())).thenReturn(CompletableResultCode.ofSuccess()); @@ -92,6 +84,7 @@ private static List writeSomeSpans(Storage storage) throws Exception { List spans = Arrays.asList(span1, span2); storage.write(SignalSerializer.ofSpans().serialize(spans)); + storage.flush(); return spans; } From 5b7993f965a4472b48af450344f2044b9b0033e2 Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 27 May 2025 11:41:30 +0200 Subject: [PATCH 06/11] pass storage --- .../disk/buffering/SpanFromDiskExporterTest.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java index bfb057d6b..5a579387b 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java @@ -18,6 +18,7 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.models.SpanDataImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; @@ -50,12 +51,15 @@ void fromDisk() throws Exception { long start = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); when(clock.now()).thenReturn(start); Storage storage = - io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.getDefaultStorage( - tempDir, SignalTypes.spans.name(), clock); + Storage.builder() + .setFolderName(SignalTypes.spans.name()) + .setStorageConfiguration(StorageConfiguration.builder().setRootDir(tempDir).build()) + .setStorageClock(clock) + .build(); List spans = writeSomeSpans(storage); - when(clock.now()).thenReturn(start + TimeUnit.MILLISECONDS.toNanos(2000)); + when(clock.now()).thenReturn(start + TimeUnit.SECONDS.toNanos(60)); SpanExporter exporter = mock(); ArgumentCaptor> capture = ArgumentCaptor.forClass(Collection.class); From 638fdd1577bf2ebd812bba54aab7bd1045f13e5c Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 27 May 2025 11:42:32 +0200 Subject: [PATCH 07/11] remove deprecated methods --- .../logs/models/LogRecordDataImpl.java | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java index 9ff0f9410..2cceb627c 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java @@ -6,7 +6,6 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.models; import com.google.auto.value.AutoValue; -import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Value; import io.opentelemetry.api.logs.Severity; @@ -23,14 +22,6 @@ public static Builder builder() { return new AutoValue_LogRecordDataImpl.Builder(); } - @Deprecated - public io.opentelemetry.sdk.logs.data.Body getBody() { - Value valueBody = getBodyValue(); - return valueBody == null - ? io.opentelemetry.sdk.logs.data.Body.empty() - : io.opentelemetry.sdk.logs.data.Body.string(valueBody.asString()); - } - @Override @Nullable public abstract Value getBodyValue(); @@ -51,17 +42,6 @@ public abstract static class Builder { public abstract Builder setSeverityText(String value); - @Deprecated - @CanIgnoreReturnValue - public Builder setBody(io.opentelemetry.sdk.logs.data.Body body) { - if (body.getType() == io.opentelemetry.sdk.logs.data.Body.Type.STRING) { - setBodyValue(Value.of(body.asString())); - } else if (body.getType() == io.opentelemetry.sdk.logs.data.Body.Type.EMPTY) { - setBodyValue(null); - } - return this; - } - public abstract Builder setBodyValue(@Nullable Value value); public abstract Builder setAttributes(Attributes value); From 8e1bf436a8760eef0fc28d4541e266d51daf4236 Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 27 May 2025 11:43:17 +0200 Subject: [PATCH 08/11] cleanup --- .../disk/buffering/internal/storage/files/WritableFile.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java index 6bf082ca5..1e924a806 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java @@ -11,9 +11,9 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import io.opentelemetry.sdk.common.Clock; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.file.Files; import java.util.concurrent.atomic.AtomicBoolean; public final class WritableFile implements FileOperations { @@ -35,7 +35,7 @@ public WritableFile( this.clock = clock; expireTimeMillis = createdTimeMillis + configuration.getMaxFileAgeForWriteMillis(); size = (int) file.length(); - out = new FileOutputStream(file); + out = Files.newOutputStream(file.toPath()); } /** From ed522600e88fbaaab3482a54a66b74a10a7acd55 Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 27 May 2025 11:47:42 +0200 Subject: [PATCH 09/11] remove deprecated methods --- .../mapping/logs/models/LogRecordDataImpl.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java index 2cceb627c..51322b41e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java @@ -22,6 +22,14 @@ public static Builder builder() { return new AutoValue_LogRecordDataImpl.Builder(); } + @Deprecated + public io.opentelemetry.sdk.logs.data.Body getBody() { + Value valueBody = getBodyValue(); + return valueBody == null + ? io.opentelemetry.sdk.logs.data.Body.empty() + : io.opentelemetry.sdk.logs.data.Body.string(valueBody.asString()); + } + @Override @Nullable public abstract Value getBodyValue(); From 29536b4734f231a0a8ff2cedecedc4b4b5d64513 Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 27 May 2025 12:01:45 +0200 Subject: [PATCH 10/11] Revert "cleanup" This reverts commit 8e1bf436a8760eef0fc28d4541e266d51daf4236. --- .../disk/buffering/internal/storage/files/WritableFile.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java index 1e924a806..6bf082ca5 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java @@ -11,9 +11,9 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import io.opentelemetry.sdk.common.Clock; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.nio.file.Files; import java.util.concurrent.atomic.AtomicBoolean; public final class WritableFile implements FileOperations { @@ -35,7 +35,7 @@ public WritableFile( this.clock = clock; expireTimeMillis = createdTimeMillis + configuration.getMaxFileAgeForWriteMillis(); size = (int) file.length(); - out = Files.newOutputStream(file.toPath()); + out = new FileOutputStream(file); } /** From eede4a7f48cbd8086d3c8a382f02096010b6dbd6 Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 27 May 2025 13:44:05 +0200 Subject: [PATCH 11/11] each storage is for a single signal type only --- .../disk/buffering/internal/storage/Storage.java | 5 +++-- .../internal/storage/StorageBuilder.java | 11 ++++------- .../disk/buffering/FromDiskExporterImplTest.java | 5 +++-- .../contrib/disk/buffering/IntegrationTest.java | 15 +++++++-------- .../disk/buffering/SpanFromDiskExporterTest.java | 3 +-- .../buffering/internal/storage/StorageTest.java | 4 +++- .../disk/buffering/internal/storage/TestData.java | 6 +++--- 7 files changed, 24 insertions(+), 25 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index 92205d845..73a263490 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -14,6 +14,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,8 +38,8 @@ public Storage(FolderManager folderManager, boolean debugEnabled) { this.debugEnabled = debugEnabled; } - public static StorageBuilder builder() { - return new StorageBuilder(); + public static StorageBuilder builder(SignalTypes types) { + return new StorageBuilder(types); } public boolean isDebugEnabled() { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java index a4b1366dd..d43bc18b2 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java @@ -7,6 +7,7 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.Clock; import java.io.File; import java.io.IOException; @@ -17,16 +18,12 @@ public class StorageBuilder { private static final Logger logger = Logger.getLogger(StorageBuilder.class.getName()); - private String folderName = "data"; + private final String folderName; private StorageConfiguration configuration = StorageConfiguration.getDefault(new File(".")); private Clock clock = Clock.getDefault(); - StorageBuilder() {} - - @CanIgnoreReturnValue - public StorageBuilder setFolderName(String folderName) { - this.folderName = folderName; - return this; + StorageBuilder(SignalTypes types) { + folderName = types.name(); } @CanIgnoreReturnValue diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java index 3f4587a0b..65c81b842 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java @@ -18,6 +18,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; @@ -40,7 +41,7 @@ class FromDiskExporterImplTest { private FromDiskExporterImpl exporter; private final List deserializedData = Collections.emptyList(); @TempDir File rootDir; - private static final String STORAGE_FOLDER_NAME = "testName"; + private static final String STORAGE_FOLDER_NAME = SignalTypes.spans.name(); @BeforeEach void setUp() throws IOException { @@ -50,7 +51,7 @@ void setUp() throws IOException { wrapped = mock(); exporter = FromDiskExporterImpl.builder( - TestData.getDefaultStorage(rootDir, STORAGE_FOLDER_NAME, clock)) + TestData.getDefaultStorage(rootDir, SignalTypes.spans, clock)) .setDeserializer(deserializer) .setExportFunction(wrapped::export) .build(); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index 631aa04fa..b2da05c24 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -65,15 +65,14 @@ public class IntegrationTest { private static final long INITIAL_TIME_IN_MILLIS = 1000; private static final long NOW_NANOS = MILLISECONDS.toNanos(INITIAL_TIME_IN_MILLIS); private StorageConfiguration storageConfig; - private Storage storage; + private Storage spanStorage; @BeforeEach void setUp() throws IOException { clock = mock(); storageConfig = StorageConfiguration.getDefault(rootDir); - storage = - Storage.builder() - .setFolderName(SignalTypes.spans.name()) + spanStorage = + Storage.builder(SignalTypes.spans) .setStorageConfiguration(storageConfig) .setStorageClock(clock) .build(); @@ -107,7 +106,7 @@ void setUp() throws IOException { @NotNull private ToDiskExporter buildToDiskExporter( SignalSerializer serializer, Function, CompletableResultCode> exporter) { - return ToDiskExporter.builder(storage) + return ToDiskExporter.builder(spanStorage) .setSerializer(serializer) .setExportFunction(exporter) .build(); @@ -128,7 +127,7 @@ void verifySpansIntegration() throws IOException { span.end(); FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(storage), + FromDiskExporterImpl.builder(spanStorage), memorySpanExporter::export, SignalDeserializer.ofSpans()); assertExporter(fromDiskExporter, () -> memorySpanExporter.getFinishedSpanItems().size()); @@ -141,7 +140,7 @@ void verifyMetricsIntegration() throws IOException { FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(storage), + FromDiskExporterImpl.builder(spanStorage), memoryMetricExporter::export, SignalDeserializer.ofMetrics()); assertExporter(fromDiskExporter, () -> memoryMetricExporter.getFinishedMetricItems().size()); @@ -153,7 +152,7 @@ void verifyLogRecordsIntegration() throws IOException { FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(storage), + FromDiskExporterImpl.builder(spanStorage), memoryLogRecordExporter::export, SignalDeserializer.ofLogs()); assertExporter( diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java index 5a579387b..ae503ecdf 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java @@ -51,8 +51,7 @@ void fromDisk() throws Exception { long start = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); when(clock.now()).thenReturn(start); Storage storage = - Storage.builder() - .setFolderName(SignalTypes.spans.name()) + Storage.builder(SignalTypes.spans) .setStorageConfiguration(StorageConfiguration.builder().setRootDir(tempDir).build()) .setStorageClock(clock) .build(); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index e9da16e5e..9accaefff 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -22,6 +22,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import java.io.File; import java.io.IOException; import java.util.function.Function; @@ -234,7 +235,8 @@ void whenMinFileReadIsNotGraterThanMaxFileWrite_throwException() { .setRootDir(new File(".")) .build(); - assertThatThrownBy(() -> Storage.builder().setStorageConfiguration(invalidConfig)) + assertThatThrownBy( + () -> Storage.builder(SignalTypes.logs).setStorageConfiguration(invalidConfig)) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "The configured max file age for writing must be lower than the configured min file age for reading"); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java index 5cfa21e24..8e66dde04 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java @@ -8,6 +8,7 @@ import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.config.TemporaryFileProvider; import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.Clock; import java.io.File; import java.io.IOException; @@ -25,11 +26,10 @@ public static StorageConfiguration getDefaultConfiguration(File rootDir) { return getConfiguration(fileProvider, rootDir); } - public static Storage getDefaultStorage(File rootDir, String storageFolderName, Clock clock) + public static Storage getDefaultStorage(File rootDir, SignalTypes types, Clock clock) throws IOException { TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance(); - return Storage.builder() - .setFolderName(storageFolderName) + return Storage.builder(types) .setStorageConfiguration(getConfiguration(fileProvider, rootDir)) .setStorageClock(clock) .build();