Skip to content

Commit 5876190

Browse files
committed
Make the disk buffered exporting more efficient
The previous implementation would load the protobuf encoded bytes from disk, deserialize it, then send it through the delegate exporter pipeline which would likely re-serialize it into the exact same format. This PR reduces the CPU burden by providing an optional alternate path which bypasses the exporter pipeline. Instead it takes the serialized bytes from disk and sends it straight to an HttpExporter or GrpcExporter, which only takes a `Marshaler` class allowing the serialized bytes to be passed straight through. Since these exporters like to know the number of serialized elements that are being passed in, I added a function (with the help of Claude) that counts the serialized elements within the protobuf encoded byte array. A future optimization would be to allow streaming the bytes directly from disk without reading them back into a byte array.
1 parent 9e432e8 commit 5876190

File tree

14 files changed

+823
-74
lines changed

14 files changed

+823
-74
lines changed

buildSrc/src/main/kotlin/otel.spotless-conventions.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ plugins {
55
spotless {
66
java {
77
googleJavaFormat()
8+
toggleOffOn()
89
licenseHeaderFile(rootProject.file("buildscripts/spotless.license.java"), "(package|import|public|// Includes work from:)")
910
target("src/**/*.java")
1011
}

disk-buffering/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ dependencies {
2222
annotationProcessor("com.google.auto.value:auto-value")
2323
testImplementation("org.mockito:mockito-inline")
2424
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
25+
testImplementation("io.opentelemetry:opentelemetry-exporter-otlp")
26+
testImplementation("io.opentelemetry:opentelemetry-exporter-sender-okhttp")
2527

2628
protos("io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha@jar")
2729
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,49 @@
99
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
1010
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
1111
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
12+
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
13+
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
14+
import io.opentelemetry.exporter.internal.http.HttpExporter;
15+
import io.opentelemetry.exporter.internal.marshal.Marshaler;
1216
import io.opentelemetry.sdk.logs.data.LogRecordData;
1317
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
1418
import java.io.IOException;
1519
import java.util.concurrent.TimeUnit;
1620

1721
public class LogRecordFromDiskExporter implements FromDiskExporter {
1822

19-
private final FromDiskExporterImpl<LogRecordData> delegate;
23+
private final FromDiskExporterImpl delegate;
2024

2125
public static LogRecordFromDiskExporter create(LogRecordExporter exporter, Storage storage)
2226
throws IOException {
23-
FromDiskExporterImpl<LogRecordData> delegate =
24-
FromDiskExporterImpl.<LogRecordData>builder(storage)
25-
.setDeserializer(SignalDeserializer.ofLogs())
26-
.setExportFunction(exporter::export)
27+
FromDiskExporterImpl delegate =
28+
FromDiskExporterImpl.<LogRecordData>builder(storage, SignalTypes.logs)
29+
.setExportFunction(exporter::export, SignalDeserializer.ofLogs())
2730
.build();
2831
return new LogRecordFromDiskExporter(delegate);
2932
}
3033

31-
private LogRecordFromDiskExporter(FromDiskExporterImpl<LogRecordData> delegate) {
34+
public static LogRecordFromDiskExporter create(HttpExporter<Marshaler> exporter, Storage storage)
35+
throws IOException {
36+
FromDiskExporterImpl delegate =
37+
FromDiskExporterImpl.<LogRecordData>builder(storage, SignalTypes.logs)
38+
.setExporter(exporter)
39+
.build();
40+
return new LogRecordFromDiskExporter(delegate);
41+
}
42+
43+
// Private because untested.
44+
@SuppressWarnings("unused")
45+
private static LogRecordFromDiskExporter create(GrpcExporter<Marshaler> exporter, Storage storage)
46+
throws IOException {
47+
FromDiskExporterImpl delegate =
48+
FromDiskExporterImpl.<LogRecordData>builder(storage, SignalTypes.logs)
49+
.setExporter(exporter)
50+
.build();
51+
return new LogRecordFromDiskExporter(delegate);
52+
}
53+
54+
private LogRecordFromDiskExporter(FromDiskExporterImpl delegate) {
3255
this.delegate = delegate;
3356
}
3457

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,49 @@
99
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
1010
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
1111
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
12+
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
13+
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
14+
import io.opentelemetry.exporter.internal.http.HttpExporter;
15+
import io.opentelemetry.exporter.internal.marshal.Marshaler;
1216
import io.opentelemetry.sdk.metrics.data.MetricData;
1317
import io.opentelemetry.sdk.metrics.export.MetricExporter;
1418
import java.io.IOException;
1519
import java.util.concurrent.TimeUnit;
1620

1721
public class MetricFromDiskExporter implements FromDiskExporter {
1822

19-
private final FromDiskExporterImpl<MetricData> delegate;
23+
private final FromDiskExporterImpl delegate;
2024

2125
public static MetricFromDiskExporter create(MetricExporter exporter, Storage storage)
2226
throws IOException {
23-
FromDiskExporterImpl<MetricData> delegate =
24-
FromDiskExporterImpl.<MetricData>builder(storage)
25-
.setDeserializer(SignalDeserializer.ofMetrics())
26-
.setExportFunction(exporter::export)
27+
FromDiskExporterImpl delegate =
28+
FromDiskExporterImpl.<MetricData>builder(storage, SignalTypes.metrics)
29+
.setExportFunction(exporter::export, SignalDeserializer.ofMetrics())
2730
.build();
2831
return new MetricFromDiskExporter(delegate);
2932
}
3033

31-
private MetricFromDiskExporter(FromDiskExporterImpl<MetricData> delegate) {
34+
public static MetricFromDiskExporter create(HttpExporter<Marshaler> exporter, Storage storage)
35+
throws IOException {
36+
FromDiskExporterImpl delegate =
37+
FromDiskExporterImpl.<MetricData>builder(storage, SignalTypes.metrics)
38+
.setExporter(exporter)
39+
.build();
40+
return new MetricFromDiskExporter(delegate);
41+
}
42+
43+
// Private because untested.
44+
@SuppressWarnings("unused")
45+
private static MetricFromDiskExporter create(GrpcExporter<Marshaler> exporter, Storage storage)
46+
throws IOException {
47+
FromDiskExporterImpl delegate =
48+
FromDiskExporterImpl.<MetricData>builder(storage, SignalTypes.metrics)
49+
.setExporter(exporter)
50+
.build();
51+
return new MetricFromDiskExporter(delegate);
52+
}
53+
54+
private MetricFromDiskExporter(FromDiskExporterImpl delegate) {
3255
this.delegate = delegate;
3356
}
3457

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,49 @@
99
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
1010
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
1111
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
12+
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
13+
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
14+
import io.opentelemetry.exporter.internal.http.HttpExporter;
15+
import io.opentelemetry.exporter.internal.marshal.Marshaler;
1216
import io.opentelemetry.sdk.trace.data.SpanData;
1317
import io.opentelemetry.sdk.trace.export.SpanExporter;
1418
import java.io.IOException;
1519
import java.util.concurrent.TimeUnit;
1620

1721
public class SpanFromDiskExporter implements FromDiskExporter {
1822

19-
private final FromDiskExporterImpl<SpanData> delegate;
23+
private final FromDiskExporterImpl delegate;
2024

2125
public static SpanFromDiskExporter create(SpanExporter exporter, Storage storage)
2226
throws IOException {
23-
FromDiskExporterImpl<SpanData> delegate =
24-
FromDiskExporterImpl.<SpanData>builder(storage)
25-
.setDeserializer(SignalDeserializer.ofSpans())
26-
.setExportFunction(exporter::export)
27+
FromDiskExporterImpl delegate =
28+
FromDiskExporterImpl.<SpanData>builder(storage, SignalTypes.spans)
29+
.setExportFunction(exporter::export, SignalDeserializer.ofSpans())
2730
.build();
2831
return new SpanFromDiskExporter(delegate);
2932
}
3033

31-
private SpanFromDiskExporter(FromDiskExporterImpl<SpanData> delegate) {
34+
public static SpanFromDiskExporter create(HttpExporter<Marshaler> exporter, Storage storage)
35+
throws IOException {
36+
FromDiskExporterImpl delegate =
37+
FromDiskExporterImpl.<SpanData>builder(storage, SignalTypes.spans)
38+
.setExporter(exporter)
39+
.build();
40+
return new SpanFromDiskExporter(delegate);
41+
}
42+
43+
// Private because untested.
44+
@SuppressWarnings("unused")
45+
private static SpanFromDiskExporter create(GrpcExporter<Marshaler> exporter, Storage storage)
46+
throws IOException {
47+
FromDiskExporterImpl delegate =
48+
FromDiskExporterImpl.<SpanData>builder(storage, SignalTypes.spans)
49+
.setExporter(exporter)
50+
.build();
51+
return new SpanFromDiskExporter(delegate);
52+
}
53+
54+
private SpanFromDiskExporter(FromDiskExporterImpl delegate) {
3255
this.delegate = delegate;
3356
}
3457

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,51 +5,80 @@
55

66
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
77

8-
import static java.util.Collections.emptyList;
9-
108
import com.google.errorprone.annotations.CanIgnoreReturnValue;
119
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
1210
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
11+
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
12+
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
13+
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
14+
import io.opentelemetry.exporter.internal.http.HttpExporter;
15+
import io.opentelemetry.exporter.internal.marshal.Marshaler;
1316
import io.opentelemetry.sdk.common.CompletableResultCode;
1417
import java.io.IOException;
1518
import java.util.Collection;
19+
import java.util.List;
20+
import java.util.function.BiFunction;
1621
import java.util.function.Function;
17-
import org.jetbrains.annotations.NotNull;
22+
import java.util.logging.Logger;
1823

19-
public class FromDiskExporterBuilder<T> {
24+
public class FromDiskExporterBuilder<EXPORT_DATA> {
2025

21-
private SignalDeserializer<T> serializer = noopDeserializer();
26+
private final DebugLogger logger;
2227
private final Storage storage;
28+
private final SignalTypes signalType;
2329

24-
private Function<Collection<T>, CompletableResultCode> exportFunction =
25-
x -> CompletableResultCode.ofFailure();
30+
private BiFunction<ProtoByteArrayMarshaler, Integer, CompletableResultCode> exportFunction =
31+
(x, i) -> CompletableResultCode.ofFailure();
2632

27-
public FromDiskExporterBuilder(Storage storage) {
33+
public FromDiskExporterBuilder(Storage storage, SignalTypes signalType) {
2834
if (storage == null) {
2935
throw new NullPointerException("Storage cannot be null");
3036
}
3137
this.storage = storage;
38+
this.signalType = signalType;
39+
this.logger =
40+
DebugLogger.wrap(
41+
Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled());
3242
}
3343

34-
@NotNull
35-
private static <T> SignalDeserializer<T> noopDeserializer() {
36-
return x -> emptyList();
44+
@CanIgnoreReturnValue
45+
public FromDiskExporterBuilder<EXPORT_DATA> setExportFunction(
46+
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
47+
SignalDeserializer<EXPORT_DATA> deserializer) {
48+
if (!deserializer.signalType().equals(signalType.name())) {
49+
throw new IllegalArgumentException(
50+
deserializer.signalType() + " does not match " + signalType);
51+
}
52+
this.exportFunction =
53+
(exportRequest, itemCount) -> {
54+
try {
55+
List<EXPORT_DATA> telemetry = deserializer.deserialize(exportRequest.getBytes());
56+
return exportFunction.apply(telemetry);
57+
} catch (IOException e) {
58+
return CompletableResultCode.ofExceptionalFailure(e);
59+
}
60+
};
61+
return this;
3762
}
3863

64+
/**
65+
* The provided HttpExporter should _NOT_ be configured to send JSON. The data is serialized to
66+
* disk in protobuf format and sent directly to the provided exporter as a serialized payload.
67+
*/
3968
@CanIgnoreReturnValue
40-
public FromDiskExporterBuilder<T> setDeserializer(SignalDeserializer<T> serializer) {
41-
this.serializer = serializer;
69+
public FromDiskExporterBuilder<EXPORT_DATA> setExporter(HttpExporter<Marshaler> exporter) {
70+
// Any way we can assert the exporter is not configured for JSON?
71+
this.exportFunction = exporter::export;
4272
return this;
4373
}
4474

4575
@CanIgnoreReturnValue
46-
public FromDiskExporterBuilder<T> setExportFunction(
47-
Function<Collection<T>, CompletableResultCode> exportFunction) {
48-
this.exportFunction = exportFunction;
76+
public FromDiskExporterBuilder<EXPORT_DATA> setExporter(GrpcExporter<Marshaler> exporter) {
77+
this.exportFunction = exporter::export;
4978
return this;
5079
}
5180

52-
public FromDiskExporterImpl<T> build() throws IOException {
53-
return new FromDiskExporterImpl<>(serializer, exportFunction, storage);
81+
public FromDiskExporterImpl build() throws IOException {
82+
return new FromDiskExporterImpl(exportFunction, storage, signalType);
5483
}
5584
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,43 +6,46 @@
66
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException;
9-
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
109
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1110
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
1211
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
1312
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
13+
import io.opentelemetry.contrib.disk.buffering.internal.utils.ProtobufTools;
14+
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
15+
import io.opentelemetry.exporter.internal.marshal.ProtoFieldInfo;
16+
import io.opentelemetry.proto.collector.logs.v1.internal.ExportLogsServiceRequest;
17+
import io.opentelemetry.proto.collector.metrics.v1.internal.ExportMetricsServiceRequest;
18+
import io.opentelemetry.proto.collector.trace.v1.internal.ExportTraceServiceRequest;
1419
import io.opentelemetry.sdk.common.CompletableResultCode;
1520
import java.io.IOException;
16-
import java.util.Collection;
17-
import java.util.List;
1821
import java.util.concurrent.TimeUnit;
19-
import java.util.function.Function;
22+
import java.util.function.BiFunction;
2023
import java.util.logging.Logger;
2124

2225
/**
2326
* Signal-type generic class that can read telemetry previously buffered on disk and send it to
2427
* another delegated exporter.
2528
*/
26-
public final class FromDiskExporterImpl<EXPORT_DATA> implements FromDiskExporter {
29+
public final class FromDiskExporterImpl implements FromDiskExporter {
2730
private final DebugLogger logger;
2831
private final Storage storage;
29-
private final SignalDeserializer<EXPORT_DATA> deserializer;
30-
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
32+
private final SignalTypes signalType;
33+
private final BiFunction<ProtoByteArrayMarshaler, Integer, CompletableResultCode> exportFunction;
3134

3235
FromDiskExporterImpl(
33-
SignalDeserializer<EXPORT_DATA> deserializer,
34-
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
35-
Storage storage) {
36-
this.deserializer = deserializer;
36+
BiFunction<ProtoByteArrayMarshaler, Integer, CompletableResultCode> exportFunction,
37+
Storage storage,
38+
SignalTypes signalType) {
3739
this.exportFunction = exportFunction;
3840
this.storage = storage;
41+
this.signalType = signalType;
3942
this.logger =
4043
DebugLogger.wrap(
4144
Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled());
4245
}
4346

44-
public static <T> FromDiskExporterBuilder<T> builder(Storage storage) {
45-
return new FromDiskExporterBuilder<>(storage);
47+
public static <T> FromDiskExporterBuilder<T> builder(Storage storage, SignalTypes signalType) {
48+
return new FromDiskExporterBuilder<>(storage, signalType);
4649
}
4750

4851
/**
@@ -56,25 +59,37 @@ public static <T> FromDiskExporterBuilder<T> builder(Storage storage) {
5659
*/
5760
@Override
5861
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
59-
logger.log("Attempting to export " + deserializer.signalType() + " batch from disk.");
62+
logger.log("Attempting to export " + signalType.name() + " batch from disk.");
6063
ReadableResult result =
6164
storage.readAndProcess(
6265
bytes -> {
63-
logger.log(
64-
"Read "
65-
+ bytes.length
66-
+ " "
67-
+ deserializer.signalType()
68-
+ " bytes from storage.");
66+
logger.log("Read " + bytes.length + " " + signalType.name() + " bytes from storage.");
67+
ProtoFieldInfo field;
68+
switch (signalType) {
69+
case metrics:
70+
field = ExportMetricsServiceRequest.RESOURCE_METRICS;
71+
break;
72+
case logs:
73+
field = ExportLogsServiceRequest.RESOURCE_LOGS;
74+
break;
75+
case spans:
76+
field = ExportTraceServiceRequest.RESOURCE_SPANS;
77+
break;
78+
default:
79+
throw new IllegalStateException("Unsupported signal type: " + signalType);
80+
}
81+
int itemCount = 0;
6982
try {
70-
List<EXPORT_DATA> telemetry = deserializer.deserialize(bytes);
71-
logger.log(
72-
"Now exporting batch of " + telemetry.size() + " " + deserializer.signalType());
73-
CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit);
74-
return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER;
83+
itemCount = ProtobufTools.countRepeatedField(bytes, field.getFieldNumber());
7584
} catch (DeserializationException e) {
7685
return ProcessResult.CONTENT_INVALID;
7786
}
87+
logger.log("Now exporting batch of " + itemCount + " " + signalType.name());
88+
CompletableResultCode join =
89+
exportFunction
90+
.apply(new ProtoByteArrayMarshaler(bytes), itemCount)
91+
.join(timeout, unit);
92+
return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER;
7893
});
7994
return result == ReadableResult.SUCCEEDED;
8095
}

0 commit comments

Comments
 (0)