Skip to content

Commit f597cbe

Browse files
Shared storage (#1912)
Co-authored-by: otelbot <[email protected]>
1 parent fc6507e commit f597cbe

21 files changed

+154
-249
lines changed

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@
55

66
package io.opentelemetry.contrib.disk.buffering;
77

8-
import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
98
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
109
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
1110
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
12-
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
11+
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1312
import io.opentelemetry.sdk.logs.data.LogRecordData;
1413
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
1514
import java.io.IOException;
@@ -19,15 +18,12 @@ public class LogRecordFromDiskExporter implements FromDiskExporter {
1918

2019
private final FromDiskExporterImpl<LogRecordData> delegate;
2120

22-
public static LogRecordFromDiskExporter create(
23-
LogRecordExporter exporter, StorageConfiguration config) throws IOException {
21+
public static LogRecordFromDiskExporter create(LogRecordExporter exporter, Storage storage)
22+
throws IOException {
2423
FromDiskExporterImpl<LogRecordData> delegate =
25-
FromDiskExporterImpl.<LogRecordData>builder()
26-
.setFolderName(SignalTypes.logs.name())
27-
.setStorageConfiguration(config)
24+
FromDiskExporterImpl.<LogRecordData>builder(storage)
2825
.setDeserializer(SignalDeserializer.ofLogs())
2926
.setExportFunction(exporter::export)
30-
.setDebugEnabled(config.isDebugEnabled())
3127
.build();
3228
return new LogRecordFromDiskExporter(delegate);
3329
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@
55

66
package io.opentelemetry.contrib.disk.buffering;
77

8-
import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
98
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
109
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
11-
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
10+
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1211
import io.opentelemetry.sdk.common.CompletableResultCode;
1312
import io.opentelemetry.sdk.logs.data.LogRecordData;
1413
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
@@ -26,16 +25,12 @@ public class LogRecordToDiskExporter implements LogRecordExporter {
2625
* Creates a new LogRecordToDiskExporter that will buffer LogRecordData telemetry on disk storage.
2726
*
2827
* @param delegate - The LogRecordExporter to delegate to if disk writing fails.
29-
* @param config - The StorageConfiguration that specifies how storage is managed.
28+
* @param storage - The Storage instance that specifies how storage is managed.
3029
* @return A new LogRecordToDiskExporter instance.
31-
* @throws IOException if the delegate ToDiskExporter could not be created.
3230
*/
33-
public static LogRecordToDiskExporter create(
34-
LogRecordExporter delegate, StorageConfiguration config) throws IOException {
31+
public static LogRecordToDiskExporter create(LogRecordExporter delegate, Storage storage) {
3532
ToDiskExporter<LogRecordData> toDisk =
36-
ToDiskExporter.<LogRecordData>builder()
37-
.setFolderName(SignalTypes.logs.name())
38-
.setStorageConfiguration(config)
33+
ToDiskExporter.<LogRecordData>builder(storage)
3934
.setSerializer(SignalSerializer.ofLogs())
4035
.setExportFunction(delegate::export)
4136
.build();

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@
55

66
package io.opentelemetry.contrib.disk.buffering;
77

8-
import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
98
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
109
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
1110
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
12-
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
11+
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1312
import io.opentelemetry.sdk.metrics.data.MetricData;
1413
import io.opentelemetry.sdk.metrics.export.MetricExporter;
1514
import java.io.IOException;
@@ -19,15 +18,12 @@ public class MetricFromDiskExporter implements FromDiskExporter {
1918

2019
private final FromDiskExporterImpl<MetricData> delegate;
2120

22-
public static MetricFromDiskExporter create(MetricExporter exporter, StorageConfiguration config)
21+
public static MetricFromDiskExporter create(MetricExporter exporter, Storage storage)
2322
throws IOException {
2423
FromDiskExporterImpl<MetricData> delegate =
25-
FromDiskExporterImpl.<MetricData>builder()
26-
.setFolderName(SignalTypes.metrics.name())
27-
.setStorageConfiguration(config)
24+
FromDiskExporterImpl.<MetricData>builder(storage)
2825
.setDeserializer(SignalDeserializer.ofMetrics())
2926
.setExportFunction(exporter::export)
30-
.setDebugEnabled(config.isDebugEnabled())
3127
.build();
3228
return new MetricFromDiskExporter(delegate);
3329
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@
55

66
package io.opentelemetry.contrib.disk.buffering;
77

8-
import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
98
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
109
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
11-
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
10+
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1211
import io.opentelemetry.sdk.common.CompletableResultCode;
1312
import io.opentelemetry.sdk.metrics.InstrumentType;
1413
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
@@ -31,16 +30,12 @@ public class MetricToDiskExporter implements MetricExporter {
3130
* Creates a new MetricToDiskExporter that will buffer Metric telemetry on disk storage.
3231
*
3332
* @param delegate - The MetricExporter to delegate to if disk writing fails.
34-
* @param config - The StorageConfiguration that specifies how storage is managed.
33+
* @param storage - The Storage instance that specifies how storage is managed.
3534
* @return A new MetricToDiskExporter instance.
36-
* @throws IOException if the delegate ToDiskExporter could not be created.
3735
*/
38-
public static MetricToDiskExporter create(MetricExporter delegate, StorageConfiguration config)
39-
throws IOException {
36+
public static MetricToDiskExporter create(MetricExporter delegate, Storage storage) {
4037
ToDiskExporter<MetricData> toDisk =
41-
ToDiskExporter.<MetricData>builder()
42-
.setFolderName(SignalTypes.metrics.name())
43-
.setStorageConfiguration(config)
38+
ToDiskExporter.<MetricData>builder(storage)
4439
.setSerializer(SignalSerializer.ofMetrics())
4540
.setExportFunction(delegate::export)
4641
.build();

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@
55

66
package io.opentelemetry.contrib.disk.buffering;
77

8-
import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
98
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
109
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
1110
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
12-
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
11+
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1312
import io.opentelemetry.sdk.trace.data.SpanData;
1413
import io.opentelemetry.sdk.trace.export.SpanExporter;
1514
import java.io.IOException;
@@ -19,15 +18,12 @@ public class SpanFromDiskExporter implements FromDiskExporter {
1918

2019
private final FromDiskExporterImpl<SpanData> delegate;
2120

22-
public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfiguration config)
21+
public static SpanFromDiskExporter create(SpanExporter exporter, Storage storage)
2322
throws IOException {
2423
FromDiskExporterImpl<SpanData> delegate =
25-
FromDiskExporterImpl.<SpanData>builder()
26-
.setFolderName(SignalTypes.spans.name())
27-
.setStorageConfiguration(config)
24+
FromDiskExporterImpl.<SpanData>builder(storage)
2825
.setDeserializer(SignalDeserializer.ofSpans())
2926
.setExportFunction(exporter::export)
30-
.setDebugEnabled(config.isDebugEnabled())
3127
.build();
3228
return new SpanFromDiskExporter(delegate);
3329
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@
55

66
package io.opentelemetry.contrib.disk.buffering;
77

8-
import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
98
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
109
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
11-
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
10+
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1211
import io.opentelemetry.sdk.common.CompletableResultCode;
1312
import io.opentelemetry.sdk.trace.data.SpanData;
1413
import io.opentelemetry.sdk.trace.export.SpanExporter;
@@ -27,16 +26,12 @@ public class SpanToDiskExporter implements SpanExporter {
2726
* Creates a new SpanToDiskExporter that will buffer Span telemetry on disk storage.
2827
*
2928
* @param delegate - The SpanExporter to delegate to if disk writing fails.
30-
* @param config - The StorageConfiguration that specifies how storage is managed.
29+
* @param storage - The Storage instance that specifies how storage is managed.
3130
* @return A new SpanToDiskExporter instance.
32-
* @throws IOException if the delegate ToDiskExporter could not be created.
3331
*/
34-
public static SpanToDiskExporter create(SpanExporter delegate, StorageConfiguration config)
35-
throws IOException {
32+
public static SpanToDiskExporter create(SpanExporter delegate, Storage storage) {
3633
ToDiskExporter<SpanData> toDisk =
37-
ToDiskExporter.<SpanData>builder()
38-
.setFolderName(SignalTypes.spans.name())
39-
.setStorageConfiguration(config)
34+
ToDiskExporter.<SpanData>builder(storage)
4035
.setSerializer(SignalSerializer.ofSpans())
4136
.setExportFunction(delegate::export)
4237
.build();

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,8 @@
88
import static java.util.Collections.emptyList;
99

1010
import com.google.errorprone.annotations.CanIgnoreReturnValue;
11-
import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
1211
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
1312
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
14-
import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder;
15-
import io.opentelemetry.sdk.common.Clock;
1613
import io.opentelemetry.sdk.common.CompletableResultCode;
1714
import java.io.IOException;
1815
import java.util.Collection;
@@ -22,36 +19,23 @@
2219
public class FromDiskExporterBuilder<T> {
2320

2421
private SignalDeserializer<T> serializer = noopDeserializer();
22+
private final Storage storage;
23+
2524
private Function<Collection<T>, CompletableResultCode> exportFunction =
2625
x -> CompletableResultCode.ofFailure();
2726

28-
private boolean debugEnabled = false;
27+
public FromDiskExporterBuilder(Storage storage) {
28+
if (storage == null) {
29+
throw new NullPointerException("Storage cannot be null");
30+
}
31+
this.storage = storage;
32+
}
2933

3034
@NotNull
3135
private static <T> SignalDeserializer<T> noopDeserializer() {
3236
return x -> emptyList();
3337
}
3438

35-
private final StorageBuilder storageBuilder = Storage.builder();
36-
37-
@CanIgnoreReturnValue
38-
public FromDiskExporterBuilder<T> setFolderName(String folderName) {
39-
storageBuilder.setFolderName(folderName);
40-
return this;
41-
}
42-
43-
@CanIgnoreReturnValue
44-
public FromDiskExporterBuilder<T> setStorageConfiguration(StorageConfiguration configuration) {
45-
storageBuilder.setStorageConfiguration(configuration);
46-
return this;
47-
}
48-
49-
@CanIgnoreReturnValue
50-
public FromDiskExporterBuilder<T> setStorageClock(Clock clock) {
51-
storageBuilder.setStorageClock(clock);
52-
return this;
53-
}
54-
5539
@CanIgnoreReturnValue
5640
public FromDiskExporterBuilder<T> setDeserializer(SignalDeserializer<T> serializer) {
5741
this.serializer = serializer;
@@ -65,19 +49,7 @@ public FromDiskExporterBuilder<T> setExportFunction(
6549
return this;
6650
}
6751

68-
@CanIgnoreReturnValue
69-
public FromDiskExporterBuilder<T> enableDebug() {
70-
return setDebugEnabled(true);
71-
}
72-
73-
@CanIgnoreReturnValue
74-
public FromDiskExporterBuilder<T> setDebugEnabled(boolean debugEnabled) {
75-
this.debugEnabled = debugEnabled;
76-
return this;
77-
}
78-
7952
public FromDiskExporterImpl<T> build() throws IOException {
80-
Storage storage = storageBuilder.build();
81-
return new FromDiskExporterImpl<>(serializer, exportFunction, storage, debugEnabled);
53+
return new FromDiskExporterImpl<>(serializer, exportFunction, storage);
8254
}
8355
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,17 @@ public final class FromDiskExporterImpl<EXPORT_DATA> implements FromDiskExporter
3232
FromDiskExporterImpl(
3333
SignalDeserializer<EXPORT_DATA> deserializer,
3434
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
35-
Storage storage,
36-
boolean debugEnabled) {
35+
Storage storage) {
3736
this.deserializer = deserializer;
3837
this.exportFunction = exportFunction;
3938
this.storage = storage;
4039
this.logger =
41-
DebugLogger.wrap(Logger.getLogger(FromDiskExporterImpl.class.getName()), debugEnabled);
40+
DebugLogger.wrap(
41+
Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled());
4242
}
4343

44-
public static <T> FromDiskExporterBuilder<T> builder() {
45-
return new FromDiskExporterBuilder<>();
44+
public static <T> FromDiskExporterBuilder<T> builder(Storage storage) {
45+
return new FromDiskExporterBuilder<>(storage);
4646
}
4747

4848
/**

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,17 @@ public class ToDiskExporter<EXPORT_DATA> {
2525
ToDiskExporter(
2626
SignalSerializer<EXPORT_DATA> serializer,
2727
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
28-
Storage storage,
29-
boolean debugEnabled) {
28+
Storage storage) {
3029
this.serializer = serializer;
3130
this.exportFunction = exportFunction;
3231
this.storage = storage;
33-
this.logger = DebugLogger.wrap(Logger.getLogger(ToDiskExporter.class.getName()), debugEnabled);
32+
this.logger =
33+
DebugLogger.wrap(
34+
Logger.getLogger(ToDiskExporter.class.getName()), storage.isDebugEnabled());
3435
}
3536

36-
public static <T> ToDiskExporterBuilder<T> builder() {
37-
return new ToDiskExporterBuilder<>();
37+
public static <T> ToDiskExporterBuilder<T> builder(Storage storage) {
38+
return new ToDiskExporterBuilder<>(storage);
3839
}
3940

4041
public CompletableResultCode export(Collection<EXPORT_DATA> data) {

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java

Lines changed: 8 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,56 +6,26 @@
66
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
77

88
import com.google.errorprone.annotations.CanIgnoreReturnValue;
9-
import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration;
109
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
1110
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
12-
import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder;
13-
import io.opentelemetry.sdk.common.Clock;
1411
import io.opentelemetry.sdk.common.CompletableResultCode;
15-
import java.io.IOException;
1612
import java.util.Collection;
1713
import java.util.function.Function;
1814

1915
public final class ToDiskExporterBuilder<T> {
2016

2117
private SignalSerializer<T> serializer = ts -> new byte[0];
2218

23-
private final StorageBuilder storageBuilder = Storage.builder();
19+
private final Storage storage;
2420

2521
private Function<Collection<T>, CompletableResultCode> exportFunction =
2622
x -> CompletableResultCode.ofFailure();
27-
private boolean debugEnabled = false;
2823

29-
ToDiskExporterBuilder() {}
30-
31-
@CanIgnoreReturnValue
32-
public ToDiskExporterBuilder<T> enableDebug() {
33-
return setDebugEnabled(true);
34-
}
35-
36-
@CanIgnoreReturnValue
37-
public ToDiskExporterBuilder<T> setDebugEnabled(boolean debugEnabled) {
38-
this.debugEnabled = debugEnabled;
39-
return this;
40-
}
41-
42-
@CanIgnoreReturnValue
43-
public ToDiskExporterBuilder<T> setFolderName(String folderName) {
44-
storageBuilder.setFolderName(folderName);
45-
return this;
46-
}
47-
48-
@CanIgnoreReturnValue
49-
public ToDiskExporterBuilder<T> setStorageConfiguration(StorageConfiguration configuration) {
50-
validateConfiguration(configuration);
51-
storageBuilder.setStorageConfiguration(configuration);
52-
return this;
53-
}
54-
55-
@CanIgnoreReturnValue
56-
public ToDiskExporterBuilder<T> setStorageClock(Clock clock) {
57-
storageBuilder.setStorageClock(clock);
58-
return this;
24+
ToDiskExporterBuilder(Storage storage) {
25+
if (storage == null) {
26+
throw new NullPointerException("Storage cannot be null");
27+
}
28+
this.storage = storage;
5929
}
6030

6131
@CanIgnoreReturnValue
@@ -71,15 +41,7 @@ public ToDiskExporterBuilder<T> setExportFunction(
7141
return this;
7242
}
7343

74-
public ToDiskExporter<T> build() throws IOException {
75-
Storage storage = storageBuilder.build();
76-
return new ToDiskExporter<>(serializer, exportFunction, storage, debugEnabled);
77-
}
78-
79-
private static void validateConfiguration(StorageConfiguration configuration) {
80-
if (configuration.getMinFileAgeForReadMillis() <= configuration.getMaxFileAgeForWriteMillis()) {
81-
throw new IllegalArgumentException(
82-
"The configured max file age for writing must be lower than the configured min file age for reading");
83-
}
44+
public ToDiskExporter<T> build() {
45+
return new ToDiskExporter<>(serializer, exportFunction, storage);
8446
}
8547
}

0 commit comments

Comments
 (0)