Skip to content

Commit 342b6d1

Browse files
committed
[FLINK-38683][table] Add schema to SHOW CREATE MATERIALIZED TABLE output
This closes #27236.
1 parent 7c60e51 commit 342b6d1

File tree

3 files changed

+75
-60
lines changed

3 files changed

+75
-60
lines changed

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,20 @@ public static String buildShowCreateTableRow(
116116
public static String buildShowCreateMaterializedTableRow(
117117
ResolvedCatalogMaterializedTable table,
118118
ObjectIdentifier tableIdentifier,
119-
boolean isTemporary) {
119+
boolean isTemporary,
120+
SqlFactory sqlFactory) {
120121
validateTableKind(table, tableIdentifier, TableKind.MATERIALIZED_TABLE);
121-
Optional<String> primaryKeys = extractFormattedPrimaryKey(table, PRINT_INDENT);
122122
StringBuilder sb =
123123
new StringBuilder()
124124
.append(
125125
buildCreateFormattedPrefix(
126-
"MATERIALIZED TABLE",
127-
isTemporary,
128-
tableIdentifier,
129-
primaryKeys.isPresent()));
130-
primaryKeys.ifPresent(s -> sb.append(s).append("\n)\n"));
126+
"MATERIALIZED TABLE", isTemporary, tableIdentifier, true));
127+
sb.append(extractFormattedColumns(table, PRINT_INDENT));
128+
extractFormattedWatermarkSpecs(table, PRINT_INDENT, sqlFactory)
129+
.ifPresent(watermarkSpecs -> sb.append(",\n").append(watermarkSpecs));
130+
extractFormattedPrimaryKey(table, PRINT_INDENT)
131+
.ifPresent(pk -> sb.append(",\n").append(pk));
132+
sb.append("\n)\n");
131133
extractComment(table).ifPresent(c -> sb.append(formatComment(c)).append("\n"));
132134
table.getDistribution()
133135
.map(TableDistribution::toString)

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ public TableResultInternal execute(Context ctx) {
5555
tableIdentifier.asSerializableString())));
5656
String resultRow =
5757
ShowCreateUtil.buildShowCreateMaterializedTableRow(
58-
table.getResolvedTable(), tableIdentifier, table.isTemporary());
58+
table.getResolvedTable(),
59+
tableIdentifier,
60+
table.isTemporary(),
61+
ctx.getCatalogManager().getSqlFactory());
5962

6063
return buildStringArrayResult("result", new String[] {resultRow});
6164
}

flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java

