Skip to content

Commit 4dfd746

Browse files
gengliangwangcloud-fan
authored andcommitted
[SPARK-23896][SQL] Improve PartitioningAwareFileIndex
## What changes were proposed in this pull request? Currently `PartitioningAwareFileIndex` accepts an optional parameter `userPartitionSchema`. If provided, it will combine the inferred partition schema with the parameter. However, 1. to get `userPartitionSchema`, we need to combine inferred partition schema with `userSpecifiedSchema` 2. to get the inferred partition schema, we have to create a temporary file index. Only after that, a final version of `PartitioningAwareFileIndex` can be created. This can be improved by passing `userSpecifiedSchema` to `PartitioningAwareFileIndex`. With the improvement, we can reduce redundant code and avoid parsing the file partition twice. ## How was this patch tested? Unit test Author: Gengliang Wang <[email protected]> Closes apache#21004 from gengliangwang/PartitioningAwareFileIndex.
1 parent a83ae0d commit 4dfd746

File tree

7 files changed

+103
-108
lines changed

7 files changed

+103
-108
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class CatalogFileIndex(
8585
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
8686
} else {
8787
new InMemoryFileIndex(
88-
sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
88+
sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None)
8989
}
9090
}
9191

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 56 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
2323
import scala.language.{existentials, implicitConversions}
2424
import scala.util.{Failure, Success, Try}
2525

26-
import org.apache.hadoop.conf.Configuration
2726
import org.apache.hadoop.fs.Path
2827

2928
import org.apache.spark.deploy.SparkHadoopUtil
@@ -103,24 +102,6 @@ case class DataSource(
103102
bucket.sortColumnNames, "in the sort definition", equality)
104103
}
105104

