Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
Expand All @@ -19,15 +18,12 @@ public class LogRecordFromDiskExporter implements FromDiskExporter {

private final FromDiskExporterImpl<LogRecordData> delegate;

public static LogRecordFromDiskExporter create(
LogRecordExporter exporter, StorageConfiguration config) throws IOException {
public static LogRecordFromDiskExporter create(LogRecordExporter exporter, Storage storage)
throws IOException {
FromDiskExporterImpl<LogRecordData> delegate =
FromDiskExporterImpl.<LogRecordData>builder()
.setFolderName(SignalTypes.logs.name())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a while since I worked on this, so apologies because I didn't notice it earlier, though I just recalled that each signal's set of files needs to be in its own folder, separated from other signal files, and that's why this property was set, and also why we needed a storage object per signal. I know from a first glance it looks strange, as I also was confused for a moment, and I think it's because the current architecture is not straightforward. I'm planning to work on enhancing it soon to make it cleaner, but for now, I'm afraid that if we merge these changes, it can cause issues, especially around serialization.

In theory, we could share the same storage object across a single signal's ToDiskExporter and FromDiskExporter objects, however, those are instantiated independently by the consumer, so to make it work we would have to create some sort of factory that focuses on providing both ToDiskExporter and FromDiskExporter instances for a single signal where we would internally ensure that the same storage object is used. And then we would have to make these create methods package private to prevent users from calling them, so that they don't provide the wrong storage object. Though I'm not sure if that would add much value compared to the current way of using the lib.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I changed the PR so that the storage builder takes the signal type as an argument.
This should make clear that the storage should not be used across signal types.

.setStorageConfiguration(config)
FromDiskExporterImpl.<LogRecordData>builder(storage)
.setDeserializer(SignalDeserializer.ofLogs())
.setExportFunction(exporter::export)
.setDebugEnabled(config.isDebugEnabled())
.build();
return new LogRecordFromDiskExporter(delegate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
Expand All @@ -26,16 +25,12 @@ public class LogRecordToDiskExporter implements LogRecordExporter {
* Creates a new LogRecordToDiskExporter that will buffer LogRecordData telemetry on disk storage.
*
* @param delegate - The LogRecordExporter to delegate to if disk writing fails.
* @param config - The StorageConfiguration that specifies how storage is managed.
* @param storage - The Storage instance that specifies how storage is managed.
* @return A new LogRecordToDiskExporter instance.
* @throws IOException if the delegate ToDiskExporter could not be created.
*/
public static LogRecordToDiskExporter create(
LogRecordExporter delegate, StorageConfiguration config) throws IOException {
public static LogRecordToDiskExporter create(LogRecordExporter delegate, Storage storage) {
ToDiskExporter<LogRecordData> toDisk =
ToDiskExporter.<LogRecordData>builder()
.setFolderName(SignalTypes.logs.name())
.setStorageConfiguration(config)
ToDiskExporter.<LogRecordData>builder(storage)
.setSerializer(SignalSerializer.ofLogs())
.setExportFunction(delegate::export)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
Expand All @@ -19,15 +18,12 @@ public class MetricFromDiskExporter implements FromDiskExporter {

private final FromDiskExporterImpl<MetricData> delegate;

public static MetricFromDiskExporter create(MetricExporter exporter, StorageConfiguration config)
public static MetricFromDiskExporter create(MetricExporter exporter, Storage storage)
throws IOException {
FromDiskExporterImpl<MetricData> delegate =
FromDiskExporterImpl.<MetricData>builder()
.setFolderName(SignalTypes.metrics.name())
.setStorageConfiguration(config)
FromDiskExporterImpl.<MetricData>builder(storage)
.setDeserializer(SignalDeserializer.ofMetrics())
.setExportFunction(exporter::export)
.setDebugEnabled(config.isDebugEnabled())
.build();
return new MetricFromDiskExporter(delegate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
Expand All @@ -31,16 +30,12 @@ public class MetricToDiskExporter implements MetricExporter {
* Creates a new MetricToDiskExporter that will buffer Metric telemetry on disk storage.
*
* @param delegate - The MetricExporter to delegate to if disk writing fails.
* @param config - The StorageConfiguration that specifies how storage is managed.
* @param storage - The Storage instance that specifies how storage is managed.
* @return A new MetricToDiskExporter instance.
* @throws IOException if the delegate ToDiskExporter could not be created.
*/
public static MetricToDiskExporter create(MetricExporter delegate, StorageConfiguration config)
throws IOException {
public static MetricToDiskExporter create(MetricExporter delegate, Storage storage) {
ToDiskExporter<MetricData> toDisk =
ToDiskExporter.<MetricData>builder()
.setFolderName(SignalTypes.metrics.name())
.setStorageConfiguration(config)
ToDiskExporter.<MetricData>builder(storage)
.setSerializer(SignalSerializer.ofMetrics())
.setExportFunction(delegate::export)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
Expand All @@ -19,15 +18,12 @@ public class SpanFromDiskExporter implements FromDiskExporter {

private final FromDiskExporterImpl<SpanData> delegate;

public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfiguration config)
public static SpanFromDiskExporter create(SpanExporter exporter, Storage storage)
throws IOException {
FromDiskExporterImpl<SpanData> delegate =
FromDiskExporterImpl.<SpanData>builder()
.setFolderName(SignalTypes.spans.name())
.setStorageConfiguration(config)
FromDiskExporterImpl.<SpanData>builder(storage)
.setDeserializer(SignalDeserializer.ofSpans())
.setExportFunction(exporter::export)
.setDebugEnabled(config.isDebugEnabled())
.build();
return new SpanFromDiskExporter(delegate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

package io.opentelemetry.contrib.disk.buffering;

import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
Expand All @@ -27,16 +26,12 @@ public class SpanToDiskExporter implements SpanExporter {
* Creates a new SpanToDiskExporter that will buffer Span telemetry on disk storage.
*
* @param delegate - The SpanExporter to delegate to if disk writing fails.
* @param config - The StorageConfiguration that specifies how storage is managed.
* @param storage - The Storage instance that specifies how storage is managed.
* @return A new SpanToDiskExporter instance.
* @throws IOException if the delegate ToDiskExporter could not be created.
*/
public static SpanToDiskExporter create(SpanExporter delegate, StorageConfiguration config)
throws IOException {
public static SpanToDiskExporter create(SpanExporter delegate, Storage storage) {
ToDiskExporter<SpanData> toDisk =
ToDiskExporter.<SpanData>builder()
.setFolderName(SignalTypes.spans.name())
.setStorageConfiguration(config)
ToDiskExporter.<SpanData>builder(storage)
.setSerializer(SignalSerializer.ofSpans())
.setExportFunction(delegate::export)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
import static java.util.Collections.emptyList;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
Expand All @@ -22,36 +19,23 @@
public class FromDiskExporterBuilder<T> {

private SignalDeserializer<T> serializer = noopDeserializer();
private final Storage storage;

private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();

private boolean debugEnabled = false;
public FromDiskExporterBuilder(Storage storage) {
if (storage == null) {
throw new NullPointerException("Storage cannot be null");
}
this.storage = storage;
}

@NotNull
private static <T> SignalDeserializer<T> noopDeserializer() {
return x -> emptyList();
}

private final StorageBuilder storageBuilder = Storage.builder();

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setFolderName(String folderName) {
storageBuilder.setFolderName(folderName);
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setStorageConfiguration(StorageConfiguration configuration) {
storageBuilder.setStorageConfiguration(configuration);
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setStorageClock(Clock clock) {
storageBuilder.setStorageClock(clock);
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setDeserializer(SignalDeserializer<T> serializer) {
this.serializer = serializer;
Expand All @@ -65,19 +49,7 @@ public FromDiskExporterBuilder<T> setExportFunction(
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> enableDebug() {
return setDebugEnabled(true);
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setDebugEnabled(boolean debugEnabled) {
this.debugEnabled = debugEnabled;
return this;
}

public FromDiskExporterImpl<T> build() throws IOException {
Storage storage = storageBuilder.build();
return new FromDiskExporterImpl<>(serializer, exportFunction, storage, debugEnabled);
return new FromDiskExporterImpl<>(serializer, exportFunction, storage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ public final class FromDiskExporterImpl<EXPORT_DATA> implements FromDiskExporter
FromDiskExporterImpl(
SignalDeserializer<EXPORT_DATA> deserializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage,
boolean debugEnabled) {
Storage storage) {
this.deserializer = deserializer;
this.exportFunction = exportFunction;
this.storage = storage;
this.logger =
DebugLogger.wrap(Logger.getLogger(FromDiskExporterImpl.class.getName()), debugEnabled);
DebugLogger.wrap(
Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled());
}

public static <T> FromDiskExporterBuilder<T> builder() {
return new FromDiskExporterBuilder<>();
public static <T> FromDiskExporterBuilder<T> builder(Storage storage) {
return new FromDiskExporterBuilder<>(storage);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ public class ToDiskExporter<EXPORT_DATA> {
ToDiskExporter(
SignalSerializer<EXPORT_DATA> serializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage,
boolean debugEnabled) {
Storage storage) {
this.serializer = serializer;
this.exportFunction = exportFunction;
this.storage = storage;
this.logger = DebugLogger.wrap(Logger.getLogger(ToDiskExporter.class.getName()), debugEnabled);
this.logger =
DebugLogger.wrap(
Logger.getLogger(ToDiskExporter.class.getName()), storage.isDebugEnabled());
}

public static <T> ToDiskExporterBuilder<T> builder() {
return new ToDiskExporterBuilder<>();
public static <T> ToDiskExporterBuilder<T> builder(Storage storage) {
return new ToDiskExporterBuilder<>(storage);
}

public CompletableResultCode export(Collection<EXPORT_DATA> data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,26 @@
package io.opentelemetry.contrib.disk.buffering.internal.exporter;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Function;

public final class ToDiskExporterBuilder<T> {

private SignalSerializer<T> serializer = ts -> new byte[0];

private final StorageBuilder storageBuilder = Storage.builder();
private final Storage storage;

private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();
private boolean debugEnabled = false;

ToDiskExporterBuilder() {}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> enableDebug() {
return setDebugEnabled(true);
}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> setDebugEnabled(boolean debugEnabled) {
this.debugEnabled = debugEnabled;
return this;
}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> setFolderName(String folderName) {
storageBuilder.setFolderName(folderName);
return this;
}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> setStorageConfiguration(StorageConfiguration configuration) {
validateConfiguration(configuration);
storageBuilder.setStorageConfiguration(configuration);
return this;
}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> setStorageClock(Clock clock) {
storageBuilder.setStorageClock(clock);
return this;
ToDiskExporterBuilder(Storage storage) {
if (storage == null) {
throw new NullPointerException("Storage cannot be null");
}
this.storage = storage;
}

@CanIgnoreReturnValue
Expand All @@ -71,15 +41,7 @@ public ToDiskExporterBuilder<T> setExportFunction(
return this;
}

public ToDiskExporter<T> build() throws IOException {
Storage storage = storageBuilder.build();
return new ToDiskExporter<>(serializer, exportFunction, storage, debugEnabled);
}

private static void validateConfiguration(StorageConfiguration configuration) {
if (configuration.getMinFileAgeForReadMillis() <= configuration.getMaxFileAgeForWriteMillis()) {
throw new IllegalArgumentException(
"The configured max file age for writing must be lower than the configured min file age for reading");
}
public ToDiskExporter<T> build() {
return new ToDiskExporter<>(serializer, exportFunction, storage);
}
}
Loading
Loading