Skip to content

Commit b4400c7

Browse files
authored
[SPARK-28098][SQL]Support read partitioned Hive tables with (#40)
1 parent d0718cb commit b4400c7

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3040,6 +3040,14 @@ object SQLConf {
30403040
.booleanConf
30413041
.createWithDefault(false)
30423042

3043+
val READ_PARTITION_WITH_SUBDIRECTORY_ENABLED =
3044+
buildConf("spark.sql.sources.readPartitionWithSubdirectory.enabled")
3045+
.doc("When set to true, Spark SQL could read the files of " +
3046+
" partitioned hive table from subdirectories under root path of table")
3047+
.booleanConf
3048+
.createWithDefault(true)
3049+
3050+
30433051
/**
30443052
* Holds information about keys that have been deprecated.
30453053
*
@@ -3694,6 +3702,9 @@ class SQLConf extends Serializable with Logging {
36943702

36953703
def charVarcharAsString: Boolean = getConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING)
36963704

3705+
def readPartitionWithSubdirectoryEnabled: Boolean =
3706+
getConf(READ_PARTITION_WITH_SUBDIRECTORY_ENABLED)
3707+
36973708
/** ********************** SQLConf functionality methods ************ */
36983709

36993710
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class InMemoryFileIndex(
5959
override val rootPaths =
6060
rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, hadoopConf))
6161

62+
val readPartitionWithSubdirectoryEnabled =
63+
sparkSession.sessionState.conf.readPartitionWithSubdirectoryEnabled
64+
6265
@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
6366
@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
6467
@volatile private var cachedPartitionSpec: PartitionSpec = _
@@ -94,10 +97,23 @@ class InMemoryFileIndex(
9497
val files = listLeafFiles(rootPaths)
9598
cachedLeafFiles =
9699
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
97-
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
100+
cachedLeafDirToChildrenFiles =
101+
if (readPartitionWithSubdirectoryEnabled) {
102+
files.toArray.groupBy(file => getRootPathsLeafDir(file.getPath.getParent))
103+
} else {
104+
files.toArray.groupBy(_.getPath.getParent)
105+
}
98106
cachedPartitionSpec = null
99107
}
100108

109+
private def getRootPathsLeafDir(path: Path): Path = {
110+
if (rootPaths.contains(path)) {
111+
path
112+
} else {
113+
getRootPathsLeafDir(path.getParent)
114+
}
115+
}
116+
101117
override def equals(other: Any): Boolean = other match {
102118
case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
103119
case _ => false

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Striped
2525
import org.apache.hadoop.fs.Path
2626

2727
import org.apache.spark.SparkException
28+
import org.apache.spark.deploy.SparkHadoopUtil
2829
import org.apache.spark.internal.Logging
2930
import org.apache.spark.sql.{AnalysisException, SparkSession}
3031
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
@@ -241,7 +242,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
241242
LogicalRelation(
242243
DataSource(
243244
sparkSession = sparkSession,
244-
paths = rootPath.toString :: Nil,
245+
paths = getDirectoryPathSeq(rootPath),
245246
userSpecifiedSchema = Option(updatedTable.dataSchema),
246247
bucketSpec = None,
247248
options = options,
@@ -277,6 +278,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
277278
result.copy(output = newOutput)
278279
}
279280

281+
private def getDirectoryPathSeq(rootPath: Path): Seq[String] = {
282+
val enableSupportSubDirectories =
283+
sparkSession.sessionState.conf.readPartitionWithSubdirectoryEnabled
284+
285+
if (enableSupportSubDirectories) {
286+
val fs = rootPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
287+
SparkHadoopUtil.get.listLeafDirStatuses(fs, rootPath).map(_.getPath.toString)
288+
} else {
289+
rootPath.toString :: Nil
290+
}
291+
}
292+
280293
private def inferIfNeeded(
281294
relation: HiveTableRelation,
282295
options: Map[String, String],

0 commit comments

Comments
 (0)