Skip to content

Commit 5000adf

Browse files
authored
[disk-buffering] Split serializer (#1167)
1 parent aaf0446 commit 5000adf

21 files changed

+176
-87
lines changed

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
99
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
1010
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
11-
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
11+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
1212
import io.opentelemetry.sdk.logs.data.LogRecordData;
1313
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
1414
import java.io.IOException;
@@ -24,7 +24,7 @@ public static LogRecordFromDiskExporter create(
2424
FromDiskExporterImpl.<LogRecordData>builder()
2525
.setFolderName("logs")
2626
.setStorageConfiguration(config)
27-
.setDeserializer(SignalSerializer.ofLogs())
27+
.setDeserializer(SignalDeserializer.ofLogs())
2828
.setExportFunction(exporter::export)
2929
.build();
3030
return new LogRecordFromDiskExporter(delegate);

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
99
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
1010
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
11-
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
11+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
1212
import io.opentelemetry.sdk.metrics.data.MetricData;
1313
import io.opentelemetry.sdk.metrics.export.MetricExporter;
1414
import java.io.IOException;
@@ -24,7 +24,7 @@ public static MetricFromDiskExporter create(MetricExporter exporter, StorageConf
2424
FromDiskExporterImpl.<MetricData>builder()
2525
.setFolderName("metrics")
2626
.setStorageConfiguration(config)
27-
.setDeserializer(SignalSerializer.ofMetrics())
27+
.setDeserializer(SignalDeserializer.ofMetrics())
2828
.setExportFunction(exporter::export)
2929
.build();
3030
return new MetricFromDiskExporter(delegate);

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
99
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
1010
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
11-
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
11+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
1212
import io.opentelemetry.sdk.trace.data.SpanData;
1313
import io.opentelemetry.sdk.trace.export.SpanExporter;
1414
import java.io.IOException;
@@ -24,7 +24,7 @@ public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfigur
2424
FromDiskExporterImpl.<SpanData>builder()
2525
.setFolderName("spans")
2626
.setStorageConfiguration(config)
27-
.setDeserializer(SignalSerializer.ofSpans())
27+
.setDeserializer(SignalDeserializer.ofSpans())
2828
.setExportFunction(exporter::export)
2929
.build();
3030
return new SpanFromDiskExporter(delegate);

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,40 +5,29 @@
55

66
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
77

8+
import static java.util.Collections.emptyList;
9+
810
import com.google.errorprone.annotations.CanIgnoreReturnValue;
911
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
10-
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
12+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
1113
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1214
import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder;
1315
import io.opentelemetry.sdk.common.Clock;
1416
import io.opentelemetry.sdk.common.CompletableResultCode;
1517
import java.io.IOException;
1618
import java.util.Collection;
17-
import java.util.Collections;
18-
import java.util.List;
1919
import java.util.function.Function;
2020
import org.jetbrains.annotations.NotNull;
2121

2222
public class FromDiskExporterBuilder<T> {
2323

24-
private SignalSerializer<T> serializer = noopSerializer();
24+
private SignalDeserializer<T> serializer = noopDeserializer();
2525
private Function<Collection<T>, CompletableResultCode> exportFunction =
2626
x -> CompletableResultCode.ofFailure();
2727

2828
@NotNull
29-
private static <T> SignalSerializer<T> noopSerializer() {
30-
return new SignalSerializer<T>() {
31-
32-
@Override
33-
public byte[] serialize(Collection<T> ts) {
34-
return new byte[0];
35-
}
36-
37-
@Override
38-
public List<T> deserialize(byte[] source) {
39-
return Collections.emptyList();
40-
}
41-
};
29+
private static <T> SignalDeserializer<T> noopDeserializer() {
30+
return x -> emptyList();
4231
}
4332

4433
private final StorageBuilder storageBuilder = Storage.builder();
@@ -62,7 +51,7 @@ public FromDiskExporterBuilder<T> setStorageClock(Clock clock) {
6251
}
6352

6453
@CanIgnoreReturnValue
65-
public FromDiskExporterBuilder<T> setDeserializer(SignalSerializer<T> serializer) {
54+
public FromDiskExporterBuilder<T> setDeserializer(SignalDeserializer<T> serializer) {
6655
this.serializer = serializer;
6756
return this;
6857
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
77

8-
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
8+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
99
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1010
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
1111
import io.opentelemetry.sdk.common.CompletableResultCode;
@@ -22,12 +22,12 @@
2222
*/
2323
public final class FromDiskExporterImpl<EXPORT_DATA> implements FromDiskExporter {
2424
private final Storage storage;
25-
private final SignalSerializer<EXPORT_DATA> deserializer;
25+
private final SignalDeserializer<EXPORT_DATA> deserializer;
2626
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
2727
private static final Logger logger = Logger.getLogger(FromDiskExporterImpl.class.getName());
2828

2929
FromDiskExporterImpl(
30-
SignalSerializer<EXPORT_DATA> deserializer,
30+
SignalDeserializer<EXPORT_DATA> deserializer,
3131
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
3232
Storage storage) {
3333
this.deserializer = deserializer;

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,11 @@
1414
import io.opentelemetry.sdk.common.CompletableResultCode;
1515
import java.io.IOException;
1616
import java.util.Collection;
17-
import java.util.Collections;
18-
import java.util.List;
1917
import java.util.function.Function;
2018

2119
public final class ToDiskExporterBuilder<T> {
2220

23-
private SignalSerializer<T> serializer =
24-
new SignalSerializer<T>() {
25-
26-
@Override
27-
public byte[] serialize(Collection<T> ts) {
28-
return new byte[0];
29-
}
30-
31-
@Override
32-
public List<T> deserialize(byte[] source) {
33-
return Collections.emptyList();
34-
}
35-
};
21+
private SignalSerializer<T> serializer = ts -> new byte[0];
3622

3723
private final StorageBuilder storageBuilder = Storage.builder();
3824

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;
7+
8+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper;
9+
import io.opentelemetry.proto.logs.v1.LogsData;
10+
import io.opentelemetry.sdk.logs.data.LogRecordData;
11+
import java.io.IOException;
12+
import java.util.List;
13+
14+
public final class LogRecordDataDeserializer implements SignalDeserializer<LogRecordData> {
15+
private static final LogRecordDataDeserializer INSTANCE = new LogRecordDataDeserializer();
16+
17+
private LogRecordDataDeserializer() {}
18+
19+
static LogRecordDataDeserializer getInstance() {
20+
return INSTANCE;
21+
}
22+
23+
@Override
24+
public List<LogRecordData> deserialize(byte[] source) {
25+
try {
26+
return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source));
27+
} catch (IOException e) {
28+
throw new IllegalArgumentException(e);
29+
}
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;
7+
8+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper;
9+
import io.opentelemetry.proto.metrics.v1.MetricsData;
10+
import io.opentelemetry.sdk.metrics.data.MetricData;
11+
import java.io.IOException;
12+
import java.util.List;
13+
14+
public final class MetricDataDeserializer implements SignalDeserializer<MetricData> {
15+
private static final MetricDataDeserializer INSTANCE = new MetricDataDeserializer();
16+
17+
private MetricDataDeserializer() {}
18+
19+
static MetricDataDeserializer getInstance() {
20+
return INSTANCE;
21+
}
22+
23+
@Override
24+
public List<MetricData> deserialize(byte[] source) {
25+
try {
26+
return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source));
27+
} catch (IOException e) {
28+
throw new IllegalArgumentException(e);
29+
}
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;
7+
8+
import io.opentelemetry.sdk.logs.data.LogRecordData;
9+
import io.opentelemetry.sdk.metrics.data.MetricData;
10+
import io.opentelemetry.sdk.trace.data.SpanData;
11+
import java.util.List;
12+
13+
public interface SignalDeserializer<SDK_ITEM> {
14+
15+
static SignalDeserializer<SpanData> ofSpans() {
16+
return SpanDataDeserializer.getInstance();
17+
}
18+
19+
static SignalDeserializer<MetricData> ofMetrics() {
20+
return MetricDataDeserializer.getInstance();
21+
}
22+
23+
static SignalDeserializer<LogRecordData> ofLogs() {
24+
return LogRecordDataDeserializer.getInstance();
25+
}
26+
27+
List<SDK_ITEM> deserialize(byte[] source);
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;
7+
8+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper;
9+
import io.opentelemetry.proto.trace.v1.TracesData;
10+
import io.opentelemetry.sdk.trace.data.SpanData;
11+
import java.io.IOException;
12+
import java.util.List;
13+
14+
public final class SpanDataDeserializer implements SignalDeserializer<SpanData> {
15+
private static final SpanDataDeserializer INSTANCE = new SpanDataDeserializer();
16+
17+
private SpanDataDeserializer() {}
18+
19+
static SpanDataDeserializer getInstance() {
20+
return INSTANCE;
21+
}
22+
23+
@Override
24+
public List<SpanData> deserialize(byte[] source) {
25+
try {
26+
return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source));
27+
} catch (IOException e) {
28+
throw new IllegalArgumentException(e);
29+
}
30+
}
31+
}

0 commit comments

Comments
 (0)