Lines changed: 62 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ class ShowCreateUtilTest {
6868

6969
private static final ResolvedSchema ONE_COLUMN_SCHEMA_WITH_PRIMARY_KEY =
7070
new ResolvedSchema(
71-
List.of(Column.physical("id", DataTypes.INT())),
71+
List.of(
72+
Column.physical("id", DataTypes.INT()),
73+
Column.metadata("mt_column", DataTypes.STRING(), null, true)),
7274
List.of(),
7375
UniqueConstraint.primaryKey("pk", List.of("id")),
7476
List.of());
@@ -104,12 +106,13 @@ void showCreateView(
104106
@ParameterizedTest(name = "{index}: {2}")
105107
@MethodSource("argsForShowCreateMaterializedTable")
106108
void showCreateMaterializedTable(
107-
ResolvedCatalogMaterializedTable materializedTable,
108-
boolean isTemporary,
109-
String expected) {
109+
ResolvedCatalogMaterializedTable materializedTable, String expected) {
110110
final String createMaterializedTableString =
111111
ShowCreateUtil.buildShowCreateMaterializedTableRow(
112-
materializedTable, MATERIALIZED_TABLE_IDENTIFIER, isTemporary);
112+
materializedTable,
113+
MATERIALIZED_TABLE_IDENTIFIER,
114+
false,
115+
DefaultSqlFactory.INSTANCE);
113116
assertThat(createMaterializedTableString).isEqualTo(expected);
114117
}
115118

@@ -274,55 +277,62 @@ private static Collection<Arguments> argsForShowCreateTable() {
274277

275278
private static Collection<Arguments> argsForShowCreateMaterializedTable() {
276279
final Collection<Arguments> argList = new ArrayList<>();
277-
addTemporaryAndPermanent(
278-
argList,
279-
createResolvedMaterialized(
280-
ONE_COLUMN_SCHEMA,
281-
null,
282-
List.of(),
283-
null,
284-
IntervalFreshness.ofMinute("1"),
285-
RefreshMode.CONTINUOUS,
286-
"SELECT 1"),
287-
"CREATE %sMATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName`\n"
288-
+ "FRESHNESS = INTERVAL '1' MINUTE\n"
289-
+ "REFRESH_MODE = CONTINUOUS\n"
290-
+ "AS SELECT 1\n");
280+
argList.add(
281+
Arguments.of(
282+
createResolvedMaterialized(
283+
ONE_COLUMN_SCHEMA,
284+
null,
285+
List.of(),
286+
null,
287+
IntervalFreshness.ofMinute("1"),
288+
RefreshMode.CONTINUOUS,
289+
"SELECT 1"),
290+
"CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n"
291+
+ " `id` INT\n"
292+
+ ")\n"
293+
+ "FRESHNESS = INTERVAL '1' MINUTE\n"
294+
+ "REFRESH_MODE = CONTINUOUS\n"
295+
+ "AS SELECT 1\n"));
291296

292-
addTemporaryAndPermanent(
293-
argList,
294-
createResolvedMaterialized(
295-
ONE_COLUMN_SCHEMA_WITH_PRIMARY_KEY,
296-
null,
297-
List.of(),
298-
null,
299-
IntervalFreshness.ofMinute("1"),
300-
RefreshMode.CONTINUOUS,
301-
"SELECT 1"),
302-
"CREATE %sMATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n"
303-
+ " CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n"
304-
+ ")\n"
305-
+ "FRESHNESS = INTERVAL '1' MINUTE\n"
306-
+ "REFRESH_MODE = CONTINUOUS\n"
307-
+ "AS SELECT 1\n");
297+
argList.add(
298+
Arguments.of(
299+
createResolvedMaterialized(
300+
ONE_COLUMN_SCHEMA_WITH_PRIMARY_KEY,
301+
null,
302+
List.of(),
303+
null,
304+
IntervalFreshness.ofMinute("1"),
305+
RefreshMode.CONTINUOUS,
306+
"SELECT 1"),
307+
"CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n"
308+
+ " `id` INT,\n"
309+
+ " `mt_column` VARCHAR(2147483647) METADATA VIRTUAL,\n"
310+
+ " CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n"
311+
+ ")\n"
312+
+ "FRESHNESS = INTERVAL '1' MINUTE\n"
313+
+ "REFRESH_MODE = CONTINUOUS\n"
314+
+ "AS SELECT 1\n"));
308315

309-
addTemporaryAndPermanent(
310-
argList,
311-
createResolvedMaterialized(
312-
TWO_COLUMNS_SCHEMA,
313-
"Materialized table comment",
314-
List.of("id"),
315-
TableDistribution.of(TableDistribution.Kind.HASH, 5, List.of("id")),
316-
IntervalFreshness.ofMinute("3"),
317-
RefreshMode.FULL,
318-
"SELECT id, name FROM tbl_a"),
319-
"CREATE %sMATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName`\n"
320-
+ "COMMENT 'Materialized table comment'\n"
321-
+ "DISTRIBUTED BY HASH(`id`) INTO 5 BUCKETS\n"
322-
+ "PARTITIONED BY (`id`)\n"
323-
+ "FRESHNESS = INTERVAL '3' MINUTE\n"
324-
+ "REFRESH_MODE = FULL\n"
325-
+ "AS SELECT id, name FROM tbl_a\n");
316+
argList.add(
317+
Arguments.of(
318+
createResolvedMaterialized(
319+
TWO_COLUMNS_SCHEMA,
320+
"Materialized table comment",
321+
List.of("id"),
322+
TableDistribution.of(TableDistribution.Kind.HASH, 5, List.of("id")),
323+
IntervalFreshness.ofMinute("3"),
324+
RefreshMode.FULL,
325+
"SELECT id, name FROM tbl_a"),
326+
"CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n"
327+
+ " `id` INT,\n"
328+
+ " `name` VARCHAR(2147483647)\n"
329+
+ ")\n"
330+
+ "COMMENT 'Materialized table comment'\n"
331+
+ "DISTRIBUTED BY HASH(`id`) INTO 5 BUCKETS\n"
332+
+ "PARTITIONED BY (`id`)\n"
333+
+ "FRESHNESS = INTERVAL '3' MINUTE\n"
334+
+ "REFRESH_MODE = FULL\n"
335+
+ "AS SELECT id, name FROM tbl_a\n"));
326336

327337
return argList;
328338
}

0 commit comments

Comments
 (0)