Skip to content

Commit c3eb777

Browse files
committed
fix parquet splitters
1 parent 3ab7229 commit c3eb777

File tree

3 files changed

+19
-35
lines changed

3 files changed

+19
-35
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ class ParquetFileFormat
313313
val splits = ParquetFileFormat.fileSplits.get(root,
314314
new Callable[ParquetFileSplitter] {
315315
override def call(): ParquetFileSplitter =
316-
createParquetFileSplits(root, hadoopConf, schema, sparkSession)
316+
createParquetFileSplits(root, hadoopConf, sparkSession)
317317
})
318318
root -> splits.buildSplitter(filters)
319319
}.toMap
@@ -331,11 +331,11 @@ class ParquetFileFormat
331331
private def createParquetFileSplits(
332332
root: Path,
333333
hadoopConf: Configuration,
334-
schema: StructType,
335334
sparkSession: SparkSession): ParquetFileSplitter = {
336335
getMetadataForPath(root, hadoopConf)
337336
.map { meta =>
338-
new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema, sparkSession)
337+
new ParquetMetadataFileSplitter(
338+
root, meta.getBlocks.asScala, meta.getFileMetaData.getSchema, sparkSession)
339339
}
340340
.getOrElse(ParquetDefaultFileSplitter)
341341
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ import scala.concurrent.{ExecutionContext, Future}
2525
import com.google.common.cache.{Cache, CacheBuilder}
2626
import org.apache.hadoop.fs.{FileStatus, Path}
2727
import org.apache.hadoop.mapreduce.lib.input.FileSplit
28-
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
28+
import org.apache.parquet.filter2.predicate.FilterApi
2929
import org.apache.parquet.filter2.statisticslevel.StatisticsFilter
3030
import org.apache.parquet.hadoop.metadata.BlockMetaData
31+
import org.apache.parquet.schema.MessageType
3132
import org.roaringbitmap.RoaringBitmap
3233

3334
import org.apache.spark.internal.Logging
3435
import org.apache.spark.sql.SparkSession
3536
import org.apache.spark.sql.sources.Filter
36-
import org.apache.spark.sql.types.StructType
3737
import org.apache.spark.util.ThreadUtils
3838

3939

@@ -54,14 +54,18 @@ object ParquetDefaultFileSplitter extends ParquetFileSplitter {
5454
class ParquetMetadataFileSplitter(
5555
val root: Path,
5656
val blocks: Seq[BlockMetaData],
57-
val schema: StructType,
57+
val parquetSchema: MessageType,
5858
val session: SparkSession)
5959
extends ParquetFileSplitter
6060
with Logging {
6161

62-
private val parquetFilters = new ParquetFilters(
63-
session.sessionState.conf.parquetFilterPushDownDate,
64-
session.sessionState.conf.isParquetINT96AsTimestamp)
62+
val sqlConf = session.sessionState.conf
63+
val parquetFilters = new ParquetFilters(
64+
sqlConf.parquetFilterPushDownDate,
65+
sqlConf.parquetFilterPushDownTimestamp,
66+
sqlConf.parquetFilterPushDownDecimal,
67+
sqlConf.parquetFilterPushDownStringStartWith,
68+
sqlConf.parquetFilterPushDownInFilterThreshold)
6569

6670
private val referencedFiles = blocks.map(bmd => new Path(root, bmd.getPath)).toSet
6771

@@ -106,7 +110,7 @@ class ParquetMetadataFileSplitter(
106110
private def applyParquetFilter(
107111
filters: Seq[Filter],
108112
blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = {
109-
val predicates = filters.flatMap(parquetFilters.createFilter(schema, _))
113+
val predicates = filters.flatMap(parquetFilters.createFilter(parquetSchema, _))
110114
if (predicates.nonEmpty) {
111115
// Asynchronously build bitmaps
112116
Future {
@@ -127,7 +131,7 @@ class ParquetMetadataFileSplitter(
127131
.filter(filterSets.getIfPresent(_) == null)
128132
.flatMap { filter =>
129133
val bitmap = new RoaringBitmap
130-
parquetFilters.createFilter(schema, filter).map((filter, _, bitmap))
134+
parquetFilters.createFilter(parquetSchema, filter).map((filter, _, bitmap))
131135
}
132136
var i = 0
133137
val blockLen = blocks.size

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
3333
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
3434

3535
import org.apache.spark.sql.catalyst.util.DateTimeUtils
36+
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
3637
import org.apache.spark.sql.sources
3738
import org.apache.spark.unsafe.types.UTF8String
3839

@@ -67,27 +68,8 @@ private[parquet] class ParquetFilters(
6768

6869
import ParquetColumns._
6970

70-
private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
71-
case IntegerType =>
72-
(n: String, v: Set[Any]) =>
73-
FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
74-
case LongType =>
75-
(n: String, v: Set[Any]) =>
76-
FilterApi.userDefined(longColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Long]]))
77-
case FloatType =>
78-
(n: String, v: Set[Any]) =>
79-
FilterApi.userDefined(floatColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Float]]))
80-
case DoubleType =>
81-
(n: String, v: Set[Any]) =>
82-
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
83-
case StringType =>
84-
(n: String, v: Set[Any]) =>
85-
FilterApi.userDefined(binaryColumn(n),
86-
SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
87-
case BinaryType =>
88-
(n: String, v: Set[Any]) =>
89-
FilterApi.userDefined(binaryColumn(n),
90-
SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
71+
private def dateToDays(date: Date): SQLDate = {
72+
DateTimeUtils.fromJavaDate(date)
9173
}
9274

9375
private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()
@@ -231,6 +213,7 @@ private[parquet] class ParquetFilters(
231213
(n: String, v: Any) =>
232214
FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
233215
case ParquetTimestampMicrosType if pushDownTimestamp =>
216+
(n: String, v: Any) => FilterApi.lt(
234217
longColumn(n),
235218
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
236219
case ParquetTimestampMillisType if pushDownTimestamp =>
@@ -487,9 +470,6 @@ private[parquet] class ParquetFilters(
487470
.map(FilterApi.not)
488471
.map(LogicalInverseRewriter.rewrite)
489472

490-
case sources.In(name, values) if canMakeFilterOn(name) =>
491-
makeInSet.lift(nameToType(name)).map(_(name, values.toSet))
492-
493473
case sources.In(name, values) if canMakeFilterOn(name, values.head)
494474
&& values.distinct.length <= pushDownInFilterThreshold =>
495475
values.distinct.flatMap { v =>

0 commit comments

Comments
 (0)