Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ plugins {
spotless {
java {
googleJavaFormat()
toggleOffOn()
licenseHeaderFile(rootProject.file("buildscripts/spotless.license.java"), "(package|import|public|// Includes work from:)")
target("src/**/*.java")
}
Expand Down
2 changes: 2 additions & 0 deletions disk-buffering/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ dependencies {
annotationProcessor("com.google.auto.value:auto-value")
testImplementation("org.mockito:mockito-inline")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testImplementation("io.opentelemetry:opentelemetry-exporter-otlp")
testImplementation("io.opentelemetry:opentelemetry-exporter-sender-okhttp")

protos("io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha@jar")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,49 @@
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class LogRecordFromDiskExporter implements FromDiskExporter {

private final FromDiskExporterImpl<LogRecordData> delegate;
private final FromDiskExporterImpl delegate;

public static LogRecordFromDiskExporter create(LogRecordExporter exporter, Storage storage)
throws IOException {
FromDiskExporterImpl<LogRecordData> delegate =
FromDiskExporterImpl.<LogRecordData>builder(storage)
.setDeserializer(SignalDeserializer.ofLogs())
.setExportFunction(exporter::export)
FromDiskExporterImpl delegate =
FromDiskExporterImpl.<LogRecordData>builder(storage, SignalTypes.logs)
.setExportFunction(exporter::export, SignalDeserializer.ofLogs())
.build();
return new LogRecordFromDiskExporter(delegate);
}

private LogRecordFromDiskExporter(FromDiskExporterImpl<LogRecordData> delegate) {
public static LogRecordFromDiskExporter create(HttpExporter<Marshaler> exporter, Storage storage)
throws IOException {
FromDiskExporterImpl delegate =
FromDiskExporterImpl.<LogRecordData>builder(storage, SignalTypes.logs)
.setExporter(exporter)
.build();
return new LogRecordFromDiskExporter(delegate);
}

// Private because untested.
@SuppressWarnings("unused")
private static LogRecordFromDiskExporter create(GrpcExporter<Marshaler> exporter, Storage storage)
throws IOException {
FromDiskExporterImpl delegate =
FromDiskExporterImpl.<LogRecordData>builder(storage, SignalTypes.logs)
.setExporter(exporter)
.build();
return new LogRecordFromDiskExporter(delegate);
}

private LogRecordFromDiskExporter(FromDiskExporterImpl delegate) {
this.delegate = delegate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,49 @@
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class MetricFromDiskExporter implements FromDiskExporter {

private final FromDiskExporterImpl<MetricData> delegate;
private final FromDiskExporterImpl delegate;

public static MetricFromDiskExporter create(MetricExporter exporter, Storage storage)
throws IOException {
FromDiskExporterImpl<MetricData> delegate =
FromDiskExporterImpl.<MetricData>builder(storage)
.setDeserializer(SignalDeserializer.ofMetrics())
.setExportFunction(exporter::export)
FromDiskExporterImpl delegate =
FromDiskExporterImpl.<MetricData>builder(storage, SignalTypes.metrics)
.setExportFunction(exporter::export, SignalDeserializer.ofMetrics())
.build();
return new MetricFromDiskExporter(delegate);
}

private MetricFromDiskExporter(FromDiskExporterImpl<MetricData> delegate) {
public static MetricFromDiskExporter create(HttpExporter<Marshaler> exporter, Storage storage)
throws IOException {
FromDiskExporterImpl delegate =
FromDiskExporterImpl.<MetricData>builder(storage, SignalTypes.metrics)
.setExporter(exporter)
.build();
return new MetricFromDiskExporter(delegate);
}

// Private because untested.
@SuppressWarnings("unused")
private static MetricFromDiskExporter create(GrpcExporter<Marshaler> exporter, Storage storage)
throws IOException {
FromDiskExporterImpl delegate =
FromDiskExporterImpl.<MetricData>builder(storage, SignalTypes.metrics)
.setExporter(exporter)
.build();
return new MetricFromDiskExporter(delegate);
}

private MetricFromDiskExporter(FromDiskExporterImpl delegate) {
this.delegate = delegate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,49 @@
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class SpanFromDiskExporter implements FromDiskExporter {

private final FromDiskExporterImpl<SpanData> delegate;
private final FromDiskExporterImpl delegate;

public static SpanFromDiskExporter create(SpanExporter exporter, Storage storage)
throws IOException {
FromDiskExporterImpl<SpanData> delegate =
FromDiskExporterImpl.<SpanData>builder(storage)
.setDeserializer(SignalDeserializer.ofSpans())
.setExportFunction(exporter::export)
FromDiskExporterImpl delegate =
FromDiskExporterImpl.<SpanData>builder(storage, SignalTypes.spans)
.setExportFunction(exporter::export, SignalDeserializer.ofSpans())
.build();
return new SpanFromDiskExporter(delegate);
}

private SpanFromDiskExporter(FromDiskExporterImpl<SpanData> delegate) {
public static SpanFromDiskExporter create(HttpExporter<Marshaler> exporter, Storage storage)
throws IOException {
FromDiskExporterImpl delegate =
FromDiskExporterImpl.<SpanData>builder(storage, SignalTypes.spans)
.setExporter(exporter)
.build();
return new SpanFromDiskExporter(delegate);
}

// Private because untested.
@SuppressWarnings("unused")
private static SpanFromDiskExporter create(GrpcExporter<Marshaler> exporter, Storage storage)
throws IOException {
FromDiskExporterImpl delegate =
FromDiskExporterImpl.<SpanData>builder(storage, SignalTypes.spans)
.setExporter(exporter)
.build();
return new SpanFromDiskExporter(delegate);
}

private SpanFromDiskExporter(FromDiskExporterImpl delegate) {
this.delegate = delegate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,74 @@

package io.opentelemetry.contrib.disk.buffering.internal.exporter;

import static java.util.Collections.emptyList;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;

public class FromDiskExporterBuilder<T> {
public class FromDiskExporterBuilder<EXPORT_DATA> {

private SignalDeserializer<T> serializer = noopDeserializer();
private final Storage storage;
private final SignalTypes signalType;

private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();
private BiFunction<ProtoByteArrayMarshaler, Integer, CompletableResultCode> exportFunction =
(x, i) -> CompletableResultCode.ofFailure();

public FromDiskExporterBuilder(Storage storage) {
public FromDiskExporterBuilder(Storage storage, SignalTypes signalType) {
if (storage == null) {
throw new NullPointerException("Storage cannot be null");
}
this.storage = storage;
this.signalType = signalType;
}

@NotNull
private static <T> SignalDeserializer<T> noopDeserializer() {
return x -> emptyList();
@CanIgnoreReturnValue
public FromDiskExporterBuilder<EXPORT_DATA> setExportFunction(
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
SignalDeserializer<EXPORT_DATA> deserializer) {
if (!deserializer.signalType().equals(signalType.name())) {
throw new IllegalArgumentException(
deserializer.signalType() + " does not match " + signalType);
}
this.exportFunction =
(exportRequest, itemCount) -> {
try {
List<EXPORT_DATA> telemetry = deserializer.deserialize(exportRequest.getBytes());
return exportFunction.apply(telemetry);
} catch (IOException e) {
return CompletableResultCode.ofExceptionalFailure(e);
}
};
return this;
}

/**
* The provided HttpExporter should _NOT_ be configured to send JSON. The data is serialized to
* disk in protobuf format and sent directly to the provided exporter as a serialized payload.
*/
@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setDeserializer(SignalDeserializer<T> serializer) {
this.serializer = serializer;
public FromDiskExporterBuilder<EXPORT_DATA> setExporter(HttpExporter<Marshaler> exporter) {
// Any way we can assert the exporter is not configured for JSON?
this.exportFunction = exporter::export;
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setExportFunction(
Function<Collection<T>, CompletableResultCode> exportFunction) {
this.exportFunction = exportFunction;
public FromDiskExporterBuilder<EXPORT_DATA> setExporter(GrpcExporter<Marshaler> exporter) {
this.exportFunction = exporter::export;
return this;
}

public FromDiskExporterImpl<T> build() throws IOException {
return new FromDiskExporterImpl<>(serializer, exportFunction, storage);
public FromDiskExporterImpl build() throws IOException {
return new FromDiskExporterImpl(exportFunction, storage, signalType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,46 @@
package io.opentelemetry.contrib.disk.buffering.internal.exporter;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools;
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
import io.opentelemetry.exporter.internal.marshal.ProtoFieldInfo;
import io.opentelemetry.proto.collector.logs.v1.internal.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.internal.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.internal.ExportTraceServiceRequest;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.logging.Logger;

/**
* Signal-type generic class that can read telemetry previously buffered on disk and send it to
* another delegated exporter.
*/
public final class FromDiskExporterImpl<EXPORT_DATA> implements FromDiskExporter {
public final class FromDiskExporterImpl implements FromDiskExporter {
private final DebugLogger logger;
private final Storage storage;
private final SignalDeserializer<EXPORT_DATA> deserializer;
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
private final SignalTypes signalType;
private final BiFunction<ProtoByteArrayMarshaler, Integer, CompletableResultCode> exportFunction;

FromDiskExporterImpl(
SignalDeserializer<EXPORT_DATA> deserializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage) {
this.deserializer = deserializer;
BiFunction<ProtoByteArrayMarshaler, Integer, CompletableResultCode> exportFunction,
Storage storage,
SignalTypes signalType) {
this.exportFunction = exportFunction;
this.storage = storage;
this.signalType = signalType;
this.logger =
DebugLogger.wrap(
Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled());
}

public static <T> FromDiskExporterBuilder<T> builder(Storage storage) {
return new FromDiskExporterBuilder<>(storage);
public static <T> FromDiskExporterBuilder<T> builder(Storage storage, SignalTypes signalType) {
return new FromDiskExporterBuilder<>(storage, signalType);
}

/**
Expand All @@ -56,25 +59,37 @@ public static <T> FromDiskExporterBuilder<T> builder(Storage storage) {
*/
@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
logger.log("Attempting to export " + deserializer.signalType() + " batch from disk.");
logger.log("Attempting to export " + signalType.name() + " batch from disk.");
ReadableResult result =
storage.readAndProcess(
bytes -> {
logger.log(
"Read "
+ bytes.length
+ " "
+ deserializer.signalType()
+ " bytes from storage.");
logger.log("Read " + bytes.length + " " + signalType.name() + " bytes from storage.");
ProtoFieldInfo field;
switch (signalType) {
case metrics:
field = ExportMetricsServiceRequest.RESOURCE_METRICS;
break;
case logs:
field = ExportLogsServiceRequest.RESOURCE_LOGS;
break;
case spans:
field = ExportTraceServiceRequest.RESOURCE_SPANS;
break;
default:
throw new IllegalStateException("Unsupported signal type: " + signalType);
}
int itemCount = 0;
try {
List<EXPORT_DATA> telemetry = deserializer.deserialize(bytes);
logger.log(
"Now exporting batch of " + telemetry.size() + " " + deserializer.signalType());
CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit);
return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER;
itemCount = ProtobufTools.countRepeatedField(bytes, field.getFieldNumber());
} catch (DeserializationException e) {
return ProcessResult.CONTENT_INVALID;
}
logger.log("Now exporting batch of " + itemCount + " " + signalType.name());
CompletableResultCode join =
exportFunction
.apply(new ProtoByteArrayMarshaler(bytes), itemCount)
.join(timeout, unit);
return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER;
});
return result == ReadableResult.SUCCEEDED;
}
Expand Down
Loading
Loading