Skip to content

Commit 2eda3dd

Browse files
committed
revert case
1 parent a5e3b96 commit 2eda3dd

File tree

10 files changed

+90
-87
lines changed

10 files changed

+90
-87
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.flink.cdc.common.types.RowType;
2626
import org.apache.flink.cdc.common.utils.Preconditions;
2727

28+
import org.apache.commons.lang3.StringUtils;
29+
2830
import javax.annotation.Nullable;
2931

3032
import java.io.Serializable;
@@ -239,7 +241,9 @@ public String toString() {
239241
if (!partitionKeys.isEmpty()) {
240242
sb.append(", partitionKeys=").append(String.join(";", partitionKeys));
241243
}
242-
sb.append(", comment=").append(comment);
244+
if (StringUtils.isNotBlank(comment)) {
245+
sb.append(", comment=").append(comment);
246+
}
243247
sb.append(", options=").append(describeOptions());
244248

245249
return sb.toString();

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 33 additions & 34 deletions
Large diffs are not rendered by default.

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlParallelizedPipelineITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public void testExtremeParallelizedSchemaChange() throws Exception {
189189
.map(
190190
subTaskId ->
191191
String.format(
192-
"%d> CreateTableEvent{tableId=%s.TABLE%d, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
192+
"%d> CreateTableEvent{tableId=%s.TABLE%d, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
193193
subTaskId,
194194
parallelismDatabase
195195
.getDatabaseName(),

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelperTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void testConvertEventToStr() {
5050

5151
List<RecordData.FieldGetter> fieldGetters = SchemaUtils.createFieldGetters(schema);
5252
Assert.assertEquals(
53-
"CreateTableEvent{tableId=default.default.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
53+
"CreateTableEvent{tableId=default.default.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
5454
ValuesDataSinkHelper.convertEventToStr(
5555
new CreateTableEvent(tableId, schema), fieldGetters));
5656

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,12 @@ public void testSyncWholeDatabase() throws Exception {
126126
mysqlInventoryDatabase.getDatabaseName()));
127127

128128
validateResult(
129-
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, comment=null, options=()}",
129+
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
130130
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
131131
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}",
132132
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}",
133133
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}",
134-
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, comment=null, options=()}",
134+
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
135135
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
136136
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}",
137137
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}",
@@ -235,12 +235,12 @@ public void testSchemaChangeEvents() throws Exception {
235235
mysqlInventoryDatabase.getDatabaseName()));
236236

237237
validateResult(
238-
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, comment=null, options=()}",
238+
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
239239
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
240240
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}",
241241
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}",
242242
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}",
243-
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, comment=null, options=()}",
243+
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
244244
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
245245
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}",
246246
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}",
@@ -381,12 +381,12 @@ public void testSoftDelete() throws Exception {
381381
mysqlInventoryDatabase.getDatabaseName()));
382382

383383
validateResult(
384-
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512),`op_type` STRING NOT NULL}, primaryKeys=id, comment=null, options=()}",
384+
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512),`op_type` STRING NOT NULL}, primaryKeys=id, options=()}",
385385
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234, +I], op=INSERT, meta=()}",
386386
"DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234, +I], op=INSERT, meta=()}",
387387
"DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234, +I], op=INSERT, meta=()}",
388388
"DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234, +I], op=INSERT, meta=()}",
389-
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING,`op_type` STRING NOT NULL}, primaryKeys=id, comment=null, options=()}",
389+
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING,`op_type` STRING NOT NULL}, primaryKeys=id, options=()}",
390390
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null, +I], op=INSERT, meta=()}",
391391
"DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null, +I], op=INSERT, meta=()}",
392392
"DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null, +I], op=INSERT, meta=()}",
@@ -546,8 +546,8 @@ public void testDanglingDropTableEventInBinlog() throws Exception {
546546
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
547547
waitUntilJobRunning(Duration.ofSeconds(30));
548548
validateResult(
549-
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, comment=null, options=()}",
550-
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, comment=null, options=()}");
549+
"CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
550+
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}");
551551
}
552552

