Skip to content

Commit 1c6574d

Browse files
authored
[FLINK-38470][table] Make CreateMaterializedTableOperation return ResolvedCatalogMaterializedTable
1 parent b4eaf3c commit 1c6574d

File tree

4 files changed

+17
-24
lines changed

4 files changed

+17
-24
lines changed

flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ private ResultFetcher callCreateMaterializedTableOperation(
197197
OperationExecutor operationExecutor,
198198
OperationHandle handle,
199199
CreateMaterializedTableOperation createMaterializedTableOperation) {
200-
CatalogMaterializedTable materializedTable =
200+
ResolvedCatalogMaterializedTable materializedTable =
201201
createMaterializedTableOperation.getCatalogMaterializedTable();
202202
if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
203203
createMaterializedTableInContinuousMode(
@@ -220,7 +220,7 @@ private void createMaterializedTableInContinuousMode(
220220

221221
ObjectIdentifier materializedTableIdentifier =
222222
createMaterializedTableOperation.getTableIdentifier();
223-
CatalogMaterializedTable catalogMaterializedTable =
223+
ResolvedCatalogMaterializedTable catalogMaterializedTable =
224224
createMaterializedTableOperation.getCatalogMaterializedTable();
225225

226226
try {
@@ -257,7 +257,7 @@ private void createMaterializedTableInFullMode(
257257

258258
ObjectIdentifier materializedTableIdentifier =
259259
createMaterializedTableOperation.getTableIdentifier();
260-
CatalogMaterializedTable catalogMaterializedTable =
260+
ResolvedCatalogMaterializedTable catalogMaterializedTable =
261261
createMaterializedTableOperation.getCatalogMaterializedTable();
262262

263263
// convert duration to cron expression

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,19 @@ public class SqlCreateMaterializedTable extends SqlCreate {
5151

5252
private final SqlIdentifier tableName;
5353

54-
private final SqlCharStringLiteral comment;
54+
private final @Nullable SqlTableConstraint tableConstraint;
5555

56-
private final SqlTableConstraint tableConstraint;
56+
private final @Nullable SqlCharStringLiteral comment;
5757

58-
private final SqlDistribution distribution;
58+
private final @Nullable SqlDistribution distribution;
5959

6060
private final SqlNodeList partitionKeyList;
6161

6262
private final SqlNodeList propertyList;
6363

6464
private final SqlIntervalLiteral freshness;
6565

66-
@Nullable private final SqlLiteral refreshMode;
66+
private final @Nullable SqlLiteral refreshMode;
6767

6868
private final SqlNode asQuery;
6969

@@ -80,8 +80,8 @@ public SqlCreateMaterializedTable(
8080
SqlNode asQuery) {
8181
super(OPERATOR, pos, false, false);
8282
this.tableName = requireNonNull(tableName, "tableName should not be null");
83-
this.comment = comment;
8483
this.tableConstraint = tableConstraint;
84+
this.comment = comment;
8585
this.distribution = distribution;
8686
this.partitionKeyList =
8787
requireNonNull(partitionKeyList, "partitionKeyList should not be null");
@@ -124,7 +124,7 @@ public Optional<SqlTableConstraint> getTableConstraint() {
124124
return Optional.ofNullable(tableConstraint);
125125
}
126126

127-
public SqlDistribution getDistribution() {
127+
public @Nullable SqlDistribution getDistribution() {
128128
return distribution;
129129
}
130130

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.table.api.internal.TableResultImpl;
2323
import org.apache.flink.table.api.internal.TableResultInternal;
24-
import org.apache.flink.table.catalog.CatalogMaterializedTable;
2524
import org.apache.flink.table.catalog.ObjectIdentifier;
2625
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
2726
import org.apache.flink.table.operations.Operation;
@@ -38,7 +37,7 @@ public class CreateMaterializedTableOperation
3837
implements CreateOperation, MaterializedTableOperation {
3938

4039
private final ObjectIdentifier tableIdentifier;
41-
private final CatalogMaterializedTable materializedTable;
40+
private final ResolvedCatalogMaterializedTable materializedTable;
4241

4342
public CreateMaterializedTableOperation(
4443
ObjectIdentifier tableIdentifier, ResolvedCatalogMaterializedTable materializedTable) {
@@ -57,7 +56,7 @@ public ObjectIdentifier getTableIdentifier() {
5756
return tableIdentifier;
5857
}
5958

60-
public CatalogMaterializedTable getCatalogMaterializedTable() {
59+
public ResolvedCatalogMaterializedTable getCatalogMaterializedTable() {
6160
return materializedTable;
6261
}
6362

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,7 @@ void testCreateMaterializedTable() {
126126
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
127127

128128
CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
129-
CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
130-
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
129+
ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
131130

132131
Map<String, String> options = new HashMap<>();
133132
options.put("connector", "filesystem");
@@ -152,8 +151,7 @@ void testCreateMaterializedTable() {
152151
.definitionQuery("SELECT *\n" + "FROM `builtin`.`default`.`t1`")
153152
.build();
154153

155-
assertThat(((ResolvedCatalogMaterializedTable) materializedTable).getOrigin())
156-
.isEqualTo(expected);
154+
assertThat(materializedTable.getOrigin()).isEqualTo(expected);
157155
}
158156

159157
@Test
@@ -202,8 +200,7 @@ void testContinuousRefreshMode() {
202200
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
203201

204202
CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
205-
CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
206-
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
203+
ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
207204

208205
assertThat(materializedTable.getLogicalRefreshMode())
209206
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
@@ -220,8 +217,7 @@ void testContinuousRefreshMode() {
220217
assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);
221218

222219
CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2;
223-
CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();
224-
assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
220+
ResolvedCatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();
225221

226222
assertThat(materializedTable2.getLogicalRefreshMode())
227223
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS);
@@ -240,8 +236,7 @@ void testFullRefreshMode() {
240236
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
241237

242238
CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation;
243-
CatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
244-
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
239+
ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable();
245240

246241
assertThat(materializedTable.getLogicalRefreshMode())
247242
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
@@ -258,8 +253,7 @@ void testFullRefreshMode() {
258253
assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);
259254

260255
CreateMaterializedTableOperation op2 = (CreateMaterializedTableOperation) operation2;
261-
CatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();
262-
assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
256+
ResolvedCatalogMaterializedTable materializedTable2 = op2.getCatalogMaterializedTable();
263257

264258
assertThat(materializedTable2.getLogicalRefreshMode())
265259
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);

0 commit comments

Comments
 (0)