diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java index d7940cab5..ebd1417c0 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java @@ -8,6 +8,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; import java.io.IOException; @@ -21,10 +22,11 @@ public static LogRecordFromDiskExporter create( LogRecordExporter exporter, StorageConfiguration config) throws IOException { FromDiskExporterImpl delegate = FromDiskExporterImpl.builder() - .setFolderName("logs") + .setFolderName(SignalTypes.logs.name()) .setStorageConfiguration(config) .setDeserializer(SignalDeserializer.ofLogs()) .setExportFunction(exporter::export) + .setDebugEnabled(config.isDebugEnabled()) .build(); return new LogRecordFromDiskExporter(delegate); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java index 3f5b894b4..64ee4ed20 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java @@ -7,6 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; @@ -32,7 +33,7 @@ public static LogRecordToDiskExporter create( LogRecordExporter delegate, StorageConfiguration config) throws IOException { ToDiskExporter toDisk = ToDiskExporter.builder() - .setFolderName("logs") + .setFolderName(SignalTypes.logs.name()) .setStorageConfiguration(config) .setSerializer(SignalSerializer.ofLogs()) .setExportFunction(delegate::export) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java index bc44c2d18..2f70f27bd 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java @@ -8,6 +8,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.io.IOException; @@ -21,10 +22,11 @@ public static MetricFromDiskExporter create(MetricExporter exporter, StorageConf throws IOException { FromDiskExporterImpl delegate = FromDiskExporterImpl.builder() - .setFolderName("metrics") + .setFolderName(SignalTypes.metrics.name()) .setStorageConfiguration(config) .setDeserializer(SignalDeserializer.ofMetrics()) .setExportFunction(exporter::export) + .setDebugEnabled(config.isDebugEnabled()) .build(); return new MetricFromDiskExporter(delegate); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java index d950b035c..e5a7d7bee 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java @@ -7,6 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; @@ -42,7 +43,7 @@ public static MetricToDiskExporter create( throws IOException { ToDiskExporter toDisk = ToDiskExporter.builder() - .setFolderName("metrics") + .setFolderName(SignalTypes.metrics.name()) .setStorageConfiguration(config) .setSerializer(SignalSerializer.ofMetrics()) .setExportFunction(delegate::export) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java index a15d4fe31..1a24a4316 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java @@ -8,6 +8,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.io.IOException; @@ -21,10 +22,11 @@ public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfigur throws IOException { FromDiskExporterImpl delegate = FromDiskExporterImpl.builder() - .setFolderName("spans") + .setFolderName(SignalTypes.spans.name()) .setStorageConfiguration(config) .setDeserializer(SignalDeserializer.ofSpans()) .setExportFunction(exporter::export) + .setDebugEnabled(config.isDebugEnabled()) .build(); return new SpanFromDiskExporter(delegate); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java index 0d4fac3f1..8310b0cf7 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java @@ -7,6 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; @@ -33,7 +34,7 @@ public static SpanToDiskExporter create(SpanExporter delegate, StorageConfigurat throws IOException { ToDiskExporter toDisk = ToDiskExporter.builder() - .setFolderName("spans") + .setFolderName(SignalTypes.spans.name()) .setStorageConfiguration(config) .setSerializer(SignalSerializer.ofSpans()) .setExportFunction(delegate::export) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/StorageConfiguration.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/StorageConfiguration.java index d47797f9e..efe59fcba 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/StorageConfiguration.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/StorageConfiguration.java @@ -18,6 +18,9 @@ public abstract class StorageConfiguration { /** The root storage location for buffered telemetry. */ public abstract File getRootDir(); + /** Returns true if the storage has been configured with debug verbosity enabled. */ + public abstract boolean isDebugEnabled(); + /** The max amount of time a file can receive new data. */ public abstract long getMaxFileAgeForWriteMillis(); @@ -62,6 +65,7 @@ public static Builder builder() { .setMaxFileAgeForWriteMillis(TimeUnit.SECONDS.toMillis(30)) .setMinFileAgeForReadMillis(TimeUnit.SECONDS.toMillis(33)) .setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(18)) + .setDebugEnabled(false) .setTemporaryFileProvider(fileProvider); } @@ -81,6 +85,8 @@ public abstract static class Builder { public abstract Builder setRootDir(File rootDir); + public abstract Builder setDebugEnabled(boolean debugEnabled); + public abstract StorageConfiguration build(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java index 4008bd1f9..9479ad3a0 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java @@ -25,6 +25,8 @@ public class FromDiskExporterBuilder { private Function, CompletableResultCode> exportFunction = x -> CompletableResultCode.ofFailure(); + private boolean debugEnabled = false; + @NotNull private static SignalDeserializer noopDeserializer() { return x -> emptyList(); @@ -63,8 +65,19 @@ public FromDiskExporterBuilder setExportFunction( return this; } + @CanIgnoreReturnValue + public FromDiskExporterBuilder enableDebug() { + return setDebugEnabled(true); + } + + @CanIgnoreReturnValue + public FromDiskExporterBuilder setDebugEnabled(boolean debugEnabled) { + this.debugEnabled = debugEnabled; + return this; + } + public FromDiskExporterImpl build() throws IOException { Storage storage = storageBuilder.build(); - return new FromDiskExporterImpl<>(serializer, exportFunction, storage); + return new FromDiskExporterImpl<>(serializer, exportFunction, storage, debugEnabled); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java index 968ce1940..873ce20c0 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java @@ -8,12 +8,13 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; +import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import java.util.logging.Level; import java.util.logging.Logger; /** @@ -21,18 +22,21 @@ * another delegated exporter. */ public final class FromDiskExporterImpl implements FromDiskExporter { + private final DebugLogger logger; private final Storage storage; private final SignalDeserializer deserializer; private final Function, CompletableResultCode> exportFunction; - private static final Logger logger = Logger.getLogger(FromDiskExporterImpl.class.getName()); FromDiskExporterImpl( SignalDeserializer deserializer, Function, CompletableResultCode> exportFunction, - Storage storage) { + Storage storage, + boolean debugEnabled) { this.deserializer = deserializer; this.exportFunction = exportFunction; this.storage = storage; + this.logger = + DebugLogger.wrap(Logger.getLogger(FromDiskExporterImpl.class.getName()), debugEnabled); } public static FromDiskExporterBuilder builder() { @@ -44,19 +48,26 @@ public static FromDiskExporterBuilder builder() { * * @param timeout The amount of time to wait for the wrapped exporter to finish. * @param unit The unit of the time provided. - * @return true if there was data available and it was successfully exported within the timeout + * @return true if there was data available, and it was successfully exported within the timeout * provided. false otherwise. * @throws IOException If an unexpected error happens. */ @Override public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException { - logger.log(Level.INFO, "Attempting to export batch from disk."); + logger.log("Attempting to export " + deserializer.signalType() + " batch from disk."); ReadableResult result = storage.readAndProcess( bytes -> { - logger.log(Level.INFO, "About to export stored batch."); - CompletableResultCode join = - exportFunction.apply(deserializer.deserialize(bytes)).join(timeout, unit); + logger.log( + "Read " + + bytes.length + + " " + + deserializer.signalType() + + " bytes from storage."); + List telemetry = deserializer.deserialize(bytes); + logger.log( + "Now exporting batch of " + telemetry.size() + " " + deserializer.signalType()); + CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit); return join.isSuccess(); }); return result == ReadableResult.SUCCEEDED; diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java index ca2c60f67..1a43cb5eb 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java @@ -7,6 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.IOException; import java.util.Collection; @@ -16,7 +17,7 @@ public class ToDiskExporter { - private static final Logger logger = Logger.getLogger(ToDiskExporter.class.getName()); + private final DebugLogger logger; private final Storage storage; private final SignalSerializer serializer; private final Function, CompletableResultCode> exportFunction; @@ -24,10 +25,12 @@ public class ToDiskExporter { ToDiskExporter( SignalSerializer serializer, Function, CompletableResultCode> exportFunction, - Storage storage) { + Storage storage, + boolean debugEnabled) { this.serializer = serializer; this.exportFunction = exportFunction; this.storage = storage; + this.logger = DebugLogger.wrap(Logger.getLogger(ToDiskExporter.class.getName()), debugEnabled); } public static ToDiskExporterBuilder builder() { @@ -35,17 +38,17 @@ public static ToDiskExporterBuilder builder() { } public CompletableResultCode export(Collection data) { - logger.log(Level.FINER, "Intercepting exporter batch."); + logger.log("Intercepting exporter batch.", Level.FINER); try { if (storage.write(serializer.serialize(data))) { return CompletableResultCode.ofSuccess(); } - logger.log(Level.INFO, "Could not store batch in disk. Exporting it right away."); + logger.log("Could not store batch in disk. Exporting it right away."); return exportFunction.apply(data); } catch (IOException e) { logger.log( - Level.WARNING, "An unexpected error happened while attempting to write the data in disk. Exporting it right away.", + Level.WARNING, e); return exportFunction.apply(data); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java index 158eae60b..5baac7709 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java @@ -24,9 +24,21 @@ public final class ToDiskExporterBuilder { private Function, CompletableResultCode> exportFunction = x -> CompletableResultCode.ofFailure(); + private boolean debugEnabled = false; ToDiskExporterBuilder() {} + @CanIgnoreReturnValue + public ToDiskExporterBuilder enableDebug() { + return setDebugEnabled(true); + } + + @CanIgnoreReturnValue + public ToDiskExporterBuilder setDebugEnabled(boolean debugEnabled) { + this.debugEnabled = debugEnabled; + return this; + } + @CanIgnoreReturnValue public ToDiskExporterBuilder setFolderName(String folderName) { storageBuilder.setFolderName(folderName); @@ -61,7 +73,7 @@ public ToDiskExporterBuilder setExportFunction( public ToDiskExporter build() throws IOException { Storage storage = storageBuilder.build(); - return new ToDiskExporter<>(serializer, exportFunction, storage); + return new ToDiskExporter<>(serializer, exportFunction, storage, debugEnabled); } private static void validateConfiguration(StorageConfiguration configuration) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java index 4e39e3308..22e9d12c3 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java @@ -6,6 +6,7 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.proto.logs.v1.LogsData; import io.opentelemetry.sdk.logs.data.LogRecordData; import java.io.IOException; @@ -28,4 +29,9 @@ public List deserialize(byte[] source) { throw new IllegalArgumentException(e); } } + + @Override + public String signalType() { + return SignalTypes.logs.name(); + } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java index 19d686e68..16c672cbc 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java @@ -6,6 +6,7 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.proto.metrics.v1.MetricsData; import io.opentelemetry.sdk.metrics.data.MetricData; import java.io.IOException; @@ -28,4 +29,9 @@ public List deserialize(byte[] source) { throw new IllegalArgumentException(e); } } + + @Override + public String signalType() { + return SignalTypes.metrics.name(); + } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java index 758eff4b0..435c14088 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java @@ -24,5 +24,11 @@ static SignalDeserializer ofLogs() { return LogRecordDataDeserializer.getInstance(); } + /** Deserializes the given byte array into a list of telemetry items. */ List deserialize(byte[] source); + + /** Returns the name of the stored type of signal -- one of "metrics", "spans", or "logs". */ + default String signalType() { + return "unknown"; + } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java index 5c5dba730..03737f8a7 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java @@ -6,6 +6,7 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.proto.trace.v1.TracesData; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; @@ -28,4 +29,9 @@ public List deserialize(byte[] source) { throw new IllegalArgumentException(e); } } + + @Override + public String signalType() { + return SignalTypes.spans.name(); + } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java index 23bf52b92..27fc5b094 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.Objects; import javax.annotation.Nullable; +import org.jetbrains.annotations.NotNull; public final class FolderManager { private final File folder; @@ -42,6 +43,7 @@ public synchronized ReadableFile getReadableFile() throws IOException { return null; } + @NotNull public synchronized WritableFile createWritableFile() throws IOException { long systemCurrentTimeMillis = nowMillis(clock); File[] existingFiles = folder.listFiles(); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index 384dc720e..f0d074f4e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -5,25 +5,34 @@ package io.opentelemetry.contrib.disk.buffering.internal.storage; +import static java.util.logging.Level.WARNING; + +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; +import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.logging.Logger; import javax.annotation.Nullable; public final class Storage implements Closeable { private static final int MAX_ATTEMPTS = 3; + private final DebugLogger logger; + private final FolderManager folderManager; private final AtomicBoolean isClosed = new AtomicBoolean(false); @Nullable private WritableFile writableFile; @Nullable private ReadableFile readableFile; - public Storage(FolderManager folderManager) { + public Storage(FolderManager folderManager, boolean debugEnabled) { this.folderManager = folderManager; + this.logger = + DebugLogger.wrap(Logger.getLogger(FromDiskExporterImpl.class.getName()), debugEnabled); } public static StorageBuilder builder() { @@ -42,13 +51,16 @@ public boolean write(byte[] item) throws IOException { private boolean write(byte[] item, int attemptNumber) throws IOException { if (isClosed.get()) { + logger.log("Refusing to write to storage after being closed."); return false; } if (attemptNumber > MAX_ATTEMPTS) { + logger.log("Max number of attempts to write buffered data exceeded.", WARNING); return false; } if (writableFile == null) { writableFile = folderManager.createWritableFile(); + logger.log("Created new writableFile: " + writableFile); } WritableResult result = writableFile.append(item); if (result != WritableResult.SUCCEEDED) { @@ -72,17 +84,22 @@ public ReadableResult readAndProcess(Function processing) throw private ReadableResult readAndProcess(Function processing, int attemptNumber) throws IOException { if (isClosed.get()) { + logger.log("Refusing to read from storage after being closed."); return ReadableResult.FAILED; } if (attemptNumber > MAX_ATTEMPTS) { + logger.log("Maximum number of attempts to read and process buffered data exceeded.", WARNING); return ReadableResult.FAILED; } if (readableFile == null) { + logger.log("Obtaining a new readableFile from the folderManager."); readableFile = folderManager.getReadableFile(); if (readableFile == null) { + logger.log("Unable to get or create readable file."); return ReadableResult.FAILED; } } + logger.log("Attempting to read data from " + readableFile); ReadableResult result = readableFile.readAndProcess(processing); switch (result) { case SUCCEEDED: @@ -97,6 +114,7 @@ private ReadableResult readAndProcess(Function processing, int @Override public void close() throws IOException { + logger.log("Closing disk buffering storage."); if (isClosed.compareAndSet(false, true)) { if (writableFile != null) { writableFile.close(); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java index dd86d67c2..9544eec52 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java @@ -10,9 +10,13 @@ import io.opentelemetry.sdk.common.Clock; import java.io.File; import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; public class StorageBuilder { + private static final Logger logger = Logger.getLogger(StorageBuilder.class.getName()); + private String folderName = "data"; private StorageConfiguration configuration = StorageConfiguration.getDefault(new File(".")); private Clock clock = Clock.getDefault(); @@ -40,7 +44,10 @@ public StorageBuilder setStorageClock(Clock clock) { public Storage build() throws IOException { File folder = ensureSubdir(configuration.getRootDir(), folderName); FolderManager folderManager = new FolderManager(folder, configuration, clock); - return new Storage(folderManager); + if (configuration.isDebugEnabled()) { + logger.log(Level.INFO, "Building storage with configuration => " + configuration); + } + return new Storage(folderManager, configuration.isDebugEnabled()); } private static File ensureSubdir(File rootDir, String child) throws IOException { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java index 876f7377b..e88cb0955 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import javax.annotation.Nullable; +import org.jetbrains.annotations.NotNull; /** * Reads from a file and updates it in parallel in order to avoid re-reading the same items later. @@ -34,7 +35,7 @@ *

More information on the overall storage process in the CONTRIBUTING.md file. */ public final class ReadableFile implements FileOperations { - private final File file; + @NotNull private final File file; private final int originalFileSize; private final StreamReader reader; private final FileTransferUtil fileTransferUtil; @@ -140,6 +141,7 @@ public synchronized boolean isClosed() { return isClosed.get(); } + @NotNull @Override public File getFile() { return file; @@ -170,4 +172,9 @@ private static void copyFile(File from, File to) throws IOException { } } } + + @Override + public String toString() { + return "ReadableFile{" + "file=" + file + '}'; + } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java index ad63d1d71..06f7477ef 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java @@ -89,4 +89,9 @@ public synchronized void close() throws IOException { out.close(); } } + + @Override + public String toString() { + return "WritableFile{" + "file=" + file + '}'; + } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/DebugLogger.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/DebugLogger.java new file mode 100644 index 000000000..46ff72ebf --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/DebugLogger.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.utils; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class DebugLogger { + private final Logger logger; + private final boolean debugEnabled; + + private DebugLogger(Logger logger, boolean debugEnabled) { + this.logger = logger; + this.debugEnabled = debugEnabled; + } + + public static DebugLogger wrap(Logger logger, boolean debugEnabled) { + return new DebugLogger(logger, debugEnabled); + } + + public void log(String msg) { + log(msg, Level.INFO); + } + + public void log(String msg, Level level) { + if (debugEnabled) { + logger.log(level, msg); + } + } + + public void log(String msg, Level level, Throwable e) { + if (debugEnabled) { + logger.log(level, msg, e); + } + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/SignalTypes.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/SignalTypes.java new file mode 100644 index 000000000..c0a7f5765 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/SignalTypes.java @@ -0,0 +1,12 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.utils; + +public enum SignalTypes { + metrics, + spans, + logs +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index 7c33a9938..61a6f90d2 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -21,6 +21,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.SdkLoggerProvider; @@ -103,7 +104,7 @@ private ToDiskExporter buildToDiskExporter( SignalSerializer serializer, Function, CompletableResultCode> exporter) throws IOException { return ToDiskExporter.builder() - .setFolderName("spans") + .setFolderName(SignalTypes.spans.name()) .setStorageConfiguration(storageConfig) .setSerializer(serializer) .setExportFunction(exporter) @@ -119,7 +120,7 @@ private FromDiskExporterImpl buildFromDiskExporter( throws IOException { return builder .setExportFunction(exportFunction) - .setFolderName("spans") + .setFolderName(SignalTypes.spans.name()) .setStorageConfiguration(storageConfig) .setDeserializer(deserializer) .setStorageClock(clock) diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java index 87e796896..578cc3e8e 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java @@ -23,6 +23,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.models.SpanDataImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; @@ -84,11 +85,14 @@ private static List writeSomeSpans(StorageConfiguration config) throws List spans = Arrays.asList(span1, span2); SignalSerializer serializer = SignalSerializer.ofSpans(); - File subdir = new File(config.getRootDir(), "spans"); + File subdir = new File(config.getRootDir(), SignalTypes.spans.name()); assertTrue(subdir.mkdir()); Storage storage = - Storage.builder().setStorageConfiguration(config).setFolderName("spans").build(); + Storage.builder() + .setStorageConfiguration(config) + .setFolderName(SignalTypes.spans.name()) + .build(); storage.write(serializer.serialize(spans)); storage.close(); return spans; diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java index f7b6e3ff6..865aa6298 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java @@ -49,7 +49,7 @@ void setup() { exportedFnSeen = x; return exportFnResultToReturn.get(); }; - toDiskExporter = new ToDiskExporter<>(serializer, exportFn, storage); + toDiskExporter = new ToDiskExporter<>(serializer, exportFn, storage, true); when(serializer.serialize(records)).thenReturn(serialized); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index 03dfe9a29..d3ed0667a 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -39,7 +39,7 @@ void setUp() throws IOException { writableFile = createWritableFile(); processing = mock(); when(readableFile.readAndProcess(processing)).thenReturn(ReadableResult.SUCCEEDED); - storage = new Storage(folderManager); + storage = new Storage(folderManager, true); } @Test