Skip to content

Commit 4fdf2a9

Browse files
committed
format code & update case
1 parent 12d5c13 commit 4fdf2a9

File tree

6 files changed

+114
-15
lines changed

6 files changed

+114
-15
lines changed

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -410,10 +410,10 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
410410
execution.execute();
411411

412412
// Check the order and content of all received events
413-
String[] outputEvents = outCaptor.toString().trim().split("\n");
413+
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
414414
assertThat(outputEvents)
415415
.containsExactly(
416-
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL,`opts` BIGINT NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
416+
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING,`rk` STRING NOT NULL,`opts` BIGINT NOT NULL}, primaryKeys=col1, partitionKeys=col12, comment=null, options=({key1=value1})}",
417417
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I, 1], op=INSERT, meta=({op_ts=1})}",
418418
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}",
419419
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, existedColumnName=col2}]}",
@@ -1250,10 +1250,10 @@ void testTransformMergingWithRouteChangeOrder(ValuesDataSink.SinkApi sinkApi) th
12501250
.physicalColumn("gender", DataTypes.STRING())
12511251
.primaryKey("id")
12521252
.build());
1253-
String[] outputEvents = outCaptor.toString().trim().split("\n");
1253+
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
12541254
assertThat(outputEvents)
12551255
.containsExactly(
1256-
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}",
1256+
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, comment=null, options=()}",
12571257
"AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}",
12581258
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}",
12591259
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name, null], op=INSERT, meta=()}",
@@ -1392,36 +1392,36 @@ void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkA
13921392
execution.execute();
13931393

13941394
// Check the order and content of all received events
1395-
String[] outputEvents = outCaptor.toString().trim().split("\n");
1395+
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
13961396

13971397
String[] expected =
13981398
Stream.of(
13991399
// Merging timestamp with different precision
1400-
"CreateTableEvent{tableId={}_table_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}",
1400+
"CreateTableEvent{tableId={}_table_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, comment=null, options=()}",
14011401
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
14021402
"AlterColumnTypeEvent{tableId={}_table_timestamp_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}}",
14031403
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
14041404
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
14051405
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
14061406

14071407
// Merging zoned timestamp with different precision
1408-
"CreateTableEvent{tableId={}_table_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0) WITH TIME ZONE}, primaryKeys=id, options=()}",
1408+
"CreateTableEvent{tableId={}_table_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0) WITH TIME ZONE}, primaryKeys=id, comment=null, options=()}",
14091409
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[3, Alice, 17, 2020-01-01T14:28:57Z], op=INSERT, meta=()}",
14101410
"AlterColumnTypeEvent{tableId={}_table_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(0) WITH TIME ZONE}}",
14111411
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[4, Alice, 17, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}",
14121412
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[103, Zen, 19, 2020-01-01T14:28:57Z], op=INSERT, meta=()}",
14131413
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[104, Zen, 19, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}",
14141414

