Skip to content

Commit 2b8de4b

Browse files
authored
Use a more efficient serializer and write bytes directly to disk (#2138)
1 parent 50fa886 commit 2b8de4b

28 files changed

+274
-150
lines changed

disk-buffering/build.gradle.kts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ val protos by configurations.creating
1717
dependencies {
1818
api("io.opentelemetry:opentelemetry-sdk")
1919
implementation("io.opentelemetry:opentelemetry-api-incubator")
20+
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
2021
compileOnly("com.google.auto.value:auto-value-annotations")
2122
annotationProcessor("com.google.auto.value:auto-value")
2223
testImplementation("org.mockito:mockito-inline")
@@ -47,9 +48,10 @@ wire {
4748
}
4849

4950
root(
50-
"opentelemetry.proto.trace.v1.TracesData",
51-
"opentelemetry.proto.metrics.v1.MetricsData",
52-
"opentelemetry.proto.logs.v1.LogsData",
51+
// These are the types used by the Java SDK's OTLP exporters.
52+
"opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest",
53+
"opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest",
54+
"opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest",
5355
)
5456
}
5557

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
7+
8+
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
9+
import java.io.IOException;
10+
import java.io.OutputStream;
11+
import java.util.Collection;
12+
13+
class NoopSerializer<T> implements SignalSerializer<T> {
14+
15+
@Override
16+
public NoopSerializer<T> initialize(Collection<T> data) {
17+
return this;
18+
}
19+
20+
@Override
21+
public void writeBinaryTo(OutputStream output) throws IOException {}
22+
23+
@Override
24+
public int getBinarySerializedSize() {
25+
return 0;
26+
}
27+
28+
@Override
29+
public void reset() {}
30+
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ public static <T> ToDiskExporterBuilder<T> builder(Storage storage) {
3838
return new ToDiskExporterBuilder<>(storage);
3939
}
4040

41-
public CompletableResultCode export(Collection<EXPORT_DATA> data) {
41+
public synchronized CompletableResultCode export(Collection<EXPORT_DATA> data) {
4242
logger.log("Intercepting exporter batch.", Level.FINER);
4343
try {
44-
if (storage.write(serializer.serialize(data))) {
44+
serializer.initialize(data);
45+
if (storage.write(serializer)) {
4546
return CompletableResultCode.ofSuccess();
4647
}
4748
logger.log("Could not store batch in disk. Exporting it right away.");
@@ -52,6 +53,8 @@ public CompletableResultCode export(Collection<EXPORT_DATA> data) {
5253
Level.WARNING,
5354
e);
5455
return exportFunction.apply(data);
56+
} finally {
57+
serializer.reset();
5558
}
5659
}
5760

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
public final class ToDiskExporterBuilder<T> {
1616

17-
private SignalSerializer<T> serializer = ts -> new byte[0];
17+
private SignalSerializer<T> serializer = new NoopSerializer<T>();
1818

1919
private final Storage storage;
2020

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.ProtoLogsDataMapper;
99
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
10-
import io.opentelemetry.proto.logs.v1.LogsData;
10+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
1111
import io.opentelemetry.sdk.logs.data.LogRecordData;
1212
import java.io.IOException;
1313
import java.util.List;
@@ -24,7 +24,8 @@ static LogRecordDataDeserializer getInstance() {
2424
@Override
2525
public List<LogRecordData> deserialize(byte[] source) throws DeserializationException {
2626
try {
27-
return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source));
27+
return ProtoLogsDataMapper.getInstance()
28+
.fromProto(ExportLogsServiceRequest.ADAPTER.decode(source));
2829
} catch (IOException e) {
2930
throw new DeserializationException(e);
3031
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics.ProtoMetricsDataMapper;
99
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
10-
import io.opentelemetry.proto.metrics.v1.MetricsData;
10+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
1111
import io.opentelemetry.sdk.metrics.data.MetricData;
1212
import java.io.IOException;
1313
import java.util.List;
@@ -24,7 +24,8 @@ static MetricDataDeserializer getInstance() {
2424
@Override
2525
public List<MetricData> deserialize(byte[] source) throws DeserializationException {
2626
try {
27-
return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source));
27+
return ProtoMetricsDataMapper.getInstance()
28+
.fromProto(ExportMetricsServiceRequest.ADAPTER.decode(source));
2829
} catch (IOException e) {
2930
throw new DeserializationException(e);
3031
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.ProtoSpansDataMapper;
99
import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes;
10-
import io.opentelemetry.proto.trace.v1.TracesData;
10+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
1111
import io.opentelemetry.sdk.trace.data.SpanData;
1212
import java.io.IOException;
1313
import java.util.List;
@@ -24,7 +24,8 @@ static SpanDataDeserializer getInstance() {
2424
@Override
2525
public List<SpanData> deserialize(byte[] source) throws DeserializationException {
2626
try {
27-
return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source));
27+
return ProtoSpansDataMapper.getInstance()
28+
.fromProto(ExportTraceServiceRequest.ADAPTER.decode(source));
2829
} catch (IOException e) {
2930
throw new DeserializationException(e);
3031
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/common/ByteStringMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ public static ByteStringMapper getInstance() {
1616
}
1717

1818
public ByteString stringToProto(String source) {
19-
return ByteString.encodeUtf8(source);
19+
return ByteString.decodeHex(source);
2020
}
2121

2222
public String protoToString(ByteString source) {
23-
return source.utf8();
23+
return source.hex();
2424
}
2525
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/ProtoLogsDataMapper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs;
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper;
9+
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
910
import io.opentelemetry.proto.logs.v1.LogRecord;
10-
import io.opentelemetry.proto.logs.v1.LogsData;
1111
import io.opentelemetry.proto.logs.v1.ResourceLogs;
1212
import io.opentelemetry.proto.logs.v1.ScopeLogs;
1313
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@@ -19,7 +19,7 @@
1919

2020
public final class ProtoLogsDataMapper
2121
extends BaseProtoSignalsDataMapper<
22-
LogRecordData, LogRecord, LogsData, ResourceLogs, ScopeLogs> {
22+
LogRecordData, LogRecord, ExportLogsServiceRequest, ResourceLogs, ScopeLogs> {
2323

2424
private static final ProtoLogsDataMapper INSTANCE = new ProtoLogsDataMapper();
2525

@@ -39,12 +39,12 @@ protected LogRecordData protoToSignalItem(
3939
}
4040

4141
@Override
42-
protected List<ResourceLogs> getProtoResources(LogsData logsData) {
42+
protected List<ResourceLogs> getProtoResources(ExportLogsServiceRequest logsData) {
4343
return logsData.resource_logs;
4444
}
4545

4646
@Override
47-
protected LogsData createProtoData(
47+
protected ExportLogsServiceRequest createProtoData(
4848
Map<Resource, Map<InstrumentationScopeInfo, List<LogRecord>>> itemsByResource) {
4949
List<ResourceLogs> items = new ArrayList<>();
5050
itemsByResource.forEach(
@@ -58,7 +58,7 @@ protected LogsData createProtoData(
5858
}
5959
items.add(resourceLogsBuilder.build());
6060
});
61-
return new LogsData.Builder().resource_logs(items).build();
61+
return new ExportLogsServiceRequest.Builder().resource_logs(items).build();
6262
}
6363

6464
private ScopeLogs.Builder createProtoScopeBuilder(InstrumentationScopeInfo scopeInfo) {

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics;
77

88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.BaseProtoSignalsDataMapper;
9+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
910
import io.opentelemetry.proto.metrics.v1.Metric;
10-
import io.opentelemetry.proto.metrics.v1.MetricsData;
1111
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
1212
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
1313
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@@ -19,7 +19,7 @@
1919

2020
public final class ProtoMetricsDataMapper
2121
extends BaseProtoSignalsDataMapper<
22-
MetricData, Metric, MetricsData, ResourceMetrics, ScopeMetrics> {
22+
MetricData, Metric, ExportMetricsServiceRequest, ResourceMetrics, ScopeMetrics> {
2323

2424
private static final ProtoMetricsDataMapper INSTANCE = new ProtoMetricsDataMapper();
2525

@@ -39,12 +39,12 @@ protected MetricData protoToSignalItem(
3939
}
4040

4141
@Override
42-
protected List<ResourceMetrics> getProtoResources(MetricsData protoData) {
42+
protected List<ResourceMetrics> getProtoResources(ExportMetricsServiceRequest protoData) {
4343
return protoData.resource_metrics;
4444
}
4545

4646
@Override
47-
protected MetricsData createProtoData(
47+
protected ExportMetricsServiceRequest createProtoData(
4848
Map<Resource, Map<InstrumentationScopeInfo, List<Metric>>> itemsByResource) {
4949
List<ResourceMetrics> items = new ArrayList<>();
5050
itemsByResource.forEach(
@@ -58,7 +58,7 @@ protected MetricsData createProtoData(
5858
}
5959
items.add(resourceMetricsBuilder.build());
6060
});
61-
return new MetricsData.Builder().resource_metrics(items).build();
61+
return new ExportMetricsServiceRequest.Builder().resource_metrics(items).build();
6262
}
6363

6464
private ScopeMetrics.Builder createProtoScopeBuilder(InstrumentationScopeInfo scopeInfo) {

0 commit comments

Comments
 (0)