Skip to content

Commit d2b2932

Browse files
alahvanhovell
authored andcommitted
[SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data
## What changes were proposed in this pull request? `OffHeapColumnVector.reserveInternal()` will only copy already inserted values during reallocation if `data != null`. In vectors containing arrays or structs this is incorrect, since there field `data` is not used at all. We need to check `nulls` instead. ## How was this patch tested? Adds new tests to `ColumnVectorSuite` that reproduce the errors. Author: Ala Luszczak <[email protected]> Closes apache#19308 from ala/vector-realloc.
1 parent 10e37f6 commit d2b2932

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ public void loadBytes(ColumnVector.Array array) {
515515
// Split out the slow path.
516516
@Override
517517
protected void reserveInternal(int newCapacity) {
518-
int oldCapacity = (this.data == 0L) ? 0 : capacity;
518+
int oldCapacity = (nulls == 0L) ? 0 : capacity;
519519
if (this.resultArray != null) {
520520
this.lengthData =
521521
Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,30 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
198198
assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456)
199199
assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67)
200200
}
201+
202+
test("[SPARK-22092] off-heap column vector reallocation corrupts array data") {
203+
val arrayType = ArrayType(IntegerType, true)
204+
testVector = new OffHeapColumnVector(8, arrayType)
205+
206+
val data = testVector.arrayData()
207+
(0 until 8).foreach(i => data.putInt(i, i))
208+
(0 until 8).foreach(i => testVector.putArray(i, i, 1))
209+
210+
// Increase vector's capacity and reallocate the data to new bigger buffers.
211+
testVector.reserve(16)
212+
213+
// Check that none of the values got lost/overwritten.
214+
val array = new ColumnVector.Array(testVector)
215+
(0 until 8).foreach { i =>
216+
assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i))
217+
}
218+
}
219+
220+
test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") {
221+
val structType = new StructType().add("int", IntegerType).add("double", DoubleType)
222+
testVector = new OffHeapColumnVector(8, structType)
223+
(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i))
224+
testVector.reserve(16)
225+
(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
226+
}
201227
}

0 commit comments

Comments
 (0)