Skip to content

Commit 3d822d7

Browse files
gengliangwangdongjoon-hyun
authored andcommitted
[SPARK-26447][SQL] Allow OrcColumnarBatchReader to return less partition columns
## What changes were proposed in this pull request? Currently OrcColumnarBatchReader returns all the partition column values in the batch read. In data source V2, we can improve it by returning the required partition column values only. This PR is part of apache#23383 . As cloud-fan suggested, create a new PR to make review easier. Also, this PR doesn't improve `OrcFileFormat`, since in the method `buildReaderWithPartitionValues`, the `requiredSchema` filter out all the partition columns, so we can't know which partition column is required. ## How was this patch tested? Unit test Closes apache#23387 from gengliangwang/refactorOrcColumnarBatch. Lead-authored-by: Gengliang Wang <[email protected]> Co-authored-by: Gengliang Wang <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 151b94a commit 3d822d7

File tree

3 files changed

+136
-47
lines changed

3 files changed

+136
-47
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.util.stream.IntStream;
2222

23+
import com.google.common.annotations.VisibleForTesting;
2324
import org.apache.hadoop.conf.Configuration;
2425
import org.apache.hadoop.mapreduce.InputSplit;
2526
import org.apache.hadoop.mapreduce.RecordReader;
@@ -58,17 +59,23 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
5859

5960
/**
6061
* The column IDs of the physical ORC file schema which are required by this reader.
61-
* -1 means this required column doesn't exist in the ORC file.
62+
* -1 means this required column is partition column, or it doesn't exist in the ORC file.
63+
* Ideally partition column should never appear in the physical file, and should only appear
64+
* in the directory name. However, Spark allows partition columns inside physical file,
65+
* but Spark will discard the values from the file, and use the partition value got from
66+
* directory name. The column order will be reserved though.
6267
*/
63-
private int[] requestedColIds;
68+
@VisibleForTesting
69+
public int[] requestedDataColIds;
6470

6571
// Record reader from ORC row batch.
6672
private org.apache.orc.RecordReader recordReader;
6773

6874
private StructField[] requiredFields;
6975

7076
// The result columnar batch for vectorized execution by whole-stage codegen.
71-
private ColumnarBatch columnarBatch;
77+
@VisibleForTesting
78+
public ColumnarBatch columnarBatch;
7279

