Skip to content

Commit 65ce1e6

Browse files
authored
[format] read blobs as bytes when blob-as-descriptor is false (#6989)
1 parent 17727e7 commit 65ce1e6

File tree

4 files changed

+115
-54
lines changed

4 files changed

+115
-54
lines changed

paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,15 @@
4343
/** {@link FileFormat} for blob file. */
4444
public class BlobFileFormat extends FileFormat {
4545

46+
private final boolean blobAsDescriptor;
47+
4648
public BlobFileFormat() {
49+
this(false);
50+
}
51+
52+
public BlobFileFormat(boolean blobAsDescriptor) {
4753
super(BlobFileFormatFactory.IDENTIFIER);
54+
this.blobAsDescriptor = blobAsDescriptor;
4855
}
4956

5057
public static boolean isBlobFile(String fileName) {
@@ -56,7 +63,7 @@ public FormatReaderFactory createReaderFactory(
5663
RowType dataSchemaRowType,
5764
RowType projectedRowType,
5865
@Nullable List<Predicate> filters) {
59-
return new BlobFormatReaderFactory();
66+
return new BlobFormatReaderFactory(blobAsDescriptor);
6067
}
6168

6269
@Override
@@ -89,10 +96,20 @@ public FormatWriter create(PositionOutputStream out, String compression) {
8996

9097
private static class BlobFormatReaderFactory implements FormatReaderFactory {
9198

99+
private final boolean blobAsDescriptor;
100+
101+
public BlobFormatReaderFactory(boolean blobAsDescriptor) {
102+
this.blobAsDescriptor = blobAsDescriptor;
103+
}
104+
92105
@Override
93106
public FileRecordReader<InternalRow> createReader(Context context) throws IOException {
94107
return new BlobFormatReader(
95-
context.fileIO(), context.filePath(), context.fileSize(), context.selection());
108+
context.fileIO(),
109+
context.filePath(),
110+
context.fileSize(),
111+
context.selection(),
112+
blobAsDescriptor);
96113
}
97114
}
98115
}

paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormatFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.format.blob;
2020

21+
import org.apache.paimon.CoreOptions;
2122
import org.apache.paimon.format.FileFormat;
2223
import org.apache.paimon.format.FileFormatFactory;
2324

@@ -33,6 +34,7 @@ public String identifier() {
3334

3435
@Override
3536
public FileFormat create(FormatContext formatContext) {
36-
return new BlobFileFormat();
37+
boolean blobAsDescriptor = formatContext.options().get(CoreOptions.BLOB_AS_DESCRIPTOR);
38+
return new BlobFileFormat(blobAsDescriptor);
3739
}
3840
}

paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatReader.java

Lines changed: 70 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -44,58 +44,65 @@ public class BlobFormatReader implements FileRecordReader<InternalRow> {
4444
private final long[] blobLengths;
4545
private final long[] blobOffsets;
4646
private final int[] returnedPositions;
47+
private final SeekableInputStream in;
48+
private final boolean blobAsDescriptor;
4749

4850
private boolean returned;
4951

5052
public BlobFormatReader(
51-
FileIO fileIO, Path filePath, long fileSize, @Nullable RoaringBitmap32 selection)
53+
FileIO fileIO,
54+
Path filePath,
55+
long fileSize,
56+
@Nullable RoaringBitmap32 selection,
57+
boolean blobAsDescriptor)
5258
throws IOException {
5359
this.fileIO = fileIO;
5460
this.filePath = filePath;
5561
this.returned = false;
56-
try (SeekableInputStream in = fileIO.newInputStream(filePath)) {
57-
in.seek(fileSize - 5);
58-
byte[] header = new byte[5];
59-
IOUtils.readFully(in, header);
60-
byte version = header[4];
61-
if (version != 1) {
62-
throw new IOException("Unsupported version: " + version);
63-
}
64-
int indexLength = BytesUtils.getInt(header, 0);
65-
66-
in.seek(fileSize - 5 - indexLength);
67-
byte[] indexBytes = new byte[indexLength];
68-
IOUtils.readFully(in, indexBytes);
69-
70-
long[] blobLengths = DeltaVarintCompressor.decompress(indexBytes);
71-
long[] blobOffsets = new long[blobLengths.length];
72-
long offset = 0;
73-
for (int i = 0; i < blobLengths.length; i++) {
74-
blobOffsets[i] = offset;
75-
offset += blobLengths[i];
76-
}
62+
this.blobAsDescriptor = blobAsDescriptor;
63+
this.in = fileIO.newInputStream(filePath);
64+
65+
in.seek(fileSize - 5);
66+
byte[] header = new byte[5];
67+
IOUtils.readFully(in, header);
68+
byte version = header[4];
69+
if (version != 1) {
70+
throw new IOException("Unsupported version: " + version);
71+
}
72+
int indexLength = BytesUtils.getInt(header, 0);
73+
74+
in.seek(fileSize - 5 - indexLength);
75+
byte[] indexBytes = new byte[indexLength];
76+
IOUtils.readFully(in, indexBytes);
77+
78+
long[] blobLengths = DeltaVarintCompressor.decompress(indexBytes);
79+
long[] blobOffsets = new long[blobLengths.length];
80+
long offset = 0;
81+
for (int i = 0; i < blobLengths.length; i++) {
82+
blobOffsets[i] = offset;
83+
offset += blobLengths[i];
84+
}
7785

78-
int[] returnedPositions = null;
79-
if (selection != null) {
80-
int cardinality = (int) selection.getCardinality();
81-
returnedPositions = new int[cardinality];
82-
long[] newLengths = new long[cardinality];
83-
long[] newOffsets = new long[cardinality];
84-
Iterator<Integer> iterator = selection.iterator();
85-
for (int i = 0; i < cardinality; i++) {
86-
Integer next = iterator.next();
87-
newLengths[i] = blobLengths[next];
88-
newOffsets[i] = blobOffsets[next];
89-
returnedPositions[i] = next;
90-
}
91-
blobLengths = newLengths;
92-
blobOffsets = newOffsets;
86+
int[] returnedPositions = null;
87+
if (selection != null) {
88+
int cardinality = (int) selection.getCardinality();
89+
returnedPositions = new int[cardinality];
90+
long[] newLengths = new long[cardinality];
91+
long[] newOffsets = new long[cardinality];
92+
Iterator<Integer> iterator = selection.iterator();
93+
for (int i = 0; i < cardinality; i++) {
94+
Integer next = iterator.next();
95+
newLengths[i] = blobLengths[next];
96+
newOffsets[i] = blobOffsets[next];
97+
returnedPositions[i] = next;
9398
}
94-
95-
this.returnedPositions = returnedPositions;
96-
this.blobLengths = blobLengths;
97-
this.blobOffsets = blobOffsets;
99+
blobLengths = newLengths;
100+
blobOffsets = newOffsets;
98101
}
102+
103+
this.returnedPositions = returnedPositions;
104+
this.blobLengths = blobLengths;
105+
this.blobOffsets = blobOffsets;
99106
}
100107

101108
@Nullable
@@ -129,12 +136,14 @@ public InternalRow next() {
129136
return null;
130137
}
131138

132-
Blob blob =
133-
Blob.fromFile(
134-
fileIO,
135-
filePath.toString(),
136-
blobOffsets[currentPosition] + 4,
137-
blobLengths[currentPosition] - 16);
139+
Blob blob;
140+
long offset = blobOffsets[currentPosition] + 4;
141+
long length = blobLengths[currentPosition] - 16;
142+
if (!blobAsDescriptor) {
143+
blob = Blob.fromData(readInlineBlob(offset, length));
144+
} else {
145+
blob = Blob.fromFile(fileIO, filePath.toString(), offset, length);
146+
}
138147
currentPosition++;
139148
return GenericRow.of(blob);
140149
}
@@ -145,5 +154,18 @@ public void releaseBatch() {}
145154
}
146155

147156
@Override
148-
public void close() throws IOException {}
157+
public void close() throws IOException {
158+
in.close();
159+
}
160+
161+
private byte[] readInlineBlob(long position, long length) {
162+
byte[] blobData = new byte[(int) length];
163+
try {
164+
in.seek(position);
165+
IOUtils.readFully(in, blobData);
166+
} catch (IOException e) {
167+
throw new RuntimeException(e);
168+
}
169+
return blobData;
170+
}
149171
}

paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFileFormatTest.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
package org.apache.paimon.format.blob;
2020

21+
import org.apache.paimon.data.Blob;
2122
import org.apache.paimon.data.BlobData;
23+
import org.apache.paimon.data.BlobRef;
2224
import org.apache.paimon.data.GenericRow;
2325
import org.apache.paimon.format.FormatReaderContext;
2426
import org.apache.paimon.format.FormatReaderFactory;
@@ -61,8 +63,17 @@ public void beforeEach() {
6163
}
6264

6365
@Test
64-
public void test() throws IOException {
65-
BlobFileFormat format = new BlobFileFormat();
66+
public void testBlobAsDescriptor() throws IOException {
67+
innerTest(true);
68+
}
69+
70+
@Test
71+
public void testReadBlobInlineBytes() throws IOException {
72+
innerTest(false);
73+
}
74+
75+
private void innerTest(boolean blobAsDescriptor) throws IOException {
76+
BlobFileFormat format = new BlobFileFormat(blobAsDescriptor);
6677
RowType rowType = RowType.of(DataTypes.BLOB());
6778

6879
// write
@@ -83,7 +94,16 @@ public void test() throws IOException {
8394
List<byte[]> result = new ArrayList<>();
8495
readerFactory
8596
.createReader(context)
86-
.forEachRemaining(row -> result.add(row.getBlob(0).toData()));
97+
.forEachRemaining(
98+
row -> {
99+
Blob blob = row.getBlob(0);
100+
if (blobAsDescriptor) {
101+
assertThat(blob).isInstanceOf(BlobRef.class);
102+
} else {
103+
assertThat(blob).isInstanceOf(BlobData.class);
104+
}
105+
result.add(blob.toData());
106+
});
87107

88108
// assert
89109
assertThat(result).containsExactlyElementsOf(blobs);

0 commit comments

Comments
 (0)