14151415
// Merging local-zoned timestamp with different precision
1416-
"CreateTableEvent{tableId={}_table_local_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP_LTZ(0)}, primaryKeys=id, options=()}",
1416+
"CreateTableEvent{tableId={}_table_local_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP_LTZ(0)}, primaryKeys=id, comment=null, options=()}",
14171417
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[5, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
14181418
"AlterColumnTypeEvent{tableId={}_table_local_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP_LTZ(9)}, oldTypeMapping={birthday=TIMESTAMP_LTZ(0)}}",
14191419
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[6, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
14201420
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[105, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
14211421
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[106, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
14221422

14231423
// Merging all
1424-
"CreateTableEvent{tableId={}_everything_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}",
1424+
"CreateTableEvent{tableId={}_everything_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, comment=null, options=()}",
14251425
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
14261426
"AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}}",
14271427
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
@@ -1489,11 +1489,11 @@ void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi sinkApi) thr
14891489
execution.execute();
14901490

14911491
// Check the order and content of all received events
1492-
String[] outputEvents = outCaptor.toString().trim().split("\n");
1492+
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
14931493

14941494
String[] expected =
14951495
Stream.of(
1496-
"CreateTableEvent{tableId={}, schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, primaryKeys=id, options=()}",
1496+
"CreateTableEvent{tableId={}, schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, primaryKeys=id, comment=null, options=()}",
14971497
"DataChangeEvent{tableId={}, before=[], after=[1, Alice, 17, 1], op=INSERT, meta=()}",
14981498
"AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=SMALLINT}, oldTypeMapping={fav_num=TINYINT}}",
14991499
"DataChangeEvent{tableId={}, before=[], after=[2, Alice, 17, 22], op=INSERT, meta=()}",

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ limitations under the License.
2828

2929
<properties>
3030
<doris.connector.version>24.0.1</doris.connector.version>
31-
<testcontainers.version>1.18.3</testcontainers.version>
3231
<mysql.connector.version>8.0.26</mysql.connector.version>
3332
</properties>
3433

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public class MySqlDataSourceOptions {
281281
.withDescription(
282282
"List of readable metadata from SourceRecord to be passed to downstream, split by `,`. "
283283
+ "Available readable metadata are: op_ts.");
284-
284+
285285
@Experimental
286286
public static final ConfigOption<Boolean> INCLUDE_COMMENTS_ENABLED =
287287
ConfigOptions.key("include-comments.enabled")

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
6868
private transient CustomMySqlAntlrDdlParser customParser;
6969

7070
private List<MySqlReadableMetadata> readableMetadataList;
71-
71+
7272
public MySqlEventDeserializer(
7373
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
7474
this(changelogMode, includeSchemaChanges, new ArrayList<>(), false);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,14 @@
7575
import java.util.stream.Collectors;
7676
import java.util.stream.Stream;
7777

78+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
79+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
80+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
81+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
7882
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
83+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
84+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
85+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
7986
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
8087
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
8188
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults;
@@ -931,6 +938,98 @@ public void testDanglingDropTableEventInBinlog() throws Exception {
931938
actual.stream().map(Object::toString).collect(Collectors.toList()));
932939
}
933940

941+
@Test
942+
public void testIncludeComments() throws Exception {
943+
env.setParallelism(1);
944+
inventoryDatabase.createAndInitialize();
945+
TableId tableId =
946+
TableId.tableId(inventoryDatabase.getDatabaseName(), "products_with_comments");
947+
948+
String createTableSql =
949+
String.format(
950+
"CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n"
951+
+ " id INTEGER NOT NULL AUTO_INCREMENT COMMENT 'column comment of id' PRIMARY KEY,\n"
952+
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink' COMMENT 'column comment of name',\n"
953+
+ " weight FLOAT(6) COMMENT 'column comment of weight'\n"
954+
+ ")\n"
955+
+ "COMMENT 'table comment of products';",
956+
inventoryDatabase.getDatabaseName(), "products_with_comments");
957+
executeSql(inventoryDatabase, createTableSql);
958+
959+
Map<String, String> options = new HashMap<>();
960+
options.put(HOSTNAME.key(), MYSQL8_CONTAINER.getHost());
961+
options.put(PORT.key(), String.valueOf(MYSQL8_CONTAINER.getDatabasePort()));
962+
options.put(USERNAME.key(), TEST_USER);
963+
options.put(PASSWORD.key(), TEST_PASSWORD);
964+
options.put(SERVER_TIME_ZONE.key(), "UTC");
965+
options.put(INCLUDE_COMMENTS_ENABLED.key(), "true");
966+
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".products_with_comments");
967+
Factory.Context context =
968+
new FactoryHelper.DefaultContext(
969+
Configuration.fromMap(options), null, this.getClass().getClassLoader());
970+
971+
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
972+
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
973+
FlinkSourceProvider sourceProvider =
974+
(FlinkSourceProvider) dataSource.getEventSourceProvider();
975+
976+
CloseableIterator<Event> events =
977+
env.fromSource(
978+
sourceProvider.getSource(),
979+
WatermarkStrategy.noWatermarks(),
980+
MySqlDataSourceFactory.IDENTIFIER,
981+
new EventTypeInfo())
982+
.executeAndCollect();
983+
Thread.sleep(5_000);
984+
985+
// add some column
986+
String addColumnSql =
987+
String.format(
988+
"ALTER TABLE `%s`.`products_with_comments` ADD COLUMN `description` VARCHAR(512) comment 'column comment of description';",
989+
inventoryDatabase.getDatabaseName());
990+
executeSql(inventoryDatabase, addColumnSql);
991+
992+
List<Event> expectedEvents = getEventsWithComments(tableId);
993+
List<Event> actual = fetchResults(events, expectedEvents.size());
994+
assertEqualsInAnyOrder(
995+
expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
996+
actual.stream().map(Object::toString).collect(Collectors.toList()));
997+
}
998+
999+
private void executeSql(UniqueDatabase database, String sql) throws SQLException {
1000+
try (Connection connection = database.getJdbcConnection();
1001+
Statement statement = connection.createStatement()) {
1002+
statement.execute(sql);
1003+
}
1004+
}
1005+
1006+
private List<Event> getEventsWithComments(TableId tableId) {
1007+
return Arrays.asList(
1008+
new CreateTableEvent(
1009+
tableId,
1010+
Schema.newBuilder()
1011+
.physicalColumn(
1012+
"id", DataTypes.INT().notNull(), "column comment of id")
1013+
.physicalColumn(
1014+
"name",
1015+
DataTypes.VARCHAR(255).notNull(),
1016+
"column comment of name",
1017+
"flink")
1018+
.physicalColumn(
1019+
"weight", DataTypes.FLOAT(), "column comment of weight")
1020+
.primaryKey(Collections.singletonList("id"))
1021+
.comment("table comment of products")
1022+
.build()),
1023+
new AddColumnEvent(
1024+
tableId,
1025+
Collections.singletonList(
1026+
new AddColumnEvent.ColumnWithPosition(
1027+
Column.physicalColumn(
1028+
"description",
1029+
DataTypes.VARCHAR(512),
1030+
"column comment of description")))));
1031+
}
1032+
9341033
private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
9351034
return new CreateTableEvent(
9361035
tableId,
@@ -1196,4 +1295,4 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
11961295
inventoryDatabase.getDatabaseName()));
11971296
return expected;
11981297
}
1199-
}
1298+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti
178178
}
179179
builder.partitionKeys(partitionKeys)
180180
.primaryKey(primaryKeys)
181+
.comment(schema.comment())
181182
.options(tableOptions)
182183
.options(schema.options());
183184
catalog.createTable(

0 commit comments

Comments
 (0)