Skip to content

Commit 5a02e3a

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-22602][SQL] remove ColumnVector#loadBytes
## What changes were proposed in this pull request? `ColumnVector#loadBytes` is only used as an optimization for reading UTF8String in `WritableColumnVector`, this PR moves this optimization to `WritableColumnVector` and simplified it. ## How was this patch tested? existing test Author: Wenchen Fan <[email protected]> Closes #19815 from cloud-fan/load-bytes.
1 parent d49d9e4 commit 5a02e3a

File tree

6 files changed

+24
-61
lines changed

6 files changed

+24
-61
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,6 @@ public int getArrayOffset(int rowId) {
240240
return accessor.getArrayOffset(rowId);
241241
}
242242

243-
@Override
244-
public void loadBytes(ColumnarArray array) {
245-
throw new UnsupportedOperationException();
246-
}
247-
248243
//
249244
// APIs dealing with Decimals
250245
//

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,6 @@ public final ColumnarArray getArray(int rowId) {
180180
return resultArray;
181181
}
182182

183-
/**
184-
* Loads the data into array.byteArray.
185-
*/
186-
public abstract void loadBytes(ColumnarArray array);
187-
188183
/**
189184
* Returns the value for rowId.
190185
*/
@@ -198,7 +193,8 @@ public MapData getMap(int ordinal) {
198193
public abstract Decimal getDecimal(int rowId, int precision, int scale);
199194

200195
/**
201-
* Returns the UTF8String for rowId.
196+
* Returns the UTF8String for rowId. Note that the returned UTF8String may point to the data of
197+
* this column vector, please copy it if you want to keep it after this column vector is freed.
202198
*/
203199
public abstract UTF8String getUTF8String(int rowId);
204200

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,6 @@ public final class ColumnarArray extends ArrayData {
3333
public int length;
3434
public int offset;
3535

36-
// Populate if binary data is required for the Array. This is stored here as an optimization
37-
// for string data.
38-
public byte[] byteArray;
39-
public int byteArrayOffset;
40-
41-
// Reused staging buffer, used for loading from offheap.
42-
protected byte[] tmpByteArray = new byte[1];
43-
4436
ColumnarArray(ColumnVector data) {
4537
this.data = data;
4638
}

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.spark.sql.types.*;
2525
import org.apache.spark.unsafe.Platform;
26+
import org.apache.spark.unsafe.types.UTF8String;
2627

2728
/**
2829
* Column data backed using offheap memory.
@@ -75,16 +76,14 @@ public OffHeapColumnVector(int capacity, DataType type) {
7576
reset();
7677
}
7778

79+
/**
80+
* Returns the off heap pointer for the values buffer.
81+
*/
7882
@VisibleForTesting
7983
public long valuesNativeAddress() {
8084
return data;
8185
}
8286

83-
@VisibleForTesting
84-
public long nullsNativeAddress() {
85-
return nulls;
86-
}
87-
8887
@Override
8988
public void close() {
9089
super.close();
@@ -207,6 +206,11 @@ public byte[] getBytes(int rowId, int count) {
207206
return array;
208207
}
209208

209+
@Override
210+
protected UTF8String getBytesAsUTF8String(int rowId, int count) {
211+
return UTF8String.fromAddress(null, data + rowId, count);
212+
}
213+
210214
//
211215
// APIs dealing with shorts
212216
//
@@ -524,15 +528,6 @@ public int putByteArray(int rowId, byte[] value, int offset, int length) {
524528
return result;
525529
}
526530

527-
@Override
528-
public void loadBytes(ColumnarArray array) {
529-
if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length];
530-
Platform.copyMemory(
531-
null, data + array.offset, array.tmpByteArray, Platform.BYTE_ARRAY_OFFSET, array.length);
532-
array.byteArray = array.tmpByteArray;
533-
array.byteArrayOffset = 0;
534-
}
535-
536531
// Split out the slow path.
537532
@Override
538533
protected void reserveInternal(int newCapacity) {

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.spark.sql.types.*;
2424
import org.apache.spark.unsafe.Platform;
25+
import org.apache.spark.unsafe.types.UTF8String;
2526

2627
/**
2728
* A column backed by an in memory JVM array. This stores the NULLs as a byte per value
@@ -203,6 +204,11 @@ public byte[] getBytes(int rowId, int count) {
203204
return array;
204205
}
205206

207+
@Override
208+
protected UTF8String getBytesAsUTF8String(int rowId, int count) {
209+
return UTF8String.fromBytes(byteData, rowId, count);
210+
}
211+
206212
//
207213
// APIs dealing with Shorts
208214
//
@@ -484,12 +490,6 @@ public void putArray(int rowId, int offset, int length) {
484490
arrayLengths[rowId] = length;
485491
}
486492

487-
@Override
488-
public void loadBytes(ColumnarArray array) {
489-
array.byteArray = byteData;
490-
array.byteArrayOffset = array.offset;
491-
}
492-
493493
//
494494
// APIs dealing with Byte Arrays
495495
//

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -280,18 +280,6 @@ public final int putByteArray(int rowId, byte[] value) {
280280
return putByteArray(rowId, value, 0, value.length);
281281
}
282282

283-
/**
284-
* Returns the value for rowId.
285-
*/
286-
private ColumnarArray getByteArray(int rowId) {
287-
ColumnarArray array = getArray(rowId);
288-
array.data.loadBytes(array);
289-
return array;
290-
}
291-
292-
/**
293-
* Returns the decimal for rowId.
294-
*/
295283
@Override
296284
public Decimal getDecimal(int rowId, int precision, int scale) {
297285
if (precision <= Decimal.MAX_INT_DIGITS()) {
@@ -318,30 +306,27 @@ public void putDecimal(int rowId, Decimal value, int precision) {
318306
}
319307
}
320308

321-
/**
322-
* Returns the UTF8String for rowId.
323-
*/
324309
@Override
325310
public UTF8String getUTF8String(int rowId) {
326311
if (dictionary == null) {
327-
ColumnarArray a = getByteArray(rowId);
328-
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
312+
return arrayData().getBytesAsUTF8String(getArrayOffset(rowId), getArrayLength(rowId));
329313
} else {
330314
byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
331315
return UTF8String.fromBytes(bytes);
332316
}
333317
}
334318

335319
/**
336-
* Returns the byte array for rowId.
320+
* Gets the values of bytes from [rowId, rowId + count), as a UTF8String.
321+
* This method is similar to {@link ColumnVector#getBytes(int, int)}, but can save data copy as
322+
* UTF8String is used as a pointer.
337323
*/
324+
protected abstract UTF8String getBytesAsUTF8String(int rowId, int count);
325+
338326
@Override
339327
public byte[] getBinary(int rowId) {
340328
if (dictionary == null) {
341-
ColumnarArray array = getByteArray(rowId);
342-
byte[] bytes = new byte[array.length];
343-
System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
344-
return bytes;
329+
return arrayData().getBytes(getArrayOffset(rowId), getArrayLength(rowId));
345330
} else {
346331
return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
347332
}

0 commit comments

Comments
 (0)