Skip to content

Commit 98fa296

Browse files
authored
Add ByteBuffer field type marshaling support to exporter. (#6686)
1 parent 7e4da16 commit 98fa296

File tree

6 files changed

+103
-10
lines changed

6 files changed

+103
-10
lines changed

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/CodedOutputStream.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import io.opentelemetry.api.internal.ConfigUtil;
4545
import java.io.IOException;
4646
import java.io.OutputStream;
47+
import java.nio.ByteBuffer;
4748

4849
/**
4950
* Protobuf wire encoder.
@@ -56,7 +57,7 @@
5657
//
5758
// Differences
5859
// - No support for Message/Lite
59-
// - No support for ByteString or ByteBuffer
60+
// - No support for ByteString
6061
// - No support for message set extensions
6162
// - No support for Unsafe
6263
// - No support for Java String, only UTF-8 bytes
@@ -329,6 +330,11 @@ public static int computeByteArraySizeNoTag(final byte[] value) {
329330
return computeLengthDelimitedFieldSize(value.length);
330331
}
331332

333+
/** Compute the number of bytes that would be needed to encode a {@code bytes} field. */
334+
public static int computeByteBufferSizeNoTag(final ByteBuffer value) {
335+
return computeLengthDelimitedFieldSize(value.capacity());
336+
}
337+
332338
static int computeLengthDelimitedFieldSize(int fieldLength) {
333339
return computeUInt32SizeNoTag(fieldLength) + fieldLength;
334340
}
@@ -375,6 +381,8 @@ static long encodeZigZag64(final long n) {
375381
abstract void writeByteArrayNoTag(final byte[] value, final int offset, final int length)
376382
throws IOException;
377383

384+
abstract void writeByteBufferNoTag(final ByteBuffer value) throws IOException;
385+
378386
// =================================================================
379387

380388
/** Abstract base class for buffered encoders. */
@@ -487,6 +495,49 @@ void writeByteArrayNoTag(final byte[] value, int offset, int length) throws IOEx
487495
write(value, offset, length);
488496
}
489497

498+
@Override
499+
void writeByteBufferNoTag(final ByteBuffer value) throws IOException {
500+
writeUInt32NoTag(value.capacity());
501+
if (value.hasArray()) {
502+
write(value.array(), value.arrayOffset(), value.capacity());
503+
} else {
504+
write((ByteBuffer) value.duplicate().clear());
505+
}
506+
}
507+
508+
void write(ByteBuffer value) throws IOException {
509+
int length = value.remaining();
510+
if (limit - position >= length) {
511+
// We have room in the current buffer.
512+
value.get(buffer, position, length);
513+
position += length;
514+
totalBytesWritten += length;
515+
} else {
516+
// Write extends past current buffer. Fill the rest of this buffer and
517+
// flush.
518+
final int bytesWritten = limit - position;
519+
value.get(buffer, position, bytesWritten);
520+
length -= bytesWritten;
521+
position = limit;
522+
totalBytesWritten += bytesWritten;
523+
doFlush();
524+
525+
// Now deal with the rest.
526+
// Since we have an output stream, this is our buffer
527+
// and buffer offset == 0
528+
while (length > limit) {
529+
// Copy data into the buffer before writing it to OutputStream.
530+
value.get(buffer, 0, limit);
531+
out.write(buffer, 0, limit);
532+
length -= limit;
533+
totalBytesWritten += limit;
534+
}
535+
value.get(buffer, 0, length);
536+
position = length;
537+
totalBytesWritten += length;
538+
}
539+
}
540+
490541
@Override
491542
void write(byte value) throws IOException {
492543
if (position == limit) {

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/JsonSerializer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.fasterxml.jackson.core.JsonGenerator;
1010
import java.io.IOException;
1111
import java.io.OutputStream;
12+
import java.nio.ByteBuffer;
1213
import java.nio.charset.StandardCharsets;
1314
import java.util.List;
1415

@@ -126,6 +127,13 @@ public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
126127
generator.writeBinaryField(field.getJsonName(), value);
127128
}
128129

130+
@Override
131+
public void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
132+
byte[] data = new byte[value.capacity()];
133+
((ByteBuffer) value.duplicate().clear()).get(data);
134+
generator.writeBinaryField(field.getJsonName(), data);
135+
}
136+
129137
@Override
130138
protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
131139
generator.writeObjectFieldStart(field.getJsonName());

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/MarshalerUtil.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.io.ByteArrayOutputStream;
1414
import java.io.IOException;
1515
import java.io.UncheckedIOException;
16+
import java.nio.ByteBuffer;
1617
import java.nio.charset.StandardCharsets;
1718
import java.util.ArrayList;
1819
import java.util.Collection;
@@ -346,6 +347,14 @@ public static int sizeBytes(ProtoFieldInfo field, byte[] message) {
346347
return field.getTagSize() + CodedOutputStream.computeByteArraySizeNoTag(message);
347348
}
348349

350+
/** Returns the size of a bytes field based on the buffer's capacity. */
351+
public static int sizeByteBuffer(ProtoFieldInfo field, ByteBuffer message) {
352+
if (message.capacity() == 0) {
353+
return 0;
354+
}
355+
return field.getTagSize() + CodedOutputStream.computeByteBufferSizeNoTag(message);
356+
}
357+
349358
/** Returns the size of a enum field. */
350359
// Assumes OTLP always defines the first item in an enum with number 0, which it does and will.
351360
public static int sizeEnum(ProtoFieldInfo field, ProtoEnumInfo enumValue) {

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/ProtoSerializer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.opentelemetry.api.trace.TraceId;
1111
import java.io.IOException;
1212
import java.io.OutputStream;
13+
import java.nio.ByteBuffer;
1314
import java.util.HashMap;
1415
import java.util.List;
1516
import java.util.Map;
@@ -168,6 +169,12 @@ public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
168169
output.writeByteArrayNoTag(value);
169170
}
170171

172+
@Override
173+
public void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
174+
output.writeUInt32NoTag(field.getTag());
175+
output.writeByteBufferNoTag(value);
176+
}
177+
171178
@Override
172179
protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
173180
output.writeUInt32NoTag(field.getTag());

