Skip to content

Commit cb34923

Browse files
committed
Use a more efficient serializer and write bytes directly to disk
Instead of an intermediary byte array. The OTel Java SDK has a hand-coded and optimized serializer implementation that is well tested and used extensively. This PR switches to using that serializer. It is also more efficient to serialize directly to the file output stream instead of an intermediary byte array, so I used the same marshaling strategy used by the SDK and passed the marshaler down to the WritableFile.
1 parent 0f23b50 commit cb34923

28 files changed

+274
-150
lines changed

disk-buffering/build.gradle.kts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ val protos by configurations.creating
1818
dependencies {
1919
api("io.opentelemetry:opentelemetry-sdk")
2020
implementation("io.opentelemetry:opentelemetry-api-incubator")
21+
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
2122
compileOnly("com.google.auto.value:auto-value-annotations")
2223
annotationProcessor("com.google.auto.value:auto-value")
2324
signature("com.toasttab.android:gummy-bears-api-21:0.12.0:coreLib@signature")
@@ -63,9 +64,10 @@ wire {
6364
}
6465

6566
root(
66-
"opentelemetry.proto.trace.v1.TracesData",
67-
"opentelemetry.proto.metrics.v1.MetricsData",
68-
"opentelemetry.proto.logs.v1.LogsData",
67+
// These are the types used by the Java SDK's OTLP exporters.
68+
"opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest",
69+
"opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest",
70+
"opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest",
6971
)
7072
}
7173

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
7+
8+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
9+
import java.io.IOException;
10+
import java.io.OutputStream;
11+
import java.util.Collection;
12+
13+
class NoopSerializer<T> implements SignalSerializer<T> {
14+
15+
@Override
16+
public NoopSerializer<T> initialize(Collection<T> data) {
17+
return this;
18+
}
19+
20+
@Override
21+
public void writeBinaryTo(OutputStream output) throws IOException {}
22+
23+
@Override
24+
public int getBinarySerializedSize() {
25+
return 0;
26+
}
27+
28+
@Override
29+
public void reset() {}
30+
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ public static <T> ToDiskExporterBuilder<T> builder(Storage storage) {
3838
return new ToDiskExporterBuilder<>(storage);
3939
}
4040

41-
public CompletableResultCode export(Collection<EXPORT_DATA> data) {
41+
public synchronized CompletableResultCode export(Collection<EXPORT_DATA> data) {
4242
logger.log("Intercepting exporter batch.", Level.FINER);
4343
try {
44-
if (storage.write(serializer.serialize(data))) {
44+
serializer.initialize(data);
45+
if (storage.write(serializer)) {
4546
return CompletableResultCode.ofSuccess();
4647
}
4748
logger.log("Could not store batch in disk. Exporting it right away.");
@@ -52,6 +53,8 @@ public CompletableResultCode export(Collection<EXPORT_DATA> data) {
5253
Level.WARNING,
5354
e);
5455
return exportFunction.apply(data);
56+
} finally {
57+
serializer.reset();
5558
}
5659
}
5760

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
public final class ToDiskExporterBuilder<T> {
1616

17-
private SignalSerializer<T> serializer = ts -> new byte[0];
17+
private SignalSerializer<T> serializer = new NoopSerializer<T>();
1818

1919
private final Storage storage;
2020

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper;
99
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
10-
import io.opentelemetry.proto.logs.v1.LogsData;
10+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
1111
import io.opentelemetry.sdk.logs.data.LogRecordData;
1212
import java.io.IOException;
1313
import java.util.List;
@@ -24,7 +24,8 @@ static LogRecordDataDeserializer getInstance() {
2424
@Override
2525
public List<LogRecordData> deserialize(byte[] source) throws DeserializationException {
2626
try {
27-
return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source));
27+
return ProtoLogsDataMapper.getInstance()
28+
.fromProto(ExportLogsServiceRequest.ADAPTER.decode(source));
2829
} catch (IOException e) {
2930
throw new DeserializationException(e);
3031
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper;
99
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
10-
import io.opentelemetry.proto.metrics.v1.MetricsData;
10+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
1111
import io.opentelemetry.sdk.metrics.data.MetricData;
1212
import java.io.IOException;
1313
import java.util.List;
@@ -24,7 +24,8 @@ static MetricDataDeserializer getInstance() {
2424
@Override
2525
public List<MetricData> deserialize(byte[] source) throws DeserializationException {
2626
try {
27-
return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source));
27+
return ProtoMetricsDataMapper.getInstance()
28+
.fromProto(ExportMetricsServiceRequest.ADAPTER.decode(source));
2829
} catch (IOException e) {
2930
throw new DeserializationException(e);
3031
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper;
99
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
10-
import io.opentelemetry.proto.trace.v1.TracesData;
10+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
1111
import io.opentelemetry.sdk.trace.data.SpanData;
1212
import java.io.IOException;
1313
import java.util.List;
@@ -24,7 +24,8 @@ static SpanDataDeserializer getInstance() {
2424
@Override
2525
public List<SpanData> deserialize(byte[] source) throws DeserializationException {
2626
try {
27-
return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source));
27+
return ProtoSpansDataMapper.getInstance()
28+
.fromProto(ExportTraceServiceRequest.ADAPTER.decode(source));
2829
} catch (IOException e) {
2930
throw new DeserializationException(e);
3031
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/common/ByteStringMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ public static ByteStringMapper getInstance() {
1616
}
1717

1818
public ByteString stringToProto(String source) {
19-
return ByteString.encodeUtf8(source);
19+
return ByteString.decodeHex(source);
2020
}
2121

2222
public String protoToString(ByteString source) {
23-
return source.utf8();
23+
return source.hex();
2424
}
2525
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs;
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper;
9+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
910
import io.opentelemetry.proto.logs.v1.LogRecord;
10-
import io.opentelemetry.proto.logs.v1.LogsData;
1111
import io.opentelemetry.proto.logs.v1.ResourceLogs;
1212
import io.opentelemetry.proto.logs.v1.ScopeLogs;
1313
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@@ -19,7 +19,7 @@
1919

2020
public final class ProtoLogsDataMapper
2121
extends BaseProtoSignalsDataMapper<
22-
LogRecordData, LogRecord, LogsData, ResourceLogs, ScopeLogs> {
22+
LogRecordData, LogRecord, ExportLogsServiceRequest, ResourceLogs, ScopeLogs> {
2323

2424
private static final ProtoLogsDataMapper INSTANCE = new ProtoLogsDataMapper();
2525

@@ -39,12 +39,12 @@ protected LogRecordData protoToSignalItem(
3939
}
4040

4141
@Override
42-
protected List<ResourceLogs> getProtoResources(LogsData logsData) {
42+
protected List<ResourceLogs> getProtoResources(ExportLogsServiceRequest logsData) {
4343
return logsData.resource_logs;
4444
}
4545

4646
@Override
47-
protected LogsData createProtoData(
47+
protected ExportLogsServiceRequest createProtoData(
4848
Map<Resource, Map<InstrumentationScopeInfo, List<LogRecord>>> itemsByResource) {
4949
List<ResourceLogs> items = new ArrayList<>();
5050
itemsByResource.forEach(
@@ -58,7 +58,7 @@ protected LogsData createProtoData(
5858
}
5959
items.add(resourceLogsBuilder.build());
6060
});
61-
return new LogsData.Builder().resource_logs(items).build();
61+
return new ExportLogsServiceRequest.Builder().resource_logs(items).build();
6262
}
6363

6464
private ScopeLogs.Builder createProtoScopeBuilder(InstrumentationScopeInfo scopeInfo) {

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics;
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper;
9+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
910
import io.opentelemetry.proto.metrics.v1.Metric;
10-
import io.opentelemetry.proto.metrics.v1.MetricsData;
1111
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
1212
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
1313
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@@ -19,7 +19,7 @@
1919

2020
public final class ProtoMetricsDataMapper
2121
extends BaseProtoSignalsDataMapper<
22-
MetricData, Metric, MetricsData, ResourceMetrics, ScopeMetrics> {
22+
MetricData, Metric, ExportMetricsServiceRequest, ResourceMetrics, ScopeMetrics> {
2323

2424
private static final ProtoMetricsDataMapper INSTANCE = new ProtoMetricsDataMapper();
2525

@@ -39,12 +39,12 @@ protected MetricData protoToSignalItem(
3939
}
4040

4141
@Override
42-
protected List<ResourceMetrics> getProtoResources(MetricsData protoData) {
42+
protected List<ResourceMetrics> getProtoResources(ExportMetricsServiceRequest protoData) {
4343
return protoData.resource_metrics;
4444
}
4545

4646
@Override
47-
protected MetricsData createProtoData(
47+
protected ExportMetricsServiceRequest createProtoData(
4848
Map<Resource, Map<InstrumentationScopeInfo, List<Metric>>> itemsByResource) {
4949
List<ResourceMetrics> items = new ArrayList<>();
5050
itemsByResource.forEach(
@@ -58,7 +58,7 @@ protected MetricsData createProtoData(
5858
}
5959
items.add(resourceMetricsBuilder.build());
6060
});
61-
return new MetricsData.Builder().resource_metrics(items).build();
61+
return new ExportMetricsServiceRequest.Builder().resource_metrics(items).build();
6262
}
6363

6464
private ScopeMetrics.Builder createProtoScopeBuilder(InstrumentationScopeInfo scopeInfo) {

0 commit comments

Comments
 (0)