7380
// Writable column vectors of the result columnar batch.
7481
private WritableColumnVector[] columnVectors;
@@ -143,75 +150,75 @@ public void initialize(
143150
/**
144151
* Initialize columnar batch by setting required schema and partition information.
145152
* With this information, this creates ColumnarBatch with the full schema.
153+
*
154+
* @param orcSchema Schema from ORC file reader.
155+
* @param requiredFields All the fields that are required to return, including partition fields.
156+
* @param requestedDataColIds Requested column ids from orcSchema. -1 if not existed.
157+
* @param requestedPartitionColIds Requested column ids from partition schema. -1 if not existed.
158+
* @param partitionValues Values of partition columns.
146159
*/
147160
public void initBatch(
148161
TypeDescription orcSchema,
149-
int[] requestedColIds,
150162
StructField[] requiredFields,
151-
StructType partitionSchema,
163+
int[] requestedDataColIds,
164+
int[] requestedPartitionColIds,
152165
InternalRow partitionValues) {
153166
batch = orcSchema.createRowBatch(capacity);
154167
assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.
155-
168+
assert(requiredFields.length == requestedDataColIds.length);
169+
assert(requiredFields.length == requestedPartitionColIds.length);
170+
// If a required column is also partition column, use partition value and don't read from file.
171+
for (int i = 0; i < requiredFields.length; i++) {
172+
if (requestedPartitionColIds[i] != -1) {
173+
requestedDataColIds[i] = -1;
174+
}
175+
}
156176
this.requiredFields = requiredFields;
157-
this.requestedColIds = requestedColIds;
158-
assert(requiredFields.length == requestedColIds.length);
177+
this.requestedDataColIds = requestedDataColIds;
159178

160179
StructType resultSchema = new StructType(requiredFields);
161-
for (StructField f : partitionSchema.fields()) {
162-
resultSchema = resultSchema.add(f);
163-
}
164-
165180
if (copyToSpark) {
166181
if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
167182
columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
168183
} else {
169184
columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
170185
}
171186

172-
// Initialize the missing columns once.
187+
// Initialize the partition columns and missing columns once.
173188
for (int i = 0; i < requiredFields.length; i++) {
174-
if (requestedColIds[i] == -1) {
189+
if (requestedPartitionColIds[i] != -1) {
190+
ColumnVectorUtils.populate(columnVectors[i],
191+
partitionValues, requestedPartitionColIds[i]);
192+
columnVectors[i].setIsConstant();
193+
} else if (requestedDataColIds[i] == -1) {
175194
columnVectors[i].putNulls(0, capacity);
176195
columnVectors[i].setIsConstant();
177196
}
178197
}
179198

180-
if (partitionValues.numFields() > 0) {
181-
int partitionIdx = requiredFields.length;
182-
for (int i = 0; i < partitionValues.numFields(); i++) {
183-
ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
184-
columnVectors[i + partitionIdx].setIsConstant();
185-
}
186-
}
187-
188199
columnarBatch = new ColumnarBatch(columnVectors);
189200
} else {
190201
// Just wrap the ORC column vector instead of copying it to Spark column vector.
191202
orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
192203

193204
for (int i = 0; i < requiredFields.length; i++) {
194205
DataType dt = requiredFields[i].dataType();
195-
int colId = requestedColIds[i];
196-
// Initialize the missing columns once.
197-
if (colId == -1) {
198-
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
199-
missingCol.putNulls(0, capacity);
200-
missingCol.setIsConstant();
201-
orcVectorWrappers[i] = missingCol;
202-
} else {
203-
orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
204-
}
205-
}
206-
207-
if (partitionValues.numFields() > 0) {
208-
int partitionIdx = requiredFields.length;
209-
for (int i = 0; i < partitionValues.numFields(); i++) {
210-
DataType dt = partitionSchema.fields()[i].dataType();
206+
if (requestedPartitionColIds[i] != -1) {
211207
OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
212-
ColumnVectorUtils.populate(partitionCol, partitionValues, i);
208+
ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]);
213209
partitionCol.setIsConstant();
214-
orcVectorWrappers[partitionIdx + i] = partitionCol;
210+
orcVectorWrappers[i] = partitionCol;
211+
} else {
212+
int colId = requestedDataColIds[i];
213+
// Initialize the missing columns once.
214+
if (colId == -1) {
215+
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
216+
missingCol.putNulls(0, capacity);
217+
missingCol.setIsConstant();
218+
orcVectorWrappers[i] = missingCol;
219+
} else {
220+
orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
221+
}
215222
}
216223
}
217224

@@ -233,7 +240,7 @@ private boolean nextBatch() throws IOException {
233240

234241
if (!copyToSpark) {
235242
for (int i = 0; i < requiredFields.length; i++) {
236-
if (requestedColIds[i] != -1) {
243+
if (requestedDataColIds[i] != -1) {
237244
((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
238245
}
239246
}
@@ -248,8 +255,8 @@ private boolean nextBatch() throws IOException {
248255
StructField field = requiredFields[i];
249256
WritableColumnVector toColumn = columnVectors[i];
250257

251-
if (requestedColIds[i] >= 0) {
252-
ColumnVector fromColumn = batch.cols[requestedColIds[i]];
258+
if (requestedDataColIds[i] >= 0) {
259+
ColumnVector fromColumn = batch.cols[requestedDataColIds[i]];
253260

254261
if (fromColumn.isRepeating) {
255262
putRepeatingValues(batchSize, field, fromColumn, toColumn);

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,15 @@ class OrcFileFormat
206206
// after opening a file.
207207
val iter = new RecordReaderIterator(batchReader)
208208
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
209-
209+
val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1)
210+
val requestedPartitionColIds =
211+
Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length)
210212
batchReader.initialize(fileSplit, taskAttemptContext)
211213
batchReader.initBatch(
212214
reader.getSchema,
213-
requestedColIds,
214-
requiredSchema.fields,
215-
partitionSchema,
215+
resultSchema.fields,
216+
requestedDataColIds,
217+
requestedPartitionColIds,
216218
file.partitionValues)
217219

218220
iter.asInstanceOf[Iterator[InternalRow]]
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.orc
19+
20+
import org.apache.orc.TypeDescription
21+
22+
import org.apache.spark.sql.QueryTest
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
25+
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
26+
import org.apache.spark.sql.types.{StructField, StructType}
27+
import org.apache.spark.unsafe.types.UTF8String.fromString
28+
29+
class OrcColumnarBatchReaderSuite extends QueryTest with SQLTestUtils with SharedSQLContext {
30+
private val dataSchema = StructType.fromDDL("col1 int, col2 int")
31+
private val partitionSchema = StructType.fromDDL("p1 string, p2 string")
32+
private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2"))
33+
private val orcFileSchemaList = Seq(
34+
"struct<col1:int,col2:int>", "struct<col1:int,col2:int,p1:string,p2:string>",
35+
"struct<col1:int,col2:int,p1:string>", "struct<col1:int,col2:int,p2:string>")
36+
orcFileSchemaList.foreach { case schema =>
37+
val orcFileSchema = TypeDescription.fromString(schema)
38+
39+
val isConstant = classOf[WritableColumnVector].getDeclaredField("isConstant")
40+
isConstant.setAccessible(true)
41+
42+
def getReader(
43+
requestedDataColIds: Array[Int],
44+
requestedPartitionColIds: Array[Int],
45+
resultFields: Array[StructField]): OrcColumnarBatchReader = {
46+
val reader = new OrcColumnarBatchReader(false, false, 4096)
47+
reader.initBatch(
48+
orcFileSchema,
49+
resultFields,
50+
requestedDataColIds,
51+
requestedPartitionColIds,
52+
partitionValues)
53+
reader
54+
}
55+
56+
test(s"all partitions are requested: $schema") {
57+
val requestedDataColIds = Array(0, 1, 0, 0)
58+
val requestedPartitionColIds = Array(-1, -1, 0, 1)
59+
val reader = getReader(requestedDataColIds, requestedPartitionColIds,
60+
dataSchema.fields ++ partitionSchema.fields)
61+
assert(reader.requestedDataColIds === Array(0, 1, -1, -1))
62+
}
63+
64+
test(s"initBatch should initialize requested partition columns only: $schema") {
65+
val requestedDataColIds = Array(0, -1) // only `col1` is requested, `col2` doesn't exist
66+
val requestedPartitionColIds = Array(-1, 0) // only `p1` is requested
67+
val reader = getReader(requestedDataColIds, requestedPartitionColIds,
68+
Array(dataSchema.fields(0), partitionSchema.fields(0)))
69+
val batch = reader.columnarBatch
70+
assert(batch.numCols() === 2)
71+
72+
assert(batch.column(0).isInstanceOf[OrcColumnVector])
73+
assert(batch.column(1).isInstanceOf[OnHeapColumnVector])
74+
75+
val p1 = batch.column(1).asInstanceOf[OnHeapColumnVector]
76+
assert(isConstant.get(p1).asInstanceOf[Boolean]) // Partition column is constant.
77+
assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0))
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)