Skip to content

Commit b370375

Browse files
aokolnychyigengliangwang
authored andcommitted
[SPARK-55631][SQL] ALTER TABLE must invalidate cache for DSv2 tables
### What changes were proposed in this pull request? This PR makes ALTER TABLE command invalidate cache. ### Why are the changes needed? These changes are needed to reflect changes made in the session correctly. ### Does this PR introduce _any_ user-facing change? ALTER TABLE commands will now invalidate cache. ### How was this patch tested? This PR comes with test that would previously fail. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #54427 from aokolnychyi/spark-55631. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent d73c6cb commit b370375

File tree

3 files changed

+26
-3
lines changed

3 files changed

+26
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
3131
case class AlterTableExec(
3232
catalog: TableCatalog,
3333
ident: Identifier,
34-
changes: Seq[TableChange]) extends LeafV2CommandExec {
34+
changes: Seq[TableChange],
35+
refreshCache: () => Unit) extends LeafV2CommandExec {
3536

3637
override def output: Seq[Attribute] = Seq.empty
3738

@@ -42,7 +43,7 @@ case class AlterTableExec(
4243
case e: IllegalArgumentException if !e.isInstanceOf[SparkThrowable] =>
4344
throw QueryExecutionErrors.unsupportedTableChangeError(e)
4445
}
45-
46+
refreshCache()
4647
Seq.empty
4748
}
4849
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
579579
val table = a.table.asInstanceOf[ResolvedTable]
580580
ResolveTableConstraints.validateCatalogForTableChange(
581581
a.changes, table.catalog, table.identifier)
582-
AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil
582+
AlterTableExec(
583+
table.catalog,
584+
table.identifier,
585+
a.changes,
586+
recacheTable(table, includeTimeTravel = false)) :: Nil
583587

584588
case CreateIndex(ResolvedTable(_, _, table, _),
585589
indexName, indexType, ifNotExists, columns, properties) =>

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2663,6 +2663,24 @@ class CachedTableSuite extends QueryTest
26632663
}
26642664
}
26652665

2666+
test("ALTER TABLE invalidates cached table") {
2667+
val t = "testcat.tbl"
2668+
withTable(t) {
2669+
sql(s"CREATE TABLE $t (id int, data string) USING foo")
2670+
sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
2671+
2672+
sql(s"CACHE TABLE $t")
2673+
assertCached(sql(s"SELECT * FROM $t"))
2674+
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, "a"), Row(2, "b")))
2675+
2676+
sql(s"ALTER TABLE $t ADD COLUMN new_col int")
2677+
2678+
val result = sql(s"SELECT * FROM $t ORDER BY id")
2679+
assertCached(result)
2680+
checkAnswer(result, Seq(Row(1, "a", null), Row(2, "b", null)))
2681+
}
2682+
}
2683+
26662684
private def cacheManager = spark.sharedState.cacheManager
26672685

26682686
private def pinTable(

0 commit comments

Comments
 (0)