Skip to content

Commit 53f4cf7

Browse files
authored
feat: Remove mutable buffers from scan partition/missing columns [iceberg] (apache#3514)
1 parent e7eae66 commit 53f4cf7

File tree

3 files changed

+371
-5
lines changed

3 files changed

+371
-5
lines changed
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet;
21+
22+
import java.math.BigDecimal;
23+
24+
import org.apache.arrow.memory.BufferAllocator;
25+
import org.apache.arrow.memory.RootAllocator;
26+
import org.apache.arrow.vector.*;
27+
import org.apache.spark.sql.catalyst.InternalRow;
28+
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
29+
import org.apache.spark.sql.types.*;
30+
import org.apache.spark.unsafe.types.UTF8String;
31+
32+
import org.apache.comet.vector.CometPlainVector;
33+
import org.apache.comet.vector.CometVector;
34+
35+
/**
36+
* A column reader that returns constant vectors using Arrow Java vectors directly (no native
37+
* mutable buffers). Used for partition columns and missing columns in the native_iceberg_compat
38+
* scan path.
39+
*
40+
* <p>The vector is filled with the constant value repeated for every row in the batch. This is
41+
* necessary because the underlying Arrow vector's buffers must be large enough to match the
42+
* reported value count — otherwise variable-width types (strings, binary) would have undersized
43+
* offset buffers, causing out-of-bounds reads on the native side.
44+
*/
45+
public class ArrowConstantColumnReader extends AbstractColumnReader {
46+
private final BufferAllocator allocator = new RootAllocator();
47+
48+
private boolean isNull;
49+
private Object value;
50+
private FieldVector fieldVector;
51+
private CometPlainVector vector;
52+
private int currentSize;
53+
54+
/** Constructor for missing columns (default values from schema). */
55+
ArrowConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) {
56+
super(field.dataType(), TypeUtil.convertToParquet(field), useDecimal128, false);
57+
this.batchSize = batchSize;
58+
this.value =
59+
ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new StructField[] {field}))[
60+
0];
61+
initVector(value, batchSize);
62+
}
63+
64+
/** Constructor for partition columns with values from a row. */
65+
ArrowConstantColumnReader(
66+
StructField field, int batchSize, InternalRow values, int index, boolean useDecimal128) {
67+
super(field.dataType(), TypeUtil.convertToParquet(field), useDecimal128, false);
68+
this.batchSize = batchSize;
69+
Object v = values.get(index, field.dataType());
70+
this.value = v;
71+
initVector(v, batchSize);
72+
}
73+
74+
@Override
75+
public void setBatchSize(int batchSize) {
76+
close();
77+
this.batchSize = batchSize;
78+
initVector(value, batchSize);
79+
}
80+
81+
@Override
82+
public void readBatch(int total) {
83+
if (total != currentSize) {
84+
close();
85+
initVector(value, total);
86+
}
87+
}
88+
89+
@Override
90+
public CometVector currentBatch() {
91+
return vector;
92+
}
93+
94+
@Override
95+
public void close() {
96+
if (vector != null) {
97+
vector.close();
98+
vector = null;
99+
}
100+
if (fieldVector != null) {
101+
fieldVector.close();
102+
fieldVector = null;
103+
}
104+
}
105+
106+
private void initVector(Object value, int count) {
107+
currentSize = count;
108+
if (value == null) {
109+
isNull = true;
110+
fieldVector = createNullVector(count);
111+
} else {
112+
isNull = false;
113+
fieldVector = createFilledVector(value, count);
114+
}
115+
vector = new CometPlainVector(fieldVector, useDecimal128, false, true);
116+
}
117+
118+
/** Creates a vector of the correct type with {@code count} null values. */
119+
private FieldVector createNullVector(int count) {
120+
String name = "constant";
121+
FieldVector v;
122+
if (type == DataTypes.BooleanType) {
123+
v = new BitVector(name, allocator);
124+
} else if (type == DataTypes.ByteType) {
125+
v = new TinyIntVector(name, allocator);
126+
} else if (type == DataTypes.ShortType) {
127+
v = new SmallIntVector(name, allocator);
128+
} else if (type == DataTypes.IntegerType || type == DataTypes.DateType) {
129+
v = new IntVector(name, allocator);
130+
} else if (type == DataTypes.LongType
131+
|| type == DataTypes.TimestampType
132+
|| type == TimestampNTZType$.MODULE$) {
133+
v = new BigIntVector(name, allocator);
134+
} else if (type == DataTypes.FloatType) {
135+
v = new Float4Vector(name, allocator);
136+
} else if (type == DataTypes.DoubleType) {
137+
v = new Float8Vector(name, allocator);
138+
} else if (type == DataTypes.BinaryType) {
139+
v = new VarBinaryVector(name, allocator);
140+
} else if (type == DataTypes.StringType) {
141+
v = new VarCharVector(name, allocator);
142+
} else if (type instanceof DecimalType) {
143+
DecimalType dt = (DecimalType) type;
144+
if (!useDecimal128 && dt.precision() <= Decimal.MAX_INT_DIGITS()) {
145+
v = new IntVector(name, allocator);
146+
} else if (!useDecimal128 && dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
147+
v = new BigIntVector(name, allocator);
148+
} else {
149+
v = new DecimalVector(name, allocator, dt.precision(), dt.scale());
150+
}
151+
} else {
152+
throw new UnsupportedOperationException("Unsupported Spark type: " + type);
153+
}
154+
v.setValueCount(count);
155+
return v;
156+
}
157+
158+
/** Creates a vector filled with {@code count} copies of the given value. */
159+
private FieldVector createFilledVector(Object value, int count) {
160+
String name = "constant";
161+
if (type == DataTypes.BooleanType) {
162+
BitVector v = new BitVector(name, allocator);
163+
v.allocateNew(count);
164+
int bit = (boolean) value ? 1 : 0;
165+
for (int i = 0; i < count; i++) v.setSafe(i, bit);
166+
v.setValueCount(count);
167+
return v;
168+
} else if (type == DataTypes.ByteType) {
169+
TinyIntVector v = new TinyIntVector(name, allocator);
170+
v.allocateNew(count);
171+
byte val = (byte) value;
172+
for (int i = 0; i < count; i++) v.setSafe(i, val);
173+
v.setValueCount(count);
174+
return v;
175+
} else if (type == DataTypes.ShortType) {
176+
SmallIntVector v = new SmallIntVector(name, allocator);
177+
v.allocateNew(count);
178+
short val = (short) value;
179+
for (int i = 0; i < count; i++) v.setSafe(i, val);
180+
v.setValueCount(count);
181+
return v;
182+
} else if (type == DataTypes.IntegerType || type == DataTypes.DateType) {
183+
IntVector v = new IntVector(name, allocator);
184+
v.allocateNew(count);
185+
int val = (int) value;
186+
for (int i = 0; i < count; i++) v.setSafe(i, val);
187+
v.setValueCount(count);
188+
return v;
189+
} else if (type == DataTypes.LongType
190+
|| type == DataTypes.TimestampType
191+
|| type == TimestampNTZType$.MODULE$) {
192+
BigIntVector v = new BigIntVector(name, allocator);
193+
v.allocateNew(count);
194+
long val = (long) value;
195+
for (int i = 0; i < count; i++) v.setSafe(i, val);
196+
v.setValueCount(count);
197+
return v;
198+
} else if (type == DataTypes.FloatType) {
199+
Float4Vector v = new Float4Vector(name, allocator);
200+
v.allocateNew(count);
201+
float val = (float) value;
202+
for (int i = 0; i < count; i++) v.setSafe(i, val);
203+
v.setValueCount(count);
204+
return v;
205+
} else if (type == DataTypes.DoubleType) {
206+
Float8Vector v = new Float8Vector(name, allocator);
207+
v.allocateNew(count);
208+
double val = (double) value;
209+
for (int i = 0; i < count; i++) v.setSafe(i, val);
210+
v.setValueCount(count);
211+
return v;
212+
} else if (type == DataTypes.BinaryType) {
213+
VarBinaryVector v = new VarBinaryVector(name, allocator);
214+
v.allocateNew(count);
215+
byte[] bytes = (byte[]) value;
216+
for (int i = 0; i < count; i++) v.setSafe(i, bytes, 0, bytes.length);
217+
v.setValueCount(count);
218+
return v;
219+
} else if (type == DataTypes.StringType) {
220+
VarCharVector v = new VarCharVector(name, allocator);
221+
v.allocateNew(count);
222+
byte[] bytes = ((UTF8String) value).getBytes();
223+
for (int i = 0; i < count; i++) v.setSafe(i, bytes, 0, bytes.length);
224+
v.setValueCount(count);
225+
return v;
226+
} else if (type instanceof DecimalType) {
227+
DecimalType dt = (DecimalType) type;
228+
Decimal d = (Decimal) value;
229+
if (!useDecimal128 && dt.precision() <= Decimal.MAX_INT_DIGITS()) {
230+
IntVector v = new IntVector(name, allocator);
231+
v.allocateNew(count);
232+
int val = (int) d.toUnscaledLong();
233+
for (int i = 0; i < count; i++) v.setSafe(i, val);
234+
v.setValueCount(count);
235+
return v;
236+
} else if (!useDecimal128 && dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
237+
BigIntVector v = new BigIntVector(name, allocator);
238+
v.allocateNew(count);
239+
long val = d.toUnscaledLong();
240+
for (int i = 0; i < count; i++) v.setSafe(i, val);
241+
v.setValueCount(count);
242+
return v;
243+
} else {
244+
DecimalVector v = new DecimalVector(name, allocator, dt.precision(), dt.scale());
245+
v.allocateNew(count);
246+
BigDecimal bd = d.toJavaBigDecimal();
247+
for (int i = 0; i < count; i++) v.setSafe(i, bd);
248+
v.setValueCount(count);
249+
return v;
250+
}
251+
} else {
252+
throw new UnsupportedOperationException("Unsupported Spark type: " + type);
253+
}
254+
}
255+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet;
21+
22+
import org.apache.arrow.memory.BufferAllocator;
23+
import org.apache.arrow.memory.RootAllocator;
24+
import org.apache.arrow.vector.BigIntVector;
25+
import org.apache.spark.sql.types.*;
26+
27+
import org.apache.comet.vector.CometPlainVector;
28+
import org.apache.comet.vector.CometVector;
29+
30+
/**
31+
* A column reader that computes row indices in Java and creates Arrow BigIntVectors directly (no
32+
* native mutable buffers). Used for the row index metadata column in the native_iceberg_compat scan
33+
* path.
34+
*
35+
* <p>The {@code indices} array contains alternating pairs of (start_index, count) representing
36+
* ranges of sequential row indices within each row group.
37+
*/
38+
public class ArrowRowIndexColumnReader extends AbstractColumnReader {
39+
private final BufferAllocator allocator = new RootAllocator();
40+
41+
/** Alternating (start_index, count) pairs from row groups. */
42+
private final long[] indices;
43+
44+
/** Number of row indices consumed so far across batches. */
45+
private long offset;
46+
47+
private BigIntVector fieldVector;
48+
private CometPlainVector vector;
49+
50+
public ArrowRowIndexColumnReader(StructField field, int batchSize, long[] indices) {
51+
super(field.dataType(), TypeUtil.convertToParquet(field), false, false);
52+
this.indices = indices;
53+
this.batchSize = batchSize;
54+
}
55+
56+
@Override
57+
public void setBatchSize(int batchSize) {
58+
close();
59+
this.batchSize = batchSize;
60+
}
61+
62+
@Override
63+
public void readBatch(int total) {
64+
close();
65+
66+
fieldVector = new BigIntVector("row_index", allocator);
67+
fieldVector.allocateNew(total);
68+
69+
// Port of Rust set_indices: iterate (start, count) pairs, skip offset rows, fill up to total.
70+
long skipped = 0;
71+
int filled = 0;
72+
for (int i = 0; i < indices.length && filled < total; i += 2) {
73+
long index = indices[i];
74+
long count = indices[i + 1];
75+
long skip = Math.min(count, offset - skipped);
76+
skipped += skip;
77+
if (count == skip) {
78+
continue;
79+
}
80+
long remaining = Math.min(count - skip, total - filled);
81+
for (long j = 0; j < remaining; j++) {
82+
fieldVector.setSafe(filled, index + skip + j);
83+
filled++;
84+
}
85+
}
86+
offset += filled;
87+
88+
fieldVector.setValueCount(filled);
89+
vector = new CometPlainVector(fieldVector, false, false, false);
90+
vector.setNumValues(filled);
91+
}
92+
93+
@Override
94+
public CometVector currentBatch() {
95+
return vector;
96+
}
97+
98+
@Override
99+
public void close() {
100+
if (vector != null) {
101+
vector.close();
102+
vector = null;
103+
}
104+
if (fieldVector != null) {
105+
fieldVector.close();
106+
fieldVector = null;
107+
}
108+
}
109+
}

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,8 @@ public void init() throws Throwable {
448448
// TODO(SPARK-40059): Allow users to include columns named
449449
// FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in their schemas.
450450
long[] rowIndices = FileReader.getRowIndices(blocks);
451-
columnReaders[i] = new RowIndexColumnReader(nonPartitionFields[i], capacity, rowIndices);
451+
columnReaders[i] =
452+
new ArrowRowIndexColumnReader(nonPartitionFields[i], capacity, rowIndices);
452453
hasRowIndexColumn = true;
453454
missingColumns[i] = true;
454455
} else if (optFileField.isPresent()) {
@@ -473,8 +474,8 @@ public void init() throws Throwable {
473474
+ filePath);
474475
}
475476
if (field.isPrimitive()) {
476-
ConstantColumnReader reader =
477-
new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128);
477+
ArrowConstantColumnReader reader =
478+
new ArrowConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128);
478479
columnReaders[i] = reader;
479480
missingColumns[i] = true;
480481
} else {
@@ -492,8 +493,9 @@ public void init() throws Throwable {
492493
for (int i = fields.size(); i < columnReaders.length; i++) {
493494
int fieldIndex = i - fields.size();
494495
StructField field = partitionFields[fieldIndex];
495-
ConstantColumnReader reader =
496-
new ConstantColumnReader(field, capacity, partitionValues, fieldIndex, useDecimal128);
496+
ArrowConstantColumnReader reader =
497+
new ArrowConstantColumnReader(
498+
field, capacity, partitionValues, fieldIndex, useDecimal128);
497499
columnReaders[i] = reader;
498500
}
499501
}

0 commit comments

Comments
 (0)