Skip to content

Commit e14e60c

Browse files
committed
[SPARK-51771][SQL] Add DSv2 APIs for ALTER TABLE ADD/DROP CONSTRAINT
### What changes were proposed in this pull request? This PR adds the following DSv2 TableChange as per SPIP [doc](https://docs.google.com/document/d/1EHjB4W1LjiXxsK_G7067j9pPX0y15LUF1Z5DlUPoPIo/): * AddConstraint * DropConstraint ### Why are the changes needed? For constraints support in Spark ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #50561 from gengliangwang/alterConstraint. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent a551080 commit e14e60c

File tree

10 files changed

+314
-14
lines changed

10 files changed

+314
-14
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,20 @@
811811
},
812812
"sqlState" : "XX000"
813813
},
814+
"CONSTRAINT_ALREADY_EXISTS" : {
815+
"message" : [
816+
"Constraint '<constraintName>' already exists. Please delete the existing constraint first.",
817+
"Existing constraint:",
818+
"<oldConstraint>"
819+
],
820+
"sqlState" : "42710"
821+
},
822+
"CONSTRAINT_DOES_NOT_EXIST" : {
823+
"message" : [
824+
"Cannot drop nonexistent constraint <constraintName> from table <tableName>."
825+
],
826+
"sqlState" : "42704"
827+
},
814828
"CONVERSION_INVALID_INPUT" : {
815829
"message" : [
816830
"The value <str> (<fmt>) cannot be converted to <targetType> because it is malformed. Correct the value as per the syntax, or change its format. Use <suggestion> to tolerate malformed input and return NULL instead."

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,10 @@ default Map<String, String> properties() {
8989
* Returns the constraints for this table.
9090
*/
9191
default Constraint[] constraints() { return new Constraint[0]; }
92+
93+
/**
94+
* Returns the current table version if implementation supports versioning.
95+
* If the table is not versioned, null can be returned here.
96+
*/
97+
default String currentVersion() { return null; }
9298
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import javax.annotation.Nullable;
2323

2424
import org.apache.spark.annotation.Evolving;
25+
import org.apache.spark.sql.connector.catalog.constraints.Constraint;
2526
import org.apache.spark.sql.connector.expressions.NamedReference;
2627
import org.apache.spark.sql.types.DataType;
2728

@@ -297,6 +298,21 @@ public int hashCode() {
297298
}
298299
}
299300

301+
/**
302+
* Create a TableChange for adding a new table constraint
303+
*/
304+
static TableChange addConstraint(Constraint constraint, String validatedTableVersion) {
305+
return new AddConstraint(constraint, validatedTableVersion);
306+
}
307+
308+
/**
309+
* Create a TableChange for dropping a table constraint
310+
*/
311+
static TableChange dropConstraint(String name, boolean ifExists, boolean cascade) {
312+
DropConstraint.Mode mode = cascade ? DropConstraint.Mode.CASCADE : DropConstraint.Mode.RESTRICT;
313+
return new DropConstraint(name, ifExists, mode);
314+
}
315+
300316
/**
301317
* A TableChange to remove a table property.
302318
* <p>
@@ -787,4 +803,82 @@ public int hashCode() {
787803
return Arrays.hashCode(clusteringColumns);
788804
}
789805
}
806+
807+
/** A TableChange to alter table and add a constraint. */
808+
final class AddConstraint implements TableChange {
809+
private final Constraint constraint;
810+
private final String validatedTableVersion;
811+
812+
private AddConstraint(Constraint constraint, String validatedTableVersion) {
813+
this.constraint = constraint;
814+
this.validatedTableVersion = validatedTableVersion;
815+
}
816+
817+
public Constraint constraint() {
818+
return constraint;
819+
}
820+
821+
public String validatedTableVersion() {
822+
return validatedTableVersion;
823+
}
824+
825+
@Override
826+
public boolean equals(Object o) {
827+
if (this == o) return true;
828+
if (o == null || getClass() != o.getClass()) return false;
829+
AddConstraint that = (AddConstraint) o;
830+
return constraint.equals(that.constraint) &&
831+
Objects.equals(validatedTableVersion, that.validatedTableVersion);
832+
}
833+
834+
@Override
835+
public int hashCode() {
836+
return Objects.hash(constraint, validatedTableVersion);
837+
}
838+
}
839+
840+
/** A TableChange to alter table and drop a constraint. */
841+
final class DropConstraint implements TableChange {
842+
private final String name;
843+
private final boolean ifExists;
844+
private final Mode mode;
845+
846+
/**
847+
* Defines modes for dropping a constraint.
848+
* <p>
849+
* RESTRICT - Prevents dropping a constraint if it is referenced by other objects.
850+
* CASCADE - Automatically drops objects that depend on the constraint.
851+
*/
852+
public enum Mode { RESTRICT, CASCADE }
853+
854+
private DropConstraint(String name, boolean ifExists, Mode mode) {
855+
this.name = name;
856+
this.ifExists = ifExists;
857+
this.mode = mode;
858+
}
859+
860+
public String name() {
861+
return name;
862+
}
863+
864+
public boolean ifExists() {
865+
return ifExists;
866+
}
867+
868+
public Mode mode() {
869+
return mode;
870+
}
871+
872+
public boolean equals(Object o) {
873+
if (this == o) return true;
874+
if (o == null || getClass() != o.getClass()) return false;
875+
DropConstraint that = (DropConstraint) o;
876+
return that.name.equals(name) && that.ifExists == ifExists && mode == that.mode;
877+
}
878+
879+
@Override
880+
public int hashCode() {
881+
return Objects.hash(name, ifExists, mode);
882+
}
883+
}
790884
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveTableConstraints.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616
*/
1717
package org.apache.spark.sql.catalyst.util
1818

19-
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, TableCatalogCapability}
19+
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, TableCatalogCapability, TableChange}
2020
import org.apache.spark.sql.connector.catalog.constraints.Constraint
2121
import org.apache.spark.sql.errors.QueryCompilationErrors
2222

