Skip to content

Commit bb318a5

Browse files
authored
feat: [iceberg] delete rows support using selection vectors (#2346)
1 parent bd6b9fa commit bb318a5

File tree

7 files changed

+625
-119
lines changed

7 files changed

+625
-119
lines changed
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.vector;
21+
22+
import org.apache.arrow.memory.BufferAllocator;
23+
import org.apache.arrow.vector.IntVector;
24+
import org.apache.arrow.vector.dictionary.DictionaryProvider;
25+
import org.apache.spark.sql.vectorized.ColumnVector;
26+
import org.apache.spark.sql.vectorized.ColumnarArray;
27+
import org.apache.spark.sql.vectorized.ColumnarMap;
28+
import org.apache.spark.unsafe.types.UTF8String;
29+
30+
/**
31+
* A zero-copy selection vector that extends CometVector. This implementation stores the original
32+
* data vector and selection indices as separate CometVectors, providing zero copy access to the the
33+
* underlying data.
34+
*
35+
* <p>If the original vector has values [v0, v1, v2, v3, v4, v5, v6, v7] and the selection indices
36+
* are [0, 1, 3, 4, 5, 7], then this selection vector will logically represent [v0, v1, v3, v4, v5,
37+
* v7] without actually copying the data.
38+
*
39+
* <p>Most of the implementations of CometVector methods are implemented for completeness. We don't
40+
* use this class except to transfer the original data and the selection indices to the native code.
41+
*/
42+
public class CometSelectionVector extends CometVector {
43+
/** The original vector containing all values */
44+
private final CometVector values;
45+
46+
/**
47+
* The valid indices in the values vector. This array is converted into an Arrow vector so we can
48+
* transfer the data to native in one JNI call. This is used to represent the rowid mapping used
49+
* by Iceberg
50+
*/
51+
private final int[] selectionIndices;
52+
53+
/**
54+
* The indices vector containing selection indices. This is currently allocated by the JVM side
55+
* unlike the values vector which is allocated on the native side
56+
*/
57+
private final CometVector indices;
58+
59+
/**
60+
* Number of selected elements. The indices array may have a length greater than this but only
61+
* numValues elements in the array are valid
62+
*/
63+
private final int numValues;
64+
65+
/**
66+
* Creates a new selection vector from the given vector and indices.
67+
*
68+
* @param values The original vector to select from
69+
* @param indices The indices to select from the original vector
70+
* @param numValues The number of valid values in the indices array
71+
* @throws IllegalArgumentException if any index is out of bounds
72+
*/
73+
public CometSelectionVector(CometVector values, int[] indices, int numValues) {
74+
// Use the values vector's datatype, useDecimal128, and dictionary provider
75+
super(values.dataType(), values.useDecimal128);
76+
77+
this.values = values;
78+
this.selectionIndices = indices;
79+
this.numValues = numValues;
80+
81+
// Validate indices
82+
int originalLength = values.numValues();
83+
for (int idx : indices) {
84+
if (idx < 0 || idx >= originalLength) {
85+
throw new IllegalArgumentException(
86+
String.format(
87+
"Index %d is out of bounds for vector of length %d", idx, originalLength));
88+
}
89+
}
90+
91+
// Create indices vector
92+
BufferAllocator allocator = values.getValueVector().getAllocator();
93+
IntVector indicesVector = new IntVector("selection_indices", allocator);
94+
indicesVector.allocateNew(numValues);
95+
for (int i = 0; i < numValues; i++) {
96+
indicesVector.set(i, indices[i]);
97+
}
98+
indicesVector.setValueCount(numValues);
99+
100+
this.indices =
101+
CometVector.getVector(indicesVector, values.useDecimal128, values.getDictionaryProvider());
102+
}
103+
104+
/**
105+
* Returns the index in the values vector for the given selection vector index.
106+
*
107+
* @param selectionIndex Index in the selection vector
108+
* @return The corresponding index in the original vector
109+
* @throws IndexOutOfBoundsException if selectionIndex is out of bounds
110+
*/
111+
private int getValuesIndex(int selectionIndex) {
112+
if (selectionIndex < 0 || selectionIndex >= numValues) {
113+
throw new IndexOutOfBoundsException(
114+
String.format(
115+
"Selection index %d is out of bounds for selection vector of length %d",
116+
selectionIndex, numValues));
117+
}
118+
return indices.getInt(selectionIndex);
119+
}
120+
121+
/**
122+
* Returns a reference to the values vector.
123+
*
124+
* @return The CometVector containing the values
125+
*/
126+
public CometVector getValues() {
127+
return values;
128+
}
129+
130+
/**
131+
* Returns the indices vector.
132+
*
133+
* @return The CometVector containing the indices
134+
*/
135+
public CometVector getIndices() {
136+
return indices;
137+
}
138+
139+
/**
140+
* Returns the selected indices.
141+
*
142+
* @return Array of selected indices
143+
*/
144+
private int[] getSelectedIndices() {
145+
return selectionIndices;
146+
}
147+
148+
@Override
149+
public int numValues() {
150+
return numValues;
151+
}
152+
153+
@Override
154+
public void setNumValues(int numValues) {
155+
// For selection vectors, we don't allow changing the number of values
156+
// as it would break the mapping between selection indices and values
157+
throw new UnsupportedOperationException("CometSelectionVector doesn't support setNumValues");
158+
}
159+
160+
@Override
161+
public void setNumNulls(int numNulls) {
162+
// For selection vectors, null count should be delegated to the underlying values vector
163+
// The selection doesn't change the null semantics
164+
values.setNumNulls(numNulls);
165+
}
166+
167+
@Override
168+
public boolean hasNull() {
169+
return values.hasNull();
170+
}
171+
172+
@Override
173+
public int numNulls() {
174+
return values.numNulls();
175+
}
176+
177+
// ColumnVector method implementations - delegate to original vector with index mapping
178+
@Override
179+
public boolean isNullAt(int rowId) {
180+
return values.isNullAt(getValuesIndex(rowId));
181+
}
182+
183+
@Override
184+
public boolean getBoolean(int rowId) {
185+
return values.getBoolean(getValuesIndex(rowId));
186+
}
187+
188+
@Override
189+
public byte getByte(int rowId) {
190+
return values.getByte(getValuesIndex(rowId));
191+
}
192+
193+
@Override
194+
public short getShort(int rowId) {
195+
return values.getShort(getValuesIndex(rowId));
196+
}
197+
198+
@Override
199+
public int getInt(int rowId) {
200+
return values.getInt(getValuesIndex(rowId));
201+
}
202+
203+
@Override
204+
public long getLong(int rowId) {
205+
return values.getLong(getValuesIndex(rowId));
206+
}
207+
208+
@Override
209+
public long getLongDecimal(int rowId) {
210+
return values.getLongDecimal(getValuesIndex(rowId));
211+
}
212+
213+
@Override
214+
public float getFloat(int rowId) {
215+
return values.getFloat(getValuesIndex(rowId));
216+
}
217+
218+
@Override
219+
public double getDouble(int rowId) {
220+
return values.getDouble(getValuesIndex(rowId));
221+
}
222+
223+
@Override
224+
public UTF8String getUTF8String(int rowId) {
225+
return values.getUTF8String(getValuesIndex(rowId));
226+
}
227+
228+
@Override
229+
public byte[] getBinary(int rowId) {
230+
return values.getBinary(getValuesIndex(rowId));
231+
}
232+
233+
@Override
234+
public ColumnarArray getArray(int rowId) {
235+
return values.getArray(getValuesIndex(rowId));
236+
}
237+
238+
@Override
239+
public ColumnarMap getMap(int rowId) {
240+
return values.getMap(getValuesIndex(rowId));
241+
}
242+
243+
@Override
244+
public ColumnVector getChild(int ordinal) {
245+
// Return the child from the original vector
246+
return values.getChild(ordinal);
247+
}
248+
249+
@Override
250+
public DictionaryProvider getDictionaryProvider() {
251+
return values.getDictionaryProvider();
252+
}
253+
254+
@Override
255+
public CometVector slice(int offset, int length) {
256+
if (offset < 0 || length < 0 || offset + length > numValues) {
257+
throw new IllegalArgumentException("Invalid slice parameters");
258+
}
259+
// Get the current indices and slice them
260+
int[] currentIndices = getSelectedIndices();
261+
int[] slicedIndices = new int[length];
262+
// This is not a very efficient version of slicing, but that is
263+
// not important because we are not likely to use it.
264+
System.arraycopy(currentIndices, offset, slicedIndices, 0, length);
265+
return new CometSelectionVector(values, slicedIndices, length);
266+
}
267+
268+
@Override
269+
public org.apache.arrow.vector.ValueVector getValueVector() {
270+
return values.getValueVector();
271+
}
272+
273+
@Override
274+
public void close() {
275+
// Close both the values and indices vectors
276+
values.close();
277+
indices.close();
278+
}
279+
}

common/src/main/java/org/apache/comet/vector/HasRowIdMapping.java

Lines changed: 0 additions & 39 deletions
This file was deleted.

common/src/main/scala/org/apache/comet/vector/NativeUtil.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,30 @@ class NativeUtil {
9696

9797
(0 until batch.numCols()).foreach { index =>
9898
batch.column(index) match {
99+
case selectionVector: CometSelectionVector =>
100+
// For CometSelectionVector, export only the values vector
101+
val valuesVector = selectionVector.getValues
102+
val valueVector = valuesVector.getValueVector
103+
104+
// Use the selection vector's logical row count
105+
numRows += selectionVector.numValues()
106+
107+
val provider = if (valueVector.getField.getDictionary != null) {
108+
valuesVector.getDictionaryProvider
109+
} else {
110+
null
111+
}
112+
113+
// The array and schema structures are allocated by native side.
114+
// Don't need to deallocate them here.
115+
val arrowSchema = ArrowSchema.wrap(schemaAddrs(index))
116+
val arrowArray = ArrowArray.wrap(arrayAddrs(index))
117+
Data.exportVector(
118+
allocator,
119+
getFieldVector(valueVector, "export"),
120+
provider,
121+
arrowArray,
122+
arrowSchema)
99123
case a: CometVector =>
100124
val valueVector = a.getValueVector
101125

@@ -133,9 +157,40 @@ class NativeUtil {
133157
// the Arrow arrays. For example, Iceberg column reader will skip deleted rows internally in
134158
// its `CometVector` implementation. The `ColumnarBatch` returned by the reader will report
135159
// logical number of rows which is less than actual number of rows due to row deletion.
160+
// Similarly, CometSelectionVector represents a different number of logical rows than the
161+
// underlying vector.
136162
numRows.headOption.getOrElse(batch.numRows())
137163
}
138164

165+
/**
166+
* Exports a single CometVector to native side.
167+
*
168+
* @param vector
169+
* The CometVector to export
170+
* @param arrayAddr
171+
* The address of the ArrowArray structure
172+
* @param schemaAddr
173+
* The address of the ArrowSchema structure
174+
*/
175+
def exportSingleVector(vector: CometVector, arrayAddr: Long, schemaAddr: Long): Unit = {
176+
val valueVector = vector.getValueVector
177+
178+
val provider = if (valueVector.getField.getDictionary != null) {
179+
vector.getDictionaryProvider
180+
} else {
181+
null
182+
}
183+
184+
val arrowSchema = ArrowSchema.wrap(schemaAddr)
185+
val arrowArray = ArrowArray.wrap(arrayAddr)
186+
Data.exportVector(
187+
allocator,
188+
getFieldVector(valueVector, "export"),
189+
provider,
190+
arrowArray,
191+
arrowSchema)
192+
}
193+
139194
/**
140195
* Gets the next batch from native execution.
141196
*

0 commit comments

Comments
 (0)