exporters/common/src/main/java/io/opentelemetry/exporter/internal/marshal/Serializer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.opentelemetry.api.common.Attributes;
1010
import io.opentelemetry.sdk.internal.DynamicPrimitiveLongList;
1111
import java.io.IOException;
12+
import java.nio.ByteBuffer;
1213
import java.util.Collection;
1314
import java.util.List;
1415
import java.util.Map;
@@ -253,8 +254,22 @@ public void serializeBytes(ProtoFieldInfo field, byte[] value) throws IOExceptio
253254
writeBytes(field, value);
254255
}
255256

257+
/**
258+
* Serializes a protobuf {@code bytes} field. Writes all content of the ByteBuffer regardless of
259+
* the current position and limit. Does not alter the position or limit of the provided
260+
* ByteBuffer.
261+
*/
262+
public void serializeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException {
263+
if (value.capacity() == 0) {
264+
return;
265+
}
266+
writeByteBuffer(field, value);
267+
}
268+
256269
public abstract void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException;
257270

271+
public abstract void writeByteBuffer(ProtoFieldInfo field, ByteBuffer value) throws IOException;
272+
258273
protected abstract void writeStartMessage(ProtoFieldInfo field, int protoMessageSize)
259274
throws IOException;
260275

exporters/otlp/profiles/src/main/java/io/opentelemetry/exporter/otlp/profiles/ProfileContainerMarshaler.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.opentelemetry.exporter.internal.otlp.KeyValueMarshaler;
1212
import io.opentelemetry.proto.profiles.v1experimental.internal.ProfileContainer;
1313
import java.io.IOException;
14+
import java.nio.ByteBuffer;
1415

