Skip to content

Commit cd219d1

Browse files
authored
[FLINK-38355][table] Support CREATE OR ALTER MATERIALIZED TABLE syntax
1 parent 863b756 commit cd219d1

File tree

31 files changed

+988
-327
lines changed

31 files changed

+988
-327
lines changed

docs/content.zh/docs/dev/table/materialized-table/statements.md

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ under the License.
2727
# 物化表语法
2828

2929
Flink SQL 目前支持以下物化表操作:
30-
- [CREATE MATERIALIZED TABLE](#create-materialized-table)
30+
- [CREATE [OR ALTER] MATERIALIZED TABLE](#create-or-alter-materialized-table)
3131
- [ALTER MATERIALIZED TABLE](#alter-materialized-table)
3232
- [DROP MATERIALIZED TABLE](#drop-materialized-table)
3333

34-
# CREATE MATERIALIZED TABLE
34+
# CREATE [OR ALTER] MATERIALIZED TABLE
3535

3636
```
37-
CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
37+
CREATE [OR ALTER] MATERIALIZED TABLE [catalog_name.][db_name.]table_name
3838
3939
[(
4040
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
@@ -228,6 +228,30 @@ CREATE MATERIALIZED TABLE my_materialized_table
228228
AS SELECT * FROM kafka_catalog.db1.kafka_table;
229229
```
230230

231+
## OR ALTER
232+
233+
The `OR ALTER` clause provides create-or-update semantics:
234+
235+
- **If the materialized table does not exist**: Creates a new materialized table with the specified options
236+
- **If the materialized table exists**: Modifies the query definition (behaves like `ALTER MATERIALIZED TABLE AS`)
237+
238+
This is particularly useful in declarative deployment scenarios where you want to define the desired state without checking if the materialized table already exists.
239+
240+
**Behavior when materialized table exists:**
241+
242+
The operation updates the materialized table similarly to [ALTER MATERIALIZED TABLE AS](#as-select_statement-1):
243+
244+
**Full mode:**
245+
1. Updates the schema and query definition
246+
2. The materialized table is refreshed using the new query when the next refresh job is triggered
247+
248+
**Continuous mode:**
249+
1. Pauses the current running refresh job
250+
2. Updates the schema and query definition
251+
3. Starts a new refresh job from the beginning
252+
253+
See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
254+
231255
## 示例
232256

233257
假定 `materialized-table.refresh-mode.freshness-threshold` 为 30 分钟。
@@ -313,6 +337,46 @@ It might happen that types of columns are not the same, in that case implicit ca
313337
If for some of the combinations implicit cast is not supported then there will be validation error thrown.
314338
Also, it is worth to note that reordering can also be done here.
315339

340+
Create or alter a materialized table executed twice:
341+
342+
```sql
343+
-- First execution: creates the materialized table
344+
CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
345+
FRESHNESS = INTERVAL '10' SECOND
346+
AS
347+
SELECT
348+
user_id,
349+
COUNT(*) AS event_count,
350+
SUM(amount) AS total_amount
351+
FROM
352+
kafka_catalog.db1.events
353+
WHERE
354+
event_type = 'purchase'
355+
GROUP BY
356+
user_id;
357+
358+
-- Second execution: alters the query definition (adds avg_amount column)
359+
CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
360+
FRESHNESS = INTERVAL '10' SECOND
361+
AS
362+
SELECT
363+
user_id,
364+
COUNT(*) AS event_count,
365+
SUM(amount) AS total_amount,
366+
AVG(amount) AS avg_amount -- Add a new nullable column at the end
367+
FROM
368+
kafka_catalog.db1.events
369+
WHERE
370+
event_type = 'purchase'
371+
GROUP BY
372+
user_id;
373+
```
374+
375+
<span class="label label-danger">Note</span>
376+
- When altering an existing materialized table, schema evolution currently only supports adding `nullable` columns to the end of the original materialized table's schema.
377+
- In continuous mode, the new refresh job will not restore from the state of the original refresh job when altering.
378+
- All limitations from both CREATE and ALTER operations apply.
379+
316380
## 限制
317381
- Does not support explicitly specifying physical columns which are not used in the query
318382
- 不支持在 select 查询语句中引用临时表、临时视图或临时函数

docs/content/docs/dev/table/materialized-table/statements.md

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ under the License.
2727
# Materialized Table Statements
2828

2929
Flink SQL supports the following Materialized Table statements for now:
30-
- [CREATE MATERIALIZED TABLE](#create-materialized-table)
30+
- [CREATE [OR ALTER] MATERIALIZED TABLE](#create-or-alter-materialized-table)
3131
- [ALTER MATERIALIZED TABLE](#alter-materialized-table)
3232
- [DROP MATERIALIZED TABLE](#drop-materialized-table)
3333

34-
# CREATE MATERIALIZED TABLE
34+
# CREATE [OR ALTER] MATERIALIZED TABLE
3535

3636
```
37-
CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
37+
CREATE [OR ALTER] MATERIALIZED TABLE [catalog_name.][db_name.]table_name
3838
3939
[(
4040
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
@@ -228,6 +228,30 @@ CREATE MATERIALIZED TABLE my_materialized_table
228228
AS SELECT * FROM kafka_catalog.db1.kafka_table;
229229
```
230230

231+
## OR ALTER
232+
233+
The `OR ALTER` clause provides create-or-update semantics:
234+
235+
- **If the materialized table does not exist**: Creates a new materialized table with the specified options
236+
- **If the materialized table exists**: Modifies the query definition (behaves like `ALTER MATERIALIZED TABLE AS`)
237+
238+
This is particularly useful in declarative deployment scenarios where you want to define the desired state without checking if the materialized table already exists.
239+
240+
**Behavior when materialized table exists:**
241+
242+
The operation updates the materialized table similarly to [ALTER MATERIALIZED TABLE AS](#as-select_statement-1):
243+
244+
**Full mode:**
245+
1. Updates the schema and query definition
246+
2. The materialized table is refreshed using the new query when the next refresh job is triggered
247+
248+
**Continuous mode:**
249+
1. Pauses the current running refresh job
250+
2. Updates the schema and query definition
251+
3. Starts a new refresh job from the beginning
252+
253+
See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
254+
231255
## Examples
232256

233257
Assuming `materialized-table.refresh-mode.freshness-threshold` is 30 minutes.
@@ -311,6 +335,46 @@ It might happen that types of columns are not the same, in that case implicit ca
311335
If for some of the combinations implicit cast is not supported then there will be validation error thrown.
312336
Also, it is worth to note that reordering can also be done here.
313337

338+
Create or alter a materialized table executed twice:
339+
340+
```sql
341+
-- First execution: creates the materialized table
342+
CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
343+
FRESHNESS = INTERVAL '10' SECOND
344+
AS
345+
SELECT
346+
user_id,
347+
COUNT(*) AS event_count,
348+
SUM(amount) AS total_amount
349+
FROM
350+
kafka_catalog.db1.events
351+
WHERE
352+
event_type = 'purchase'
353+
GROUP BY
354+
user_id;
355+
356+
-- Second execution: alters the query definition (adds avg_amount column)
357+
CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
358+
FRESHNESS = INTERVAL '10' SECOND
359+
AS
360+
SELECT
361+
user_id,
362+
COUNT(*) AS event_count,
363+
SUM(amount) AS total_amount,
364+
AVG(amount) AS avg_amount -- Add a new nullable column at the end
365+
FROM
366+
kafka_catalog.db1.events
367+
WHERE
368+
event_type = 'purchase'
369+
GROUP BY
370+
user_id;
371+
```
372+
373+
<span class="label label-danger">Note</span>
374+
- When altering an existing materialized table, schema evolution currently only supports adding `nullable` columns to the end of the original materialized table's schema.
375+
- In continuous mode, the new refresh job will not restore from the state of the original refresh job when altering.
376+
- All limitations from both CREATE and ALTER operations apply.
377+
314378
## Limitations
315379

316380
- Does not support explicitly specifying physical columns which are not used in the query

flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ private void createMaterializedTableInFullMode(
265265
CreateRefreshWorkflow createRefreshWorkflow =
266266
new CreatePeriodicRefreshWorkflow(
267267
materializedTableIdentifier,
268-
catalogMaterializedTable.getDefinitionQuery(),
268+
catalogMaterializedTable.getExpandedQuery(),
269269
cronExpression,
270270
getSessionInitializationConf(operationExecutor),
271271
Collections.emptyMap(),
@@ -569,7 +569,7 @@ private void executeContinuousRefreshJob(
569569
String insertStatement =
570570
getInsertStatement(
571571
materializedTableIdentifier,
572-
catalogMaterializedTable.getDefinitionQuery(),
572+
catalogMaterializedTable.getExpandedQuery(),
573573
dynamicOptions);
574574

575575
JobExecutionResult result =
@@ -651,7 +651,7 @@ public ResultFetcher refreshMaterializedTable(
651651
String insertStatement =
652652
getRefreshStatement(
653653
materializedTableIdentifier,
654-
materializedTable.getDefinitionQuery(),
654+
materializedTable.getExpandedQuery(),
655655
refreshPartitions,
656656
dynamicOptions);
657657

@@ -868,8 +868,8 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
868868
LOG.warn(
869869
"Failed to start the continuous refresh job for materialized table {} using new query {}, rollback to origin query {}.",
870870
tableIdentifier,
871-
op.getCatalogMaterializedTable().getDefinitionQuery(),
872-
suspendMaterializedTable.getDefinitionQuery(),
871+
op.getCatalogMaterializedTable().getExpandedQuery(),
872+
suspendMaterializedTable.getExpandedQuery(),
873873
e);
874874

875875
AlterMaterializedTableChangeOperation rollbackChangeOperation =
@@ -891,7 +891,7 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
891891
throw new SqlExecutionException(
892892
String.format(
893893
"Failed to start the continuous refresh job using new query %s when altering materialized table %s select query.",
894-
op.getCatalogMaterializedTable().getDefinitionQuery(),
894+
op.getCatalogMaterializedTable().getExpandedQuery(),
895895
tableIdentifier),
896896
e);
897897
}
@@ -944,8 +944,7 @@ private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedT
944944
oldMaterializedTable.getSerializedRefreshHandler()));
945945
} else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
946946
rollbackChanges.add(
947-
TableChange.modifyDefinitionQuery(
948-
oldMaterializedTable.getDefinitionQuery()));
947+
TableChange.modifyDefinitionQuery(oldMaterializedTable.getExpandedQuery()));
949948
} else {
950949
throw new ValidationException(
951950
String.format(

flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,7 +1268,7 @@ void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
12681268
.isEqualTo(
12691269
Collections.singletonList(
12701270
Column.physical("order_amount_sum", DataTypes.INT())));
1271-
assertThat(newTable.getDefinitionQuery())
1271+
assertThat(newTable.getExpandedQuery())
12721272
.isEqualTo(
12731273
String.format(
12741274
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\n"
@@ -1342,7 +1342,7 @@ void testAlterMaterializedTableAddDistribution() throws Exception {
13421342
TEST_DEFAULT_DATABASE,
13431343
"users_shops"));
13441344

1345-
assertThat(newTable.getDefinitionQuery()).isEqualTo(oldTable.getDefinitionQuery());
1345+
assertThat(newTable.getExpandedQuery()).isEqualTo(oldTable.getExpandedQuery());
13461346

13471347
// the refresh handler in full mode should be the same as the old one
13481348
assertThat(oldTable.getSerializedRefreshHandler())
@@ -1408,7 +1408,7 @@ void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Excep
14081408
.isEqualTo(
14091409
Collections.singletonList(
14101410
Column.physical("order_amount_sum", DataTypes.INT())));
1411-
assertThat(newTable.getDefinitionQuery())
1411+
assertThat(newTable.getExpandedQuery())
14121412
.isEqualTo(
14131413
String.format(
14141414
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\n"
@@ -1495,7 +1495,7 @@ void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path temporaryPa
14951495
.isEqualTo(oldTable.getResolvedSchema().getPrimaryKey());
14961496
assertThat(newTable.getResolvedSchema().getWatermarkSpecs())
14971497
.isEqualTo(oldTable.getResolvedSchema().getWatermarkSpecs());
1498-
assertThat(newTable.getDefinitionQuery())
1498+
assertThat(newTable.getExpandedQuery())
14991499
.isEqualTo(
15001500
String.format(
15011501
"SELECT COALESCE(`tmp`.`user_id`, CAST(0 AS BIGINT)) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
@@ -1594,7 +1594,7 @@ void testAlterMaterializedTableAsQueryInContinuousModeWithSuspendStatus(
15941594

15951595
assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema()))
15961596
.isEqualTo(Collections.singletonList(Column.physical("pv", DataTypes.INT())));
1597-
assertThat(newTable.getDefinitionQuery())
1597+
assertThat(newTable.getExpandedQuery())
15981598
.isEqualTo(
15991599
String.format(
16001600
"SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"

flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@
7676
"org.apache.flink.sql.parser.ddl.SqlCreateCatalog"
7777
"org.apache.flink.sql.parser.ddl.SqlCreateDatabase"
7878
"org.apache.flink.sql.parser.ddl.SqlCreateFunction"
79-
"org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable"
8079
"org.apache.flink.sql.parser.ddl.SqlCreateModel"
8180
"org.apache.flink.sql.parser.ddl.SqlCreateModelAs"
81+
"org.apache.flink.sql.parser.ddl.SqlCreateOrAlterMaterializedTable"
8282
"org.apache.flink.sql.parser.ddl.SqlCreateTable"
8383
"org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
8484
"org.apache.flink.sql.parser.ddl.SqlCreateTableAs"

flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1852,9 +1852,9 @@ SqlNode SqlReplaceTable() :
18521852
}
18531853

18541854
/**
1855-
* Parses a CREATE MATERIALIZED TABLE statement.
1855+
* Parses a CREATE [OR ALTER] MATERIALIZED TABLE statement.
18561856
*/
1857-
SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporary) :
1857+
SqlCreate SqlCreateOrAlterMaterializedTable(Span s, boolean replace, boolean isTemporary) :
18581858
{
18591859
final SqlParserPos startPos = s.pos();
18601860
SqlIdentifier tableName;
@@ -1870,8 +1870,14 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar
18701870
SqlNode asQuery = null;
18711871
SqlParserPos pos = startPos;
18721872
boolean isColumnsIdentifiersOnly = false;
1873+
boolean isOrAlter = false;
18731874
}
18741875
{
1876+
[
1877+
<OR> <ALTER> {
1878+
isOrAlter = true;
1879+
}
1880+
]
18751881
<MATERIALIZED>
18761882
{
18771883
if (isTemporary) {
@@ -1946,7 +1952,7 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar
19461952
<AS>
19471953
asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
19481954
{
1949-
return new SqlCreateMaterializedTable(
1955+
return new SqlCreateOrAlterMaterializedTable(
19501956
startPos.plus(getPos()),
19511957
tableName,
19521958
columnList,
@@ -1958,7 +1964,8 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar
19581964
propertyList,
19591965
(SqlIntervalLiteral) freshness,
19601966
refreshMode,
1961-
asQuery);
1967+
asQuery,
1968+
isOrAlter);
19621969
}
19631970
}
19641971

@@ -2646,7 +2653,7 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) :
26462653
(
26472654
create = SqlCreateCatalog(s, replace)
26482655
|
2649-
create = SqlCreateMaterializedTable(s, replace, isTemporary)
2656+
create = SqlCreateOrAlterMaterializedTable(s, replace, isTemporary)
26502657
|
26512658
create = SqlCreateTable(s, replace, isTemporary)
26522659
|

flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4353,6 +4353,7 @@ SqlCreate SqlCreate() :
43534353
{
43544354
<CREATE> { s = span(); }
43554355
[
4356+
LOOKAHEAD(2)
43564357
<OR> <REPLACE> {
43574358
replace = true;
43584359
}

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import static java.util.Objects.requireNonNull;
3030

3131
/**
32-
* Abstract class to describe statements like ALTER MATERIALIZED TABLE [catalogName.]
33-
* [dataBasesName.]tableName ...
32+
* Abstract class to describe statements like ALTER MATERIALIZED TABLE
33+
* [catalogName.][dataBasesName.]tableName ...
3434
*/
3535
public abstract class SqlAlterMaterializedTable extends SqlCall {
3636

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableAsQuery.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ public List<SqlNode> getOperandList() {
5252
@Override
5353
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
5454
super.unparse(writer, leftPrec, rightPrec);
55+
writer.newlineAndIndent();
5556
writer.keyword("AS");
57+
writer.newlineAndIndent();
5658
asQuery.unparse(writer, leftPrec, rightPrec);
5759
}
5860
}

0 commit comments

Comments
 (0)