Skip to content

Commit d7bdc6a

Browse files
xy_xincloud-fan
authored andcommitted
[SPARK-29835][SQL] Remove the unnecessary conversion from Statement to LogicalPlan for DELETE/UPDATE
### What changes were proposed in this pull request? The current parse and analyze flow for DELETE is: 1, the SQL string will be firstly parsed to `DeleteFromStatement`; 2, the `DeleteFromStatement` be converted to `DeleteFromTable`. However, the SQL string can be parsed to `DeleteFromTable` directly, where a `DeleteFromStatement` seems to be redundant. It is the same for UPDATE. This pr removes the unnecessary `DeleteFromStatement` and `UpdateTableStatement`. ### Why are the changes needed? This makes the codes for DELETE and UPDATE cleaner, and keep align with MERGE INTO. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existed tests and new tests. Closes apache#26464 from xianyinxin/SPARK-29835. Authored-by: xy_xin <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent b5a2ed6 commit d7bdc6a

File tree

9 files changed

+98
-99
lines changed

9 files changed

+98
-99
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,19 +93,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
9393
s"Can not specify catalog `${catalog.name}` for view ${tableName.quoted} " +
9494
s"because view support in catalog has not been implemented yet")
9595

96-
case DeleteFromStatement(
97-
nameParts @ NonSessionCatalog(catalog, tableName), tableAlias, condition) =>
98-
val r = UnresolvedV2Relation(nameParts, catalog.asTableCatalog, tableName.asIdentifier)
99-
val aliased = tableAlias.map(SubqueryAlias(_, r)).getOrElse(r)
100-
DeleteFromTable(aliased, condition)
101-
102-
case u @ UpdateTableStatement(
103-
nameParts @ CatalogAndIdentifierParts(catalog, tableName), _, _, _, _) =>
104-
val r = UnresolvedV2Relation(nameParts, catalog.asTableCatalog, tableName.asIdentifier)
105-
val aliased = u.tableAlias.map(SubqueryAlias(_, r)).getOrElse(r)
106-
val columns = u.columns.map(UnresolvedAttribute(_))
107-
UpdateTable(aliased, columns, u.values, u.condition)
108-
10996
case DescribeTableStatement(
11097
nameParts @ NonSessionCatalog(catalog, tableName), partitionSpec, isExtended) =>
11198
if (partitionSpec.nonEmpty) {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -357,35 +357,29 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
357357

358358
override def visitDeleteFromTable(
359359
ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) {
360-
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
360+
val table = UnresolvedRelation(visitMultipartIdentifier(ctx.multipartIdentifier()))
361361
val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), "DELETE")
362+
val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
362363
val predicate = if (ctx.whereClause() != null) {
363364
Some(expression(ctx.whereClause().booleanExpression()))
364365
} else {
365366
None
366367
}
367-
368-
DeleteFromStatement(tableId, tableAlias, predicate)
368+
DeleteFromTable(aliasedTable, predicate)
369369
}
370370

371371
override def visitUpdateTable(ctx: UpdateTableContext): LogicalPlan = withOrigin(ctx) {
372-
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
372+
val table = UnresolvedRelation(visitMultipartIdentifier(ctx.multipartIdentifier()))
373373
val tableAlias = getTableAliasWithoutColumnAlias(ctx.tableAlias(), "UPDATE")
374-
val (attrs, values) = ctx.setClause().assignmentList().assignment().asScala.map {
375-
kv => visitMultipartIdentifier(kv.key) -> expression(kv.value)
376-
}.unzip
374+
val aliasedTable = tableAlias.map(SubqueryAlias(_, table)).getOrElse(table)
375+
val assignments = withAssignments(ctx.setClause().assignmentList())
377376
val predicate = if (ctx.whereClause() != null) {
378377
Some(expression(ctx.whereClause().booleanExpression()))
379378
} else {
380379
None
381380
}
382381

383-
UpdateTableStatement(
384-
tableId,
385-
tableAlias,
386-
attrs,
387-
values,
388-
predicate)
382+
UpdateTable(aliasedTable, assignments, predicate)
389383
}
390384

391385
private def withAssignments(assignCtx: SqlBaseParser.AssignmentListContext): Seq[Assignment] =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -277,24 +277,6 @@ case class DescribeColumnStatement(
277277
colNameParts: Seq[String],
278278
isExtended: Boolean) extends ParsedStatement
279279

