From cb34923b040150543e2aebf23d449e8cbd39faeb Mon Sep 17 00:00:00 2001 From: Tyler Benson <734411+tylerbenson@users.noreply.github.com> Date: Tue, 19 Aug 2025 11:40:22 -0400 Subject: [PATCH] Use a more efficient serializer and write bytes directly to disk Instead of an intermediary byte array. The OTel Java SDK has a hand-coded and optimized serializer implementation that is well tested and used extensively. This PR switches to using that serializer. It is also more efficient to serialize directly to the file output stream instead of an intermediary byte array, so I used the same marshaling strategy used by the SDK and passed the marshaler down to the WritableFile. --- disk-buffering/build.gradle.kts | 8 ++-- .../internal/exporter/NoopSerializer.java | 30 +++++++++++++ .../internal/exporter/ToDiskExporter.java | 7 +++- .../exporter/ToDiskExporterBuilder.java | 2 +- .../LogRecordDataDeserializer.java | 5 ++- .../deserializers/MetricDataDeserializer.java | 5 ++- .../deserializers/SpanDataDeserializer.java | 5 ++- .../mapping/common/ByteStringMapper.java | 4 +- .../mapping/logs/ProtoLogsDataMapper.java | 10 ++--- .../metrics/ProtoMetricsDataMapper.java | 10 ++--- .../mapping/spans/ProtoSpansDataMapper.java | 11 ++--- .../serializers/LogRecordDataSerializer.java | 42 +++++++++++-------- .../serializers/MetricDataSerializer.java | 42 +++++++++++-------- .../serializers/SignalSerializer.java | 16 +++++-- .../serializers/SpanDataSerializer.java | 42 +++++++++++-------- .../buffering/internal/storage/Storage.java | 13 +++--- .../internal/storage/files/WritableFile.java | 9 ++-- .../buffering/SpanFromDiskExporterTest.java | 2 +- .../internal/exporter/ToDiskExporterTest.java | 13 +++--- .../mapping/logs/ProtoLogsDataMapperTest.java | 16 +++---- .../metrics/ProtoMetricsDataMapperTest.java | 14 +++---- .../spans/ProtoSpansDataMapperTest.java | 14 +++---- .../serializers/ByteArraySerializer.java | 39 +++++++++++++++++ .../internal/storage/FolderManagerTest.java | 3 +- .../internal/storage/StorageTest.java | 17 ++++---- .../storage/files/ReadableFileTest.java | 13 +++--- .../storage/files/WritableFileTest.java | 19 +++++---- .../testutils/BaseSignalSerializerTest.java | 13 +++++- 28 files changed, 274 insertions(+), 150 deletions(-) create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/NoopSerializer.java create mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/ByteArraySerializer.java diff --git a/disk-buffering/build.gradle.kts b/disk-buffering/build.gradle.kts index 8bae11733..251f7a7c3 100644 --- a/disk-buffering/build.gradle.kts +++ b/disk-buffering/build.gradle.kts @@ -18,6 +18,7 @@ val protos by configurations.creating dependencies { api("io.opentelemetry:opentelemetry-sdk") implementation("io.opentelemetry:opentelemetry-api-incubator") + implementation("io.opentelemetry:opentelemetry-exporter-otlp-common") compileOnly("com.google.auto.value:auto-value-annotations") annotationProcessor("com.google.auto.value:auto-value") signature("com.toasttab.android:gummy-bears-api-21:0.12.0:coreLib@signature") @@ -63,9 +64,10 @@ wire { } root( - "opentelemetry.proto.trace.v1.TracesData", - "opentelemetry.proto.metrics.v1.MetricsData", - "opentelemetry.proto.logs.v1.LogsData", + // These are the types used by the Java SDK's OTLP exporters. + "opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest", + "opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest", + "opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest", ) } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/NoopSerializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/NoopSerializer.java new file mode 100644 index 000000000..715f538c4 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/NoopSerializer.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.exporter; + +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; + +class NoopSerializer implements SignalSerializer { + + @Override + public NoopSerializer initialize(Collection data) { + return this; + } + + @Override + public void writeBinaryTo(OutputStream output) throws IOException {} + + @Override + public int getBinarySerializedSize() { + return 0; + } + + @Override + public void reset() {} +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java index b54e3cc16..5b2dcd186 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java @@ -38,10 +38,11 @@ public static ToDiskExporterBuilder builder(Storage storage) { return new ToDiskExporterBuilder<>(storage); } - public CompletableResultCode export(Collection data) { + public synchronized CompletableResultCode export(Collection data) { logger.log("Intercepting exporter batch.", Level.FINER); try { - if (storage.write(serializer.serialize(data))) { + serializer.initialize(data); + if (storage.write(serializer)) { return CompletableResultCode.ofSuccess(); } logger.log("Could not store batch in disk. Exporting it right away."); @@ -52,6 +53,8 @@ public CompletableResultCode export(Collection data) { Level.WARNING, e); return exportFunction.apply(data); + } finally { + serializer.reset(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java index 069e08986..be75a3976 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java @@ -14,7 +14,7 @@ public final class ToDiskExporterBuilder { - private SignalSerializer serializer = ts -> new byte[0]; + private SignalSerializer serializer = new NoopSerializer(); private final Storage storage; diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java index 5ac0007d9..cbbb4a0ad 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java @@ -7,7 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; -import io.opentelemetry.proto.logs.v1.LogsData; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.sdk.logs.data.LogRecordData; import java.io.IOException; import java.util.List; @@ -24,7 +24,8 @@ static LogRecordDataDeserializer getInstance() { @Override public List deserialize(byte[] source) throws DeserializationException { try { - return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source)); + return ProtoLogsDataMapper.getInstance() + .fromProto(ExportLogsServiceRequest.ADAPTER.decode(source)); } catch (IOException e) { throw new DeserializationException(e); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java index 34e88b3ef..d6410d4e7 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java @@ -7,7 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; -import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.sdk.metrics.data.MetricData; import java.io.IOException; import java.util.List; @@ -24,7 +24,8 @@ static MetricDataDeserializer getInstance() { @Override public List deserialize(byte[] source) throws DeserializationException { try { - return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source)); + return ProtoMetricsDataMapper.getInstance() + .fromProto(ExportMetricsServiceRequest.ADAPTER.decode(source)); } catch (IOException e) { throw new DeserializationException(e); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java index 457d5f268..eb4406ff3 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java @@ -7,7 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; -import io.opentelemetry.proto.trace.v1.TracesData; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.util.List; @@ -24,7 +24,8 @@ static SpanDataDeserializer getInstance() { @Override public List deserialize(byte[] source) throws DeserializationException { try { - return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source)); + return ProtoSpansDataMapper.getInstance() + .fromProto(ExportTraceServiceRequest.ADAPTER.decode(source)); } catch (IOException e) { throw new DeserializationException(e); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/common/ByteStringMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/common/ByteStringMapper.java index ca8366e8a..1234d25de 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/common/ByteStringMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/common/ByteStringMapper.java @@ -16,10 +16,10 @@ public static ByteStringMapper getInstance() { } public ByteString stringToProto(String source) { - return ByteString.encodeUtf8(source); + return ByteString.decodeHex(source); } public String protoToString(ByteString source) { - return source.utf8(); + return source.hex(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapper.java index 1d11c177f..021935f9a 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapper.java @@ -6,8 +6,8 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.logs.v1.LogRecord; -import io.opentelemetry.proto.logs.v1.LogsData; import io.opentelemetry.proto.logs.v1.ResourceLogs; import io.opentelemetry.proto.logs.v1.ScopeLogs; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; @@ -19,7 +19,7 @@ public final class ProtoLogsDataMapper extends BaseProtoSignalsDataMapper< - LogRecordData, LogRecord, LogsData, ResourceLogs, ScopeLogs> { + LogRecordData, LogRecord, ExportLogsServiceRequest, ResourceLogs, ScopeLogs> { private static final ProtoLogsDataMapper INSTANCE = new ProtoLogsDataMapper(); @@ -39,12 +39,12 @@ protected LogRecordData protoToSignalItem( } @Override - protected List getProtoResources(LogsData logsData) { + protected List getProtoResources(ExportLogsServiceRequest logsData) { return logsData.resource_logs; } @Override - protected LogsData createProtoData( + protected ExportLogsServiceRequest createProtoData( Map>> itemsByResource) { List items = new ArrayList<>(); itemsByResource.forEach( @@ -58,7 +58,7 @@ protected LogsData createProtoData( } items.add(resourceLogsBuilder.build()); }); - return new LogsData.Builder().resource_logs(items).build(); + return new ExportLogsServiceRequest.Builder().resource_logs(items).build(); } private ScopeLogs.Builder createProtoScopeBuilder(InstrumentationScopeInfo scopeInfo) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapper.java index a81ab9957..ad67eee1c 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapper.java @@ -6,8 +6,8 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.metrics.v1.Metric; -import io.opentelemetry.proto.metrics.v1.MetricsData; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.proto.metrics.v1.ScopeMetrics; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; @@ -19,7 +19,7 @@ public final class ProtoMetricsDataMapper extends BaseProtoSignalsDataMapper< - MetricData, Metric, MetricsData, ResourceMetrics, ScopeMetrics> { + MetricData, Metric, ExportMetricsServiceRequest, ResourceMetrics, ScopeMetrics> { private static final ProtoMetricsDataMapper INSTANCE = new ProtoMetricsDataMapper(); @@ -39,12 +39,12 @@ protected MetricData protoToSignalItem( } @Override - protected List getProtoResources(MetricsData protoData) { + protected List getProtoResources(ExportMetricsServiceRequest protoData) { return protoData.resource_metrics; } @Override - protected MetricsData createProtoData( + protected ExportMetricsServiceRequest createProtoData( Map>> itemsByResource) { List items = new ArrayList<>(); itemsByResource.forEach( @@ -58,7 +58,7 @@ protected MetricsData createProtoData( } items.add(resourceMetricsBuilder.build()); }); - return new MetricsData.Builder().resource_metrics(items).build(); + return new ExportMetricsServiceRequest.Builder().resource_metrics(items).build(); } private ScopeMetrics.Builder createProtoScopeBuilder(InstrumentationScopeInfo scopeInfo) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapper.java index 18acf3a1f..12697c49d 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapper.java @@ -6,10 +6,10 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.proto.trace.v1.ScopeSpans; import io.opentelemetry.proto.trace.v1.Span; -import io.opentelemetry.proto.trace.v1.TracesData; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.data.SpanData; @@ -18,7 +18,8 @@ import java.util.Map; public final class ProtoSpansDataMapper - extends BaseProtoSignalsDataMapper { + extends BaseProtoSignalsDataMapper< + SpanData, Span, ExportTraceServiceRequest, ResourceSpans, ScopeSpans> { private static final ProtoSpansDataMapper INSTANCE = new ProtoSpansDataMapper(); @@ -32,7 +33,7 @@ protected Span signalItemToProto(SpanData sourceData) { } @Override - protected List getProtoResources(TracesData protoData) { + protected List getProtoResources(ExportTraceServiceRequest protoData) { return protoData.resource_spans; } @@ -43,7 +44,7 @@ protected SpanData protoToSignalItem( } @Override - protected TracesData createProtoData( + protected ExportTraceServiceRequest createProtoData( Map>> itemsByResource) { List items = new ArrayList<>(); itemsByResource.forEach( @@ -57,7 +58,7 @@ protected TracesData createProtoData( } items.add(resourceSpansBuilder.build()); }); - return new TracesData.Builder().resource_spans(items).build(); + return new ExportTraceServiceRequest.Builder().resource_spans(items).build(); } @Override diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/LogRecordDataSerializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/LogRecordDataSerializer.java index 72c654ffe..19bb1cf93 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/LogRecordDataSerializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/LogRecordDataSerializer.java @@ -5,33 +5,41 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools; -import io.opentelemetry.proto.logs.v1.LogsData; +import io.opentelemetry.exporter.internal.otlp.logs.LowAllocationLogsRequestMarshaler; import io.opentelemetry.sdk.logs.data.LogRecordData; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; public final class LogRecordDataSerializer implements SignalSerializer { - private static final LogRecordDataSerializer INSTANCE = new LogRecordDataSerializer(); - private LogRecordDataSerializer() {} + private final LowAllocationLogsRequestMarshaler marshaler = + new LowAllocationLogsRequestMarshaler(); - static LogRecordDataSerializer getInstance() { - return INSTANCE; + LogRecordDataSerializer() {} + + @CanIgnoreReturnValue + @Override + public LogRecordDataSerializer initialize(Collection data) { + marshaler.initialize(data); + return this; + } + + @Override + public void writeBinaryTo(OutputStream output) throws IOException { + ProtobufTools.writeRawVarint32(marshaler.getBinarySerializedSize(), output); + marshaler.writeBinaryTo(output); + } + + @Override + public int getBinarySerializedSize() { + return marshaler.getBinarySerializedSize(); } @Override - public byte[] serialize(Collection logRecordData) { - LogsData proto = ProtoLogsDataMapper.getInstance().toProto(logRecordData); - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - int size = LogsData.ADAPTER.encodedSize(proto); - ProtobufTools.writeRawVarint32(size, out); - proto.encode(out); - return out.toByteArray(); - } catch (IOException e) { - throw new IllegalStateException(e); - } + public void reset() { + marshaler.reset(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializer.java index 077d4ade5..726b3185d 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializer.java @@ -5,33 +5,41 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools; -import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.exporter.internal.otlp.metrics.LowAllocationMetricsRequestMarshaler; import io.opentelemetry.sdk.metrics.data.MetricData; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; public final class MetricDataSerializer implements SignalSerializer { - private static final MetricDataSerializer INSTANCE = new MetricDataSerializer(); - private MetricDataSerializer() {} + private final LowAllocationMetricsRequestMarshaler marshaler = + new LowAllocationMetricsRequestMarshaler(); - static MetricDataSerializer getInstance() { - return INSTANCE; + MetricDataSerializer() {} + + @CanIgnoreReturnValue + @Override + public MetricDataSerializer initialize(Collection data) { + marshaler.initialize(data); + return this; + } + + @Override + public void writeBinaryTo(OutputStream output) throws IOException { + ProtobufTools.writeRawVarint32(marshaler.getBinarySerializedSize(), output); + marshaler.writeBinaryTo(output); + } + + @Override + public int getBinarySerializedSize() { + return marshaler.getBinarySerializedSize(); } @Override - public byte[] serialize(Collection metricData) { - MetricsData proto = ProtoMetricsDataMapper.getInstance().toProto(metricData); - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - int size = MetricsData.ADAPTER.encodedSize(proto); - ProtobufTools.writeRawVarint32(size, out); - proto.encode(out); - return out.toByteArray(); - } catch (IOException e) { - throw new IllegalStateException(e); - } + public void reset() { + marshaler.reset(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java index c7d7e5c8c..4c306ceb7 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java @@ -8,21 +8,29 @@ import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; public interface SignalSerializer { static SignalSerializer ofSpans() { - return SpanDataSerializer.getInstance(); + return new SpanDataSerializer(); } static SignalSerializer ofMetrics() { - return MetricDataSerializer.getInstance(); + return new MetricDataSerializer(); } static SignalSerializer ofLogs() { - return LogRecordDataSerializer.getInstance(); + return new LogRecordDataSerializer(); } - byte[] serialize(Collection items); + SignalSerializer initialize(Collection data); + + void writeBinaryTo(OutputStream output) throws IOException; + + int getBinarySerializedSize(); + + void reset(); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializer.java index 5a26426db..6e3276231 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializer.java @@ -5,33 +5,41 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools; -import io.opentelemetry.proto.trace.v1.TracesData; +import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler; import io.opentelemetry.sdk.trace.data.SpanData; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.Collection; public final class SpanDataSerializer implements SignalSerializer { - private static final SpanDataSerializer INSTANCE = new SpanDataSerializer(); - private SpanDataSerializer() {} + private final LowAllocationTraceRequestMarshaler marshaler = + new LowAllocationTraceRequestMarshaler(); - static SpanDataSerializer getInstance() { - return INSTANCE; + SpanDataSerializer() {} + + @CanIgnoreReturnValue + @Override + public SpanDataSerializer initialize(Collection data) { + marshaler.initialize(data); + return this; + } + + @Override + public void writeBinaryTo(OutputStream output) throws IOException { + ProtobufTools.writeRawVarint32(marshaler.getBinarySerializedSize(), output); + marshaler.writeBinaryTo(output); + } + + @Override + public int getBinarySerializedSize() { + return marshaler.getBinarySerializedSize(); } @Override - public byte[] serialize(Collection spanData) { - TracesData proto = ProtoSpansDataMapper.getInstance().toProto(spanData); - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - int size = TracesData.ADAPTER.encodedSize(proto); - ProtobufTools.writeRawVarint32(size, out); - proto.encode(out); - return out.toByteArray(); - } catch (IOException e) { - throw new IllegalStateException(e); - } + public void reset() { + marshaler.reset(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index 73a263490..86b5284ca 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -8,6 +8,7 @@ import static java.util.logging.Level.WARNING; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; @@ -49,14 +50,14 @@ public boolean isDebugEnabled() { /** * Attempts to write an item into a writable file. * - * @param item - The data that would be appended to the file. + * @param marshaler - The data that would be appended to the file. * @throws IOException If an unexpected error happens. */ - public boolean write(byte[] item) throws IOException { - return write(item, 1); + public boolean write(SignalSerializer marshaler) throws IOException { + return write(marshaler, 1); } - private boolean write(byte[] item, int attemptNumber) throws IOException { + private boolean write(SignalSerializer marshaler, int attemptNumber) throws IOException { if (isClosed.get()) { logger.log("Refusing to write to storage after being closed."); return false; @@ -69,11 +70,11 @@ private boolean write(byte[] item, int attemptNumber) throws IOException { writableFile = folderManager.createWritableFile(); logger.log("Created new writableFile: " + writableFile); } - WritableResult result = writableFile.append(item); + WritableResult result = writableFile.append(marshaler); if (result != WritableResult.SUCCEEDED) { // Retry with new file writableFile = null; - return write(item, ++attemptNumber); + return write(marshaler, ++attemptNumber); } return true; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java index 0f3d1d475..ce4e87ddf 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java @@ -8,6 +8,7 @@ import static io.opentelemetry.contrib.disk.buffering.internal.storage.util.ClockBuddy.nowMillis; import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import io.opentelemetry.sdk.common.Clock; import java.io.File; @@ -43,9 +44,9 @@ public WritableFile( * reached the configured max size, the file stream is closed with the contents available in the * buffer before attempting to append the new data. * - * @param data - The new data line to add. + * @param marshaler - The new data line to add. */ - public synchronized WritableResult append(byte[] data) throws IOException { + public synchronized WritableResult append(SignalSerializer marshaler) throws IOException { if (isClosed.get()) { return WritableResult.FAILED; } @@ -53,12 +54,12 @@ public synchronized WritableResult append(byte[] data) throws IOException { close(); return WritableResult.FAILED; } - int futureSize = size + data.length; + int futureSize = size + marshaler.getBinarySerializedSize(); if (futureSize > configuration.getMaxFileSize()) { close(); return WritableResult.FAILED; } - out.write(data); + marshaler.writeBinaryTo(out); size = futureSize; return WritableResult.SUCCEEDED; } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java index ae503ecdf..2ea0d2b8a 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java @@ -86,7 +86,7 @@ private static List writeSomeSpans(Storage storage) throws Exception { SpanData span2 = makeSpan2(TraceFlags.getSampled(), now); List spans = Arrays.asList(span1, span2); - storage.write(SignalSerializer.ofSpans().serialize(spans)); + storage.write(SignalSerializer.ofSpans().initialize(spans)); storage.flush(); return spans; } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java index f7b6e3ff6..0a98061ac 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java @@ -5,8 +5,8 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporter; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -30,8 +30,6 @@ class ToDiskExporterTest { private final List records = Arrays.asList("one", "two", "three"); - private final byte[] serialized = "one,two,three".getBytes(UTF_8); - @Mock private SignalSerializer serializer; @Mock private Storage storage; @@ -50,21 +48,20 @@ void setup() { return exportFnResultToReturn.get(); }; toDiskExporter = new ToDiskExporter<>(serializer, exportFn, storage); - when(serializer.serialize(records)).thenReturn(serialized); } @Test void whenWritingSucceedsOnExport_returnSuccessfulResultCode() throws Exception { - when(storage.write(serialized)).thenReturn(true); + when(storage.write(any())).thenReturn(true); CompletableResultCode completableResultCode = toDiskExporter.export(records); assertThat(completableResultCode.isSuccess()).isTrue(); - verify(storage).write(serialized); + verify(storage).write(any()); assertThat(exportedFnSeen).isNull(); } @Test void whenWritingFailsOnExport_doExportRightAway() throws Exception { - when(storage.write(serialized)).thenReturn(false); + when(storage.write(any())).thenReturn(false); exportFnResultToReturn.set(CompletableResultCode.ofSuccess()); CompletableResultCode completableResultCode = toDiskExporter.export(records); @@ -75,7 +72,7 @@ void whenWritingFailsOnExport_doExportRightAway() throws Exception { @Test void whenExceptionInWrite_doExportRightAway() throws Exception { - when(storage.write(serialized)).thenThrow(new IOException("boom")); + when(storage.write(any())).thenThrow(new IOException("boom")); exportFnResultToReturn.set(CompletableResultCode.ofFailure()); CompletableResultCode completableResultCode = toDiskExporter.export(records); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapperTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapperTest.java index 26c73502e..9a5d93cd8 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapperTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapperTest.java @@ -12,8 +12,8 @@ import io.opentelemetry.api.logs.Severity; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.models.LogRecordDataImpl; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.logs.v1.LogRecord; -import io.opentelemetry.proto.logs.v1.LogsData; import io.opentelemetry.proto.logs.v1.ResourceLogs; import io.opentelemetry.proto.logs.v1.ScopeLogs; import io.opentelemetry.sdk.logs.data.LogRecordData; @@ -104,7 +104,7 @@ class ProtoLogsDataMapperTest { void verifyConversionDataStructure() { List signals = Collections.singletonList(LOG_RECORD); - LogsData result = mapToProto(signals); + ExportLogsServiceRequest result = mapToProto(signals); List resourceLogsList = result.resource_logs; assertEquals(1, resourceLogsList.size()); @@ -118,7 +118,7 @@ void verifyConversionDataStructure() { void verifyMultipleLogsWithSameResourceAndScope() { List signals = Arrays.asList(LOG_RECORD, OTHER_LOG_RECORD); - LogsData proto = mapToProto(signals); + ExportLogsServiceRequest proto = mapToProto(signals); List resourceLogsList = proto.resource_logs; assertEquals(1, resourceLogsList.size()); @@ -139,7 +139,7 @@ void verifyMultipleLogsWithSameResourceDifferentScope() { List signals = Arrays.asList(LOG_RECORD, LOG_RECORD_WITH_DIFFERENT_SCOPE_SAME_RESOURCE); - LogsData proto = mapToProto(signals); + ExportLogsServiceRequest proto = mapToProto(signals); List resourceLogsList = proto.resource_logs; assertEquals(1, resourceLogsList.size()); @@ -159,7 +159,7 @@ void verifyMultipleLogsWithSameResourceDifferentScope() { void verifyMultipleLogsWithDifferentResource() { List signals = Arrays.asList(LOG_RECORD, LOG_RECORD_WITH_DIFFERENT_RESOURCE); - LogsData proto = mapToProto(signals); + ExportLogsServiceRequest proto = mapToProto(signals); List resourceLogsList = proto.resource_logs; assertEquals(2, resourceLogsList.size()); @@ -183,7 +183,7 @@ void verifyMultipleLogsWithDifferentResource() { void verifyLogWithEventName() { List signals = Collections.singletonList(LOG_RECORD_WITH_EVENT_NAME); - LogsData result = mapToProto(signals); + ExportLogsServiceRequest result = mapToProto(signals); List resourceLogsList = result.resource_logs; LogRecord firstLog = resourceLogsList.get(0).scope_logs.get(0).log_records.get(0); @@ -192,11 +192,11 @@ void verifyLogWithEventName() { assertThat(mapFromProto(result)).containsExactlyInAnyOrderElementsOf(signals); } - private static LogsData mapToProto(Collection signals) { + private static ExportLogsServiceRequest mapToProto(Collection signals) { return ProtoLogsDataMapper.getInstance().toProto(signals); } - private static List mapFromProto(LogsData protoData) { + private static List mapFromProto(ExportLogsServiceRequest protoData) { return ProtoLogsDataMapper.getInstance().fromProto(protoData); } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java index b45e9c9e7..59d369704 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java @@ -10,8 +10,8 @@ import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.metrics.v1.Metric; -import io.opentelemetry.proto.metrics.v1.MetricsData; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.proto.metrics.v1.ScopeMetrics; import io.opentelemetry.sdk.metrics.data.MetricData; @@ -30,7 +30,7 @@ void verifyConversionDataStructure() { MetricData expectedGauge1 = TestData.makeLongGauge(TraceFlags.getSampled()); List expectedSignals = Collections.singletonList(expectedGauge1); - MetricsData proto = mapToProto(signals); + ExportMetricsServiceRequest proto = mapToProto(signals); List resourceMetrics = proto.resource_metrics; assertEquals(1, resourceMetrics.size()); @@ -49,7 +49,7 @@ void verifyMultipleMetricsWithSameResourceAndScope() { MetricData expectedGauge2 = TestData.makeLongGauge(TraceFlags.getSampled()); List expectedSignals = Arrays.asList(expectedGauge1, expectedGauge2); - MetricsData proto = mapToProto(signals); + ExportMetricsServiceRequest proto = mapToProto(signals); List resourceMetrics = proto.resource_metrics; assertEquals(1, resourceMetrics.size()); @@ -78,7 +78,7 @@ void verifyMultipleMetricsWithSameResourceDifferentScope() { List signals = Arrays.asList(gauge1, gauge2); List expectedSignals = Arrays.asList(expectedGauge1, expectedGauge2); - MetricsData proto = mapToProto(signals); + ExportMetricsServiceRequest proto = mapToProto(signals); List resourceMetrics = proto.resource_metrics; assertEquals(1, resourceMetrics.size()); @@ -113,7 +113,7 @@ void verifyMultipleMetricsWithDifferentResource() { // , LONG_GAUGE_METRIC_WITH_DIFFERENT_RESOURCE); // List expectedSignals = Arrays.asList(expected); - MetricsData proto = mapToProto(signals); + ExportMetricsServiceRequest proto = mapToProto(signals); List resourceMetrics = proto.resource_metrics; assertEquals(2, resourceMetrics.size()); @@ -133,11 +133,11 @@ void verifyMultipleMetricsWithDifferentResource() { assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(expectedSignals); } - private static MetricsData mapToProto(Collection signals) { + private static ExportMetricsServiceRequest mapToProto(Collection signals) { return ProtoMetricsDataMapper.getInstance().toProto(signals); } - private static List mapFromProto(MetricsData protoData) { + private static List mapFromProto(ExportMetricsServiceRequest protoData) { return ProtoMetricsDataMapper.getInstance().fromProto(protoData); } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapperTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapperTest.java index bdd9c053c..ca325496b 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapperTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/ProtoSpansDataMapperTest.java @@ -11,10 +11,10 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.models.SpanDataImpl; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.proto.trace.v1.ScopeSpans; import io.opentelemetry.proto.trace.v1.Span; -import io.opentelemetry.proto.trace.v1.TracesData; import io.opentelemetry.sdk.trace.data.EventData; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; @@ -116,7 +116,7 @@ class ProtoSpansDataMapperTest { void verifyConversionDataStructure() { List signals = Collections.singletonList(SPAN_DATA); - TracesData proto = mapToProto(signals); + ExportTraceServiceRequest proto = mapToProto(signals); List resourceSpans = proto.resource_spans; assertEquals(1, resourceSpans.size()); @@ -130,7 +130,7 @@ void verifyConversionDataStructure() { void verifyMultipleSpansWithSameResourceAndScope() { List signals = Arrays.asList(SPAN_DATA, OTHER_SPAN_DATA); - TracesData proto = mapToProto(signals); + ExportTraceServiceRequest proto = mapToProto(signals); List resourceSpans = proto.resource_spans; assertEquals(1, resourceSpans.size()); @@ -146,7 +146,7 @@ void verifyMultipleSpansWithSameResourceAndScope() { void verifyMultipleSpansWithSameResourceDifferentScope() { List signals = Arrays.asList(SPAN_DATA, SPAN_DATA_WITH_DIFFERENT_SCOPE_SAME_RESOURCE); - TracesData proto = mapToProto(signals); + ExportTraceServiceRequest proto = mapToProto(signals); List resourceSpans = proto.resource_spans; assertEquals(1, resourceSpans.size()); @@ -166,7 +166,7 @@ void verifyMultipleSpansWithSameResourceDifferentScope() { void verifyMultipleSpansWithDifferentResource() { List signals = Arrays.asList(SPAN_DATA, SPAN_DATA_WITH_DIFFERENT_RESOURCE); - TracesData proto = mapToProto(signals); + ExportTraceServiceRequest proto = mapToProto(signals); List resourceSpans = proto.resource_spans; assertEquals(2, resourceSpans.size()); @@ -186,11 +186,11 @@ void verifyMultipleSpansWithDifferentResource() { assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(signals); } - private static TracesData mapToProto(Collection signals) { + private static ExportTraceServiceRequest mapToProto(Collection signals) { return ProtoSpansDataMapper.getInstance().toProto(signals); } - private static List mapFromProto(TracesData protoData) { + private static List mapFromProto(ExportTraceServiceRequest protoData) { return ProtoSpansDataMapper.getInstance().fromProto(protoData); } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/ByteArraySerializer.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/ByteArraySerializer.java new file mode 100644 index 000000000..7ad446729 --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/ByteArraySerializer.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; + +public final class ByteArraySerializer implements SignalSerializer { + + private final byte[] data; + + public ByteArraySerializer(byte[] data) { + this.data = data; + } + + @CanIgnoreReturnValue + @Override + public SignalSerializer initialize(Collection data) { + return null; + } + + @Override + public void writeBinaryTo(OutputStream output) throws IOException { + output.write(data); + } + + @Override + public int getBinarySerializedSize() { + return data.length; + } + + @Override + public void reset() {} +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java index 5b72c29e8..ad994c38d 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.ByteArraySerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; import io.opentelemetry.sdk.common.Clock; @@ -83,7 +84,7 @@ void closeCurrentlyWritableFile_whenItIsReadyToBeRead_andNoOtherReadableFilesAre when(clock.now()).thenReturn(MILLISECONDS.toNanos(createdFileTime)); WritableFile writableFile = folderManager.createWritableFile(); - writableFile.append(new byte[3]); + writableFile.append(new ByteArraySerializer(new byte[3])); when(clock.now()) .thenReturn(MILLISECONDS.toNanos(createdFileTime + MIN_FILE_AGE_FOR_READ_MILLIS)); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index 7f134afdc..d96b9a1bc 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.when; import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.ByteArraySerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; @@ -87,7 +88,7 @@ void whenReadingMultipleTimes_reuseReader() throws IOException { @Test void whenWritingMultipleTimes_reuseWriter() throws IOException { - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); WritableFile anotherWriter = createWritableFile(); when(folderManager.createWritableFile()).thenReturn(writableFile).thenReturn(anotherWriter); @@ -108,7 +109,7 @@ void whenAttemptingToReadAfterClosed_returnFailed() throws IOException { @Test void whenAttemptingToWriteAfterClosed_returnFalse() throws IOException { storage.close(); - assertFalse(storage.write(new byte[1])); + assertFalse(storage.write(new ByteArraySerializer(new byte[1]))); } @Test @@ -159,7 +160,7 @@ void whenEveryNewFileFoundCannotBeRead_returnContentNotAvailable() throws IOExce @Test void appendDataToFile() throws IOException { when(folderManager.createWritableFile()).thenReturn(writableFile); - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); storage.write(data); @@ -168,7 +169,7 @@ void appendDataToFile() throws IOException { @Test void whenWritingTimeoutHappens_retryWithNewFile() throws IOException { - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); WritableFile workingWritableFile = createWritableFile(); when(folderManager.createWritableFile()) .thenReturn(writableFile) @@ -182,7 +183,7 @@ void whenWritingTimeoutHappens_retryWithNewFile() throws IOException { @Test void whenThereIsNoSpaceAvailableForWriting_retryWithNewFile() throws IOException { - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); WritableFile workingWritableFile = createWritableFile(); when(folderManager.createWritableFile()) .thenReturn(writableFile) @@ -196,7 +197,7 @@ void whenThereIsNoSpaceAvailableForWriting_retryWithNewFile() throws IOException @Test void whenWritingResourceIsClosed_retryWithNewFile() throws IOException { - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); WritableFile workingWritableFile = createWritableFile(); when(folderManager.createWritableFile()) .thenReturn(writableFile) @@ -210,7 +211,7 @@ void whenWritingResourceIsClosed_retryWithNewFile() throws IOException { @Test void whenEveryAttemptToWriteFails_returnFalse() throws IOException { - byte[] data = new byte[1]; + ByteArraySerializer data = new ByteArraySerializer(new byte[1]); when(folderManager.createWritableFile()).thenReturn(writableFile); when(writableFile.append(data)).thenReturn(WritableResult.FAILED); @@ -223,7 +224,7 @@ void whenEveryAttemptToWriteFails_returnFalse() throws IOException { void whenClosing_closeWriterAndReaderIfNotNull() throws IOException { when(folderManager.createWritableFile()).thenReturn(writableFile); when(folderManager.getReadableFile()).thenReturn(readableFile); - storage.write(new byte[1]); + storage.write(new ByteArraySerializer(new byte[1])); storage.readAndProcess(processing); storage.close(); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java index a94b5fb3d..a9d0eb5da 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java @@ -30,6 +30,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import org.junit.jupiter.api.AfterEach; @@ -105,14 +106,12 @@ void tearDown() throws IOException { } private static void addFileContents(File source) throws IOException { - List items = new ArrayList<>(); - items.add(SERIALIZER.serialize(Collections.singleton(FIRST_LOG_RECORD))); - items.add(SERIALIZER.serialize(Collections.singleton(SECOND_LOG_RECORD))); - items.add(SERIALIZER.serialize(Collections.singleton(THIRD_LOG_RECORD))); - try (FileOutputStream out = new FileOutputStream(source)) { - for (byte[] item : items) { - out.write(item); + for (LogRecordData item : + Arrays.asList(FIRST_LOG_RECORD, SECOND_LOG_RECORD, THIRD_LOG_RECORD)) { + SERIALIZER.initialize(Collections.singleton(item)); + SERIALIZER.writeBinaryTo(out); + SERIALIZER.reset(); } } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java index 3df37eb4c..cae1e9f64 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java @@ -14,6 +14,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.ByteArraySerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import io.opentelemetry.sdk.common.Clock; @@ -71,8 +72,8 @@ void hasExpired_whenWriteAgeHasExpired() { void appendDataInNewLines_andIncreaseSize() throws IOException { byte[] line1 = getByteArrayLine("First line"); byte[] line2 = getByteArrayLine("Second line"); - writableFile.append(line1); - writableFile.append(line2); + writableFile.append(new ByteArraySerializer(line1)); + writableFile.append(new ByteArraySerializer(line2)); writableFile.close(); List lines = getWrittenLines(); @@ -85,9 +86,11 @@ void appendDataInNewLines_andIncreaseSize() throws IOException { @Test void whenAppendingData_andNotEnoughSpaceIsAvailable_closeAndReturnFailed() throws IOException { - assertEquals(WritableResult.SUCCEEDED, writableFile.append(new byte[MAX_FILE_SIZE])); + assertEquals( + WritableResult.SUCCEEDED, + writableFile.append(new ByteArraySerializer(new byte[MAX_FILE_SIZE]))); - assertEquals(WritableResult.FAILED, writableFile.append(new byte[1])); + assertEquals(WritableResult.FAILED, writableFile.append(new ByteArraySerializer(new byte[1]))); assertEquals(1, getWrittenLines().size()); assertEquals(MAX_FILE_SIZE, writableFile.getSize()); @@ -95,21 +98,21 @@ void whenAppendingData_andNotEnoughSpaceIsAvailable_closeAndReturnFailed() throw @Test void whenAppendingData_andHasExpired_closeAndReturnExpiredStatus() throws IOException { - writableFile.append(new byte[2]); + writableFile.append(new ByteArraySerializer(new byte[2])); when(clock.now()) .thenReturn(MILLISECONDS.toNanos(CREATED_TIME_MILLIS + MAX_FILE_AGE_FOR_WRITE_MILLIS)); - assertEquals(WritableResult.FAILED, writableFile.append(new byte[1])); + assertEquals(WritableResult.FAILED, writableFile.append(new ByteArraySerializer(new byte[1]))); assertEquals(1, getWrittenLines().size()); } @Test void whenAppendingData_andIsAlreadyClosed_returnFailedStatus() throws IOException { - writableFile.append(new byte[1]); + writableFile.append(new ByteArraySerializer(new byte[1])); writableFile.close(); - assertEquals(WritableResult.FAILED, writableFile.append(new byte[2])); + assertEquals(WritableResult.FAILED, writableFile.append(new ByteArraySerializer(new byte[2]))); } private static byte[] getByteArrayLine(String line) { diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java index adfc8fb2f..69186f812 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java @@ -12,6 +12,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.DelimitedProtoStreamReader; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.StreamReader; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -20,7 +21,17 @@ @SuppressWarnings("unchecked") public abstract class BaseSignalSerializerTest { protected byte[] serialize(SIGNAL_SDK_ITEM... items) { - return getSerializer().serialize(Arrays.asList(items)); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + SignalSerializer serializer = getSerializer(); + try { + serializer.initialize(Arrays.asList(items)); + serializer.writeBinaryTo(byteArrayOutputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + serializer.reset(); + } + return byteArrayOutputStream.toByteArray(); } protected List deserialize(byte[] source) {