553553
private void validateResult(String... expectedEvents) throws Exception {

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -173,19 +173,19 @@ public void testDefaultRoute() throws Exception {
173173

174174
waitUntilSpecificEvent(
175175
String.format(
176-
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
176+
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
177177
routeTestDatabase.getDatabaseName()));
178178
waitUntilSpecificEvent(
179179
String.format(
180-
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
180+
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
181181
routeTestDatabase.getDatabaseName()));
182182
waitUntilSpecificEvent(
183183
String.format(
184-
"CreateTableEvent{tableId=%s.TABLEGAMMA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
184+
"CreateTableEvent{tableId=%s.TABLEGAMMA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
185185
routeTestDatabase.getDatabaseName()));
186186
waitUntilSpecificEvent(
187187
String.format(
188-
"CreateTableEvent{tableId=%s.TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
188+
"CreateTableEvent{tableId=%s.TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
189189
routeTestDatabase.getDatabaseName()));
190190

191191
validateResult(
@@ -270,7 +270,7 @@ public void testMergeTableRoute() throws Exception {
270270

271271
waitUntilSpecificEvent(
272272
String.format(
273-
"CreateTableEvent{tableId=%s.ALL, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
273+
"CreateTableEvent{tableId=%s.ALL, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
274274
routeTestDatabase.getDatabaseName()));
275275

276276
validateResult(
@@ -354,17 +354,17 @@ public void testPartialRoute() throws Exception {
354354

355355
waitUntilSpecificEvent(
356356
String.format(
357-
"CreateTableEvent{tableId=NEW_%s.ALPHABET, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
357+
"CreateTableEvent{tableId=NEW_%s.ALPHABET, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
358358
routeTestDatabase.getDatabaseName()));
359359

360360
waitUntilSpecificEvent(
361361
String.format(
362-
"CreateTableEvent{tableId=%s.TABLEGAMMA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
362+
"CreateTableEvent{tableId=%s.TABLEGAMMA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
363363
routeTestDatabase.getDatabaseName()));
364364

365365
waitUntilSpecificEvent(
366366
String.format(
367-
"CreateTableEvent{tableId=%s.TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
367+
"CreateTableEvent{tableId=%s.TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
368368
routeTestDatabase.getDatabaseName()));
369369

370370
validateResult(
@@ -454,17 +454,17 @@ public void testMultipleRoute() throws Exception {
454454

455455
waitUntilSpecificEvent(
456456
String.format(
457-
"CreateTableEvent{tableId=NEW_%s.ALPHABET, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
457+
"CreateTableEvent{tableId=NEW_%s.ALPHABET, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
458458
routeTestDatabase.getDatabaseName()));
459459

460460
waitUntilSpecificEvent(
461461
String.format(
462-
"CreateTableEvent{tableId=NEW_%s.BETAGAMM, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
462+
"CreateTableEvent{tableId=NEW_%s.BETAGAMM, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
463463
routeTestDatabase.getDatabaseName()));
464464

465465
waitUntilSpecificEvent(
466466
String.format(
467-
"CreateTableEvent{tableId=%s.TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
467+
"CreateTableEvent{tableId=%s.TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
468468
routeTestDatabase.getDatabaseName()));
469469

470470
validateResult(
@@ -564,17 +564,17 @@ public void testOneToManyRoute() throws Exception {
564564

565565
waitUntilSpecificEvent(
566566
String.format(
567-
"CreateTableEvent{tableId=NEW_%s.TABLEA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
567+
"CreateTableEvent{tableId=NEW_%s.TABLEA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
568568
routeTestDatabase.getDatabaseName()));
569569

570570
waitUntilSpecificEvent(
571571
String.format(
572-
"CreateTableEvent{tableId=NEW_%s.TABLEB, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
572+
"CreateTableEvent{tableId=NEW_%s.TABLEB, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
573573
routeTestDatabase.getDatabaseName()));
574574

575575
waitUntilSpecificEvent(
576576
String.format(
577-
"CreateTableEvent{tableId=NEW_%s.TABLEC, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
577+
"CreateTableEvent{tableId=NEW_%s.TABLEC, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
578578
routeTestDatabase.getDatabaseName()));
579579

580580
validateResult(
@@ -674,7 +674,7 @@ public void testMergeTableRouteWithTransform() throws Exception {
674674

675675
waitUntilSpecificEvent(
676676
String.format(
677-
"CreateTableEvent{tableId=%s.ALL, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`EXTRAS` STRING}, primaryKeys=ID, comment=null, options=()}",
677+
"CreateTableEvent{tableId=%s.ALL, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`EXTRAS` STRING}, primaryKeys=ID, options=()}",
678678
routeTestDatabase.getDatabaseName()));
679679

680680
validateResult(
@@ -760,19 +760,19 @@ public void testReplacementSymbol() throws Exception {
760760

761761
waitUntilSpecificEvent(
762762
String.format(
763-
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
763+
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
764764
routeTestDatabase.getDatabaseName()));
765765
waitUntilSpecificEvent(
766766
String.format(
767-
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
767+
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
768768
routeTestDatabase.getDatabaseName()));
769769
waitUntilSpecificEvent(
770770
String.format(
771-
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEGAMMA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
771+
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEGAMMA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
772772
routeTestDatabase.getDatabaseName()));
773773
waitUntilSpecificEvent(
774774
String.format(
775-
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
775+
"CreateTableEvent{tableId=NEW_%s.NEW_TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
776776
routeTestDatabase.getDatabaseName()));
777777

778778
validateResult(
@@ -880,7 +880,7 @@ public void testExtremeMergeTableRoute() throws Exception {
880880
i ->
881881
String.format(
882882
prefix
883-
+ "CreateTableEvent{tableId=%s.TABLE%d, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, comment=null, options=()}",
883+
+ "CreateTableEvent{tableId=%s.TABLE%d, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}",
884884
databaseName,
885885
i))
886886
.toArray(String[]::new));

0 commit comments

Comments
 (0)