Skip to content

Commit c1a5f94

Browse files
viiryadongjoon-hyun
authored andcommitted
[SPARK-30112][SQL] Allow insert overwrite same table if using dynamic partition overwrite
### What changes were proposed in this pull request? This patch proposes to allow insert overwrite same table if using dynamic partition overwrite. ### Why are the changes needed? Currently, Insert overwrite cannot overwrite to same table even it is dynamic partition overwrite. But for dynamic partition overwrite, we do not delete partition directories ahead. We write to staging directories and move data to final partition directories. We should be able to insert overwrite to same table under dynamic partition overwrite. This enables users to read data from a table and insert overwrite to same table by using dynamic partition overwrite. Because this is not allowed for now, users need to write to other temporary location and move it back to the table. ### Does this PR introduce any user-facing change? Yes. Users can insert overwrite same table if using dynamic partition overwrite. ### How was this patch tested? Unit test. Closes apache#26752 from viirya/dynamic-overwrite-same-table. Lead-authored-by: Liang-Chi Hsieh <[email protected]> Co-authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent c8ed71b commit c1a5f94

File tree

3 files changed

+74
-16
lines changed

3 files changed

+74
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,13 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
188188
}
189189

190190
val outputPath = t.location.rootPaths.head
191-
if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath)
192-
193191
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
194192

195193
val partitionSchema = actualQuery.resolve(
196194
t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
197195
val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
198196

199-
InsertIntoHadoopFsRelationCommand(
197+
val insertCommand = InsertIntoHadoopFsRelationCommand(
200198
outputPath,
201199
staticPartitions,
202200
i.ifPartitionNotExists,
@@ -209,6 +207,14 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
209207
table,
210208
Some(t.location),
211209
actualQuery.output.map(_.name))
210+
211+
// For dynamic partition overwrite, we do not delete partition directories ahead.
212+
// We write to staging directories and move to final partition directories after writing
213+
// job is done. So it is ok to have outputPath try to overwrite inputpath.
214+
if (overwrite && !insertCommand.dynamicPartitionOverwrite) {
215+
DDLUtils.verifyNotReadPath(actualQuery, outputPath)
216+
}
217+
insertCommand
212218
}
213219
}
214220

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3030
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3131
import org.apache.spark.sql.execution.SparkPlan
3232
import org.apache.spark.sql.execution.command._
33+
import org.apache.spark.sql.internal.SQLConf
3334
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
3435
import org.apache.spark.sql.util.SchemaUtils
3536

@@ -60,6 +61,21 @@ case class InsertIntoHadoopFsRelationCommand(
6061
extends DataWritingCommand {
6162
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
6263

64+
private lazy val parameters = CaseInsensitiveMap(options)
65+
66+
private[sql] lazy val dynamicPartitionOverwrite: Boolean = {
67+
val partitionOverwriteMode = parameters.get("partitionOverwriteMode")
68+
// scalastyle:off caselocale
69+
.map(mode => PartitionOverwriteMode.withName(mode.toUpperCase))
70+
// scalastyle:on caselocale
71+
.getOrElse(SQLConf.get.partitionOverwriteMode)
72+
val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
73+
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
74+
// partition columns.
75+
enableDynamicOverwrite && mode == SaveMode.Overwrite &&
76+
staticPartitions.size < partitionColumns.length
77+
}
78+
6379
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
6480
// Most formats don't do well with duplicate columns, so lets not allow that
6581
SchemaUtils.checkColumnNameDuplication(
@@ -90,19 +106,6 @@ case class InsertIntoHadoopFsRelationCommand(
90106
fs, catalogTable.get, qualifiedOutputPath, matchingPartitions)
91107
}
92108

93-
val parameters = CaseInsensitiveMap(options)
94-
95-
val partitionOverwriteMode = parameters.get("partitionOverwriteMode")
96-
// scalastyle:off caselocale
97-
.map(mode => PartitionOverwriteMode.withName(mode.toUpperCase))
98-
// scalastyle:on caselocale
99-
.getOrElse(sparkSession.sessionState.conf.partitionOverwriteMode)
100-
val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
101-
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
102-
// partition columns.
103-
val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
104-
staticPartitions.size < partitionColumns.length
105-
106109
val committer = FileCommitProtocol.instantiate(
107110
sparkSession.sessionState.conf.fileCommitProtocolClass,
108111
jobId = java.util.UUID.randomUUID().toString,

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,55 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
270270
"INSERT OVERWRITE to a table while querying it should not be allowed.")
271271
}
272272

273+
test("SPARK-30112: it is allowed to write to a table while querying it for " +
274+
"dynamic partition overwrite.") {
275+
Seq(PartitionOverwriteMode.DYNAMIC.toString,
276+
PartitionOverwriteMode.STATIC.toString).foreach { mode =>
277+
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> mode) {
278+
withTable("insertTable") {
279+
sql(
280+
"""
281+
|CREATE TABLE insertTable(i int, part1 int, part2 int) USING PARQUET
282+
|PARTITIONED BY (part1, part2)
283+
""".stripMargin)
284+
285+
sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2=1) SELECT 1")
286+
checkAnswer(spark.table("insertTable"), Row(1, 1, 1))
287+
sql("INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2=2) SELECT 2")
288+
checkAnswer(spark.table("insertTable"), Row(1, 1, 1) :: Row(2, 1, 2) :: Nil)
289+
290+
if (mode == PartitionOverwriteMode.DYNAMIC.toString) {
291+
sql(
292+
"""
293+
|INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2)
294+
|SELECT i + 1, part2 FROM insertTable
295+
""".stripMargin)
296+
checkAnswer(spark.table("insertTable"), Row(2, 1, 1) :: Row(3, 1, 2) :: Nil)
297+
298+
sql(
299+
"""
300+
|INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2)
301+
|SELECT i + 1, part2 + 1 FROM insertTable
302+
""".stripMargin)
303+
checkAnswer(spark.table("insertTable"),
304+
Row(2, 1, 1) :: Row(3, 1, 2) :: Row(4, 1, 3) :: Nil)
305+
} else {
306+
val message = intercept[AnalysisException] {
307+
sql(
308+
"""
309+
|INSERT OVERWRITE TABLE insertTable PARTITION(part1=1, part2)
310+
|SELECT i + 1, part2 FROM insertTable
311+
""".stripMargin)
312+
}.getMessage
313+
assert(
314+
message.contains("Cannot overwrite a path that is also being read from."),
315+
"INSERT OVERWRITE to a table while querying it should not be allowed.")
316+
}
317+
}
318+
}
319+
}
320+
}
321+
273322
test("Caching") {
274323
// write something to the jsonTable
275324
sql(

0 commit comments

Comments
 (0)