Skip to content

Commit e3fd93f

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-22604][SQL] remove the get address methods from ColumnVector
## What changes were proposed in this pull request? `nullsNativeAddress` and `valuesNativeAddress` are only used in tests and benchmark, no need to be top class API. ## How was this patch tested? existing tests Author: Wenchen Fan <[email protected]> Closes #19818 from cloud-fan/minor.
1 parent 7022190 commit e3fd93f

File tree

6 files changed

+47
-99
lines changed

6 files changed

+47
-99
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,6 @@ public boolean anyNullsSet() {
5959
return numNulls() > 0;
6060
}
6161

62-
@Override
63-
public long nullsNativeAddress() {
64-
throw new RuntimeException("Cannot get native address for arrow column");
65-
}
66-
67-
@Override
68-
public long valuesNativeAddress() {
69-
throw new RuntimeException("Cannot get native address for arrow column");
70-
}
71-
7262
@Override
7363
public void close() {
7464
if (childColumns != null) {

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,6 @@ public abstract class ColumnVector implements AutoCloseable {
6262
*/
6363
public abstract boolean anyNullsSet();
6464

65-
/**
66-
* Returns the off heap ptr for the arrays backing the NULLs and values buffer. Only valid
67-
* to call for off heap columns.
68-
*/
69-
public abstract long nullsNativeAddress();
70-
public abstract long valuesNativeAddress();
71-
7265
/**
7366
* Returns whether the value at rowId is NULL.
7467
*/

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.nio.ByteBuffer;
2020
import java.nio.ByteOrder;
2121

22+
import com.google.common.annotations.VisibleForTesting;
23+
2224
import org.apache.spark.sql.types.*;
2325
import org.apache.spark.unsafe.Platform;
2426

@@ -73,12 +75,12 @@ public OffHeapColumnVector(int capacity, DataType type) {
7375
reset();
7476
}
7577

76-
@Override
78+
@VisibleForTesting
7779
public long valuesNativeAddress() {
7880
return data;
7981
}
8082

81-
@Override
83+
@VisibleForTesting
8284
public long nullsNativeAddress() {
8385
return nulls;
8486
}

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,6 @@ public OnHeapColumnVector(int capacity, DataType type) {
7979
reset();
8080
}
8181

82-
@Override
83-
public long valuesNativeAddress() {
84-
throw new RuntimeException("Cannot get native address for on heap column");
85-
}
86-
@Override
87-
public long nullsNativeAddress() {
88-
throw new RuntimeException("Cannot get native address for on heap column");
89-
}
90-
9182
@Override
9283
public void close() {
9384
super.close();

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,6 @@ import org.apache.spark.util.collection.BitSet
3636
* Benchmark to low level memory access using different ways to manage buffers.
3737
*/
3838
object ColumnarBatchBenchmark {
39-
40-
def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): WritableColumnVector = {
41-
if (memMode == MemoryMode.OFF_HEAP) {
42-
new OffHeapColumnVector(capacity, dt)
43-
} else {
44-
new OnHeapColumnVector(capacity, dt)
45-
}
46-
}
47-
4839
// This benchmark reads and writes an array of ints.
4940
// TODO: there is a big (2x) penalty for a random access API for off heap.
5041
// Note: carefully if modifying this code. It's hard to reason about the JIT.
@@ -151,7 +142,7 @@ object ColumnarBatchBenchmark {
151142

152143
// Access through the column API with on heap memory
153144
val columnOnHeap = { i: Int =>
154-
val col = allocate(count, IntegerType, MemoryMode.ON_HEAP)
145+
val col = new OnHeapColumnVector(count, IntegerType)
155146
var sum = 0L
156147
for (n <- 0L until iters) {
157148
var i = 0
@@ -170,7 +161,7 @@ object ColumnarBatchBenchmark {
170161

171162
// Access through the column API with off heap memory
172163
def columnOffHeap = { i: Int => {
173-
val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP)
164+
val col = new OffHeapColumnVector(count, IntegerType)
174165
var sum = 0L
175166
for (n <- 0L until iters) {
176167
var i = 0
@@ -189,7 +180,7 @@ object ColumnarBatchBenchmark {
189180

190181
// Access by directly getting the buffer backing the column.
191182
val columnOffheapDirect = { i: Int =>
192-
val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP)
183+
val col = new OffHeapColumnVector(count, IntegerType)
193184
var sum = 0L
194185
for (n <- 0L until iters) {
195186
var addr = col.valuesNativeAddress()
@@ -255,7 +246,7 @@ object ColumnarBatchBenchmark {
255246

256247
// Adding values by appending, instead of putting.
257248
val onHeapAppend = { i: Int =>
258-
val col = allocate(count, IntegerType, MemoryMode.ON_HEAP)
249+
val col = new OnHeapColumnVector(count, IntegerType)
259250
var sum = 0L
260251
for (n <- 0L until iters) {
261252
var i = 0
@@ -330,7 +321,7 @@ object ColumnarBatchBenchmark {
330321
for (n <- 0L until iters) {
331322
var i = 0
332323
while (i < count) {
333-
if (i % 2 == 0) b(i) = 1;
324+
if (i % 2 == 0) b(i) = 1
334325
i += 1
335326
}
336327
i = 0
@@ -351,18 +342,18 @@ object ColumnarBatchBenchmark {
351342
}
352343

353344
def stringAccess(iters: Long): Unit = {
354-
val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
345+
val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
355346
val random = new Random(0)
356347

357348
def randomString(min: Int, max: Int): String = {
358349
val len = random.nextInt(max - min) + min
359350
val sb = new StringBuilder(len)
360351
var i = 0
361352
while (i < len) {
362-
sb.append(chars.charAt(random.nextInt(chars.length())));
353+
sb.append(chars.charAt(random.nextInt(chars.length())))
363354
i += 1
364355
}
365-
return sb.toString
356+
sb.toString
366357
}
367358

368359
val minString = 3
@@ -373,7 +364,12 @@ object ColumnarBatchBenchmark {
373364
.map(_.getBytes(StandardCharsets.UTF_8)).toArray
374365

375366
def column(memoryMode: MemoryMode) = { i: Int =>
376-
val column = allocate(count, BinaryType, memoryMode)
367+
val column = if (memoryMode == MemoryMode.OFF_HEAP) {
368+
new OffHeapColumnVector(count, BinaryType)
369+
} else {
370+
new OnHeapColumnVector(count, BinaryType)
371+
}
372+
377373
var sum = 0L
378374
for (n <- 0L until iters) {
379375
var i = 0

sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala

Lines changed: 29 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,19 @@ class ColumnarBatchSuite extends SparkFunSuite {
5050
name: String,
5151
size: Int,
5252
dt: DataType)(
53-
block: (WritableColumnVector, MemoryMode) => Unit): Unit = {
53+
block: WritableColumnVector => Unit): Unit = {
5454
test(name) {
5555
Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { mode =>
5656
val vector = allocate(size, dt, mode)
57-
try block(vector, mode) finally {
57+
try block(vector) finally {
5858
vector.close()
5959
}
6060
}
6161
}
6262
}
6363

6464
testVector("Null APIs", 1024, IntegerType) {
65-
(column, memMode) =>
65+
column =>
6666
val reference = mutable.ArrayBuffer.empty[Boolean]
6767
var idx = 0
6868
assert(!column.anyNullsSet())
@@ -121,15 +121,11 @@ class ColumnarBatchSuite extends SparkFunSuite {
121121

122122
reference.zipWithIndex.foreach { v =>
123123
assert(v._1 == column.isNullAt(v._2))
124-
if (memMode == MemoryMode.OFF_HEAP) {
125-
val addr = column.nullsNativeAddress()
126-
assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
127-
}
128124
}
129125
}
130126

131127
testVector("Byte APIs", 1024, ByteType) {
132-
(column, memMode) =>
128+
column =>
133129
val reference = mutable.ArrayBuffer.empty[Byte]
134130

135131
var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray
@@ -173,16 +169,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
173169
idx += 3
174170

175171
reference.zipWithIndex.foreach { v =>
176-
assert(v._1 == column.getByte(v._2), "MemoryMode" + memMode)
177-
if (memMode == MemoryMode.OFF_HEAP) {
178-
val addr = column.valuesNativeAddress()
179-
assert(v._1 == Platform.getByte(null, addr + v._2))
180-
}
172+
assert(v._1 == column.getByte(v._2), "VectorType=" + column.getClass.getSimpleName)
181173
}
182174
}
183175

184176
testVector("Short APIs", 1024, ShortType) {
185-
(column, memMode) =>
177+
column =>
186178
val seed = System.currentTimeMillis()
187179
val random = new Random(seed)
188180
val reference = mutable.ArrayBuffer.empty[Short]
@@ -248,16 +240,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
248240
}
249241

250242
reference.zipWithIndex.foreach { v =>
251-
assert(v._1 == column.getShort(v._2), "Seed = " + seed + " Mem Mode=" + memMode)
252-
if (memMode == MemoryMode.OFF_HEAP) {
253-
val addr = column.valuesNativeAddress()
254-
assert(v._1 == Platform.getShort(null, addr + 2 * v._2))
255-
}
243+
assert(v._1 == column.getShort(v._2),
244+
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
256245
}
257246
}
258247

259248
testVector("Int APIs", 1024, IntegerType) {
260-
(column, memMode) =>
249+
column =>
261250
val seed = System.currentTimeMillis()
262251
val random = new Random(seed)
263252
val reference = mutable.ArrayBuffer.empty[Int]
@@ -329,16 +318,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
329318
}
330319

331320
reference.zipWithIndex.foreach { v =>
332-
assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Mem Mode=" + memMode)
333-
if (memMode == MemoryMode.OFF_HEAP) {
334-
val addr = column.valuesNativeAddress()
335-
assert(v._1 == Platform.getInt(null, addr + 4 * v._2))
336-
}
321+
assert(v._1 == column.getInt(v._2),
322+
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
337323
}
338324
}
339325

340326
testVector("Long APIs", 1024, LongType) {
341-
(column, memMode) =>
327+
column =>
342328
val seed = System.currentTimeMillis()
343329
val random = new Random(seed)
344330
val reference = mutable.ArrayBuffer.empty[Long]
@@ -413,16 +399,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
413399

414400
reference.zipWithIndex.foreach { v =>
415401
assert(v._1 == column.getLong(v._2), "idx=" + v._2 +
416-
" Seed = " + seed + " MemMode=" + memMode)
417-
if (memMode == MemoryMode.OFF_HEAP) {
418-
val addr = column.valuesNativeAddress()
419-
assert(v._1 == Platform.getLong(null, addr + 8 * v._2))
420-
}
402+
" Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
421403
}
422404
}
423405

424406
testVector("Float APIs", 1024, FloatType) {
425-
(column, memMode) =>
407+
column =>
426408
val seed = System.currentTimeMillis()
427409
val random = new Random(seed)
428410
val reference = mutable.ArrayBuffer.empty[Float]
@@ -500,16 +482,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
500482
}
501483

502484
reference.zipWithIndex.foreach { v =>
503-
assert(v._1 == column.getFloat(v._2), "Seed = " + seed + " MemMode=" + memMode)
504-
if (memMode == MemoryMode.OFF_HEAP) {
505-
val addr = column.valuesNativeAddress()
506-
assert(v._1 == Platform.getFloat(null, addr + 4 * v._2))
507-
}
485+
assert(v._1 == column.getFloat(v._2),
486+
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
508487
}
509488
}
510489

511490
testVector("Double APIs", 1024, DoubleType) {
512-
(column, memMode) =>
491+
column =>
513492
val seed = System.currentTimeMillis()
514493
val random = new Random(seed)
515494
val reference = mutable.ArrayBuffer.empty[Double]
@@ -587,16 +566,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
587566
}
588567

589568
reference.zipWithIndex.foreach { v =>
590-
assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " MemMode=" + memMode)
591-
if (memMode == MemoryMode.OFF_HEAP) {
592-
val addr = column.valuesNativeAddress()
593-
assert(v._1 == Platform.getDouble(null, addr + 8 * v._2))
594-
}
569+
assert(v._1 == column.getDouble(v._2),
570+
"Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
595571
}
596572
}
597573

598574
testVector("String APIs", 6, StringType) {
599-
(column, memMode) =>
575+
column =>
600576
val reference = mutable.ArrayBuffer.empty[String]
601577

602578
assert(column.arrayData().elementsAppended == 0)
@@ -643,17 +619,17 @@ class ColumnarBatchSuite extends SparkFunSuite {
643619
assert(column.arrayData().elementsAppended == 17 + (s + s).length)
644620

645621
reference.zipWithIndex.foreach { v =>
646-
assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode)
647-
assert(v._1 == column.getUTF8String(v._2).toString,
648-
"MemoryMode" + memMode)
622+
val errMsg = "VectorType=" + column.getClass.getSimpleName
623+
assert(v._1.length == column.getArrayLength(v._2), errMsg)
624+
assert(v._1 == column.getUTF8String(v._2).toString, errMsg)
649625
}
650626

651627
column.reset()
652628
assert(column.arrayData().elementsAppended == 0)
653629
}
654630

655631
testVector("Int Array", 10, new ArrayType(IntegerType, true)) {
656-
(column, _) =>
632+
column =>
657633

658634
// Fill the underlying data with all the arrays back to back.
659635
val data = column.arrayData()
@@ -763,7 +739,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
763739
testVector(
764740
"Struct Column",
765741
10,
766-
new StructType().add("int", IntegerType).add("double", DoubleType)) { (column, _) =>
742+
new StructType().add("int", IntegerType).add("double", DoubleType)) { column =>
767743
val c1 = column.getChildColumn(0)
768744
val c2 = column.getChildColumn(1)
769745
assert(c1.dataType() == IntegerType)
@@ -789,7 +765,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
789765
}
790766

791767
testVector("Nest Array in Array", 10, new ArrayType(new ArrayType(IntegerType, true), true)) {
792-
(column, _) =>
768+
column =>
793769
val childColumn = column.arrayData()
794770
val data = column.arrayData().arrayData()
795771
(0 until 6).foreach {
@@ -822,7 +798,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
822798
testVector(
823799
"Nest Struct in Array",
824800
10,
825-
new ArrayType(structType, true)) { (column, _) =>
801+
new ArrayType(structType, true)) { column =>
826802
val data = column.arrayData()
827803
val c0 = data.getChildColumn(0)
828804
val c1 = data.getChildColumn(1)
@@ -851,7 +827,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
851827
10,
852828
new StructType()
853829
.add("int", IntegerType)
854-
.add("array", new ArrayType(IntegerType, true))) { (column, _) =>
830+
.add("array", new ArrayType(IntegerType, true))) { column =>
855831
val c0 = column.getChildColumn(0)
856832
val c1 = column.getChildColumn(1)
857833
c0.putInt(0, 0)
@@ -880,7 +856,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
880856
testVector(
881857
"Nest Struct in Struct",
882858
10,
883-
new StructType().add("int", IntegerType).add("struct", subSchema)) { (column, _) =>
859+
new StructType().add("int", IntegerType).add("struct", subSchema)) { column =>
884860
val c0 = column.getChildColumn(0)
885861
val c1 = column.getChildColumn(1)
886862
c0.putInt(0, 0)

0 commit comments

Comments
 (0)