Skip to content

Commit 08648c5

Browse files
committed
[FLINK-38963][table] Altering query should be possible if MATERIALIZED TABLE schema contains non persisted columns
This closes apache#27460.
1 parent 721cb0a commit 08648c5

File tree

2 files changed

+92
-46
lines changed

2 files changed

+92
-46
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.HashSet;
4444
import java.util.List;
4545
import java.util.Set;
46+
import java.util.stream.Collectors;
4647

4748
/** The utils for materialized table. */
4849
@Internal
@@ -116,9 +117,10 @@ public static RefreshMode fromLogicalRefreshModeToRefreshMode(
116117

117118
public static List<Column> validateAndExtractNewColumns(
118119
ResolvedSchema oldSchema, ResolvedSchema newSchema) {
119-
List<Column> newAddedColumns = new ArrayList<>();
120-
int originalColumnSize = oldSchema.getColumns().size();
121-
int newColumnSize = newSchema.getColumns().size();
120+
final List<Column> newColumns = getPersistedColumns(newSchema);
121+
final List<Column> oldColumns = getPersistedColumns(oldSchema);
122+
final int originalColumnSize = oldColumns.size();
123+
final int newColumnSize = newColumns.size();
122124

123125
if (originalColumnSize > newColumnSize) {
124126
throw new ValidationException(
@@ -129,9 +131,9 @@ public static List<Column> validateAndExtractNewColumns(
129131
originalColumnSize, newColumnSize));
130132
}
131133

132-
for (int i = 0; i < oldSchema.getColumns().size(); i++) {
133-
Column oldColumn = oldSchema.getColumns().get(i);
134-
Column newColumn = newSchema.getColumns().get(i);
134+
for (int i = 0; i < oldColumns.size(); i++) {
135+
Column oldColumn = oldColumns.get(i);
136+
Column newColumn = newColumns.get(i);
135137
if (!oldColumn.equals(newColumn)) {
136138
throw new ValidationException(
137139
String.format(
@@ -142,7 +144,8 @@ public static List<Column> validateAndExtractNewColumns(
142144
}
143145
}
144146

145-
for (int i = oldSchema.getColumns().size(); i < newSchema.getColumns().size(); i++) {
147+
final List<Column> newAddedColumns = new ArrayList<>();
148+
for (int i = oldColumns.size(); i < newColumns.size(); i++) {
146149
Column newColumn = newSchema.getColumns().get(i);
147150
newAddedColumns.add(newColumn.copy(newColumn.getDataType().nullable()));
148151
}
@@ -203,6 +206,12 @@ private static void throwIfPersistedColumnNotUsedByQuery(
203206
}
204207
}
205208

209+
private static List<Column> getPersistedColumns(ResolvedSchema schema) {
210+
return schema.getColumns().stream()
211+
.filter(Column::isPersisted)
212+
.collect(Collectors.toList());
213+
}
214+
206215
private static void throwPersistedColumnNotUsedException(String type, String columnName) {
207216
throw new ValidationException(
208217
String.format(PERSISTED_COLUMN_NOT_USED_IN_QUERY, type, columnName));

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java

Lines changed: 76 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import org.junit.jupiter.params.provider.MethodSource;
6060

6161
import java.util.ArrayList;
62-
import java.util.Arrays;
6362
import java.util.Collection;
6463
import java.util.Collections;
6564
import java.util.HashMap;
@@ -94,7 +93,7 @@ void before() throws TableAlreadyExistException, DatabaseNotExistException {
9493
CatalogTable.newBuilder()
9594
.schema(tableSchema)
9695
.comment("")
97-
.partitionKeys(Arrays.asList("b", "c"))
96+
.partitionKeys(List.of("b", "c"))
9897
.options(options)
9998
.build();
10099
catalog.createTable(path3, catalogTable, true);
@@ -167,6 +166,18 @@ void before() throws TableAlreadyExistException, DatabaseNotExistException {
167166
+ "AS SELECT t1.* FROM t1";
168167

169168
createMaterializedTableInCatalog(sqlWithoutConstraint, "base_mtbl_without_constraint");
169+
170+
// MATERIALIZED TABLE with non persisted columns
171+
final String sqlWithNonPersisted =
172+
"CREATE MATERIALIZED TABLE base_mtbl_with_non_persisted (\n"
173+
+ " m STRING METADATA VIRTUAL,"
174+
+ " calc AS 'a' || 'b'"
175+
+ ")\n"
176+
+ "FRESHNESS = INTERVAL '30' SECOND\n"
177+
+ "REFRESH_MODE = FULL\n"
178+
+ "AS SELECT 1";
179+
180+
createMaterializedTableInCatalog(sqlWithNonPersisted, "base_mtbl_with_non_persisted");
170181
}
171182

172183
@Test
@@ -503,15 +514,12 @@ void testAlterMaterializedTableAsQuery() throws TableNotExistException {
503514
AlterMaterializedTableAsQueryOperation op =
504515
(AlterMaterializedTableAsQueryOperation) operation;
505516
assertThat(op.getTableChanges())
506-
.isEqualTo(
507-
Arrays.asList(
508-
TableChange.add(
509-
Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))),
510-
TableChange.add(
511-
Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))),
512-
TableChange.modifyDefinitionQuery(
513-
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
514-
+ "FROM `builtin`.`default`.`t3` AS `t3`")));
517+
.containsExactly(
518+
TableChange.add(Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))),
519+
TableChange.add(Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))),
520+
TableChange.modifyDefinitionQuery(
521+
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
522+
+ "FROM `builtin`.`default`.`t3` AS `t3`"));
515523
assertThat(operation.asSummaryString())
516524
.isEqualTo(
517525
"ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
@@ -547,12 +555,11 @@ void testAlterMaterializedTableAsQuery() throws TableNotExistException {
547555
.collect(Collectors.toList());
548556
// added column should be a nullable column.
549557
assertThat(addedColumn)
550-
.isEqualTo(
551-
Arrays.asList(
552-
new Schema.UnresolvedPhysicalColumn(
553-
"e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
554-
new Schema.UnresolvedPhysicalColumn(
555-
"f", DataTypes.VARCHAR(Integer.MAX_VALUE))));
558+
.containsExactly(
559+
new Schema.UnresolvedPhysicalColumn(
560+
"e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
561+
new Schema.UnresolvedPhysicalColumn(
562+
"f", DataTypes.VARCHAR(Integer.MAX_VALUE)));
556563
}
557564

558565
@Test
@@ -562,12 +569,11 @@ void testAlterMaterializedTableAsQueryWithConflictColumnName() {
562569
(AlterMaterializedTableAsQueryOperation) parse(sql5);
563570

564571
assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
565-
.isEqualTo(
566-
Arrays.asList(
567-
TableChange.add(Column.physical("a0", DataTypes.INT())),
568-
TableChange.modifyDefinitionQuery(
569-
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`c` AS `a`\n"
570-
+ "FROM `builtin`.`default`.`t3` AS `t3`")));
572+
.containsExactly(
573+
TableChange.add(Column.physical("a0", DataTypes.INT())),
574+
TableChange.modifyDefinitionQuery(
575+
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`c` AS `a`\n"
576+
+ "FROM `builtin`.`default`.`t3` AS `t3`"));
571577
}
572578

573579
@Test
@@ -648,15 +654,12 @@ void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistEx
648654
AlterMaterializedTableAsQueryOperation op =
649655
(AlterMaterializedTableAsQueryOperation) operation;
650656
assertThat(op.getTableChanges())
651-
.isEqualTo(
652-
Arrays.asList(
653-
TableChange.add(
654-
Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))),
655-
TableChange.add(
656-
Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))),
657-
TableChange.modifyDefinitionQuery(
658-
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
659-
+ "FROM `builtin`.`default`.`t3` AS `t3`")));
657+
.containsExactly(
658+
TableChange.add(Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))),
659+
TableChange.add(Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))),
660+
TableChange.modifyDefinitionQuery(
661+
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
662+
+ "FROM `builtin`.`default`.`t3` AS `t3`"));
660663
assertThat(operation.asSummaryString())
661664
.isEqualTo(
662665
"ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
@@ -692,12 +695,11 @@ void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistEx
692695
.collect(Collectors.toList());
693696
// added column should be a nullable column.
694697
assertThat(addedColumn)
695-
.isEqualTo(
696-
Arrays.asList(
697-
new Schema.UnresolvedPhysicalColumn(
698-
"e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
699-
new Schema.UnresolvedPhysicalColumn(
700-
"f", DataTypes.VARCHAR(Integer.MAX_VALUE))));
698+
.containsExactly(
699+
new Schema.UnresolvedPhysicalColumn(
700+
"e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
701+
new Schema.UnresolvedPhysicalColumn(
702+
"f", DataTypes.VARCHAR(Integer.MAX_VALUE)));
701703
}
702704

703705
private static Collection<TestSpec> testDataForCreateAlterMaterializedTableFailedCase() {
@@ -1051,6 +1053,7 @@ private static Collection<TestSpec> alterSuccessCase() {
10511053
list.addAll(alterAddSchemaSuccessCase());
10521054
list.addAll(alterModifySchemaSuccessCase());
10531055
list.addAll(alterDropSchemaSuccessCase());
1056+
list.addAll(alterQuerySuccessCase());
10541057
return list;
10551058
}
10561059

@@ -1177,6 +1180,40 @@ private static Collection<TestSpec> alterDropSchemaSuccessCase() {
11771180
return list;
11781181
}
11791182

1183+
private static Collection<TestSpec> alterQuerySuccessCase() {
1184+
final Collection<TestSpec> list = new ArrayList<>();
1185+
1186+
list.add(
1187+
TestSpec.withExpectedSchema(
1188+
"ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted AS SELECT 1",
1189+
"(\n"
1190+
+ " `m` STRING METADATA VIRTUAL,\n"
1191+
+ " `calc` AS 'a' || 'b',\n"
1192+
+ " `EXPR$0` INT NOT NULL\n"
1193+
+ ")"));
1194+
1195+
list.add(
1196+
TestSpec.withExpectedSchema(
1197+
"ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted AS SELECT 2, 'a' AS sec",
1198+
"(\n"
1199+
+ " `m` STRING METADATA VIRTUAL,\n"
1200+
+ " `calc` AS 'a' || 'b',\n"
1201+
+ " `EXPR$0` INT NOT NULL,\n"
1202+
+ " `sec` CHAR(1)\n"
1203+
+ ")"));
1204+
1205+
list.add(
1206+
TestSpec.withExpectedSchema(
1207+
"CREATE OR ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted AS SELECT 2, 'a' AS sec",
1208+
"(\n"
1209+
+ " `m` STRING METADATA VIRTUAL,\n"
1210+
+ " `calc` AS 'a' || 'b',\n"
1211+
+ " `EXPR$0` INT NOT NULL,\n"
1212+
+ " `sec` CHAR(1)\n"
1213+
+ ")"));
1214+
return list;
1215+
}
1216+
11801217
private static Collection<Arguments> testDataWithDifferentSchemasSuccessCase() {
11811218
final Collection<Arguments> list = new ArrayList<>();
11821219
list.addAll(createOrAlter(CREATE_OPERATION));
@@ -1259,7 +1296,7 @@ private CatalogMaterializedTable.Builder getDefaultMaterializedTableBuilder() {
12591296
.build())
12601297
.comment("materialized table comment")
12611298
.options(Map.of("connector", "filesystem", "format", "json"))
1262-
.partitionKeys(Arrays.asList("a", "d"))
1299+
.partitionKeys(List.of("a", "d"))
12631300
.originalQuery("SELECT *\nFROM `t1`")
12641301
.expandedQuery(
12651302
"SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"

0 commit comments

Comments
 (0)