280-
/**
281-
* A DELETE FROM statement, as parsed from SQL.
282-
*/
283-
case class DeleteFromStatement(
284-
tableName: Seq[String],
285-
tableAlias: Option[String],
286-
condition: Option[Expression]) extends ParsedStatement
287-
288-
/**
289-
* A UPDATE tbl_name statement, as parsed from SQL.
290-
*/
291-
case class UpdateTableStatement(
292-
tableName: Seq[String],
293-
tableAlias: Option[String],
294-
columns: Seq[Seq[String]],
295-
values: Seq[Expression],
296-
condition: Option[Expression]) extends ParsedStatement
297-
298280
/**
299281
* An INSERT INTO statement, as parsed from SQL.
300282
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,7 @@ case class DeleteFromTable(
290290
*/
291291
case class UpdateTable(
292292
table: LogicalPlan,
293-
columns: Seq[Expression],
294-
values: Seq[Expression],
293+
assignments: Seq[Assignment],
295294
condition: Option[Expression]) extends Command with SupportsSubquery {
296295
override def children: Seq[LogicalPlan] = table :: Nil
297296
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -772,17 +772,15 @@ class DDLParserSuite extends AnalysisTest {
772772

773773
test("delete from table: delete all") {
774774
parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
775-
DeleteFromStatement(
776-
Seq("testcat", "ns1", "ns2", "tbl"),
777-
None,
775+
DeleteFromTable(
776+
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
778777
None))
779778
}
780779

781780
test("delete from table: with alias and where clause") {
782781
parseCompare("DELETE FROM testcat.ns1.ns2.tbl AS t WHERE t.a = 2",
783-
DeleteFromStatement(
784-
Seq("testcat", "ns1", "ns2", "tbl"),
785-
Some("t"),
782+
DeleteFromTable(
783+
SubqueryAlias("t", UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"))),
786784
Some(EqualTo(UnresolvedAttribute("t.a"), Literal(2)))))
787785
}
788786

@@ -798,13 +796,12 @@ class DDLParserSuite extends AnalysisTest {
798796
parseCompare(
799797
"""
800798
|UPDATE testcat.ns1.ns2.tbl
801-
|SET t.a='Robert', t.b=32
799+
|SET a='Robert', b=32
802800
""".stripMargin,
803-
UpdateTableStatement(
804-
Seq("testcat", "ns1", "ns2", "tbl"),
805-
None,
806-
Seq(Seq("t", "a"), Seq("t", "b")),
807-
Seq(Literal("Robert"), Literal(32)),
801+
UpdateTable(
802+
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
803+
Seq(Assignment(UnresolvedAttribute("a"), Literal("Robert")),
804+
Assignment(UnresolvedAttribute("b"), Literal(32))),
808805
None))
809806
}
810807

@@ -815,11 +812,10 @@ class DDLParserSuite extends AnalysisTest {
815812
|SET t.a='Robert', t.b=32
816813
|WHERE t.c=2
817814
""".stripMargin,
818-
UpdateTableStatement(
819-
Seq("testcat", "ns1", "ns2", "tbl"),
820-
Some("t"),
821-
Seq(Seq("t", "a"), Seq("t", "b")),
822-
Seq(Literal("Robert"), Literal(32)),
815+
UpdateTable(
816+
SubqueryAlias("t", UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl"))),
817+
Seq(Assignment(UnresolvedAttribute("t.a"), Literal("Robert")),
818+
Assignment(UnresolvedAttribute("t.b"), Literal(32))),
823819
Some(EqualTo(UnresolvedAttribute("t.c"), Literal(2)))))
824820
}
825821

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -158,17 +158,6 @@ class ResolveSessionCatalog(
158158
case AlterViewUnsetPropertiesStatement(SessionCatalog(catalog, tableName), keys, ifExists) =>
159159
AlterTableUnsetPropertiesCommand(tableName.asTableIdentifier, keys, ifExists, isView = true)
160160

161-
case DeleteFromStatement(
162-
nameParts @ SessionCatalog(catalog, tableName), tableAlias, condition) =>
163-
loadTable(catalog, tableName.asIdentifier).collect {
164-
case v1Table: V1Table =>
165-
throw new AnalysisException("DELETE FROM is only supported with v2 tables.")
166-
}.getOrElse {
167-
val r = UnresolvedV2Relation(nameParts, catalog.asTableCatalog, tableName.asIdentifier)
168-
val aliased = tableAlias.map(SubqueryAlias(_, r)).getOrElse(r)
169-
DeleteFromTable(aliased, condition)
170-
}
171-
172161
case DescribeTableStatement(
173162
nameParts @ SessionCatalog(catalog, tableName), partitionSpec, isExtended) =>
174163
loadTable(catalog, tableName.asIdentifier).collect {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -157,19 +157,25 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
157157
OverwritePartitionsDynamicExec(
158158
r.table.asWritable, writeOptions.asOptions, planLater(query)) :: Nil
159159

160-
case DeleteFromTable(DataSourceV2ScanRelation(table, _, output), condition) =>
161-
if (condition.exists(SubqueryExpression.hasSubquery)) {
162-
throw new AnalysisException(
163-
s"Delete by condition with subquery is not supported: $condition")
160+
case DeleteFromTable(relation, condition) =>
161+
relation match {
162+
case DataSourceV2ScanRelation(table, _, output) =>
163+
if (condition.exists(SubqueryExpression.hasSubquery)) {
164+
throw new AnalysisException(
165+
s"Delete by condition with subquery is not supported: $condition")
166+
}
167+
// fail if any filter cannot be converted.
168+
// correctness depends on removing all matching data.
169+
val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, output)
170+
.flatMap(splitConjunctivePredicates(_).map {
171+
f => DataSourceStrategy.translateFilter(f).getOrElse(
172+
throw new AnalysisException(s"Exec update failed:" +
173+
s" cannot translate expression to source filter: $f"))
174+
}).toArray
175+
DeleteFromTableExec(table.asDeletable, filters) :: Nil
176+
case _ =>
177+
throw new AnalysisException("DELETE is only supported with v2 tables.")
164178
}
165-
// fail if any filter cannot be converted. correctness depends on removing all matching data.
166-
val filters = DataSourceStrategy.normalizeFilters(condition.toSeq, output)
167-
.flatMap(splitConjunctivePredicates(_).map {
168-
f => DataSourceStrategy.translateFilter(f).getOrElse(
169-
throw new AnalysisException(s"Exec update failed:" +
170-
s" cannot translate expression to source filter: $f"))
171-
}).toArray
172-
DeleteFromTableExec(table.asDeletable, filters) :: Nil
173179

174180
case WriteToContinuousDataSource(writer, query) =>
175181
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,21 @@ class DataSourceV2SQLSuite
13131313
}
13141314
}
13151315

1316+
test("DeleteFrom: DELETE is only supported with v2 tables") {
1317+
// unset this config to use the default v2 session catalog.
1318+
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
1319+
val v1Table = "tbl"
1320+
withTable(v1Table) {
1321+
sql(s"CREATE TABLE $v1Table" +
1322+
s" USING ${classOf[SimpleScanSource].getName} OPTIONS (from=0,to=1)")
1323+
val exc = intercept[AnalysisException] {
1324+
sql(s"DELETE FROM $v1Table WHERE i = 2")
1325+
}
1326+
1327+
assert(exc.getMessage.contains("DELETE is only supported with v2 tables"))
1328+
}
1329+
}
1330+
13161331
test("UPDATE TABLE") {
13171332
val t = "testcat.ns1.ns2.tbl"
13181333
withTable(t) {
@@ -1326,7 +1341,7 @@ class DataSourceV2SQLSuite
13261341
// UPDATE non-existing table
13271342
assertAnalysisError(
13281343
"UPDATE dummy SET name='abc'",
1329-
"Table not found")
1344+
"Table or view not found")
13301345

13311346
// UPDATE non-existing column
13321347
assertAnalysisError(

sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, EmptyFunc
3030
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
3131
import org.apache.spark.sql.catalyst.expressions.{EqualTo, IntegerLiteral, StringLiteral}
3232
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
33-
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, Assignment, CreateTableAsSelect, CreateV2Table, DeleteAction, DescribeTable, DropTable, InsertAction, LogicalPlan, MergeIntoTable, Project, SubqueryAlias, UpdateAction, UpdateTable}
33+
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, Assignment, CreateTableAsSelect, CreateV2Table, DeleteAction, DeleteFromTable, DescribeTable, DropTable, InsertAction, LogicalPlan, MergeIntoTable, Project, SubqueryAlias, UpdateAction, UpdateTable}
3434
import org.apache.spark.sql.connector.InMemoryTableProvider
3535
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCatalog, TableChange, V1Table}
3636
import org.apache.spark.sql.execution.datasources.CreateTable
@@ -854,8 +854,41 @@ class PlanResolutionSuite extends AnalysisTest {
854854
assert(parsed4.isInstanceOf[DescribeTableCommand])
855855
}
856856

857+
test("DELETE FROM") {
858+
Seq("v2Table", "testcat.tab").foreach { tblName =>
859+
val sql1 = s"DELETE FROM $tblName"
860+
val sql2 = s"DELETE FROM $tblName where name='Robert'"
861+
val sql3 = s"DELETE FROM $tblName AS t where t.name='Robert'"
862+
863+
val parsed1 = parseAndResolve(sql1)
864+
val parsed2 = parseAndResolve(sql2)
865+
val parsed3 = parseAndResolve(sql3)
866+
867+
parsed1 match {
868+
case DeleteFromTable(_: DataSourceV2Relation, None) =>
869+
case _ => fail("Expect DeleteFromTable, bug got:\n" + parsed1.treeString)
870+
}
871+
872+
parsed2 match {
873+
case DeleteFromTable(
874+
_: DataSourceV2Relation,
875+
Some(EqualTo(name: UnresolvedAttribute, StringLiteral("Robert")))) =>
876+
assert(name.name == "name")
877+
case _ => fail("Expect DeleteFromTable, bug got:\n" + parsed1.treeString)
878+
}
879+
880+
parsed3 match {
881+
case DeleteFromTable(
882+
SubqueryAlias(AliasIdentifier("t", None), _: DataSourceV2Relation),
883+
Some(EqualTo(name: UnresolvedAttribute, StringLiteral("Robert")))) =>
884+
assert(name.name == "t.name")
885+
case _ => fail("Expect DeleteFromTable, bug got:\n" + parsed1.treeString)
886+
}
887+
}
888+
}
889+
857890
test("UPDATE TABLE") {
858-
Seq("v1Table", "v2Table", "testcat.tab").foreach { tblName =>
891+
Seq("v2Table", "testcat.tab").foreach { tblName =>
859892
val sql1 = s"UPDATE $tblName SET name='Robert', age=32"
860893
val sql2 = s"UPDATE $tblName AS t SET name='Robert', age=32"
861894
val sql3 = s"UPDATE $tblName AS t SET name='Robert', age=32 WHERE p=1"
@@ -865,10 +898,10 @@ class PlanResolutionSuite extends AnalysisTest {
865898
val parsed3 = parseAndResolve(sql3)
866899

867900
parsed1 match {
868-
case u @ UpdateTable(
901+
case UpdateTable(
869902
_: DataSourceV2Relation,
870-
Seq(name: UnresolvedAttribute, age: UnresolvedAttribute),
871-
Seq(StringLiteral("Robert"), IntegerLiteral(32)),
903+
Seq(Assignment(name: UnresolvedAttribute, StringLiteral("Robert")),
904+
Assignment(age: UnresolvedAttribute, IntegerLiteral(32))),
872905
None) =>
873906
assert(name.name == "name")
874907
assert(age.name == "age")
@@ -879,8 +912,8 @@ class PlanResolutionSuite extends AnalysisTest {
879912
parsed2 match {
880913
case UpdateTable(
881914
SubqueryAlias(AliasIdentifier("t", None), _: DataSourceV2Relation),
882-
Seq(name: UnresolvedAttribute, age: UnresolvedAttribute),
883-
Seq(StringLiteral("Robert"), IntegerLiteral(32)),
915+
Seq(Assignment(name: UnresolvedAttribute, StringLiteral("Robert")),
916+
Assignment(age: UnresolvedAttribute, IntegerLiteral(32))),
884917
None) =>
885918
assert(name.name == "name")
886919
assert(age.name == "age")
@@ -891,8 +924,8 @@ class PlanResolutionSuite extends AnalysisTest {
891924
parsed3 match {
892925
case UpdateTable(
893926
SubqueryAlias(AliasIdentifier("t", None), _: DataSourceV2Relation),
894-
Seq(name: UnresolvedAttribute, age: UnresolvedAttribute),
895-
Seq(StringLiteral("Robert"), IntegerLiteral(32)),
927+
Seq(Assignment(name: UnresolvedAttribute, StringLiteral("Robert")),
928+
Assignment(age: UnresolvedAttribute, IntegerLiteral(32))),
896929
Some(EqualTo(p: UnresolvedAttribute, IntegerLiteral(1)))) =>
897930
assert(name.name == "name")
898931
assert(age.name == "age")
@@ -906,7 +939,7 @@ class PlanResolutionSuite extends AnalysisTest {
906939
val parsed = parseAndResolve(sql)
907940
parsed match {
908941
case u: UpdateTable =>
909-
assert(u.table.isInstanceOf[UnresolvedV2Relation])
942+
assert(u.table.isInstanceOf[UnresolvedRelation])
910943
case _ => fail("Expect UpdateTable, but got:\n" + parsed.treeString)
911944
}
912945
}
@@ -1193,8 +1226,6 @@ class PlanResolutionSuite extends AnalysisTest {
11931226
assert(u.sourceTable.isInstanceOf[UnresolvedRelation])
11941227
case _ => fail("Expect MergeIntoTable, but got:\n" + parsed.treeString)
11951228
}
1196-
1197-
// TODO: v1 table is not supported.
11981229
}
11991230

12001231
// TODO: add tests for more commands.

0 commit comments

Comments
 (0)