Skip to content

Commit e2ec97d

Browse files
committed
11
1 parent 032dcf8 commit e2ec97d

File tree

3 files changed

+46
-4
lines changed

3 files changed

+46
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,6 +1364,7 @@ case class DropPartitions(
13641364
parts: Seq[PartitionSpec],
13651365
ifExists: Boolean,
13661366
purge: Boolean) extends V2PartitionCommand {
1367+
override def allowPartialPartitionSpec: Boolean = true
13671368
override protected def withNewChildInternal(newChild: LogicalPlan): DropPartitions =
13681369
copy(table = newChild)
13691370
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropPartitionExec.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20+
import scala.collection.mutable
21+
2022
import org.apache.spark.sql.catalyst.InternalRow
2123
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, ResolvedPartitionSpec}
2224
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -37,15 +39,23 @@ case class DropPartitionExec(
3739
override def output: Seq[Attribute] = Seq.empty
3840

3941
override protected def run(): Seq[InternalRow] = {
40-
val (existsPartIdents, notExistsPartIdents) =
41-
partSpecs.map(_.ident).partition(table.partitionExists)
42+
val existsPartIdents = mutable.Set[InternalRow]()
43+
val notExistsPartIdents = mutable.Set[InternalRow]()
44+
partSpecs.foreach { partSpec =>
45+
val partIdents = table.listPartitionIdentifiers(partSpec.names.toArray, partSpec.ident)
46+
if (partIdents.nonEmpty) {
47+
existsPartIdents.addAll(partIdents)
48+
} else {
49+
notExistsPartIdents.add(partSpec.ident)
50+
}
51+
}
4252

4353
if (notExistsPartIdents.nonEmpty && !ignoreIfNotExists) {
4454
throw new NoSuchPartitionsException(
45-
table.name(), notExistsPartIdents, table.partitionSchema())
55+
table.name(), notExistsPartIdents.toSeq, table.partitionSchema())
4656
}
4757

48-
val isTableAltered = existsPartIdents match {
58+
val isTableAltered = existsPartIdents.toSeq match {
4959
case Seq() => false // Nothing will be done
5060
case Seq(partIdent) =>
5161
if (purge) table.purgePartition(partIdent) else table.dropPartition(partIdent)

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3042,6 +3042,37 @@ class DataSourceV2SQLSuiteV1Filter
30423042
}
30433043
}
30443044

3045+
test("Drop multi-level partition table with partial partition spec") {
3046+
val t = "testpart.tbl"
3047+
withTable(t) {
3048+
sql(s"CREATE TABLE $t (id string) USING foo PARTITIONED BY (p1 int, p2 int, p3 int)")
3049+
sql(s"INSERT INTO $t VALUES ('a', 1, 11, 111), ('b', 1, 22, 222), ('c', 2, 22, 333)")
3050+
3051+
checkAnswer(
3052+
sql(s"SHOW PARTITIONS $t"),
3053+
Seq(Row("p1=1/p2=11/p3=111"), Row("p1=1/p2=22/p3=222"), Row("p1=2/p2=22/p3=333")))
3054+
3055+
sql(s"ALTER TABLE $t DROP PARTITION (p1=1)")
3056+
checkAnswer(
3057+
sql(s"SHOW PARTITIONS $t"),
3058+
Seq(Row("p1=2/p2=22/p3=333")))
3059+
3060+
checkError(exception = intercept[AnalysisException] {
3061+
sql(s"ALTER TABLE $t DROP PARTITION (p1=4)")
3062+
},
3063+
condition = "PARTITIONS_NOT_FOUND",
3064+
sqlState = Some("428FT"),
3065+
parameters = Map(
3066+
"partitionList" -> "PARTITION (`p1` = 4)",
3067+
"tableName" -> t.split("\\.").map(part => s"`$part`").mkString(".")))
3068+
3069+
sql(s"ALTER TABLE $t DROP IF EXISTS PARTITION (p1=4)")
3070+
checkAnswer(
3071+
sql(s"SHOW PARTITIONS $t"),
3072+
Seq(Row("p1=2/p2=22/p3=333")))
3073+
}
3074+
}
3075+
30453076
test("Check HasPartitionStatistics from InMemoryPartitionTable") {
30463077
val t = "testpart.tbl"
30473078
withTable(t) {

0 commit comments

Comments
 (0)