-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-26447][SQL]Allow OrcColumnarBatchReader to return less partition columns #23387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
799f429
49ae28b
a3a5741
1b09dae
b87ea1e
1b58df8
5ed34d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,9 +58,13 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> { | |
|
|
||
| /** | ||
| * The column IDs of the physical ORC file schema which are required by this reader. | ||
| * -1 means this required column doesn't exist in the ORC file. | ||
| * -1 means this required column is partition column, or it doesn't exist in the ORC file. | ||
| * Ideally partition column should never appear in the physical file, and should only appear | ||
| * in the directory name. However, Spark allows partition columns inside physical file, | ||
| * but Spark will discard the values from the file, and use the partition value got from | ||
| * directory name. The column order will be reserved though. | ||
| */ | ||
| private int[] requestedColIds; | ||
| private int[] requestedDataColIds; | ||
|
|
||
| // Record reader from ORC row batch. | ||
| private org.apache.orc.RecordReader recordReader; | ||
|
|
@@ -143,75 +147,75 @@ public void initialize( | |
| /** | ||
| * Initialize columnar batch by setting required schema and partition information. | ||
| * With this information, this creates ColumnarBatch with the full schema. | ||
| * | ||
| * @param orcSchema Schema from ORC file reader. | ||
| * @param requiredFields All the fields that are required to return, including partition fields. | ||
| * @param requestedDataColIds Requested column ids from orcSchema. -1 if not existed. | ||
| * @param requestedPartitionColIds Requested column ids from partition schema. -1 if not existed. | ||
| * @param partitionValues Values of partition columns. | ||
| */ | ||
| public void initBatch( | ||
| TypeDescription orcSchema, | ||
| int[] requestedColIds, | ||
| StructField[] requiredFields, | ||
| StructType partitionSchema, | ||
| int[] requestedDataColIds, | ||
| int[] requestedPartitionColIds, | ||
| InternalRow partitionValues) { | ||
| batch = orcSchema.createRowBatch(capacity); | ||
| assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`. | ||
|
|
||
| assert(requiredFields.length == requestedDataColIds.length); | ||
| assert(requiredFields.length == requestedPartitionColIds.length); | ||
| // If a required column is also partition column, use partition value and don't read from file. | ||
| for (int i = 0; i < requiredFields.length; i++) { | ||
| if (requestedPartitionColIds[i] != -1) { | ||
| requestedDataColIds[i] = -1; | ||
| } | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this loop work as expected? The intention seems to be clear, but here, we initialized like the following. val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1)
val requestedPartitionColIds = Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length)So, logically, in this
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. This is because in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L191 Now It can be easily fixed in ORC V2, but to fix
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The suggested test suite also covers this logic. |
||
| this.requiredFields = requiredFields; | ||
| this.requestedColIds = requestedColIds; | ||
| assert(requiredFields.length == requestedColIds.length); | ||
| this.requestedDataColIds = requestedDataColIds; | ||
|
|
||
| StructType resultSchema = new StructType(requiredFields); | ||
| for (StructField f : partitionSchema.fields()) { | ||
| resultSchema = resultSchema.add(f); | ||
| } | ||
|
|
||
| if (copyToSpark) { | ||
| if (MEMORY_MODE == MemoryMode.OFF_HEAP) { | ||
| columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema); | ||
| } else { | ||
| columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema); | ||
| } | ||
|
|
||
| // Initialize the missing columns once. | ||
| // Initialize the partition columns and missing columns once. | ||
| for (int i = 0; i < requiredFields.length; i++) { | ||
| if (requestedColIds[i] == -1) { | ||
| if (requestedPartitionColIds[i] != -1) { | ||
| ColumnVectorUtils.populate(columnVectors[i], | ||
| partitionValues, requestedPartitionColIds[i]); | ||
| columnVectors[i].setIsConstant(); | ||
| } else if (requestedDataColIds[i] == -1) { | ||
| columnVectors[i].putNulls(0, capacity); | ||
| columnVectors[i].setIsConstant(); | ||
| } | ||
| } | ||
|
|
||
| if (partitionValues.numFields() > 0) { | ||
| int partitionIdx = requiredFields.length; | ||
| for (int i = 0; i < partitionValues.numFields(); i++) { | ||
| ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i); | ||
| columnVectors[i + partitionIdx].setIsConstant(); | ||
| } | ||
| } | ||
|
|
||
| columnarBatch = new ColumnarBatch(columnVectors); | ||
| } else { | ||
| // Just wrap the ORC column vector instead of copying it to Spark column vector. | ||
| orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()]; | ||
|
|
||
| for (int i = 0; i < requiredFields.length; i++) { | ||
| DataType dt = requiredFields[i].dataType(); | ||
| int colId = requestedColIds[i]; | ||
| // Initialize the missing columns once. | ||
| if (colId == -1) { | ||
| OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); | ||
| missingCol.putNulls(0, capacity); | ||
| missingCol.setIsConstant(); | ||
| orcVectorWrappers[i] = missingCol; | ||
| } else { | ||
| orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]); | ||
| } | ||
| } | ||
|
|
||
| if (partitionValues.numFields() > 0) { | ||
| int partitionIdx = requiredFields.length; | ||
| for (int i = 0; i < partitionValues.numFields(); i++) { | ||
| DataType dt = partitionSchema.fields()[i].dataType(); | ||
| if (requestedPartitionColIds[i] != -1) { | ||
| OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt); | ||
| ColumnVectorUtils.populate(partitionCol, partitionValues, i); | ||
| ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]); | ||
| partitionCol.setIsConstant(); | ||
| orcVectorWrappers[partitionIdx + i] = partitionCol; | ||
| orcVectorWrappers[i] = partitionCol; | ||
| } else { | ||
| int colId = requestedDataColIds[i]; | ||
| // Initialize the missing columns once. | ||
| if (colId == -1) { | ||
| OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); | ||
| missingCol.putNulls(0, capacity); | ||
| missingCol.setIsConstant(); | ||
| orcVectorWrappers[i] = missingCol; | ||
| } else { | ||
| orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -233,7 +237,7 @@ private boolean nextBatch() throws IOException { | |
|
|
||
| if (!copyToSpark) { | ||
| for (int i = 0; i < requiredFields.length; i++) { | ||
| if (requestedColIds[i] != -1) { | ||
| if (requestedDataColIds[i] != -1) { | ||
| ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize); | ||
| } | ||
| } | ||
|
|
@@ -248,8 +252,8 @@ private boolean nextBatch() throws IOException { | |
| StructField field = requiredFields[i]; | ||
| WritableColumnVector toColumn = columnVectors[i]; | ||
|
|
||
| if (requestedColIds[i] >= 0) { | ||
| ColumnVector fromColumn = batch.cols[requestedColIds[i]]; | ||
| if (requestedDataColIds[i] >= 0) { | ||
| ColumnVector fromColumn = batch.cols[requestedDataColIds[i]]; | ||
|
|
||
| if (fromColumn.isRepeating) { | ||
| putRepeatingValues(batchSize, field, fromColumn, toColumn); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need more comments here.
Ideally partition column should never appear in the physical file, and should only appear in the directory name. However, Spark is OK with partition columns inside physical file, but Spark will discard the values from the file, and use the partition value got from directory name. The column order will be reserved though.