1516
final class ProfileContainerMarshaler extends MarshalerWithSize {
1617

@@ -20,17 +21,19 @@ final class ProfileContainerMarshaler extends MarshalerWithSize {
2021
private final KeyValueMarshaler[] attributeMarshalers;
2122
private final int droppedAttributesCount;
2223
private final byte[] originalPayloadFormatUtf8;
23-
private final byte[] originalPayload;
24+
private final ByteBuffer originalPayload;
2425
private final ProfileMarshaler profileMarshaler;
2526

2627
static ProfileContainerMarshaler create(ProfileContainerData profileContainerData) {
2728
int droppedAttributesCount =
2829
profileContainerData.getTotalAttributeCount() - profileContainerData.getAttributes().size();
2930

30-
// Not ideal, but this will do for now. ByteBuffer support in
31-
// Serialzer/CodedOutputStream/MarshalerUtilwill follow in a separate step.
32-
byte[] originalPayload = new byte[profileContainerData.getOriginalPayload().remaining()];
33-
profileContainerData.getOriginalPayload().get(originalPayload);
31+
ByteBuffer originalPayload = profileContainerData.getOriginalPayload();
32+
if (originalPayload == null) {
33+
originalPayload = ByteBuffer.allocate(0);
34+
} else {
35+
originalPayload = originalPayload.duplicate().asReadOnlyBuffer();
36+
}
3437

3538
return new ProfileContainerMarshaler(
3639
profileContainerData.getProfileIdBytes(),
@@ -50,7 +53,7 @@ private ProfileContainerMarshaler(
5053
KeyValueMarshaler[] attributeMarshalers,
5154
int droppedAttributesCount,
5255
byte[] originalPayloadFormat,
53-
byte[] originalPayload,
56+
ByteBuffer originalPayload,
5457
ProfileMarshaler profileMarshaler) {
5558
super(
5659
calculateSize(
@@ -80,7 +83,7 @@ protected void writeTo(Serializer output) throws IOException {
8083
output.serializeRepeatedMessage(ProfileContainer.ATTRIBUTES, attributeMarshalers);
8184
output.serializeUInt32(ProfileContainer.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
8285
output.serializeString(ProfileContainer.ORIGINAL_PAYLOAD_FORMAT, originalPayloadFormatUtf8);
83-
output.serializeBytes(ProfileContainer.ORIGINAL_PAYLOAD, originalPayload);
86+
output.serializeByteBuffer(ProfileContainer.ORIGINAL_PAYLOAD, originalPayload);
8487
output.serializeMessage(ProfileContainer.PROFILE, profileMarshaler);
8588
}
8689

@@ -91,7 +94,7 @@ private static int calculateSize(
9194
KeyValueMarshaler[] attributeMarshalers,
9295
int droppedAttributesCount,
9396
byte[] originalPayloadFormat,
94-
byte[] originalPayload,
97+
ByteBuffer originalPayload,
9598
ProfileMarshaler profileMarshaler) {
9699
int size;
97100
size = 0;
@@ -103,7 +106,7 @@ private static int calculateSize(
103106
MarshalerUtil.sizeUInt32(ProfileContainer.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
104107
size +=
105108
MarshalerUtil.sizeBytes(ProfileContainer.ORIGINAL_PAYLOAD_FORMAT, originalPayloadFormat);
106-
size += MarshalerUtil.sizeBytes(ProfileContainer.ORIGINAL_PAYLOAD, originalPayload);
109+
size += MarshalerUtil.sizeByteBuffer(ProfileContainer.ORIGINAL_PAYLOAD, originalPayload);
107110
size += MarshalerUtil.sizeMessage(ProfileContainer.PROFILE, profileMarshaler);
108111
return size;
109112
}

0 commit comments

Comments
 (0)