Skip to content

Commit b5a2ed6

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-29851][SQL] V2 catalog: Change default behavior of dropping namespace to cascade
### What changes were proposed in this pull request? Currently, `SupportsNamespaces.dropNamespace` drops a namespace only if it is empty. Thus, to implement a cascading drop, one needs to iterate all objects (tables, view, etc.) within the namespace (including its sub-namespaces recursively) and drop them one by one. This can have a negative impact on the performance when there are large number of objects. Instead, this PR proposes to change the default behavior of dropping a namespace to cascading such that implementing cascading/non-cascading drop is simpler without performance penalties. ### Why are the changes needed? The new behavior makes implementing cascading/non-cascading drop simple without performance penalties. ### Does this PR introduce any user-facing change? Yes. The default behavior of `SupportsNamespaces.dropNamespace` is now cascading. ### How was this patch tested? Added new unit tests. Closes apache#26476 from imback82/drop_ns_cascade. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent f926809 commit b5a2ed6

File tree

7 files changed

+67
-40
lines changed

7 files changed

+67
-40
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,14 @@ void alterNamespace(
131131
NamespaceChange... changes) throws NoSuchNamespaceException;
132132

133133
/**
134-
* Drop a namespace from the catalog.
134+
* Drop a namespace from the catalog, recursively dropping all objects within the namespace.
135135
* <p>
136-
* This operation may be rejected by the catalog implementation if the namespace is not empty by
137-
* throwing {@link IllegalStateException}. If the catalog implementation does not support this
138-
* operation, it may throw {@link UnsupportedOperationException}.
136+
* If the catalog implementation does not support this operation, it may throw
137+
* {@link UnsupportedOperationException}.
139138
*
140139
* @param namespace a multi-part namespace
141140
* @return true if the namespace was dropped
142141
* @throws NoSuchNamespaceException If the namespace does not exist (optional)
143-
* @throws IllegalStateException If the namespace is not empty
144142
* @throws UnsupportedOperationException If drop is not a supported operation
145143
*/
146144
boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException;

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
183183
c.properties)
184184

185185
case DropNamespaceStatement(NonSessionCatalog(catalog, nameParts), ifExists, cascade) =>
186-
DropNamespace(catalog.asNamespaceCatalog, nameParts, ifExists, cascade)
186+
DropNamespace(catalog, nameParts, ifExists, cascade)
187187

