Skip to content

Commit e103adf

Browse files
committed
[SPARK-22703][SQL] make ColumnarRow an immutable view
## What changes were proposed in this pull request? Similar to #19842 , we should also make `ColumnarRow` an immutable view, and move forward to make `ColumnVector` public. ## How was this patch tested? Existing tests. The performance concern should be same as #19842 . Author: Wenchen Fan <[email protected]> Closes #19898 from cloud-fan/row-id.
1 parent c1e5688 commit e103adf

File tree

12 files changed

+89
-99
lines changed

12 files changed

+89
-99
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
public class AggregateHashMap {
4242

4343
private OnHeapColumnVector[] columnVectors;
44-
private ColumnarBatch batch;
44+
private MutableColumnarRow aggBufferRow;
4545
private int[] buckets;
4646
private int numBuckets;
4747
private int numRows = 0;
@@ -63,7 +63,7 @@ public AggregateHashMap(StructType schema, int capacity, double loadFactor, int
6363
this.maxSteps = maxSteps;
6464
numBuckets = (int) (capacity / loadFactor);
6565
columnVectors = OnHeapColumnVector.allocateColumns(capacity, schema);
66-
batch = new ColumnarBatch(schema, columnVectors, capacity);
66+
aggBufferRow = new MutableColumnarRow(columnVectors);
6767
buckets = new int[numBuckets];
6868
Arrays.fill(buckets, -1);
6969
}
@@ -72,14 +72,15 @@ public AggregateHashMap(StructType schema) {
7272
this(schema, DEFAULT_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_MAX_STEPS);
7373
}
7474

75-
public ColumnarRow findOrInsert(long key) {
75+
public MutableColumnarRow findOrInsert(long key) {
7676
int idx = find(key);
7777
if (idx != -1 && buckets[idx] == -1) {
7878
columnVectors[0].putLong(numRows, key);
7979
columnVectors[1].putLong(numRows, 0);
8080
buckets[idx] = numRows++;
8181
}
82-
return batch.getRow(buckets[idx]);
82+
aggBufferRow.rowId = buckets[idx];
83+
return aggBufferRow;
8384
}
8485

8586
@VisibleForTesting

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,6 @@ public ArrowColumnVector(ValueVector vector) {
323323
for (int i = 0; i < childColumns.length; ++i) {
324324
childColumns[i] = new ArrowColumnVector(mapVector.getVectorById(i));
325325
}
326-
resultStruct = new ColumnarRow(childColumns);
327326
} else {
328327
throw new UnsupportedOperationException();
329328
}

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -157,18 +157,16 @@ public abstract class ColumnVector implements AutoCloseable {
157157
/**
158158
* Returns a utility object to get structs.
159159
*/
160-
public ColumnarRow getStruct(int rowId) {
161-
resultStruct.rowId = rowId;
162-
return resultStruct;
160+
public final ColumnarRow getStruct(int rowId) {
161+
return new ColumnarRow(this, rowId);
163162
}
164163

165164
/**
166165
* Returns a utility object to get structs.
167166
* provided to keep API compatibility with InternalRow for code generation
168167
*/
169-
public ColumnarRow getStruct(int rowId, int size) {
170-
resultStruct.rowId = rowId;
171-
return resultStruct;
168+
public final ColumnarRow getStruct(int rowId, int size) {
169+
return getStruct(rowId);
172170
}
173171

174172
/**
@@ -216,11 +214,6 @@ public MapData getMap(int ordinal) {
216214
*/
217215
protected DataType type;
218216

219-
/**
220-
* Reusable Struct holder for getStruct().
221-
*/
222-
protected ColumnarRow resultStruct;
223-
224217
/**
225218
* The Dictionary for this column.
226219
*

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.*;
2020

21+
import org.apache.spark.sql.catalyst.InternalRow;
2122
import org.apache.spark.sql.types.StructType;
2223

2324
/**
@@ -40,10 +41,10 @@ public final class ColumnarBatch {
4041
private final StructType schema;
4142
private final int capacity;
4243
private int numRows;
43-
final ColumnVector[] columns;
44+
private final ColumnVector[] columns;
4445

45-
// Staging row returned from getRow.
46-
final ColumnarRow row;
46+
// Staging row returned from `getRow`.
47+
private final MutableColumnarRow row;
4748

4849
/**
4950
* Called to close all the columns in this batch. It is not valid to access the data after
@@ -58,10 +59,10 @@ public void close() {
5859
/**
5960
* Returns an iterator over the rows in this batch. This skips rows that are filtered out.
6061
*/
61-
public Iterator<ColumnarRow> rowIterator() {
62+
public Iterator<InternalRow> rowIterator() {
6263
final int maxRows = numRows;
63-
final ColumnarRow row = new ColumnarRow(columns);
64-
return new Iterator<ColumnarRow>() {
64+
final MutableColumnarRow row = new MutableColumnarRow(columns);
65+
return new Iterator<InternalRow>() {
6566
int rowId = 0;
6667

6768
@Override
@@ -70,7 +71,7 @@ public boolean hasNext() {
7071
}
7172

7273
@Override
73-
public ColumnarRow next() {
74+
public InternalRow next() {
7475
if (rowId >= maxRows) {
7576
throw new NoSuchElementException();
7677
}
@@ -133,9 +134,8 @@ public void setNumRows(int numRows) {
133134
/**
134135
* Returns the row in this batch at `rowId`. Returned row is reused across calls.
135136
*/
136-
public ColumnarRow getRow(int rowId) {
137-
assert(rowId >= 0);
138-
assert(rowId < numRows);
137+
public InternalRow getRow(int rowId) {
138+
assert(rowId >= 0 && rowId < numRows);
139139
row.rowId = rowId;
140140
return row;
141141
}
@@ -144,6 +144,6 @@ public ColumnarBatch(StructType schema, ColumnVector[] columns, int capacity) {
144144
this.schema = schema;
145145
this.columns = columns;
146146
this.capacity = capacity;
147-
this.row = new ColumnarRow(columns);
147+
this.row = new MutableColumnarRow(columns);
148148
}
149149
}

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarRow.java

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,30 +28,32 @@
2828
* to be reused, callers should copy the data out if it needs to be stored.
2929
*/
3030
public final class ColumnarRow extends InternalRow {
31-
protected int rowId;
32-
private final ColumnVector[] columns;
33-
34-
// Ctor used if this is a struct.
35-
ColumnarRow(ColumnVector[] columns) {
36-
this.columns = columns;
31+
// The data for this row. E.g. the value of 3rd int field is `data.getChildColumn(3).getInt(rowId)`.
32+
private final ColumnVector data;
33+
private final int rowId;
34+
private final int numFields;
35+
36+
ColumnarRow(ColumnVector data, int rowId) {
37+
assert (data.dataType() instanceof StructType);
38+
this.data = data;
39+
this.rowId = rowId;
40+
this.numFields = ((StructType) data.dataType()).size();
3741
}
3842

39-
public ColumnVector[] columns() { return columns; }
40-
4143
@Override
42-
public int numFields() { return columns.length; }
44+
public int numFields() { return numFields; }
4345

4446
/**
4547
* Revisit this. This is expensive. This is currently only used in test paths.
4648
*/
4749
@Override
4850
public InternalRow copy() {
49-
GenericInternalRow row = new GenericInternalRow(columns.length);
51+
GenericInternalRow row = new GenericInternalRow(numFields);
5052
for (int i = 0; i < numFields(); i++) {
5153
if (isNullAt(i)) {
5254
row.setNullAt(i);
5355
} else {
54-
DataType dt = columns[i].dataType();
56+
DataType dt = data.getChildColumn(i).dataType();
5557
if (dt instanceof BooleanType) {
5658
row.setBoolean(i, getBoolean(i));
5759
} else if (dt instanceof ByteType) {
@@ -91,65 +93,65 @@ public boolean anyNull() {
9193
}
9294

9395
@Override
94-
public boolean isNullAt(int ordinal) { return columns[ordinal].isNullAt(rowId); }
96+
public boolean isNullAt(int ordinal) { return data.getChildColumn(ordinal).isNullAt(rowId); }
9597

9698
@Override
97-
public boolean getBoolean(int ordinal) { return columns[ordinal].getBoolean(rowId); }
99+
public boolean getBoolean(int ordinal) { return data.getChildColumn(ordinal).getBoolean(rowId); }
98100

99101
@Override
100-
public byte getByte(int ordinal) { return columns[ordinal].getByte(rowId); }
102+
public byte getByte(int ordinal) { return data.getChildColumn(ordinal).getByte(rowId); }
101103

102104
@Override
103-
public short getShort(int ordinal) { return columns[ordinal].getShort(rowId); }
105+
public short getShort(int ordinal) { return data.getChildColumn(ordinal).getShort(rowId); }
104106

105107
@Override
106-
public int getInt(int ordinal) { return columns[ordinal].getInt(rowId); }
108+
public int getInt(int ordinal) { return data.getChildColumn(ordinal).getInt(rowId); }
107109

108110
@Override
109-
public long getLong(int ordinal) { return columns[ordinal].getLong(rowId); }
111+
public long getLong(int ordinal) { return data.getChildColumn(ordinal).getLong(rowId); }
110112

111113
@Override
112-
public float getFloat(int ordinal) { return columns[ordinal].getFloat(rowId); }
114+
public float getFloat(int ordinal) { return data.getChildColumn(ordinal).getFloat(rowId); }
113115

114116
@Override
115-
public double getDouble(int ordinal) { return columns[ordinal].getDouble(rowId); }
117+
public double getDouble(int ordinal) { return data.getChildColumn(ordinal).getDouble(rowId); }
116118

117119
@Override
118120
public Decimal getDecimal(int ordinal, int precision, int scale) {
119-
if (columns[ordinal].isNullAt(rowId)) return null;
120-
return columns[ordinal].getDecimal(rowId, precision, scale);
121+
if (data.getChildColumn(ordinal).isNullAt(rowId)) return null;
122+
return data.getChildColumn(ordinal).getDecimal(rowId, precision, scale);
121123
}
122124

123125
@Override
124126
public UTF8String getUTF8String(int ordinal) {
125-
if (columns[ordinal].isNullAt(rowId)) return null;
126-
return columns[ordinal].getUTF8String(rowId);
127+
if (data.getChildColumn(ordinal).isNullAt(rowId)) return null;
128+
return data.getChildColumn(ordinal).getUTF8String(rowId);
127129
}
128130

129131
@Override
130132
public byte[] getBinary(int ordinal) {
131-
if (columns[ordinal].isNullAt(rowId)) return null;
132-
return columns[ordinal].getBinary(rowId);
133+
if (data.getChildColumn(ordinal).isNullAt(rowId)) return null;
134+
return data.getChildColumn(ordinal).getBinary(rowId);
133135
}
134136

135137
@Override
136138
public CalendarInterval getInterval(int ordinal) {
137-
if (columns[ordinal].isNullAt(rowId)) return null;
138-
final int months = columns[ordinal].getChildColumn(0).getInt(rowId);
139-
final long microseconds = columns[ordinal].getChildColumn(1).getLong(rowId);
139+
if (data.getChildColumn(ordinal).isNullAt(rowId)) return null;
140+
final int months = data.getChildColumn(ordinal).getChildColumn(0).getInt(rowId);
141+
final long microseconds = data.getChildColumn(ordinal).getChildColumn(1).getLong(rowId);
140142
return new CalendarInterval(months, microseconds);
141143
}
142144

143145
@Override
144146
public ColumnarRow getStruct(int ordinal, int numFields) {
145-
if (columns[ordinal].isNullAt(rowId)) return null;
146-
return columns[ordinal].getStruct(rowId);
147+
if (data.getChildColumn(ordinal).isNullAt(rowId)) return null;
148+
return data.getChildColumn(ordinal).getStruct(rowId);
147149
}
148150

149151
@Override
150152
public ColumnarArray getArray(int ordinal) {
151-
if (columns[ordinal].isNullAt(rowId)) return null;
152-
return columns[ordinal].getArray(rowId);
153+
if (data.getChildColumn(ordinal).isNullAt(rowId)) return null;
154+
return data.getChildColumn(ordinal).getArray(rowId);
153155
}
154156

155157
@Override

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,24 @@
2828

2929
/**
3030
* A mutable version of {@link ColumnarRow}, which is used in the vectorized hash map for hash
31-
* aggregate.
31+
* aggregate, and {@link ColumnarBatch} to save object creation.
3232
*
3333
* Note that this class intentionally has a lot of duplicated code with {@link ColumnarRow}, to
3434
* avoid java polymorphism overhead by keeping {@link ColumnarRow} and this class final classes.
3535
*/
3636
public final class MutableColumnarRow extends InternalRow {
3737
public int rowId;
38-
private final WritableColumnVector[] columns;
38+
private final ColumnVector[] columns;
39+
private final WritableColumnVector[] writableColumns;
3940

40-
public MutableColumnarRow(WritableColumnVector[] columns) {
41+
public MutableColumnarRow(ColumnVector[] columns) {
4142
this.columns = columns;
43+
this.writableColumns = null;
44+
}
45+
46+
public MutableColumnarRow(WritableColumnVector[] writableColumns) {
47+
this.columns = writableColumns;
48+
this.writableColumns = writableColumns;
4249
}
4350

4451
@Override
@@ -225,54 +232,54 @@ public void update(int ordinal, Object value) {
225232

226233
@Override
227234
public void setNullAt(int ordinal) {
228-
columns[ordinal].putNull(rowId);
235+
writableColumns[ordinal].putNull(rowId);
229236
}
230237

231238
@Override
232239
public void setBoolean(int ordinal, boolean value) {
233-
columns[ordinal].putNotNull(rowId);
234-
columns[ordinal].putBoolean(rowId, value);
240+
writableColumns[ordinal].putNotNull(rowId);
241+
writableColumns[ordinal].putBoolean(rowId, value);
235242
}
236243

237244
@Override
238245
public void setByte(int ordinal, byte value) {
239-
columns[ordinal].putNotNull(rowId);
240-
columns[ordinal].putByte(rowId, value);
246+
writableColumns[ordinal].putNotNull(rowId);
247+
writableColumns[ordinal].putByte(rowId, value);
241248
}
242249

243250
@Override
244251
public void setShort(int ordinal, short value) {
245-
columns[ordinal].putNotNull(rowId);
246-
columns[ordinal].putShort(rowId, value);
252+
writableColumns[ordinal].putNotNull(rowId);
253+
writableColumns[ordinal].putShort(rowId, value);
247254
}
248255

249256
@Override
250257
public void setInt(int ordinal, int value) {
251-
columns[ordinal].putNotNull(rowId);
252-
columns[ordinal].putInt(rowId, value);
258+
writableColumns[ordinal].putNotNull(rowId);
259+
writableColumns[ordinal].putInt(rowId, value);
253260
}
254261

255262
@Override
256263
public void setLong(int ordinal, long value) {
257-
columns[ordinal].putNotNull(rowId);
258-
columns[ordinal].putLong(rowId, value);
264+
writableColumns[ordinal].putNotNull(rowId);
265+
writableColumns[ordinal].putLong(rowId, value);
259266
}
260267

261268
@Override
262269
public void setFloat(int ordinal, float value) {
263-
columns[ordinal].putNotNull(rowId);
264-
columns[ordinal].putFloat(rowId, value);
270+
writableColumns[ordinal].putNotNull(rowId);
271+
writableColumns[ordinal].putFloat(rowId, value);
265272
}
266273

267274
@Override
268275
public void setDouble(int ordinal, double value) {
269-
columns[ordinal].putNotNull(rowId);
270-
columns[ordinal].putDouble(rowId, value);
276+
writableColumns[ordinal].putNotNull(rowId);
277+
writableColumns[ordinal].putDouble(rowId, value);
271278
}
272279

273280
@Override
274281
public void setDecimal(int ordinal, Decimal value, int precision) {
275-
columns[ordinal].putNotNull(rowId);
276-
columns[ordinal].putDecimal(rowId, value, precision);
282+
writableColumns[ordinal].putNotNull(rowId);
283+
writableColumns[ordinal].putDecimal(rowId, value, precision);
277284
}
278285
}

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ protected void reserveInternal(int newCapacity) {
547547
} else if (type instanceof LongType || type instanceof DoubleType ||
548548
DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) {
549549
this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8);
550-
} else if (resultStruct != null) {
550+
} else if (childColumns != null) {
551551
// Nothing to store.
552552
} else {
553553
throw new RuntimeException("Unhandled " + type);

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ protected void reserveInternal(int newCapacity) {
558558
if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, capacity);
559559
doubleData = newData;
560560
}
561-
} else if (resultStruct != null) {
561+
} else if (childColumns != null) {
562562
// Nothing to store.
563563
} else {
564564
throw new RuntimeException("Unhandled " + type);

0 commit comments

Comments
 (0)