Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 4e6fc69

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-4131][FOLLOW-UP] Support "Writing data into the filesystem from queries"
## What changes were proposed in this pull request? This PR is clean the codes in apache#18975 ## How was this patch tested? N/A Author: gatorsmile <[email protected]> Closes apache#19225 from gatorsmile/refactorSPARK-4131.
1 parent c76153c commit 4e6fc69

File tree

3 files changed

+10
-5
lines changed

3 files changed

+10
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,11 @@ object DDLUtils {
801801
val HIVE_PROVIDER = "hive"
802802

803803
def isHiveTable(table: CatalogTable): Boolean = {
804-
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER
804+
isHiveTable(table.provider)
805+
}
806+
807+
def isHiveTable(provider: Option[String]): Boolean = {
808+
provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER
805809
}
806810

807811
def isDatasourceTable(table: CatalogTable): Boolean = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
160160
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
161161

162162
case InsertIntoDir(isLocal, storage, provider, child, overwrite)
163-
if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER =>
164-
163+
if DDLUtils.isHiveTable(provider) =>
165164
val outputPath = new Path(storage.locationUri.get)
166165
if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
167166

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner
3131

3232
import org.apache.spark.internal.io.FileCommitProtocol
3333
import org.apache.spark.sql.SparkSession
34+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3435
import org.apache.spark.sql.catalyst.expressions.Attribute
3536
import org.apache.spark.sql.execution.SparkPlan
3637
import org.apache.spark.sql.execution.command.DataWritingCommand
@@ -50,7 +51,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
5051
hadoopConf: Configuration,
5152
fileSinkConf: FileSinkDesc,
5253
outputLocation: String,
53-
partitionAttributes: Seq[Attribute] = Nil): Unit = {
54+
customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
55+
partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
5456

5557
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
5658
if (isCompressed) {
@@ -76,7 +78,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
7678
plan = plan,
7779
fileFormat = new HiveFileFormat(fileSinkConf),
7880
committer = committer,
79-
outputSpec = FileFormatWriter.OutputSpec(outputLocation, Map.empty),
81+
outputSpec = FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations),
8082
hadoopConf = hadoopConf,
8183
partitionColumns = partitionAttributes,
8284
bucketSpec = None,

0 commit comments

Comments
 (0)