188188
case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) =>
189189
ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
2020
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, Star, UnresolvedException}
2121
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable}
2222
import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
23-
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog, TableChange}
23+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, SupportsNamespaces, TableCatalog, TableChange}
2424
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
2525
import org.apache.spark.sql.connector.expressions.Transform
2626
import org.apache.spark.sql.types.{DataType, StringType, StructType}
@@ -250,7 +250,7 @@ case class CreateNamespace(
250250
* The logical plan of the DROP NAMESPACE command that works for v2 catalogs.
251251
*/
252252
case class DropNamespace(
253-
catalog: SupportsNamespaces,
253+
catalog: CatalogPlugin,
254254
namespace: Seq[String],
255255
ifExists: Boolean,
256256
cascade: Boolean) extends Command

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
3131
class BasicInMemoryTableCatalog extends TableCatalog {
3232
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
3333

34+
protected val namespaces: util.Map[List[String], Map[String, String]] =
35+
new ConcurrentHashMap[List[String], Map[String, String]]()
36+
3437
protected val tables: util.Map[Identifier, InMemoryTable] =
3538
new ConcurrentHashMap[Identifier, InMemoryTable]()
3639

@@ -74,6 +77,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
7477

7578
val table = new InMemoryTable(s"$name.${ident.quoted}", schema, partitions, properties)
7679
tables.put(ident, table)
80+
namespaces.putIfAbsent(ident.namespace.toList, Map())
7781
table
7882
}
7983

@@ -120,11 +124,6 @@ class BasicInMemoryTableCatalog extends TableCatalog {
120124
}
121125

122126
class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces {
123-
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
124-
125-
protected val namespaces: util.Map[List[String], Map[String, String]] =
126-
new ConcurrentHashMap[List[String], Map[String, String]]()
127-
128127
private def allNamespaces: Seq[Seq[String]] = {
129128
(tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct
130129
}
@@ -181,9 +180,8 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp
181180
}
182181

183182
override def dropNamespace(namespace: Array[String]): Boolean = {
184-
if (listTables(namespace).nonEmpty) {
185-
throw new IllegalStateException(s"Cannot delete non-empty namespace: ${namespace.quoted}")
186-
}
183+
listNamespaces(namespace).map(dropNamespace)
184+
listTables(namespace).map(dropTable)
187185
Option(namespaces.remove(namespace.toList)).isDefined
188186
}
189187
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -847,19 +847,16 @@ class TableCatalogSuite extends SparkFunSuite {
847847
assert(catalog.namespaceExists(testNs) === false)
848848
}
849849

850-
test("dropNamespace: fail if not empty") {
850+
test("dropNamespace: drop even if it's not empty") {
851851
val catalog = newCatalog()
852852

853853
catalog.createNamespace(testNs, Map("property" -> "value").asJava)
854854
catalog.createTable(testIdent, schema, Array.empty, emptyProps)
855855

856-
val exc = intercept[IllegalStateException] {
857-
catalog.dropNamespace(testNs)
858-
}
856+
assert(catalog.dropNamespace(testNs))
859857

860-
assert(exc.getMessage.contains(testNs.quoted))
861-
assert(catalog.namespaceExists(testNs) === true)
862-
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
858+
assert(!catalog.namespaceExists(testNs))
859+
assert(catalog.listTables(testNs).isEmpty)
863860
}
864861

865862
test("alterNamespace: basic behavior") {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,36 @@ import org.apache.spark.SparkException
2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
2323
import org.apache.spark.sql.catalyst.expressions.Attribute
24-
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
24+
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, SupportsNamespaces}
2525

2626
/**
2727
* Physical plan node for dropping a namespace.
2828
*/
2929
case class DropNamespaceExec(
30-
catalog: SupportsNamespaces,
30+
catalog: CatalogPlugin,
3131
namespace: Seq[String],
3232
ifExists: Boolean,
3333
cascade: Boolean)
3434
extends V2CommandExec {
3535
override protected def run(): Seq[InternalRow] = {
36+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
37+
38+
val nsCatalog = catalog.asNamespaceCatalog
3639
val ns = namespace.toArray
37-
if (catalog.namespaceExists(ns)) {
38-
try {
39-
catalog.dropNamespace(ns)
40-
} catch {
41-
case e: IllegalStateException if cascade =>
40+
if (nsCatalog.namespaceExists(ns)) {
41+
// The default behavior of `SupportsNamespace.dropNamespace()` is cascading,
42+
// so make sure the namespace to drop is empty.
43+
if (!cascade) {
44+
if (catalog.asTableCatalog.listTables(ns).nonEmpty
45+
|| nsCatalog.listNamespaces(ns).nonEmpty) {
4246
throw new SparkException(
43-
"Cascade option for droping namespace is not supported in V2 catalog", e)
47+
s"Cannot drop a non-empty namespace: ${namespace.quoted}. " +
48+
"Use CASCADE option to drop a non-empty namespace.")
49+
}
50+
}
51+
52+
if (!nsCatalog.dropNamespace(ns)) {
53+
throw new SparkException(s"Failed to drop a namespace: ${namespace.quoted}.")
4454
}
4555
} else if (!ifExists) {
4656
throw new NoSuchNamespaceException(ns)

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -822,20 +822,44 @@ class DataSourceV2SQLSuite
822822
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq())
823823
}
824824

825-
test("DropNamespace: drop non-empty namespace") {
825+
test("DropNamespace: drop non-empty namespace with a non-cascading mode") {
826826
sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo")
827+
sql("CREATE TABLE testcat.ns1.ns2.table (id bigint) USING foo")
827828
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1"))
829+
testShowNamespaces("SHOW NAMESPACES IN testcat.ns1", Seq("ns1.ns2"))
828830

829-
val e1 = intercept[IllegalStateException] {
830-
sql("DROP NAMESPACE testcat.ns1")
831+
def assertDropFails(): Unit = {
832+
val e = intercept[SparkException] {
833+
sql("DROP NAMESPACE testcat.ns1")
834+
}
835+
assert(e.getMessage.contains("Cannot drop a non-empty namespace: ns1"))
831836
}
832-
assert(e1.getMessage.contains("Cannot delete non-empty namespace: ns1"))
833837

834-
val e2 = intercept[SparkException] {
835-
sql("DROP NAMESPACE testcat.ns1 CASCADE")
836-
}
837-
assert(e2.getMessage.contains(
838-
"Cascade option for droping namespace is not supported in V2 catalog"))
838+
// testcat.ns1.table is present, thus testcat.ns1 cannot be dropped.
839+
assertDropFails()
840+
sql("DROP TABLE testcat.ns1.table")
841+
842+
// testcat.ns1.ns2.table is present, thus testcat.ns1 cannot be dropped.
843+
assertDropFails()
844+
sql("DROP TABLE testcat.ns1.ns2.table")
845+
846+
// testcat.ns1.ns2 namespace is present, thus testcat.ns1 cannot be dropped.
847+
assertDropFails()
848+
sql("DROP NAMESPACE testcat.ns1.ns2")
849+
850+
// Now that testcat.ns1 is empty, it can be dropped.
851+
sql("DROP NAMESPACE testcat.ns1")
852+
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq())
853+
}
854+
855+
test("DropNamespace: drop non-empty namespace with a cascade mode") {
856+
sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo")
857+
sql("CREATE TABLE testcat.ns1.ns2.table (id bigint) USING foo")
858+
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1"))
859+
testShowNamespaces("SHOW NAMESPACES IN testcat.ns1", Seq("ns1.ns2"))
860+
861+
sql("DROP NAMESPACE testcat.ns1 CASCADE")
862+
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq())
839863
}
840864

841865
test("DropNamespace: test handling of 'IF EXISTS'") {

0 commit comments

Comments
 (0)