@@ -40,14 +40,14 @@ import org.apache.spark.util.ThreadUtils
40
40
abstract class ParquetFileSplitter {
41
41
def buildSplitter (filters : Seq [Filter ]): (FileStatus => Seq [FileSplit ])
42
42
43
- def singleFileSplit (stat : FileStatus ): Seq [FileSplit ] = {
44
- Seq (new FileSplit (stat.getPath , 0 , stat.getLen , Array .empty))
43
+ def singleFileSplit (path : Path , length : Long ): Seq [FileSplit ] = {
44
+ Seq (new FileSplit (path , 0 , length , Array .empty))
45
45
}
46
46
}
47
47
48
48
object ParquetDefaultFileSplitter extends ParquetFileSplitter {
49
49
override def buildSplitter (filters : Seq [Filter ]): (FileStatus => Seq [FileSplit ]) = {
50
- stat => singleFileSplit(stat)
50
+ stat => singleFileSplit(stat.getPath, stat.getLen )
51
51
}
52
52
}
53
53
@@ -82,18 +82,20 @@ class ParquetMetadataFileSplitter(
82
82
(applied, unapplied, filteredBlocks)
83
83
}
84
84
85
+ // Group eligible splits by file Path.
85
86
val eligible = applyParquetFilter(unapplied, filteredBlocks).map { bmd =>
86
87
val blockPath = new Path (root, bmd.getPath)
87
88
new FileSplit (blockPath, bmd.getStartingPos, bmd.getCompressedSize, Array .empty)
88
- }
89
+ }.groupBy(_.getPath)
89
90
90
91
val statFilter : (FileStatus => Seq [FileSplit ]) = { stat =>
91
- if (referencedFiles.contains(stat.getPath)) {
92
- eligible.filter(_.getPath == stat.getPath)
92
+ val filePath = stat.getPath
93
+ if (referencedFiles.contains(filePath)) {
94
+ eligible.getOrElse(filePath, Nil )
93
95
} else {
94
96
log.warn(s " Found _metadata file for $root, " +
95
- s " but no entries for blocks in ${stat.getPath }. Retaining whole file. " )
96
- singleFileSplit(stat)
97
+ s " but no entries for blocks in ${filePath }. Retaining whole file. " )
98
+ singleFileSplit(filePath, stat.getLen )
97
99
}
98
100
}
99
101
statFilter
0 commit comments