2323
object ResolveTableConstraints {
24-
// Fails if the given catalog does not support table constraint.
24+
// Validates that the catalog supports create/replace table with constraints.
25+
// Throws an exception if unsupported
2526
def validateCatalogForTableConstraint(
2627
constraints: Seq[Constraint],
2728
catalog: TableCatalog,
@@ -32,4 +33,23 @@ object ResolveTableConstraints {
3233
catalog, ident, "table constraint")
3334
}
3435
}
36+
37+
// Validates that the catalog supports ALTER TABLE ADD/DROP CONSTRAINT operations.
38+
// Throws an exception if unsupported.
39+
def validateCatalogForTableChange(
40+
tableChanges: Seq[TableChange],
41+
catalog: TableCatalog,
42+
ident: Identifier): Unit = {
43+
// Check if the table changes contain table constraints.
44+
val hasTableConstraint = tableChanges.exists {
45+
case _: TableChange.AddConstraint => true
46+
case _: TableChange.DropConstraint => true
47+
case _ => false
48+
}
49+
if (hasTableConstraint &&
50+
!catalog.capabilities().contains(TableCatalogCapability.SUPPORT_TABLE_CONSTRAINT)) {
51+
throw QueryCompilationErrors.unsupportedTableOperationError(
52+
catalog, ident, "table constraint")
53+
}
54+
}
3555
}

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.connector.catalog
1919

2020
import java.util
21-
import java.util.Collections
21+
import java.util.{Collections, Locale}
2222

2323
import scala.jdk.CollectionConverters._
2424

@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
3232
import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn}
3333
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
3434
import org.apache.spark.sql.connector.catalog.TableChange._
35+
import org.apache.spark.sql.connector.catalog.constraints.Constraint
3536
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
3637
import org.apache.spark.sql.connector.expressions.{ClusterByTransform, LiteralValue, Transform}
3738
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -296,6 +297,49 @@ private[sql] object CatalogV2Util {
296297
}
297298
}
298299

