From 587619096707ab7c6c6c7c08ce2750e0db5a9e5b Mon Sep 17 00:00:00 2001 From: Tyler Benson <734411+tylerbenson@users.noreply.github.com> Date: Wed, 27 Aug 2025 11:11:16 -0400 Subject: [PATCH 1/3] Make the disk buffered exporting more efficient The previous implementation would load the protobuf encoded bytes from disk, deserialize it, then send it through the delegate exporter pipeline which would likely re-serialize it into the exact same format. This PR reduces the CPU burden by providing an optional alternate path which bypasses the exporter pipeline. Instead it takes the serialized bytes from disk and sends it straight to an HttpExporter or GrpcExporter, which only takes a `Marshaler` class allowing the serialized bytes to be passed straight through. Since these exporters like to know the number of serialized elements that are being passed in, I added a function (with the help of Claude) that counts the serialized elements within the protobuf encoded byte array. A future optimization would be to allow streaming the bytes directly from disk without reading them back into a byte array. --- .../otel.spotless-conventions.gradle.kts | 1 + disk-buffering/build.gradle.kts | 2 + .../buffering/LogRecordFromDiskExporter.java | 35 ++- .../buffering/MetricFromDiskExporter.java | 35 ++- .../disk/buffering/SpanFromDiskExporter.java | 35 ++- .../exporter/FromDiskExporterBuilder.java | 65 ++-- .../exporter/FromDiskExporterImpl.java | 65 ++-- .../exporter/ProtoByteArrayMarshaler.java | 24 ++ .../DeserializationException.java | 4 + .../internal/utils/ProtobufTools.java | 106 +++++++ .../buffering/FromDiskExporterImplTest.java | 8 +- .../disk/buffering/HttpIntegrationTest.java | 284 ++++++++++++++++++ .../disk/buffering/IntegrationTest.java | 18 +- .../internal/utils/ProtobufToolsTest.java | 215 +++++++++++++ 14 files changed, 823 insertions(+), 74 deletions(-) create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java create mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java create mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufToolsTest.java diff --git a/buildSrc/src/main/kotlin/otel.spotless-conventions.gradle.kts b/buildSrc/src/main/kotlin/otel.spotless-conventions.gradle.kts index b1c39dcd0..ea50e8f59 100644 --- a/buildSrc/src/main/kotlin/otel.spotless-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/otel.spotless-conventions.gradle.kts @@ -5,6 +5,7 @@ plugins { spotless { java { googleJavaFormat() + toggleOffOn() licenseHeaderFile(rootProject.file("buildscripts/spotless.license.java"), "(package|import|public|// Includes work from:)") target("src/**/*.java") } diff --git a/disk-buffering/build.gradle.kts b/disk-buffering/build.gradle.kts index 0a883cf73..89570508c 100644 --- a/disk-buffering/build.gradle.kts +++ b/disk-buffering/build.gradle.kts @@ -22,6 +22,8 @@ dependencies { annotationProcessor("com.google.auto.value:auto-value") testImplementation("org.mockito:mockito-inline") testImplementation("io.opentelemetry:opentelemetry-sdk-testing") + testImplementation("io.opentelemetry:opentelemetry-exporter-otlp") + testImplementation("io.opentelemetry:opentelemetry-exporter-sender-okhttp") protos("io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha@jar") } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java index c26a383d6..0383b8f98 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java @@ -9,6 +9,10 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; import java.io.IOException; @@ -16,19 +20,38 @@ public class LogRecordFromDiskExporter implements FromDiskExporter { - private final FromDiskExporterImpl delegate; + private final FromDiskExporterImpl delegate; public static LogRecordFromDiskExporter create(LogRecordExporter exporter, Storage storage) throws IOException { - FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder(storage) - .setDeserializer(SignalDeserializer.ofLogs()) - .setExportFunction(exporter::export) + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.logs) + .setExportFunction(exporter::export, SignalDeserializer.ofLogs()) .build(); return new LogRecordFromDiskExporter(delegate); } - private LogRecordFromDiskExporter(FromDiskExporterImpl delegate) { + public static LogRecordFromDiskExporter create(HttpExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.logs) + .setExporter(exporter) + .build(); + return new LogRecordFromDiskExporter(delegate); + } + + // Private because untested. + @SuppressWarnings("unused") + private static LogRecordFromDiskExporter create(GrpcExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.logs) + .setExporter(exporter) + .build(); + return new LogRecordFromDiskExporter(delegate); + } + + private LogRecordFromDiskExporter(FromDiskExporterImpl delegate) { this.delegate = delegate; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java index 8bb4f3dcd..369da882e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java @@ -9,6 +9,10 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.io.IOException; @@ -16,19 +20,38 @@ public class MetricFromDiskExporter implements FromDiskExporter { - private final FromDiskExporterImpl delegate; + private final FromDiskExporterImpl delegate; public static MetricFromDiskExporter create(MetricExporter exporter, Storage storage) throws IOException { - FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder(storage) - .setDeserializer(SignalDeserializer.ofMetrics()) - .setExportFunction(exporter::export) + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.metrics) + .setExportFunction(exporter::export, SignalDeserializer.ofMetrics()) .build(); return new MetricFromDiskExporter(delegate); } - private MetricFromDiskExporter(FromDiskExporterImpl delegate) { + public static MetricFromDiskExporter create(HttpExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.metrics) + .setExporter(exporter) + .build(); + return new MetricFromDiskExporter(delegate); + } + + // Private because untested. + @SuppressWarnings("unused") + private static MetricFromDiskExporter create(GrpcExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.metrics) + .setExporter(exporter) + .build(); + return new MetricFromDiskExporter(delegate); + } + + private MetricFromDiskExporter(FromDiskExporterImpl delegate) { this.delegate = delegate; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java index e3c7992ba..72611968c 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java @@ -9,6 +9,10 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.io.IOException; @@ -16,19 +20,38 @@ public class SpanFromDiskExporter implements FromDiskExporter { - private final FromDiskExporterImpl delegate; + private final FromDiskExporterImpl delegate; public static SpanFromDiskExporter create(SpanExporter exporter, Storage storage) throws IOException { - FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder(storage) - .setDeserializer(SignalDeserializer.ofSpans()) - .setExportFunction(exporter::export) + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.spans) + .setExportFunction(exporter::export, SignalDeserializer.ofSpans()) .build(); return new SpanFromDiskExporter(delegate); } - private SpanFromDiskExporter(FromDiskExporterImpl delegate) { + public static SpanFromDiskExporter create(HttpExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.spans) + .setExporter(exporter) + .build(); + return new SpanFromDiskExporter(delegate); + } + + // Private because untested. + @SuppressWarnings("unused") + private static SpanFromDiskExporter create(GrpcExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.spans) + .setExporter(exporter) + .build(); + return new SpanFromDiskExporter(delegate); + } + + private SpanFromDiskExporter(FromDiskExporterImpl delegate) { this.delegate = delegate; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java index a91ded1f3..5be99ed03 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java @@ -5,51 +5,80 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporter; -import static java.util.Collections.emptyList; - import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.IOException; import java.util.Collection; +import java.util.List; +import java.util.function.BiFunction; import java.util.function.Function; -import org.jetbrains.annotations.NotNull; +import java.util.logging.Logger; -public class FromDiskExporterBuilder { +public class FromDiskExporterBuilder { - private SignalDeserializer serializer = noopDeserializer(); + private final DebugLogger logger; private final Storage storage; + private final SignalTypes signalType; - private Function, CompletableResultCode> exportFunction = - x -> CompletableResultCode.ofFailure(); + private BiFunction exportFunction = + (x, i) -> CompletableResultCode.ofFailure(); - public FromDiskExporterBuilder(Storage storage) { + public FromDiskExporterBuilder(Storage storage, SignalTypes signalType) { if (storage == null) { throw new NullPointerException("Storage cannot be null"); } this.storage = storage; + this.signalType = signalType; + this.logger = + DebugLogger.wrap( + Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled()); } - @NotNull - private static SignalDeserializer noopDeserializer() { - return x -> emptyList(); + @CanIgnoreReturnValue + public FromDiskExporterBuilder setExportFunction( + Function, CompletableResultCode> exportFunction, + SignalDeserializer deserializer) { + if (!deserializer.signalType().equals(signalType.name())) { + throw new IllegalArgumentException( + deserializer.signalType() + " does not match " + signalType); + } + this.exportFunction = + (exportRequest, itemCount) -> { + try { + List telemetry = deserializer.deserialize(exportRequest.getBytes()); + return exportFunction.apply(telemetry); + } catch (IOException e) { + return CompletableResultCode.ofExceptionalFailure(e); + } + }; + return this; } + /** + * The provided HttpExporter should _NOT_ be configured to send JSON. The data is serialized to + * disk in protobuf format and sent directly to the provided exporter as a serialized payload. + */ @CanIgnoreReturnValue - public FromDiskExporterBuilder setDeserializer(SignalDeserializer serializer) { - this.serializer = serializer; + public FromDiskExporterBuilder setExporter(HttpExporter exporter) { + // Any way we can assert the exporter is not configured for JSON? + this.exportFunction = exporter::export; return this; } @CanIgnoreReturnValue - public FromDiskExporterBuilder setExportFunction( - Function, CompletableResultCode> exportFunction) { - this.exportFunction = exportFunction; + public FromDiskExporterBuilder setExporter(GrpcExporter exporter) { + this.exportFunction = exporter::export; return this; } - public FromDiskExporterImpl build() throws IOException { - return new FromDiskExporterImpl<>(serializer, exportFunction, storage); + public FromDiskExporterImpl build() throws IOException { + return new FromDiskExporterImpl(exportFunction, storage, signalType); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java index 5ba5c2390..850673340 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java @@ -6,43 +6,46 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; +import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.marshal.ProtoFieldInfo; +import io.opentelemetry.proto.collector.logs.v1.internal.ExportLogsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.internal.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.internal.ExportTraceServiceRequest; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.IOException; -import java.util.Collection; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.logging.Logger; /** * Signal-type generic class that can read telemetry previously buffered on disk and send it to * another delegated exporter. */ -public final class FromDiskExporterImpl implements FromDiskExporter { +public final class FromDiskExporterImpl implements FromDiskExporter { private final DebugLogger logger; private final Storage storage; - private final SignalDeserializer deserializer; - private final Function, CompletableResultCode> exportFunction; + private final SignalTypes signalType; + private final BiFunction exportFunction; FromDiskExporterImpl( - SignalDeserializer deserializer, - Function, CompletableResultCode> exportFunction, - Storage storage) { - this.deserializer = deserializer; + BiFunction exportFunction, + Storage storage, + SignalTypes signalType) { this.exportFunction = exportFunction; this.storage = storage; + this.signalType = signalType; this.logger = DebugLogger.wrap( Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled()); } - public static FromDiskExporterBuilder builder(Storage storage) { - return new FromDiskExporterBuilder<>(storage); + public static FromDiskExporterBuilder builder(Storage storage, SignalTypes signalType) { + return new FromDiskExporterBuilder<>(storage, signalType); } /** @@ -56,25 +59,37 @@ public static FromDiskExporterBuilder builder(Storage storage) { */ @Override public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException { - logger.log("Attempting to export " + deserializer.signalType() + " batch from disk."); + logger.log("Attempting to export " + signalType.name() + " batch from disk."); ReadableResult result = storage.readAndProcess( bytes -> { - logger.log( - "Read " - + bytes.length - + " " - + deserializer.signalType() - + " bytes from storage."); + logger.log("Read " + bytes.length + " " + signalType.name() + " bytes from storage."); + ProtoFieldInfo field; + switch (signalType) { + case metrics: + field = ExportMetricsServiceRequest.RESOURCE_METRICS; + break; + case logs: + field = ExportLogsServiceRequest.RESOURCE_LOGS; + break; + case spans: + field = ExportTraceServiceRequest.RESOURCE_SPANS; + break; + default: + throw new IllegalStateException("Unsupported signal type: " + signalType); + } + int itemCount = 0; try { - List telemetry = deserializer.deserialize(bytes); - logger.log( - "Now exporting batch of " + telemetry.size() + " " + deserializer.signalType()); - CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit); - return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER; + itemCount = ProtobufTools.countRepeatedField(bytes, field.getFieldNumber()); } catch (DeserializationException e) { return ProcessResult.CONTENT_INVALID; } + logger.log("Now exporting batch of " + itemCount + " " + signalType.name()); + CompletableResultCode join = + exportFunction + .apply(new ProtoByteArrayMarshaler(bytes), itemCount) + .join(timeout, unit); + return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER; }); return result == ReadableResult.SUCCEEDED; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java new file mode 100644 index 000000000..1359afeb1 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java @@ -0,0 +1,24 @@ +package io.opentelemetry.contrib.disk.buffering.internal.exporter; + +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.exporter.internal.marshal.Serializer; +import java.io.IOException; + +class ProtoByteArrayMarshaler extends MarshalerWithSize { + + private final byte[] bytes; + + ProtoByteArrayMarshaler(byte[] bytes) { + super(bytes.length); + this.bytes = bytes; + } + + @Override + protected void writeTo(Serializer output) throws IOException { + output.writeSerializedMessage(bytes, ""); + } + + public byte[] getBytes() { + return bytes; + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java index f8e1d9729..29d30e83f 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java @@ -12,4 +12,8 @@ public class DeserializationException extends IOException { public DeserializationException(Throwable cause) { super(cause); } + + public DeserializationException(String message) { + super(message); + } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java index 2788f1d72..b3d208ee4 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java @@ -6,12 +6,19 @@ package io.opentelemetry.contrib.disk.buffering.internal.utils; import com.squareup.wire.ProtoAdapter; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public final class ProtobufTools { + // Wire types + private static final int WIRETYPE_VARINT = 0; + private static final int WIRETYPE_FIXED64 = 1; + private static final int WIRETYPE_LENGTH_DELIMITED = 2; + private static final int WIRETYPE_FIXED32 = 5; + private ProtobufTools() {} public static void writeRawVarint32(int value, OutputStream out) throws IOException { @@ -56,4 +63,103 @@ public static int readRawVarint32(int firstByte, InputStream input) throws IOExc public static int toUnsignedInt(byte x) { return ((int) x) & 0xff; } + + /** + * Counts repeated field occurrences in a protobuf-encoded byte array. Handles both packed and + * unpacked encodings. + */ + public static int countRepeatedField(byte[] data, int targetField) + throws DeserializationException { + int i = 0; + int count = 0; + + while (i < data.length) { + // Read tag (varint) + long[] tagAndLen = readVarint(data, i); + long tag = tagAndLen[0]; + int n = (int) tagAndLen[1]; + if (tag == 0) { + break; // end marker + } + + i += n; + int fieldNumber = (int) (tag >>> 3); + int wireType = (int) (tag & 0x07); + + if (fieldNumber == targetField) { + switch (wireType) { + case WIRETYPE_VARINT: + long[] v = readVarint(data, i); + i += (int) v[1]; + count++; + break; + case WIRETYPE_FIXED64: + i += 8; + count++; + break; + case WIRETYPE_FIXED32: + i += 4; + count++; + break; + case WIRETYPE_LENGTH_DELIMITED: + long[] lres = readVarint(data, i); + int len = (int) lres[0]; + int ln = (int) lres[1]; + i += ln; + + // Each length-delimited field occurrence counts as one element + // (message, string, bytes, or unpacked repeated field element) + count++; + i += len; + break; + default: + throw new DeserializationException("Unsupported wire type: " + wireType); + } + } else { + // skip unknown field + i = skipField(data, i, wireType); + } + } + return count; + } + + /** Reads a varint starting at offset. Returns [value, lengthInBytes]. */ + private static long[] readVarint(byte[] data, int offset) throws DeserializationException { + long result = 0; + int shift = 0; + int i = offset; + while (i < data.length) { + byte b = data[i++]; + result |= (long) (b & 0x7F) << shift; + if ((b & 0x80) == 0) { + return new long[] {result, i - offset}; + } + shift += 7; + if (shift > 64) { + throw new DeserializationException("Varint too long"); + } + } + throw new DeserializationException("Truncated varint"); + } + + /** Skips a field of given wire type. */ + private static int skipField(byte[] data, int offset, int wireType) + throws DeserializationException { + switch (wireType) { + case WIRETYPE_VARINT: + long[] v = readVarint(data, offset); + return offset + (int) v[1]; + case WIRETYPE_FIXED64: + return offset + 8; + case WIRETYPE_FIXED32: + return offset + 4; + case WIRETYPE_LENGTH_DELIMITED: + long[] lres = readVarint(data, offset); + int len = (int) lres[0]; + int ln = (int) lres[1]; + return offset + ln + len; + default: + throw new DeserializationException("Unsupported wire type: " + wireType); + } + } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java index e7995c675..180ec9728 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java @@ -38,7 +38,7 @@ class FromDiskExporterImplTest { private SpanExporter wrapped; private SignalDeserializer deserializer; private Clock clock; - private FromDiskExporterImpl exporter; + private FromDiskExporterImpl exporter; private final List deserializedData = Collections.emptyList(); @TempDir File rootDir; private static final String STORAGE_FOLDER_NAME = SignalTypes.spans.name(); @@ -51,9 +51,8 @@ void setUp() throws IOException { wrapped = mock(); exporter = FromDiskExporterImpl.builder( - TestData.getStorage(rootDir, SignalTypes.spans, clock)) - .setDeserializer(deserializer) - .setExportFunction(wrapped::export) + TestData.getStorage(rootDir, SignalTypes.spans, clock), SignalTypes.spans) + .setExportFunction(wrapped::export, deserializer) .build(); } @@ -104,6 +103,7 @@ private void createDummyFile() throws IOException { private void setUpSerializer() throws DeserializationException { deserializer = mock(); when(deserializer.deserialize(any())).thenReturn(deserializedData); + when(deserializer.signalType()).thenReturn(SignalTypes.spans.name()); } private static Clock createClockMock() { diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java new file mode 100644 index 000000000..8415e3045 --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java @@ -0,0 +1,284 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterBuilder; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; +import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.StandardComponentId; +import io.opentelemetry.sdk.logs.SdkLoggerProvider; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class HttpIntegrationTest { + private Tracer tracer; + private SdkMeterProvider meterProvider; + private Meter meter; + private Logger logger; + private Clock clock; + @TempDir File rootDir; + private static final long INITIAL_TIME_IN_MILLIS = 1000; + private static final long NOW_NANOS = MILLISECONDS.toNanos(INITIAL_TIME_IN_MILLIS); + private static final String TRACES_PATH = "/v1/traces"; + private static final String METRICS_PATH = "/v1/metrics"; + private static final String LOGS_PATH = "/v1/logs"; + private StorageConfiguration storageConfig; + private Storage spanStorage; + private final List exportedData = new CopyOnWriteArrayList<>(); + private final CountDownLatch latch = new CountDownLatch(1); + + final HttpHandler tracesHandler = + exchange -> { + InputStream requestBody = exchange.getRequestBody(); + ExportTraceServiceRequest otlpData = ExportTraceServiceRequest.ADAPTER.decode(requestBody); + exportedData.addAll(otlpData.resource_spans); + latch.countDown(); + String response = "OK"; + exchange.sendResponseHeaders(200, response.length()); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(response.getBytes(UTF_8)); + responseBody.close(); + }; + + final HttpHandler logsHandler = + exchange -> { + InputStream requestBody = exchange.getRequestBody(); + ExportLogsServiceRequest otlpData = ExportLogsServiceRequest.ADAPTER.decode(requestBody); + exportedData.addAll(otlpData.resource_logs); + latch.countDown(); + String response = "OK"; + exchange.sendResponseHeaders(200, response.length()); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(response.getBytes(UTF_8)); + responseBody.close(); + }; + + final HttpHandler metricsHandler = + exchange -> { + InputStream requestBody = exchange.getRequestBody(); + ExportMetricsServiceRequest otlpData = + ExportMetricsServiceRequest.ADAPTER.decode(requestBody); + exportedData.addAll(otlpData.resource_metrics); + latch.countDown(); + String response = "OK"; + exchange.sendResponseHeaders(200, response.length()); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(response.getBytes(UTF_8)); + responseBody.close(); + }; + private HttpServer mockServer; + private int port; + + @BeforeEach + void setUp() throws IOException { + clock = mock(); + storageConfig = + StorageConfiguration.builder().setRootDir(rootDir).setDebugEnabled(true).build(); + spanStorage = + Storage.builder(SignalTypes.spans) + .setStorageConfiguration(storageConfig) + .setStorageClock(clock) + .build(); + + when(clock.now()).thenReturn(NOW_NANOS); + + mockServer = HttpServer.create(new InetSocketAddress(0), 0); + port = mockServer.getAddress().getPort(); + mockServer.createContext(TRACES_PATH, tracesHandler); + mockServer.createContext(LOGS_PATH, logsHandler); + mockServer.createContext(METRICS_PATH, metricsHandler); + mockServer.start(); + + // Setting up spans + OtlpHttpSpanExporter spanExporter = + OtlpHttpSpanExporter.builder() + .setEndpoint("http://localhost:" + port + TRACES_PATH) + .build(); + ToDiskExporter toDiskSpanExporter = + buildToDiskExporter(SignalSerializer.ofSpans(), spanExporter::export); + SpanToDiskExporter spanToDiskExporter = new SpanToDiskExporter(toDiskSpanExporter); + tracer = createTracerProvider(spanToDiskExporter).get("SpanInstrumentationScope"); + + // Setting up metrics + OtlpHttpMetricExporter metricExporter = + OtlpHttpMetricExporter.builder() + .setEndpoint("http://localhost:" + port + METRICS_PATH) + .build(); + ToDiskExporter toDiskMetricExporter = + buildToDiskExporter(SignalSerializer.ofMetrics(), metricExporter::export); + MetricToDiskExporter metricToDiskExporter = + new MetricToDiskExporter(toDiskMetricExporter, metricExporter); + meterProvider = createMeterProvider(metricToDiskExporter); + meter = meterProvider.get("MetricInstrumentationScope"); + + // Setting up logs + OtlpHttpLogRecordExporter logRecordExporter = + OtlpHttpLogRecordExporter.builder() + .setEndpoint("http://localhost:" + port + LOGS_PATH) + .build(); + ToDiskExporter toDiskLogExporter = + buildToDiskExporter(SignalSerializer.ofLogs(), logRecordExporter::export); + LogRecordToDiskExporter logToDiskExporter = new LogRecordToDiskExporter(toDiskLogExporter); + logger = createLoggerProvider(logToDiskExporter).get("LogInstrumentationScope"); + } + + @AfterEach + void tearDown() throws IOException { + spanStorage.close(); + mockServer.stop(0); + } + + @NotNull + private ToDiskExporter buildToDiskExporter( + SignalSerializer serializer, Function, CompletableResultCode> exporter) { + return ToDiskExporter.builder(spanStorage) + .setSerializer(serializer) + .setExportFunction(exporter) + .build(); + } + + @NotNull + private static FromDiskExporterImpl buildFromDiskExporter( + FromDiskExporterBuilder builder, + StandardComponentId.ExporterType exporterType, + String endpoint) + throws IOException { + return builder.setExporter(new HttpExporterBuilder<>(exporterType, endpoint).build()).build(); + } + + @Test + void verifySpansIntegration() throws IOException, InterruptedException { + Span span = tracer.spanBuilder("Span name").startSpan(); + span.end(); + FromDiskExporterImpl fromDiskExporter = + buildFromDiskExporter( + FromDiskExporterImpl.builder(spanStorage, SignalTypes.spans), + StandardComponentId.ExporterType.OTLP_HTTP_SPAN_EXPORTER, + "http://localhost:" + port + TRACES_PATH); + assertExporter(fromDiskExporter); + } + + @Test + void verifyMetricsIntegration() throws IOException, InterruptedException { + meter.counterBuilder("Counter").build().add(2); + meterProvider.forceFlush(); + + FromDiskExporterImpl fromDiskExporter = + buildFromDiskExporter( + FromDiskExporterImpl.builder(spanStorage, SignalTypes.metrics), + StandardComponentId.ExporterType.OTLP_HTTP_METRIC_EXPORTER, + "http://localhost:" + port + METRICS_PATH); + assertExporter(fromDiskExporter); + } + + @Test + void verifyLogRecordsIntegration() throws IOException, InterruptedException { + logger.logRecordBuilder().setBody("I'm a log!").emit(); + + FromDiskExporterImpl fromDiskExporter = + buildFromDiskExporter( + FromDiskExporterImpl.builder(spanStorage, SignalTypes.logs), + StandardComponentId.ExporterType.OTLP_HTTP_LOG_EXPORTER, + "http://localhost:" + port + LOGS_PATH); + assertExporter(fromDiskExporter); + } + + private void assertExporter(FromDiskExporterImpl exporter) + throws IOException, InterruptedException { + // Verify no data has been received in the original exporter until this point. + assertEquals(0, exportedData.size()); + + // Go to the future when we can read the stored items. + fastForwardTimeByMillis(storageConfig.getMinFileAgeForReadMillis()); + + // Read and send stored data. + assertTrue(exporter.exportStoredBatch(1, TimeUnit.SECONDS)); + assertTrue(latch.await(10, SECONDS)); + + // Now the data must have been delegated to the original exporter. + assertEquals(1, exportedData.size()); + + // Bonus: Try to read again, no more data should be available. + assertFalse(exporter.exportStoredBatch(1, TimeUnit.SECONDS)); + assertEquals(1, exportedData.size()); + } + + @SuppressWarnings("DirectInvocationOnMock") + private void fastForwardTimeByMillis(long milliseconds) { + when(clock.now()).thenReturn(NOW_NANOS + MILLISECONDS.toNanos(milliseconds)); + } + + private static SdkTracerProvider createTracerProvider(SpanExporter exporter) { + return SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .build(); + } + + private static SdkMeterProvider createMeterProvider(MetricExporter exporter) { + return SdkMeterProvider.builder() + .registerMetricReader(PeriodicMetricReader.create(exporter)) + .build(); + } + + private static SdkLoggerProvider createLoggerProvider(LogRecordExporter exporter) { + return SdkLoggerProvider.builder() + .addLogRecordProcessor(SimpleLogRecordProcessor.create(exporter)) + .build(); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index b46cba12f..a2a61acdb 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -119,21 +119,21 @@ private ToDiskExporter buildToDiskExporter( } @NotNull - private static FromDiskExporterImpl buildFromDiskExporter( + private static FromDiskExporterImpl buildFromDiskExporter( FromDiskExporterBuilder builder, Function, CompletableResultCode> exportFunction, SignalDeserializer deserializer) throws IOException { - return builder.setExportFunction(exportFunction).setDeserializer(deserializer).build(); + return builder.setExportFunction(exportFunction, deserializer).build(); } @Test void verifySpansIntegration() throws IOException { Span span = tracer.spanBuilder("Span name").startSpan(); span.end(); - FromDiskExporterImpl fromDiskExporter = + FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(spanStorage), + FromDiskExporterImpl.builder(spanStorage, SignalTypes.spans), memorySpanExporter::export, SignalDeserializer.ofSpans()); assertExporter(fromDiskExporter, () -> memorySpanExporter.getFinishedSpanItems().size()); @@ -144,9 +144,9 @@ void verifyMetricsIntegration() throws IOException { meter.counterBuilder("Counter").build().add(2); meterProvider.forceFlush(); - FromDiskExporterImpl fromDiskExporter = + FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(spanStorage), + FromDiskExporterImpl.builder(spanStorage, SignalTypes.metrics), memoryMetricExporter::export, SignalDeserializer.ofMetrics()); assertExporter(fromDiskExporter, () -> memoryMetricExporter.getFinishedMetricItems().size()); @@ -156,16 +156,16 @@ void verifyMetricsIntegration() throws IOException { void verifyLogRecordsIntegration() throws IOException { logger.logRecordBuilder().setBody("I'm a log!").emit(); - FromDiskExporterImpl fromDiskExporter = + FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(spanStorage), + FromDiskExporterImpl.builder(spanStorage, SignalTypes.logs), memoryLogRecordExporter::export, SignalDeserializer.ofLogs()); assertExporter( fromDiskExporter, () -> memoryLogRecordExporter.getFinishedLogRecordItems().size()); } - private void assertExporter(FromDiskExporterImpl exporter, Supplier finishedItems) + private void assertExporter(FromDiskExporterImpl exporter, Supplier finishedItems) throws IOException { // Verify no data has been received in the original exporter until this point. assertEquals(0, finishedItems.get()); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufToolsTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufToolsTest.java new file mode 100644 index 000000000..bc6477143 --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufToolsTest.java @@ -0,0 +1,215 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.utils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; +import org.junit.jupiter.api.Test; + +class ProtobufToolsTest { + + @Test + void countRepeatedField_emptyData() throws DeserializationException { + byte[] data = {}; + int count = ProtobufTools.countRepeatedField(data, 1); + assertThat(count).isEqualTo(0); + } + + @Test + void countRepeatedField_singleVarintField() throws DeserializationException { + // Field 1, wire type 0 (varint), value 42 + byte[] data = {0x08, 0x2A}; // tag=8 (field 1, wire type 0), value=42 + int count = ProtobufTools.countRepeatedField(data, 1); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_multipleVarintFields() throws DeserializationException { + // Field 1 appears 3 times with different values + byte[] data = { + 0x08, 0x01, // field 1, value 1 + 0x08, 0x02, // field 1, value 2 + 0x08, 0x03 // field 1, value 3 + }; + int count = ProtobufTools.countRepeatedField(data, 1); + assertThat(count).isEqualTo(3); + } + + @Test + void countRepeatedField_singleFixed32Field() throws DeserializationException { + // Field 2, wire type 5 (fixed32), value 0x12345678 + byte[] data = { + 0x15, 0x78, 0x56, 0x34, 0x12 + }; // tag=21 (field 2, wire type 5), little-endian value + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_multipleFixed32Fields() throws DeserializationException { + // Field 2 appears twice + byte[] data = { + // spotless:off + 0x15, 0x78, 0x56, 0x34, 0x12, // field 2, value 0x12345678 + 0x15, (byte) 0x87, 0x65, 0x43, 0x21 // field 2, value 0x21436587 + // spotless:on + }; + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(2); + } + + @Test + void countRepeatedField_singleFixed64Field() throws DeserializationException { + // Field 3, wire type 1 (fixed64) + byte[] data = { + 0x19, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08 + }; // tag=25 (field 3, wire type 1) + int count = ProtobufTools.countRepeatedField(data, 3); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_multipleLengthDelimitedFields() throws DeserializationException { + // Field 4 appears twice as length-delimited (strings/messages) + byte[] data = { + 0x22, 0x05, 'h', 'e', 'l', 'l', 'o', // field 4, length 5, "hello" + 0x22, 0x05, 'w', 'o', 'r', 'l', 'd' // field 4, length 5, "world" + }; + int count = ProtobufTools.countRepeatedField(data, 4); + assertThat(count).isEqualTo(2); + } + + @Test + void countRepeatedField_targetFieldNotPresent() throws DeserializationException { + // Only field 1 is present, but we're looking for field 2 + byte[] data = {0x08, 0x2A}; // field 1, value 42 + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(0); + } + + @Test + void countRepeatedField_skipOtherFields() throws DeserializationException { + // Multiple fields present, but we only count field 2 + byte[] data = { + // spotless:off + 0x08, 0x01, // field 1, varint + 0x15, 0x78, 0x56, 0x34, 0x12, // field 2, fixed32 (target) + 0x1A, 0x05, 'h', 'e', 'l', 'l', 'o', // field 3, string + 0x15, (byte) 0x87, 0x65, 0x43, 0x21, // field 2, fixed32 (target) + 0x20, 0x64 // field 4, varint + // spotless:on + }; + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(2); + } + + @Test + void countRepeatedField_largeVarint() throws DeserializationException { + // Test with a large varint that uses multiple bytes + byte[] data = { + 0x08, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0x0F + }; // field 1, large varint + int count = ProtobufTools.countRepeatedField(data, 1); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_emptyLengthDelimited() throws DeserializationException { + // Field with empty length-delimited content + byte[] data = {0x12, 0x00}; // field 2, length 0 + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_nestedMessage() throws DeserializationException { + // Field containing a nested message with its own fields + byte[] nestedMessage = {0x08, 0x01, 0x10, 0x02}; // inner field 1=1, inner field 2=2 + byte[] data = new byte[2 + nestedMessage.length]; + data[0] = 0x1A; // field 3, wire type 2 (length-delimited) + data[1] = (byte) nestedMessage.length; + System.arraycopy(nestedMessage, 0, data, 2, nestedMessage.length); + + int count = ProtobufTools.countRepeatedField(data, 3); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_truncatedVarint() { + // Incomplete varint at end of data + byte[] data = {0x08, (byte) 0xFF}; // field 1, incomplete varint (missing continuation) + assertThatThrownBy(() -> ProtobufTools.countRepeatedField(data, 1)) + .isInstanceOf(DeserializationException.class) + .hasMessageContaining("Truncated varint"); + } + + @Test + void countRepeatedField_truncatedLengthDelimited() throws DeserializationException { + // Length-delimited field with length > remaining data + byte[] data = {0x12, 0x10, 0x01, 0x02}; // field 2, claims length 16 but only 2 bytes follow + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_invalidWireType() { + // Wire type 6 is not valid + byte[] data = {0x0E, 0x01}; // field 1, wire type 6 (invalid) + assertThatThrownBy(() -> ProtobufTools.countRepeatedField(data, 1)) + .isInstanceOf(DeserializationException.class) + .hasMessageContaining("Unsupported wire type: 6"); + } + + @Test + void countRepeatedField_startGroupWireType() { + // Wire type 3 (START_GROUP) is no longer supported + byte[] data = {0x0B}; // field 1, wire type 3 (START_GROUP) + assertThatThrownBy(() -> ProtobufTools.countRepeatedField(data, 1)) + .isInstanceOf(DeserializationException.class) + .hasMessageContaining("Unsupported wire type: 3"); + } + + @Test + void countRepeatedField_endGroupWireType() { + // Wire type 4 (END_GROUP) is no longer supported + byte[] data = {0x0C}; // field 1, wire type 4 (END_GROUP) + assertThatThrownBy(() -> ProtobufTools.countRepeatedField(data, 1)) + .isInstanceOf(DeserializationException.class) + .hasMessageContaining("Unsupported wire type: 4"); + } + + @Test + void countRepeatedField_realWorldExample() throws DeserializationException { + // Simulate a more realistic protobuf message structure + // message ExportRequest { + // repeated SpanData spans = 1; + // string service_name = 2; + // } + // With 3 spans and a service name + byte[] data = { + // spotless:off + // First span (field 1, length-delimited) + 0x0A, 0x04, 0x08, 0x01, 0x10, 0x02, // span 1: some inner fields + // Second span (field 1, length-delimited) + 0x0A, 0x04, 0x08, 0x03, 0x10, 0x04, // span 2: some inner fields + // Service name (field 2, length-delimited) + 0x12, 0x07, 's', 'e', 'r', 'v', 'i', 'c', 'e', + // Third span (field 1, length-delimited) + 0x0A, 0x04, 0x08, 0x05, 0x10, 0x06 // span 3: some inner fields + // spotless:on + }; + + // Count spans (field 1) + int spanCount = ProtobufTools.countRepeatedField(data, 1); + assertThat(spanCount).isEqualTo(3); + + // Count service names (field 2) + int serviceNameCount = ProtobufTools.countRepeatedField(data, 2); + assertThat(serviceNameCount).isEqualTo(1); + } +} From 6d8d57d42d22b62ab32342d0d3e9c27fa150093a Mon Sep 17 00:00:00 2001 From: otelbot <197425009+otelbot@users.noreply.github.com> Date: Wed, 27 Aug 2025 16:53:08 +0000 Subject: [PATCH 2/3] ./gradlew spotlessApply --- .../exporter/ProtoByteArrayMarshaler.java | 5 +++ .../internal/utils/ProtobufTools.java | 40 +++++++++---------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java index 1359afeb1..75b9c2276 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java @@ -1,3 +1,8 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.contrib.disk.buffering.internal.exporter; import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java index b3d208ee4..453b2d6ba 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java @@ -89,29 +89,29 @@ public static int countRepeatedField(byte[] data, int targetField) if (fieldNumber == targetField) { switch (wireType) { case WIRETYPE_VARINT: - long[] v = readVarint(data, i); - i += (int) v[1]; - count++; - break; + long[] v = readVarint(data, i); + i += (int) v[1]; + count++; + break; case WIRETYPE_FIXED64: - i += 8; - count++; - break; + i += 8; + count++; + break; case WIRETYPE_FIXED32: - i += 4; - count++; - break; + i += 4; + count++; + break; case WIRETYPE_LENGTH_DELIMITED: - long[] lres = readVarint(data, i); - int len = (int) lres[0]; - int ln = (int) lres[1]; - i += ln; - - // Each length-delimited field occurrence counts as one element - // (message, string, bytes, or unpacked repeated field element) - count++; - i += len; - break; + long[] lres = readVarint(data, i); + int len = (int) lres[0]; + int ln = (int) lres[1]; + i += ln; + + // Each length-delimited field occurrence counts as one element + // (message, string, bytes, or unpacked repeated field element) + count++; + i += len; + break; default: throw new DeserializationException("Unsupported wire type: " + wireType); } From f31eca1b0dba9df3adc7c02bb115b4bb1dd8aa17 Mon Sep 17 00:00:00 2001 From: Tyler Benson <734411+tylerbenson@users.noreply.github.com> Date: Wed, 27 Aug 2025 13:05:22 -0400 Subject: [PATCH 3/3] Fix errorprone and extra test debug logging. --- .../internal/exporter/FromDiskExporterBuilder.java | 6 ------ .../contrib/disk/buffering/HttpIntegrationTest.java | 2 +- .../disk/buffering/internal/storage/StorageTest.java | 3 ++- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java index 5be99ed03..fa34df07d 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java @@ -8,7 +8,6 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; -import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.http.HttpExporter; @@ -19,11 +18,9 @@ import java.util.List; import java.util.function.BiFunction; import java.util.function.Function; -import java.util.logging.Logger; public class FromDiskExporterBuilder { - private final DebugLogger logger; private final Storage storage; private final SignalTypes signalType; @@ -36,9 +33,6 @@ public FromDiskExporterBuilder(Storage storage, SignalTypes signalType) { } this.storage = storage; this.signalType = signalType; - this.logger = - DebugLogger.wrap( - Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled()); } @CanIgnoreReturnValue diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java index 8415e3045..39873af4b 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java @@ -129,7 +129,7 @@ public class HttpIntegrationTest { void setUp() throws IOException { clock = mock(); storageConfig = - StorageConfiguration.builder().setRootDir(rootDir).setDebugEnabled(true).build(); + StorageConfiguration.builder().setRootDir(rootDir).setDebugEnabled(false).build(); spanStorage = Storage.builder(SignalTypes.spans) .setStorageConfiguration(storageConfig) diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index d96b9a1bc..45970cfe3 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -33,6 +33,7 @@ @SuppressWarnings("unchecked") class StorageTest { + private static final boolean DEBUG_ENABLED = false; private FolderManager folderManager; private Storage storage; private Function processing; @@ -46,7 +47,7 @@ void setUp() throws IOException { writableFile = createWritableFile(); processing = mock(); when(readableFile.readAndProcess(processing)).thenReturn(ReadableResult.SUCCEEDED); - storage = new Storage(folderManager, true); + storage = new Storage(folderManager, DEBUG_ENABLED); } @AfterEach