diff --git a/buildSrc/src/main/kotlin/otel.spotless-conventions.gradle.kts b/buildSrc/src/main/kotlin/otel.spotless-conventions.gradle.kts index b1c39dcd0..ea50e8f59 100644 --- a/buildSrc/src/main/kotlin/otel.spotless-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/otel.spotless-conventions.gradle.kts @@ -5,6 +5,7 @@ plugins { spotless { java { googleJavaFormat() + toggleOffOn() licenseHeaderFile(rootProject.file("buildscripts/spotless.license.java"), "(package|import|public|// Includes work from:)") target("src/**/*.java") } diff --git a/disk-buffering/build.gradle.kts b/disk-buffering/build.gradle.kts index 0a883cf73..89570508c 100644 --- a/disk-buffering/build.gradle.kts +++ b/disk-buffering/build.gradle.kts @@ -22,6 +22,8 @@ dependencies { annotationProcessor("com.google.auto.value:auto-value") testImplementation("org.mockito:mockito-inline") testImplementation("io.opentelemetry:opentelemetry-sdk-testing") + testImplementation("io.opentelemetry:opentelemetry-exporter-otlp") + testImplementation("io.opentelemetry:opentelemetry-exporter-sender-okhttp") protos("io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha@jar") } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java index c26a383d6..0383b8f98 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java @@ -9,6 +9,10 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; import java.io.IOException; @@ -16,19 +20,38 @@ public class LogRecordFromDiskExporter implements FromDiskExporter { - private final FromDiskExporterImpl delegate; + private final FromDiskExporterImpl delegate; public static LogRecordFromDiskExporter create(LogRecordExporter exporter, Storage storage) throws IOException { - FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder(storage) - .setDeserializer(SignalDeserializer.ofLogs()) - .setExportFunction(exporter::export) + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.logs) + .setExportFunction(exporter::export, SignalDeserializer.ofLogs()) .build(); return new LogRecordFromDiskExporter(delegate); } - private LogRecordFromDiskExporter(FromDiskExporterImpl delegate) { + public static LogRecordFromDiskExporter create(HttpExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.logs) + .setExporter(exporter) + .build(); + return new LogRecordFromDiskExporter(delegate); + } + + // Private because untested. + @SuppressWarnings("unused") + private static LogRecordFromDiskExporter create(GrpcExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.logs) + .setExporter(exporter) + .build(); + return new LogRecordFromDiskExporter(delegate); + } + + private LogRecordFromDiskExporter(FromDiskExporterImpl delegate) { this.delegate = delegate; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java index 8bb4f3dcd..369da882e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java @@ -9,6 +9,10 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.io.IOException; @@ -16,19 +20,38 @@ public class MetricFromDiskExporter implements FromDiskExporter { - private final FromDiskExporterImpl delegate; + private final FromDiskExporterImpl delegate; public static MetricFromDiskExporter create(MetricExporter exporter, Storage storage) throws IOException { - FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder(storage) - .setDeserializer(SignalDeserializer.ofMetrics()) - .setExportFunction(exporter::export) + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.metrics) + .setExportFunction(exporter::export, SignalDeserializer.ofMetrics()) .build(); return new MetricFromDiskExporter(delegate); } - private MetricFromDiskExporter(FromDiskExporterImpl delegate) { + public static MetricFromDiskExporter create(HttpExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.metrics) + .setExporter(exporter) + .build(); + return new MetricFromDiskExporter(delegate); + } + + // Private because untested. + @SuppressWarnings("unused") + private static MetricFromDiskExporter create(GrpcExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.metrics) + .setExporter(exporter) + .build(); + return new MetricFromDiskExporter(delegate); + } + + private MetricFromDiskExporter(FromDiskExporterImpl delegate) { this.delegate = delegate; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java index e3c7992ba..72611968c 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java @@ -9,6 +9,10 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.io.IOException; @@ -16,19 +20,38 @@ public class SpanFromDiskExporter implements FromDiskExporter { - private final FromDiskExporterImpl delegate; + private final FromDiskExporterImpl delegate; public static SpanFromDiskExporter create(SpanExporter exporter, Storage storage) throws IOException { - FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder(storage) - .setDeserializer(SignalDeserializer.ofSpans()) - .setExportFunction(exporter::export) + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.spans) + .setExportFunction(exporter::export, SignalDeserializer.ofSpans()) .build(); return new SpanFromDiskExporter(delegate); } - private SpanFromDiskExporter(FromDiskExporterImpl delegate) { + public static SpanFromDiskExporter create(HttpExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.spans) + .setExporter(exporter) + .build(); + return new SpanFromDiskExporter(delegate); + } + + // Private because untested. + @SuppressWarnings("unused") + private static SpanFromDiskExporter create(GrpcExporter exporter, Storage storage) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder(storage, SignalTypes.spans) + .setExporter(exporter) + .build(); + return new SpanFromDiskExporter(delegate); + } + + private SpanFromDiskExporter(FromDiskExporterImpl delegate) { this.delegate = delegate; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java index a91ded1f3..fa34df07d 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java @@ -5,51 +5,74 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporter; -import static java.util.Collections.emptyList; - import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.grpc.GrpcExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.IOException; import java.util.Collection; +import java.util.List; +import java.util.function.BiFunction; import java.util.function.Function; -import org.jetbrains.annotations.NotNull; -public class FromDiskExporterBuilder { +public class FromDiskExporterBuilder { - private SignalDeserializer serializer = noopDeserializer(); private final Storage storage; + private final SignalTypes signalType; - private Function, CompletableResultCode> exportFunction = - x -> CompletableResultCode.ofFailure(); + private BiFunction exportFunction = + (x, i) -> CompletableResultCode.ofFailure(); - public FromDiskExporterBuilder(Storage storage) { + public FromDiskExporterBuilder(Storage storage, SignalTypes signalType) { if (storage == null) { throw new NullPointerException("Storage cannot be null"); } this.storage = storage; + this.signalType = signalType; } - @NotNull - private static SignalDeserializer noopDeserializer() { - return x -> emptyList(); + @CanIgnoreReturnValue + public FromDiskExporterBuilder setExportFunction( + Function, CompletableResultCode> exportFunction, + SignalDeserializer deserializer) { + if (!deserializer.signalType().equals(signalType.name())) { + throw new IllegalArgumentException( + deserializer.signalType() + " does not match " + signalType); + } + this.exportFunction = + (exportRequest, itemCount) -> { + try { + List telemetry = deserializer.deserialize(exportRequest.getBytes()); + return exportFunction.apply(telemetry); + } catch (IOException e) { + return CompletableResultCode.ofExceptionalFailure(e); + } + }; + return this; } + /** + * The provided HttpExporter should _NOT_ be configured to send JSON. The data is serialized to + * disk in protobuf format and sent directly to the provided exporter as a serialized payload. + */ @CanIgnoreReturnValue - public FromDiskExporterBuilder setDeserializer(SignalDeserializer serializer) { - this.serializer = serializer; + public FromDiskExporterBuilder setExporter(HttpExporter exporter) { + // Any way we can assert the exporter is not configured for JSON? + this.exportFunction = exporter::export; return this; } @CanIgnoreReturnValue - public FromDiskExporterBuilder setExportFunction( - Function, CompletableResultCode> exportFunction) { - this.exportFunction = exportFunction; + public FromDiskExporterBuilder setExporter(GrpcExporter exporter) { + this.exportFunction = exporter::export; return this; } - public FromDiskExporterImpl build() throws IOException { - return new FromDiskExporterImpl<>(serializer, exportFunction, storage); + public FromDiskExporterImpl build() throws IOException { + return new FromDiskExporterImpl(exportFunction, storage, signalType); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java index 5ba5c2390..850673340 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java @@ -6,43 +6,46 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; +import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.marshal.ProtoFieldInfo; +import io.opentelemetry.proto.collector.logs.v1.internal.ExportLogsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.internal.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.internal.ExportTraceServiceRequest; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.IOException; -import java.util.Collection; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.logging.Logger; /** * Signal-type generic class that can read telemetry previously buffered on disk and send it to * another delegated exporter. */ -public final class FromDiskExporterImpl implements FromDiskExporter { +public final class FromDiskExporterImpl implements FromDiskExporter { private final DebugLogger logger; private final Storage storage; - private final SignalDeserializer deserializer; - private final Function, CompletableResultCode> exportFunction; + private final SignalTypes signalType; + private final BiFunction exportFunction; FromDiskExporterImpl( - SignalDeserializer deserializer, - Function, CompletableResultCode> exportFunction, - Storage storage) { - this.deserializer = deserializer; + BiFunction exportFunction, + Storage storage, + SignalTypes signalType) { this.exportFunction = exportFunction; this.storage = storage; + this.signalType = signalType; this.logger = DebugLogger.wrap( Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled()); } - public static FromDiskExporterBuilder builder(Storage storage) { - return new FromDiskExporterBuilder<>(storage); + public static FromDiskExporterBuilder builder(Storage storage, SignalTypes signalType) { + return new FromDiskExporterBuilder<>(storage, signalType); } /** @@ -56,25 +59,37 @@ public static FromDiskExporterBuilder builder(Storage storage) { */ @Override public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException { - logger.log("Attempting to export " + deserializer.signalType() + " batch from disk."); + logger.log("Attempting to export " + signalType.name() + " batch from disk."); ReadableResult result = storage.readAndProcess( bytes -> { - logger.log( - "Read " - + bytes.length - + " " - + deserializer.signalType() - + " bytes from storage."); + logger.log("Read " + bytes.length + " " + signalType.name() + " bytes from storage."); + ProtoFieldInfo field; + switch (signalType) { + case metrics: + field = ExportMetricsServiceRequest.RESOURCE_METRICS; + break; + case logs: + field = ExportLogsServiceRequest.RESOURCE_LOGS; + break; + case spans: + field = ExportTraceServiceRequest.RESOURCE_SPANS; + break; + default: + throw new IllegalStateException("Unsupported signal type: " + signalType); + } + int itemCount = 0; try { - List telemetry = deserializer.deserialize(bytes); - logger.log( - "Now exporting batch of " + telemetry.size() + " " + deserializer.signalType()); - CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit); - return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER; + itemCount = ProtobufTools.countRepeatedField(bytes, field.getFieldNumber()); } catch (DeserializationException e) { return ProcessResult.CONTENT_INVALID; } + logger.log("Now exporting batch of " + itemCount + " " + signalType.name()); + CompletableResultCode join = + exportFunction + .apply(new ProtoByteArrayMarshaler(bytes), itemCount) + .join(timeout, unit); + return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER; }); return result == ReadableResult.SUCCEEDED; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java new file mode 100644 index 000000000..75b9c2276 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ProtoByteArrayMarshaler.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.exporter; + +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.exporter.internal.marshal.Serializer; +import java.io.IOException; + +class ProtoByteArrayMarshaler extends MarshalerWithSize { + + private final byte[] bytes; + + ProtoByteArrayMarshaler(byte[] bytes) { + super(bytes.length); + this.bytes = bytes; + } + + @Override + protected void writeTo(Serializer output) throws IOException { + output.writeSerializedMessage(bytes, ""); + } + + public byte[] getBytes() { + return bytes; + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java index f8e1d9729..29d30e83f 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java @@ -12,4 +12,8 @@ public class DeserializationException extends IOException { public DeserializationException(Throwable cause) { super(cause); } + + public DeserializationException(String message) { + super(message); + } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java index 2788f1d72..453b2d6ba 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufTools.java @@ -6,12 +6,19 @@ package io.opentelemetry.contrib.disk.buffering.internal.utils; import com.squareup.wire.ProtoAdapter; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public final class ProtobufTools { + // Wire types + private static final int WIRETYPE_VARINT = 0; + private static final int WIRETYPE_FIXED64 = 1; + private static final int WIRETYPE_LENGTH_DELIMITED = 2; + private static final int WIRETYPE_FIXED32 = 5; + private ProtobufTools() {} public static void writeRawVarint32(int value, OutputStream out) throws IOException { @@ -56,4 +63,103 @@ public static int readRawVarint32(int firstByte, InputStream input) throws IOExc public static int toUnsignedInt(byte x) { return ((int) x) & 0xff; } + + /** + * Counts repeated field occurrences in a protobuf-encoded byte array. Handles both packed and + * unpacked encodings. + */ + public static int countRepeatedField(byte[] data, int targetField) + throws DeserializationException { + int i = 0; + int count = 0; + + while (i < data.length) { + // Read tag (varint) + long[] tagAndLen = readVarint(data, i); + long tag = tagAndLen[0]; + int n = (int) tagAndLen[1]; + if (tag == 0) { + break; // end marker + } + + i += n; + int fieldNumber = (int) (tag >>> 3); + int wireType = (int) (tag & 0x07); + + if (fieldNumber == targetField) { + switch (wireType) { + case WIRETYPE_VARINT: + long[] v = readVarint(data, i); + i += (int) v[1]; + count++; + break; + case WIRETYPE_FIXED64: + i += 8; + count++; + break; + case WIRETYPE_FIXED32: + i += 4; + count++; + break; + case WIRETYPE_LENGTH_DELIMITED: + long[] lres = readVarint(data, i); + int len = (int) lres[0]; + int ln = (int) lres[1]; + i += ln; + + // Each length-delimited field occurrence counts as one element + // (message, string, bytes, or unpacked repeated field element) + count++; + i += len; + break; + default: + throw new DeserializationException("Unsupported wire type: " + wireType); + } + } else { + // skip unknown field + i = skipField(data, i, wireType); + } + } + return count; + } + + /** Reads a varint starting at offset. Returns [value, lengthInBytes]. */ + private static long[] readVarint(byte[] data, int offset) throws DeserializationException { + long result = 0; + int shift = 0; + int i = offset; + while (i < data.length) { + byte b = data[i++]; + result |= (long) (b & 0x7F) << shift; + if ((b & 0x80) == 0) { + return new long[] {result, i - offset}; + } + shift += 7; + if (shift > 64) { + throw new DeserializationException("Varint too long"); + } + } + throw new DeserializationException("Truncated varint"); + } + + /** Skips a field of given wire type. */ + private static int skipField(byte[] data, int offset, int wireType) + throws DeserializationException { + switch (wireType) { + case WIRETYPE_VARINT: + long[] v = readVarint(data, offset); + return offset + (int) v[1]; + case WIRETYPE_FIXED64: + return offset + 8; + case WIRETYPE_FIXED32: + return offset + 4; + case WIRETYPE_LENGTH_DELIMITED: + long[] lres = readVarint(data, offset); + int len = (int) lres[0]; + int ln = (int) lres[1]; + return offset + ln + len; + default: + throw new DeserializationException("Unsupported wire type: " + wireType); + } + } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java index e7995c675..180ec9728 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java @@ -38,7 +38,7 @@ class FromDiskExporterImplTest { private SpanExporter wrapped; private SignalDeserializer deserializer; private Clock clock; - private FromDiskExporterImpl exporter; + private FromDiskExporterImpl exporter; private final List deserializedData = Collections.emptyList(); @TempDir File rootDir; private static final String STORAGE_FOLDER_NAME = SignalTypes.spans.name(); @@ -51,9 +51,8 @@ void setUp() throws IOException { wrapped = mock(); exporter = FromDiskExporterImpl.builder( - TestData.getStorage(rootDir, SignalTypes.spans, clock)) - .setDeserializer(deserializer) - .setExportFunction(wrapped::export) + TestData.getStorage(rootDir, SignalTypes.spans, clock), SignalTypes.spans) + .setExportFunction(wrapped::export, deserializer) .build(); } @@ -104,6 +103,7 @@ private void createDummyFile() throws IOException { private void setUpSerializer() throws DeserializationException { deserializer = mock(); when(deserializer.deserialize(any())).thenReturn(deserializedData); + when(deserializer.signalType()).thenReturn(SignalTypes.spans.name()); } private static Clock createClockMock() { diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java new file mode 100644 index 000000000..39873af4b --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/HttpIntegrationTest.java @@ -0,0 +1,284 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterBuilder; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; +import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.StandardComponentId; +import io.opentelemetry.sdk.logs.SdkLoggerProvider; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class HttpIntegrationTest { + private Tracer tracer; + private SdkMeterProvider meterProvider; + private Meter meter; + private Logger logger; + private Clock clock; + @TempDir File rootDir; + private static final long INITIAL_TIME_IN_MILLIS = 1000; + private static final long NOW_NANOS = MILLISECONDS.toNanos(INITIAL_TIME_IN_MILLIS); + private static final String TRACES_PATH = "/v1/traces"; + private static final String METRICS_PATH = "/v1/metrics"; + private static final String LOGS_PATH = "/v1/logs"; + private StorageConfiguration storageConfig; + private Storage spanStorage; + private final List exportedData = new CopyOnWriteArrayList<>(); + private final CountDownLatch latch = new CountDownLatch(1); + + final HttpHandler tracesHandler = + exchange -> { + InputStream requestBody = exchange.getRequestBody(); + ExportTraceServiceRequest otlpData = ExportTraceServiceRequest.ADAPTER.decode(requestBody); + exportedData.addAll(otlpData.resource_spans); + latch.countDown(); + String response = "OK"; + exchange.sendResponseHeaders(200, response.length()); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(response.getBytes(UTF_8)); + responseBody.close(); + }; + + final HttpHandler logsHandler = + exchange -> { + InputStream requestBody = exchange.getRequestBody(); + ExportLogsServiceRequest otlpData = ExportLogsServiceRequest.ADAPTER.decode(requestBody); + exportedData.addAll(otlpData.resource_logs); + latch.countDown(); + String response = "OK"; + exchange.sendResponseHeaders(200, response.length()); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(response.getBytes(UTF_8)); + responseBody.close(); + }; + + final HttpHandler metricsHandler = + exchange -> { + InputStream requestBody = exchange.getRequestBody(); + ExportMetricsServiceRequest otlpData = + ExportMetricsServiceRequest.ADAPTER.decode(requestBody); + exportedData.addAll(otlpData.resource_metrics); + latch.countDown(); + String response = "OK"; + exchange.sendResponseHeaders(200, response.length()); + OutputStream responseBody = exchange.getResponseBody(); + responseBody.write(response.getBytes(UTF_8)); + responseBody.close(); + }; + private HttpServer mockServer; + private int port; + + @BeforeEach + void setUp() throws IOException { + clock = mock(); + storageConfig = + StorageConfiguration.builder().setRootDir(rootDir).setDebugEnabled(false).build(); + spanStorage = + Storage.builder(SignalTypes.spans) + .setStorageConfiguration(storageConfig) + .setStorageClock(clock) + .build(); + + when(clock.now()).thenReturn(NOW_NANOS); + + mockServer = HttpServer.create(new InetSocketAddress(0), 0); + port = mockServer.getAddress().getPort(); + mockServer.createContext(TRACES_PATH, tracesHandler); + mockServer.createContext(LOGS_PATH, logsHandler); + mockServer.createContext(METRICS_PATH, metricsHandler); + mockServer.start(); + + // Setting up spans + OtlpHttpSpanExporter spanExporter = + OtlpHttpSpanExporter.builder() + .setEndpoint("http://localhost:" + port + TRACES_PATH) + .build(); + ToDiskExporter toDiskSpanExporter = + buildToDiskExporter(SignalSerializer.ofSpans(), spanExporter::export); + SpanToDiskExporter spanToDiskExporter = new SpanToDiskExporter(toDiskSpanExporter); + tracer = createTracerProvider(spanToDiskExporter).get("SpanInstrumentationScope"); + + // Setting up metrics + OtlpHttpMetricExporter metricExporter = + OtlpHttpMetricExporter.builder() + .setEndpoint("http://localhost:" + port + METRICS_PATH) + .build(); + ToDiskExporter toDiskMetricExporter = + buildToDiskExporter(SignalSerializer.ofMetrics(), metricExporter::export); + MetricToDiskExporter metricToDiskExporter = + new MetricToDiskExporter(toDiskMetricExporter, metricExporter); + meterProvider = createMeterProvider(metricToDiskExporter); + meter = meterProvider.get("MetricInstrumentationScope"); + + // Setting up logs + OtlpHttpLogRecordExporter logRecordExporter = + OtlpHttpLogRecordExporter.builder() + .setEndpoint("http://localhost:" + port + LOGS_PATH) + .build(); + ToDiskExporter toDiskLogExporter = + buildToDiskExporter(SignalSerializer.ofLogs(), logRecordExporter::export); + LogRecordToDiskExporter logToDiskExporter = new LogRecordToDiskExporter(toDiskLogExporter); + logger = createLoggerProvider(logToDiskExporter).get("LogInstrumentationScope"); + } + + @AfterEach + void tearDown() throws IOException { + spanStorage.close(); + mockServer.stop(0); + } + + @NotNull + private ToDiskExporter buildToDiskExporter( + SignalSerializer serializer, Function, CompletableResultCode> exporter) { + return ToDiskExporter.builder(spanStorage) + .setSerializer(serializer) + .setExportFunction(exporter) + .build(); + } + + @NotNull + private static FromDiskExporterImpl buildFromDiskExporter( + FromDiskExporterBuilder builder, + StandardComponentId.ExporterType exporterType, + String endpoint) + throws IOException { + return builder.setExporter(new HttpExporterBuilder<>(exporterType, endpoint).build()).build(); + } + + @Test + void verifySpansIntegration() throws IOException, InterruptedException { + Span span = tracer.spanBuilder("Span name").startSpan(); + span.end(); + FromDiskExporterImpl fromDiskExporter = + buildFromDiskExporter( + FromDiskExporterImpl.builder(spanStorage, SignalTypes.spans), + StandardComponentId.ExporterType.OTLP_HTTP_SPAN_EXPORTER, + "http://localhost:" + port + TRACES_PATH); + assertExporter(fromDiskExporter); + } + + @Test + void verifyMetricsIntegration() throws IOException, InterruptedException { + meter.counterBuilder("Counter").build().add(2); + meterProvider.forceFlush(); + + FromDiskExporterImpl fromDiskExporter = + buildFromDiskExporter( + FromDiskExporterImpl.builder(spanStorage, SignalTypes.metrics), + StandardComponentId.ExporterType.OTLP_HTTP_METRIC_EXPORTER, + "http://localhost:" + port + METRICS_PATH); + assertExporter(fromDiskExporter); + } + + @Test + void verifyLogRecordsIntegration() throws IOException, InterruptedException { + logger.logRecordBuilder().setBody("I'm a log!").emit(); + + FromDiskExporterImpl fromDiskExporter = + buildFromDiskExporter( + FromDiskExporterImpl.builder(spanStorage, SignalTypes.logs), + StandardComponentId.ExporterType.OTLP_HTTP_LOG_EXPORTER, + "http://localhost:" + port + LOGS_PATH); + assertExporter(fromDiskExporter); + } + + private void assertExporter(FromDiskExporterImpl exporter) + throws IOException, InterruptedException { + // Verify no data has been received in the original exporter until this point. + assertEquals(0, exportedData.size()); + + // Go to the future when we can read the stored items. + fastForwardTimeByMillis(storageConfig.getMinFileAgeForReadMillis()); + + // Read and send stored data. + assertTrue(exporter.exportStoredBatch(1, TimeUnit.SECONDS)); + assertTrue(latch.await(10, SECONDS)); + + // Now the data must have been delegated to the original exporter. + assertEquals(1, exportedData.size()); + + // Bonus: Try to read again, no more data should be available. + assertFalse(exporter.exportStoredBatch(1, TimeUnit.SECONDS)); + assertEquals(1, exportedData.size()); + } + + @SuppressWarnings("DirectInvocationOnMock") + private void fastForwardTimeByMillis(long milliseconds) { + when(clock.now()).thenReturn(NOW_NANOS + MILLISECONDS.toNanos(milliseconds)); + } + + private static SdkTracerProvider createTracerProvider(SpanExporter exporter) { + return SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(exporter)) + .build(); + } + + private static SdkMeterProvider createMeterProvider(MetricExporter exporter) { + return SdkMeterProvider.builder() + .registerMetricReader(PeriodicMetricReader.create(exporter)) + .build(); + } + + private static SdkLoggerProvider createLoggerProvider(LogRecordExporter exporter) { + return SdkLoggerProvider.builder() + .addLogRecordProcessor(SimpleLogRecordProcessor.create(exporter)) + .build(); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index b46cba12f..a2a61acdb 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -119,21 +119,21 @@ private ToDiskExporter buildToDiskExporter( } @NotNull - private static FromDiskExporterImpl buildFromDiskExporter( + private static FromDiskExporterImpl buildFromDiskExporter( FromDiskExporterBuilder builder, Function, CompletableResultCode> exportFunction, SignalDeserializer deserializer) throws IOException { - return builder.setExportFunction(exportFunction).setDeserializer(deserializer).build(); + return builder.setExportFunction(exportFunction, deserializer).build(); } @Test void verifySpansIntegration() throws IOException { Span span = tracer.spanBuilder("Span name").startSpan(); span.end(); - FromDiskExporterImpl fromDiskExporter = + FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(spanStorage), + FromDiskExporterImpl.builder(spanStorage, SignalTypes.spans), memorySpanExporter::export, SignalDeserializer.ofSpans()); assertExporter(fromDiskExporter, () -> memorySpanExporter.getFinishedSpanItems().size()); @@ -144,9 +144,9 @@ void verifyMetricsIntegration() throws IOException { meter.counterBuilder("Counter").build().add(2); meterProvider.forceFlush(); - FromDiskExporterImpl fromDiskExporter = + FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(spanStorage), + FromDiskExporterImpl.builder(spanStorage, SignalTypes.metrics), memoryMetricExporter::export, SignalDeserializer.ofMetrics()); assertExporter(fromDiskExporter, () -> memoryMetricExporter.getFinishedMetricItems().size()); @@ -156,16 +156,16 @@ void verifyMetricsIntegration() throws IOException { void verifyLogRecordsIntegration() throws IOException { logger.logRecordBuilder().setBody("I'm a log!").emit(); - FromDiskExporterImpl fromDiskExporter = + FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(spanStorage), + FromDiskExporterImpl.builder(spanStorage, SignalTypes.logs), memoryLogRecordExporter::export, SignalDeserializer.ofLogs()); assertExporter( fromDiskExporter, () -> memoryLogRecordExporter.getFinishedLogRecordItems().size()); } - private void assertExporter(FromDiskExporterImpl exporter, Supplier finishedItems) + private void assertExporter(FromDiskExporterImpl exporter, Supplier finishedItems) throws IOException { // Verify no data has been received in the original exporter until this point. assertEquals(0, finishedItems.get()); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index d96b9a1bc..45970cfe3 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -33,6 +33,7 @@ @SuppressWarnings("unchecked") class StorageTest { + private static final boolean DEBUG_ENABLED = false; private FolderManager folderManager; private Storage storage; private Function processing; @@ -46,7 +47,7 @@ void setUp() throws IOException { writableFile = createWritableFile(); processing = mock(); when(readableFile.readAndProcess(processing)).thenReturn(ReadableResult.SUCCEEDED); - storage = new Storage(folderManager, true); + storage = new Storage(folderManager, DEBUG_ENABLED); } @AfterEach diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufToolsTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufToolsTest.java new file mode 100644 index 000000000..bc6477143 --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/utils/ProtobufToolsTest.java @@ -0,0 +1,215 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.utils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; +import org.junit.jupiter.api.Test; + +class ProtobufToolsTest { + + @Test + void countRepeatedField_emptyData() throws DeserializationException { + byte[] data = {}; + int count = ProtobufTools.countRepeatedField(data, 1); + assertThat(count).isEqualTo(0); + } + + @Test + void countRepeatedField_singleVarintField() throws DeserializationException { + // Field 1, wire type 0 (varint), value 42 + byte[] data = {0x08, 0x2A}; // tag=8 (field 1, wire type 0), value=42 + int count = ProtobufTools.countRepeatedField(data, 1); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_multipleVarintFields() throws DeserializationException { + // Field 1 appears 3 times with different values + byte[] data = { + 0x08, 0x01, // field 1, value 1 + 0x08, 0x02, // field 1, value 2 + 0x08, 0x03 // field 1, value 3 + }; + int count = ProtobufTools.countRepeatedField(data, 1); + assertThat(count).isEqualTo(3); + } + + @Test + void countRepeatedField_singleFixed32Field() throws DeserializationException { + // Field 2, wire type 5 (fixed32), value 0x12345678 + byte[] data = { + 0x15, 0x78, 0x56, 0x34, 0x12 + }; // tag=21 (field 2, wire type 5), little-endian value + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_multipleFixed32Fields() throws DeserializationException { + // Field 2 appears twice + byte[] data = { + // spotless:off + 0x15, 0x78, 0x56, 0x34, 0x12, // field 2, value 0x12345678 + 0x15, (byte) 0x87, 0x65, 0x43, 0x21 // field 2, value 0x21436587 + // spotless:on + }; + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(2); + } + + @Test + void countRepeatedField_singleFixed64Field() throws DeserializationException { + // Field 3, wire type 1 (fixed64) + byte[] data = { + 0x19, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08 + }; // tag=25 (field 3, wire type 1) + int count = ProtobufTools.countRepeatedField(data, 3); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_multipleLengthDelimitedFields() throws DeserializationException { + // Field 4 appears twice as length-delimited (strings/messages) + byte[] data = { + 0x22, 0x05, 'h', 'e', 'l', 'l', 'o', // field 4, length 5, "hello" + 0x22, 0x05, 'w', 'o', 'r', 'l', 'd' // field 4, length 5, "world" + }; + int count = ProtobufTools.countRepeatedField(data, 4); + assertThat(count).isEqualTo(2); + } + + @Test + void countRepeatedField_targetFieldNotPresent() throws DeserializationException { + // Only field 1 is present, but we're looking for field 2 + byte[] data = {0x08, 0x2A}; // field 1, value 42 + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(0); + } + + @Test + void countRepeatedField_skipOtherFields() throws DeserializationException { + // Multiple fields present, but we only count field 2 + byte[] data = { + // spotless:off + 0x08, 0x01, // field 1, varint + 0x15, 0x78, 0x56, 0x34, 0x12, // field 2, fixed32 (target) + 0x1A, 0x05, 'h', 'e', 'l', 'l', 'o', // field 3, string + 0x15, (byte) 0x87, 0x65, 0x43, 0x21, // field 2, fixed32 (target) + 0x20, 0x64 // field 4, varint + // spotless:on + }; + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(2); + } + + @Test + void countRepeatedField_largeVarint() throws DeserializationException { + // Test with a large varint that uses multiple bytes + byte[] data = { + 0x08, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0x0F + }; // field 1, large varint + int count = ProtobufTools.countRepeatedField(data, 1); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_emptyLengthDelimited() throws DeserializationException { + // Field with empty length-delimited content + byte[] data = {0x12, 0x00}; // field 2, length 0 + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_nestedMessage() throws DeserializationException { + // Field containing a nested message with its own fields + byte[] nestedMessage = {0x08, 0x01, 0x10, 0x02}; // inner field 1=1, inner field 2=2 + byte[] data = new byte[2 + nestedMessage.length]; + data[0] = 0x1A; // field 3, wire type 2 (length-delimited) + data[1] = (byte) nestedMessage.length; + System.arraycopy(nestedMessage, 0, data, 2, nestedMessage.length); + + int count = ProtobufTools.countRepeatedField(data, 3); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_truncatedVarint() { + // Incomplete varint at end of data + byte[] data = {0x08, (byte) 0xFF}; // field 1, incomplete varint (missing continuation) + assertThatThrownBy(() -> ProtobufTools.countRepeatedField(data, 1)) + .isInstanceOf(DeserializationException.class) + .hasMessageContaining("Truncated varint"); + } + + @Test + void countRepeatedField_truncatedLengthDelimited() throws DeserializationException { + // Length-delimited field with length > remaining data + byte[] data = {0x12, 0x10, 0x01, 0x02}; // field 2, claims length 16 but only 2 bytes follow + int count = ProtobufTools.countRepeatedField(data, 2); + assertThat(count).isEqualTo(1); + } + + @Test + void countRepeatedField_invalidWireType() { + // Wire type 6 is not valid + byte[] data = {0x0E, 0x01}; // field 1, wire type 6 (invalid) + assertThatThrownBy(() -> ProtobufTools.countRepeatedField(data, 1)) + .isInstanceOf(DeserializationException.class) + .hasMessageContaining("Unsupported wire type: 6"); + } + + @Test + void countRepeatedField_startGroupWireType() { + // Wire type 3 (START_GROUP) is no longer supported + byte[] data = {0x0B}; // field 1, wire type 3 (START_GROUP) + assertThatThrownBy(() -> ProtobufTools.countRepeatedField(data, 1)) + .isInstanceOf(DeserializationException.class) + .hasMessageContaining("Unsupported wire type: 3"); + } + + @Test + void countRepeatedField_endGroupWireType() { + // Wire type 4 (END_GROUP) is no longer supported + byte[] data = {0x0C}; // field 1, wire type 4 (END_GROUP) + assertThatThrownBy(() -> ProtobufTools.countRepeatedField(data, 1)) + .isInstanceOf(DeserializationException.class) + .hasMessageContaining("Unsupported wire type: 4"); + } + + @Test + void countRepeatedField_realWorldExample() throws DeserializationException { + // Simulate a more realistic protobuf message structure + // message ExportRequest { + // repeated SpanData spans = 1; + // string service_name = 2; + // } + // With 3 spans and a service name + byte[] data = { + // spotless:off + // First span (field 1, length-delimited) + 0x0A, 0x04, 0x08, 0x01, 0x10, 0x02, // span 1: some inner fields + // Second span (field 1, length-delimited) + 0x0A, 0x04, 0x08, 0x03, 0x10, 0x04, // span 2: some inner fields + // Service name (field 2, length-delimited) + 0x12, 0x07, 's', 'e', 'r', 'v', 'i', 'c', 'e', + // Third span (field 1, length-delimited) + 0x0A, 0x04, 0x08, 0x05, 0x10, 0x06 // span 3: some inner fields + // spotless:on + }; + + // Count spans (field 1) + int spanCount = ProtobufTools.countRepeatedField(data, 1); + assertThat(spanCount).isEqualTo(3); + + // Count service names (field 2) + int serviceNameCount = ProtobufTools.countRepeatedField(data, 2); + assertThat(serviceNameCount).isEqualTo(1); + } +}