300+
/**
301+
* Extracts and validates table constraints from a sequence of table changes.
302+
*/
303+
def collectConstraintChanges(
304+
table: Table,
305+
changes: Seq[TableChange]): Array[Constraint] = {
306+
val constraints = table.constraints()
307+
308+
def findExistingConstraint(name: String): Option[Constraint] = {
309+
constraints.find(_.name.toLowerCase(Locale.ROOT) == name.toLowerCase(Locale.ROOT))
310+
}
311+
312+
changes.foldLeft(constraints) { (constraints, change) =>
313+
change match {
314+
case add: AddConstraint =>
315+
val newConstraint = add.constraint
316+
val existingConstraint = findExistingConstraint(newConstraint.name)
317+
if (existingConstraint.isDefined) {
318+
throw new AnalysisException(
319+
errorClass = "CONSTRAINT_ALREADY_EXISTS",
320+
messageParameters =
321+
Map("constraintName" -> existingConstraint.get.name,
322+
"oldConstraint" -> existingConstraint.get.toDDL))
323+
}
324+
constraints :+ newConstraint
325+
326+
case drop: DropConstraint =>
327+
val existingConstraint = findExistingConstraint(drop.name)
328+
if (existingConstraint.isEmpty && !drop.ifExists) {
329+
throw new AnalysisException(
330+
errorClass = "CONSTRAINT_DOES_NOT_EXIST",
331+
messageParameters =
332+
Map("constraintName" -> drop.name, "tableName" -> table.name()))
333+
}
334+
constraints.filterNot(_.name == drop.name)
335+
336+
case _ =>
337+
// ignore non-constraint changes
338+
constraints
339+
}
340+
}.toArray
341+
}
342+
299343
private def addField(
300344
schema: StructType,
301345
field: StructField,

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala

Lines changed: 112 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.Collections
2323
import scala.jdk.CollectionConverters._
2424

2525
import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, SparkUnsupportedOperationException}
26+
import org.apache.spark.sql.AnalysisException
2627
import org.apache.spark.sql.catalyst.InternalRow
2728
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
2829
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -58,6 +59,14 @@ class CatalogSuite extends SparkFunSuite {
5859
private val testIdentNewQuoted = testIdentNew.asMultipartIdentifier
5960
.map(part => quoteIdentifier(part)).mkString(".")
6061

62+
private val constraints: Array[Constraint] = Array(
63+
Constraint.primaryKey("pk", Array(FieldReference.column("id"))).build(),
64+
Constraint.check("chk").predicateSql("id > 0").build(),
65+
Constraint.unique("uk", Array(FieldReference.column("data"))).build(),
66+
Constraint.foreignKey("fk", Array(FieldReference.column("data")), testIdentNew,
67+
Array(FieldReference.column("id"))).build()
68+
)
69+
6170
test("Catalogs can load the catalog") {
6271
val catalog = newCatalog()
6372

@@ -174,13 +183,6 @@ class CatalogSuite extends SparkFunSuite {
174183
val columns = Array(
175184
Column.create("id", IntegerType, false),
176185
Column.create("data", StringType))
177-
val constraints: Array[Constraint] = Array(
178-
Constraint.primaryKey("pk", Array(FieldReference.column("id"))).build(),
179-
Constraint.check("chk").predicateSql("id > 0").build(),
180-
Constraint.unique("uk", Array(FieldReference.column("data"))).build(),
181-
Constraint.foreignKey("fk", Array(FieldReference.column("data")), testIdentNew,
182-
Array(FieldReference.column("id"))).build()
183-
)
184186
val tableInfo = new TableInfo.Builder()
185187
.withColumns(columns)
186188
.withPartitions(emptyTrans)
@@ -845,6 +847,109 @@ class CatalogSuite extends SparkFunSuite {
845847
checkErrorTableNotFound(exc, testIdentQuoted)
846848
}
847849

850+
test("alterTable: add constraint") {
851+
val catalog = newCatalog()
852+
853+
val tableColumns = Array(
854+
Column.create("id", IntegerType, false),
855+
Column.create("data", StringType))
856+
val tableInfo = new TableInfo.Builder()
857+
.withColumns(tableColumns)
858+
.build()
859+
val table = catalog.createTable(testIdent, tableInfo)
860+
861+
assert(table.constraints.isEmpty)
862+
863+
for ((constraint, index) <- constraints.zipWithIndex) {
864+
val updated = catalog.alterTable(testIdent, TableChange.addConstraint(constraint, null))
865+
assert(updated.constraints.length === index + 1)
866+
assert(updated.constraints.apply(index) === constraint)
867+
}
868+
}
869+
870+
test("alterTable: add existing constraint should fail") {
871+
val catalog = newCatalog()
872+
873+
val tableColumns = Array(
874+
Column.create("id", IntegerType, false),
875+
Column.create("data", StringType))
876+
val tableInfo = new TableInfo.Builder()
877+
.withColumns(tableColumns)
878+
.withConstraints(constraints)
879+
.build()
880+
val table = catalog.createTable(testIdent, tableInfo)
881+
882+
assert(table.constraints.length === constraints.length)
883+
884+
for (constraint <- constraints) {
885+
checkError(
886+
exception = intercept[AnalysisException] {
887+
catalog.alterTable(testIdent, TableChange.addConstraint(constraint, null))
888+
},
889+
condition = "CONSTRAINT_ALREADY_EXISTS",
890+
parameters = Map("constraintName" -> constraint.name, "oldConstraint" -> constraint.toDDL))
891+
}
892+
}
893+
894+
test("alterTable: drop constraint") {
895+
val catalog = newCatalog()
896+
897+
val tableColumns = Array(
898+
Column.create("id", IntegerType, false),
899+
Column.create("data", StringType))
900+
val tableInfo = new TableInfo.Builder()
901+
.withColumns(tableColumns)
902+
.withConstraints(constraints)
903+
.build()
904+
val table = catalog.createTable(testIdent, tableInfo)
905+
906+
assert(table.constraints.length === constraints.length)
907+
908+
for ((constraint, index) <- constraints.zipWithIndex) {
909+
val updated =
910+
catalog.alterTable(testIdent, TableChange.dropConstraint(constraint.name(), false, false))
911+
assert(updated.constraints.length === constraints.length - index -1)
912+
}
913+
}
914+
915+
test("alterTable: drop non-existing constraint") {
916+
val catalog = newCatalog()
917+
918+
val tableColumns = Array(
919+
Column.create("id", IntegerType, false),
920+
Column.create("data", StringType))
921+
val tableInfo = new TableInfo.Builder()
922+
.withColumns(tableColumns)
923+
.withConstraints(constraints)
924+
.build()
925+
val table = catalog.createTable(testIdent, tableInfo)
926+
927+
checkError(
928+
exception = intercept[AnalysisException] {
929+
catalog.alterTable(testIdent,
930+
TableChange.dropConstraint("missing_constraint", false, false))
931+
},
932+
condition = "CONSTRAINT_DOES_NOT_EXIST",
933+
parameters = Map("constraintName" -> "missing_constraint",
934+
"tableName" -> table.name()))
935+
}
936+
937+
test("alterTable: drop non-existing constraint if exists") {
938+
val catalog = newCatalog()
939+
940+
val tableColumns = Array(
941+
Column.create("id", IntegerType, false),
942+
Column.create("data", StringType))
943+
val tableInfo = new TableInfo.Builder()
944+
.withColumns(tableColumns)
945+
.withConstraints(constraints)
946+
.build()
947+
catalog.createTable(testIdent, tableInfo)
948+
val updated = catalog.alterTable(testIdent,
949+
TableChange.dropConstraint("missing_constraint", true, false))
950+
assert(updated.constraints.length === constraints.length)
951+
}
952+
848953
test("dropTable") {
849954
val catalog = newCatalog()
850955

0 commit comments

Comments
 (0)