106-
/**
107-
* In the read path, only managed tables by Hive provide the partition columns properly when
108-
* initializing this class. All other file based data sources will try to infer the partitioning,
109-
* and then cast the inferred types to user specified dataTypes if the partition columns exist
110-
* inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or
111-
* inconsistent data types as reported in SPARK-21463.
112-
* @param fileIndex A FileIndex that will perform partition inference
113-
* @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema`
114-
*/
115-
private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: FileIndex): StructType = {
116-
val resolved = fileIndex.partitionSchema.map { partitionField =>
117-
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
118-
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
119-
partitionField)
120-
}
121-
StructType(resolved)
122-
}
123-
124105
/**
125106
* Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer
126107
* it. In the read path, only managed tables by Hive provide the partition columns properly when
@@ -140,31 +121,26 @@ case class DataSource(
140121
* be any further inference in any triggers.
141122
*
142123
* @param format the file format object for this DataSource
143-
* @param fileStatusCache the shared cache for file statuses to speed up listing
124+
* @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
144125
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
145126
* columns.
146127
*/
147128
private def getOrInferFileFormatSchema(
148129
format: FileFormat,
149-
fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {
150-
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
130+
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
131+
// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
151132
// in streaming mode, we have already inferred and registered partition columns, we will
152133
// never have to materialize the lazy val below
153-
lazy val tempFileIndex = {
154-
val allPaths = caseInsensitiveOptions.get("path") ++ paths
155-
val hadoopConf = sparkSession.sessionState.newHadoopConf()
156-
val globbedPaths = allPaths.toSeq.flatMap { path =>
157-
val hdfsPath = new Path(path)
158-
val fs = hdfsPath.getFileSystem(hadoopConf)
159-
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
160-
SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
161-
}.toArray
162-
new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
134+
lazy val tempFileIndex = fileIndex.getOrElse {
135+
val globbedPaths =
136+
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
137+
createInMemoryFileIndex(globbedPaths)
163138
}
139+
164140
val partitionSchema = if (partitionColumns.isEmpty) {
165141
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
166142
// columns properly unless it is a Hive DataSource
167-
combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)
143+
tempFileIndex.partitionSchema
168144
} else {
169145
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
170146
// partitioning
@@ -356,13 +332,7 @@ case class DataSource(
356332
caseInsensitiveOptions.get("path").toSeq ++ paths,
357333
sparkSession.sessionState.newHadoopConf()) =>
358334
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
359-
val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, None)
360-
val fileCatalog = if (userSpecifiedSchema.nonEmpty) {
361-
val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog)
362-
new MetadataLogFileIndex(sparkSession, basePath, Option(partitionSchema))
363-
} else {
364-
tempFileCatalog
365-
}
335+
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
366336
val dataSchema = userSpecifiedSchema.orElse {
367337
format.inferSchema(
368338
sparkSession,
@@ -384,24 +354,23 @@ case class DataSource(
384354

385355
// This is a non-streaming file based datasource.
386356
case (format: FileFormat, _) =>
387-
val allPaths = caseInsensitiveOptions.get("path") ++ paths
388-
val hadoopConf = sparkSession.sessionState.newHadoopConf()
389-
val globbedPaths = allPaths.flatMap(
390-
DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray
391-
392-
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
393-
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)
394-
395-
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
396-
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
357+
val globbedPaths =
358+
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
359+
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
360+
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
361+
catalogTable.get.partitionColumnNames.nonEmpty
362+
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
397363
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
398-
new CatalogFileIndex(
364+
val index = new CatalogFileIndex(
399365
sparkSession,
400366
catalogTable.get,
401367
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
368+
(index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema)
402369
} else {
403-
new InMemoryFileIndex(
404-
sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)
370+
val index = createInMemoryFileIndex(globbedPaths)
371+
val (resultDataSchema, resultPartitionSchema) =
372+
getOrInferFileFormatSchema(format, Some(index))
373+
(index, resultDataSchema, resultPartitionSchema)
405374
}
406375

407376
HadoopFsRelation(
@@ -552,6 +521,40 @@ case class DataSource(
552521
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
553522
}
554523
}
524+
525+
/** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
526+
private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
527+
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
528+
new InMemoryFileIndex(
529+
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
530+
}
531+
532+
/**
533+
* Checks and returns files in all the paths.
534+
*/
535+
private def checkAndGlobPathIfNecessary(
536+
checkEmptyGlobPath: Boolean,
537+
checkFilesExist: Boolean): Seq[Path] = {
538+
val allPaths = caseInsensitiveOptions.get("path") ++ paths
539+
val hadoopConf = sparkSession.sessionState.newHadoopConf()
540+
allPaths.flatMap { path =>
541+
val hdfsPath = new Path(path)
542+
val fs = hdfsPath.getFileSystem(hadoopConf)
543+
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
544+
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
545+
546+
if (checkEmptyGlobPath && globPath.isEmpty) {
547+
throw new AnalysisException(s"Path does not exist: $qualified")
548+
}
549+
550+
// Sufficient to check head of the globPath seq for non-glob scenario
551+
// Don't need to check once again if files exist in streaming mode
552+
if (checkFilesExist && !fs.exists(globPath.head)) {
553+
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
554+
}
555+
globPath
556+
}.toSeq
557+
}
555558
}
556559

557560
object DataSource extends Logging {
@@ -699,30 +702,6 @@ object DataSource extends Logging {
699702
locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath)
700703
}
701704

702-
/**
703-
* If `path` is a file pattern, return all the files that match it. Otherwise, return itself.
704-
* If `checkFilesExist` is `true`, also check the file existence.
705-
*/
706-
private def checkAndGlobPathIfNecessary(
707-
hadoopConf: Configuration,
708-
path: String,
709-
checkFilesExist: Boolean): Seq[Path] = {
710-
val hdfsPath = new Path(path)
711-
val fs = hdfsPath.getFileSystem(hadoopConf)
712-
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
713-
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
714-
715-
if (globPath.isEmpty) {
716-
throw new AnalysisException(s"Path does not exist: $qualified")
717-
}
718-
// Sufficient to check head of the globPath seq for non-glob scenario
719-
// Don't need to check once again if files exist in streaming mode
720-
if (checkFilesExist && !fs.exists(globPath.head)) {
721-
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
722-
}
723-
globPath
724-
}
725-
726705
/**
727706
* Called before writing into a FileFormat based data source to make sure the
728707
* supplied schema is not empty.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,17 @@ import org.apache.spark.util.SerializableConfiguration
4141
* @param rootPathsSpecified the list of root table paths to scan (some of which might be
4242
* filtered out later)
4343
* @param parameters as set of options to control discovery
44-
* @param partitionSchema an optional partition schema that will be use to provide types for the
45-
* discovered partitions
44+
* @param userSpecifiedSchema an optional user specified schema that will be use to provide
45+
* types for the discovered partitions
4646
*/
4747
class InMemoryFileIndex(
4848
sparkSession: SparkSession,
4949
rootPathsSpecified: Seq[Path],
5050
parameters: Map[String, String],
51-
partitionSchema: Option[StructType],
51+
userSpecifiedSchema: Option[StructType],
5252
fileStatusCache: FileStatusCache = NoopCache)
5353
extends PartitioningAwareFileIndex(
54-
sparkSession, parameters, partitionSchema, fileStatusCache) {
54+
sparkSession, parameters, userSpecifiedSchema, fileStatusCache) {
5555

5656
// Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir)
5757
// or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ import org.apache.spark.sql.types.{StringType, StructType}
3434
* It provides the necessary methods to parse partition data based on a set of files.
3535
*
3636
* @param parameters as set of options to control partition discovery
37-
* @param userPartitionSchema an optional partition schema that will be use to provide types for
38-
* the discovered partitions
37+
* @param userSpecifiedSchema an optional user specified schema that will be use to provide
38+
* types for the discovered partitions
3939
*/
4040
abstract class PartitioningAwareFileIndex(
4141
sparkSession: SparkSession,
4242
parameters: Map[String, String],
43-
userPartitionSchema: Option[StructType],
43+
userSpecifiedSchema: Option[StructType],
4444
fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging {
4545
import PartitioningAwareFileIndex.BASE_PATH_PARAM
4646

@@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex(
126126
val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
127127
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
128128
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
129-
130-
userPartitionSchema match {
129+
val inferredPartitionSpec = PartitioningUtils.parsePartitions(
130+
leafDirs,
131+
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
132+
basePaths = basePaths,
133+
timeZoneId = timeZoneId)
134+
userSpecifiedSchema match {
131135
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
132-
val spec = PartitioningUtils.parsePartitions(
133-
leafDirs,
134-
typeInference = false,
135-
basePaths = basePaths,
136-
timeZoneId = timeZoneId)
136+
val userPartitionSchema =
137+
combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec)
137138

138-
// Without auto inference, all of value in the `row` should be null or in StringType,
139139
// we need to cast into the data type that user specified.
140140
def castPartitionValuesToUserSchema(row: InternalRow) = {
141141
InternalRow((0 until row.numFields).map { i =>
142+
val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType
142143
Cast(
143-
Literal.create(row.getUTF8String(i), StringType),
144-
userProvidedSchema.fields(i).dataType,
144+
Literal.create(row.get(i, dt), dt),
145+
userPartitionSchema.fields(i).dataType,
145146
Option(timeZoneId)).eval()
146147
}: _*)
147148
}
148149

149-
PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
150+
PartitionSpec(userPartitionSchema, inferredPartitionSpec.partitions.map { part =>
150151
part.copy(values = castPartitionValuesToUserSchema(part.values))
151152
})
152153
case _ =>
153-
PartitioningUtils.parsePartitions(
154-
leafDirs,
155-
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
156-
basePaths = basePaths,
157-
timeZoneId = timeZoneId)
154+
inferredPartitionSpec
158155
}
159156
}
160157

@@ -236,6 +233,25 @@ abstract class PartitioningAwareFileIndex(
236233
val name = path.getName
237234
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
238235
}
236+
237+
/**
238+
* In the read path, only managed tables by Hive provide the partition columns properly when
239+
* initializing this class. All other file based data sources will try to infer the partitioning,
240+
* and then cast the inferred types to user specified dataTypes if the partition columns exist
241+
* inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or
242+
* inconsistent data types as reported in SPARK-21463.
243+
* @param spec A partition inference result
244+
* @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema`
245+
*/
246+
private def combineInferredAndUserSpecifiedPartitionSchema(spec: PartitionSpec): StructType = {
247+
val equality = sparkSession.sessionState.conf.resolver
248+
val resolved = spec.partitionColumns.map { partitionField =>
249+
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
250+
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
251+
partitionField)
252+
}
253+
StructType(resolved)
254+
}
239255
}
240256

241257
object PartitioningAwareFileIndex {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ import org.apache.spark.sql.types.StructType
3030
* A [[FileIndex]] that generates the list of files to processing by reading them from the
3131
* metadata log files generated by the [[FileStreamSink]].
3232
*
33-
* @param userPartitionSchema an optional partition schema that will be use to provide types for
34-
* the discovered partitions
33+
* @param userSpecifiedSchema an optional user specified schema that will be use to provide
34+
* types for the discovered partitions
3535
*/
3636
class MetadataLogFileIndex(
3737
sparkSession: SparkSession,
3838
path: Path,
39-
userPartitionSchema: Option[StructType])
40-
extends PartitioningAwareFileIndex(sparkSession, Map.empty, userPartitionSchema) {
39+
userSpecifiedSchema: Option[StructType])
40+
extends PartitioningAwareFileIndex(sparkSession, Map.empty, userSpecifiedSchema) {
4141

4242
private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
4343
logInfo(s"Reading streaming file log from $metadataDirectory")
@@ -51,7 +51,7 @@ class MetadataLogFileIndex(
5151
}
5252

5353
override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
54-
allFilesFromLog.toArray.groupBy(_.getPath.getParent)
54+
allFilesFromLog.groupBy(_.getPath.getParent)
5555
}
5656

5757
override def rootPaths: Seq[Path] = path :: Nil

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
401401
sparkSession = spark,
402402
rootPathsSpecified = Seq(new Path(tempDir)),
403403
parameters = Map.empty[String, String],
404-
partitionSchema = None)
404+
userSpecifiedSchema = None)
405405
// This should not fail.
406406
fileCatalog.listLeafFiles(Seq(new Path(tempDir)))
407407

sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ class PartitionedTablePerfStatsSuite
419419
HiveCatalogMetrics.reset()
420420
spark.read.load(dir.getAbsolutePath)
421421
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
422-
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
422+
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
423423
}
424424
}
425425
}

0 commit comments

Comments
 (0)