Skip to content

Commit 23d4c68

Browse files
authored
[cdc] mysql cdc supports table comment changes. (#5258)
1 parent caa2730 commit 23d4c68

37 files changed

+639
-452
lines changed

docs/content/cdc-ingestion/mysql-cdc.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,11 @@ to avoid potential name conflict.
264264
## FAQ
265265
266266
1. Chinese characters in records ingested from MySQL are garbled.
267+
267268
* Try to set `env.java.opts: -Dfile.encoding=UTF-8` in `flink-conf.yaml`(Flink version < 1.19) or `config.yaml`(Flink version >= 1.19)
268269
(the option is changed to `env.java.opts.all` since Flink-1.17).
269270
270-
2. Synchronize MySQL Table comment.
271+
2. Synchronize MySQL table and column comment.
272+
271273
* Synchronize MySQL create table comment to the paimon table, you need to configure `--mysql_conf jdbc.properties.useInformationSchema=true`.
274+
* Synchronize MySQL alter table or column comment to the paimon table, you need to configure `--mysql_conf debezium.include.schema.comments=true`.

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
2222
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2323
import org.apache.paimon.flink.action.cdc.TypeMapping;
24+
import org.apache.paimon.flink.sink.cdc.CdcSchema;
2425
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2526
import org.apache.paimon.types.DataTypes;
2627
import org.apache.paimon.types.RowKind;
27-
import org.apache.paimon.types.RowType;
2828
import org.apache.paimon.utils.TypeUtils;
2929

3030
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -76,13 +76,13 @@ protected void setRoot(CdcSourceRecord record) {
7676
protected abstract String dataField();
7777

7878
// use STRING type in default when we cannot get origin data types (most cases)
79-
protected void fillDefaultTypes(JsonNode record, RowType.Builder rowTypeBuilder) {
79+
protected void fillDefaultTypes(JsonNode record, CdcSchema.Builder schemaBuilder) {
8080
record.fieldNames()
81-
.forEachRemaining(name -> rowTypeBuilder.field(name, DataTypes.STRING()));
81+
.forEachRemaining(name -> schemaBuilder.column(name, DataTypes.STRING()));
8282
}
8383

84-
protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
85-
fillDefaultTypes(record, rowTypeBuilder);
84+
protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder schemaBuilder) {
85+
fillDefaultTypes(record, schemaBuilder);
8686
Map<String, Object> recordMap =
8787
convertValue(record, new TypeReference<Map<String, Object>>() {});
8888
Map<String, String> rowData =
@@ -103,7 +103,7 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
103103
}
104104
return Objects.toString(entry.getValue());
105105
}));
106-
evalComputedColumns(rowData, rowTypeBuilder);
106+
evalComputedColumns(rowData, schemaBuilder);
107107
return rowData;
108108
}
109109

@@ -121,9 +121,9 @@ protected List<String> extractPrimaryKeys() {
121121

122122
protected void processRecord(
123123
JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord> records) {
124-
RowType.Builder rowTypeBuilder = RowType.builder();
125-
Map<String, String> rowData = this.extractRowData(jsonNode, rowTypeBuilder);
126-
records.add(createRecord(rowKind, rowData, rowTypeBuilder.build().getFields()));
124+
CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
125+
Map<String, String> rowData = this.extractRowData(jsonNode, schemaBuilder);
126+
records.add(createRecord(rowKind, rowData, schemaBuilder));
127127
}
128128

129129
protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@
2222
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2323
import org.apache.paimon.flink.action.cdc.TypeMapping;
2424
import org.apache.paimon.flink.sink.cdc.CdcRecord;
25+
import org.apache.paimon.flink.sink.cdc.CdcSchema;
2526
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2627
import org.apache.paimon.schema.Schema;
27-
import org.apache.paimon.types.DataField;
2828
import org.apache.paimon.types.RowKind;
29-
import org.apache.paimon.types.RowType;
3029

3130
import org.apache.flink.api.common.functions.FlatMapFunction;
3231
import org.apache.flink.util.Collector;
@@ -71,20 +70,7 @@ public Schema buildSchema(CdcSourceRecord record) {
7170
}
7271

7372
Optional<RichCdcMultiplexRecord> recordOpt = extractRecords().stream().findFirst();
74-
if (!recordOpt.isPresent()) {
75-
return null;
76-
}
77-
78-
Schema.Builder builder = Schema.newBuilder();
79-
recordOpt
80-
.get()
81-
.fields()
82-
.forEach(
83-
field ->
84-
builder.column(
85-
field.name(), field.type(), field.description()));
86-
builder.primaryKey(extractPrimaryKeys());
87-
return builder.build();
73+
return recordOpt.map(RichCdcMultiplexRecord::buildSchema).orElse(null);
8874
} catch (Exception e) {
8975
logInvalidSourceRecord(record);
9076
throw e;
@@ -114,24 +100,24 @@ protected boolean isDDL() {
114100

115101
/** generate values for computed columns. */
116102
protected void evalComputedColumns(
117-
Map<String, String> rowData, RowType.Builder rowTypeBuilder) {
103+
Map<String, String> rowData, CdcSchema.Builder schemaBuilder) {
118104
computedColumns.forEach(
119105
computedColumn -> {
120106
rowData.put(
121107
computedColumn.columnName(),
122108
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
123-
rowTypeBuilder.field(computedColumn.columnName(), computedColumn.columnType());
109+
schemaBuilder.column(computedColumn.columnName(), computedColumn.columnType());
124110
});
125111
}
126112

127113
/** Handle case sensitivity here. */
128114
protected RichCdcMultiplexRecord createRecord(
129-
RowKind rowKind, Map<String, String> data, List<DataField> paimonFields) {
115+
RowKind rowKind, Map<String, String> data, CdcSchema.Builder schemaBuilder) {
116+
schemaBuilder.primaryKey(extractPrimaryKeys());
130117
return new RichCdcMultiplexRecord(
131118
getDatabaseName(),
132119
getTableName(),
133-
paimonFields,
134-
extractPrimaryKeys(),
120+
schemaBuilder.build(),
135121
new CdcRecord(rowKind, data));
136122
}
137123

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import org.apache.paimon.flink.action.cdc.TypeMapping;
2323
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
2424
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
25+
import org.apache.paimon.flink.sink.cdc.CdcSchema;
2526
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2627
import org.apache.paimon.types.DataType;
2728
import org.apache.paimon.types.RowKind;
28-
import org.apache.paimon.types.RowType;
2929
import org.apache.paimon.utils.JsonSerdeUtil;
3030

3131
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
@@ -166,7 +166,7 @@ public List<RichCdcMultiplexRecord> extractRecords() {
166166
}
167167

168168
@Override
169-
protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
169+
protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder schemaBuilder) {
170170

171171
Map<String, Object> recordMap =
172172
JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>() {});
@@ -184,14 +184,14 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
184184
Tuple3<String, Integer, Integer> typeInfo = MySqlTypeUtils.getTypeInfo(originalType);
185185
DataType paimonDataType =
186186
MySqlTypeUtils.toDataType(typeInfo.f0, typeInfo.f1, typeInfo.f2, typeMapping);
187-
rowTypeBuilder.field(originalName, paimonDataType);
187+
schemaBuilder.column(originalName, paimonDataType);
188188
}
189189

190190
for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
191191
rowData.put(entry.getKey(), Objects.toString(entry.getValue(), null));
192192
}
193193

194-
evalComputedColumns(rowData, rowTypeBuilder);
194+
evalComputedColumns(rowData, schemaBuilder);
195195
return rowData;
196196
}
197197

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import org.apache.paimon.flink.action.cdc.TypeMapping;
2323
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
2424
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
25+
import org.apache.paimon.flink.sink.cdc.CdcSchema;
2526
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2627
import org.apache.paimon.types.DataType;
2728
import org.apache.paimon.types.RowKind;
28-
import org.apache.paimon.types.RowType;
2929
import org.apache.paimon.utils.JsonSerdeUtil;
3030

3131
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
@@ -152,7 +152,7 @@ protected String dataField() {
152152
}
153153

154154
@Override
155-
protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
155+
protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder schemaBuilder) {
156156
LinkedHashMap<String, String> originalFieldTypes = tryExtractOriginalFieldTypes();
157157
Map<String, Object> recordMap =
158158
JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>() {});
@@ -167,20 +167,20 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
167167
DataType paimonDataType =
168168
MySqlTypeUtils.toDataType(
169169
typeInfo.f0, typeInfo.f1, typeInfo.f2, typeMapping);
170-
rowTypeBuilder.field(originalName, paimonDataType);
170+
schemaBuilder.column(originalName, paimonDataType);
171171

172172
String filedValue = Objects.toString(recordMap.get(originalName), null);
173173
String newValue = transformValue(filedValue, typeInfo.f0, originalType);
174174
rowData.put(originalName, newValue);
175175
}
176176
} else {
177-
fillDefaultTypes(record, rowTypeBuilder);
177+
fillDefaultTypes(record, schemaBuilder);
178178
for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
179179
rowData.put(entry.getKey(), Objects.toString(entry.getValue(), null));
180180
}
181181
}
182182

183-
evalComputedColumns(rowData, rowTypeBuilder);
183+
evalComputedColumns(rowData, schemaBuilder);
184184
return rowData;
185185
}
186186

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2323
import org.apache.paimon.flink.action.cdc.TypeMapping;
2424
import org.apache.paimon.flink.action.cdc.format.AbstractRecordParser;
25+
import org.apache.paimon.flink.sink.cdc.CdcSchema;
2526
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2627
import org.apache.paimon.types.RowKind;
27-
import org.apache.paimon.types.RowType;
2828

2929
import org.apache.avro.Schema;
3030
import org.apache.avro.generic.GenericRecord;
@@ -113,9 +113,9 @@ protected List<RichCdcMultiplexRecord> extractRecords() {
113113

114114
private void processRecord(
115115
GenericRecord record, RowKind rowKind, List<RichCdcMultiplexRecord> records) {
116-
RowType.Builder rowTypeBuilder = RowType.builder();
117-
Map<String, String> rowData = this.extractRowData(record, rowTypeBuilder);
118-
records.add(createRecord(rowKind, rowData, rowTypeBuilder.build().getFields()));
116+
CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
117+
Map<String, String> rowData = this.extractRowData(record, schemaBuilder);
118+
records.add(createRecord(rowKind, rowData, schemaBuilder));
119119
}
120120

121121
@Override
@@ -128,7 +128,7 @@ protected List<String> extractPrimaryKeys() {
128128
}
129129

130130
private Map<String, String> extractRowData(
131-
GenericRecord record, RowType.Builder rowTypeBuilder) {
131+
GenericRecord record, CdcSchema.Builder schemaBuilder) {
132132
Schema payloadSchema = sanitizedSchema(record.getSchema());
133133

134134
LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
@@ -155,10 +155,10 @@ private Map<String, String> extractRowData(
155155
record.get(fieldName),
156156
ZoneOffset.UTC);
157157
resultMap.put(fieldName, transformed);
158-
rowTypeBuilder.field(fieldName, avroToPaimonDataType(schema));
158+
schemaBuilder.column(fieldName, avroToPaimonDataType(schema));
159159
}
160160

161-
evalComputedColumns(resultMap, rowTypeBuilder);
161+
evalComputedColumns(resultMap, schemaBuilder);
162162
return resultMap;
163163
}
164164

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2323
import org.apache.paimon.flink.action.cdc.TypeMapping;
2424
import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
25+
import org.apache.paimon.flink.sink.cdc.CdcSchema;
2526
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2627
import org.apache.paimon.types.DataTypes;
2728
import org.apache.paimon.types.RowKind;
28-
import org.apache.paimon.types.RowType;
2929
import org.apache.paimon.utils.Preconditions;
3030
import org.apache.paimon.utils.TypeUtils;
3131

@@ -121,7 +121,7 @@ protected void setRoot(CdcSourceRecord record) {
121121
}
122122

123123
@Override
124-
protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
124+
protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder schemaBuilder) {
125125
// bson record should be a string
126126
Preconditions.checkArgument(
127127
record.isTextual(),
@@ -133,10 +133,10 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
133133
for (Map.Entry<String, BsonValue> entry : document.entrySet()) {
134134
String fieldName = entry.getKey();
135135
resultMap.put(fieldName, toJsonString(BsonValueConvertor.convert(entry.getValue())));
136-
rowTypeBuilder.field(fieldName, DataTypes.STRING());
136+
schemaBuilder.column(fieldName, DataTypes.STRING());
137137
}
138138

139-
evalComputedColumns(resultMap, rowTypeBuilder);
139+
evalComputedColumns(resultMap, schemaBuilder);
140140

141141
return resultMap;
142142
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2323
import org.apache.paimon.flink.action.cdc.TypeMapping;
2424
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
25+
import org.apache.paimon.flink.sink.cdc.CdcSchema;
2526
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2627
import org.apache.paimon.types.RowKind;
27-
import org.apache.paimon.types.RowType;
2828
import org.apache.paimon.utils.JsonSerdeUtil;
2929
import org.apache.paimon.utils.Preconditions;
3030

@@ -181,9 +181,9 @@ private String getString(JsonNode node, String fieldName) {
181181
}
182182

183183
@Override
184-
protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
184+
protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder schemaBuilder) {
185185
if (!hasSchema) {
186-
return super.extractRowData(record, rowTypeBuilder);
186+
return super.extractRowData(record, schemaBuilder);
187187
}
188188

189189
Map<String, Object> recordMap =
@@ -205,13 +205,13 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
205205
ZoneOffset.UTC);
206206
resultMap.put(fieldName, transformed);
207207

208-
rowTypeBuilder.field(
208+
schemaBuilder.column(
209209
fieldName,
210210
DebeziumSchemaUtils.toDataType(
211211
debeziumType, className, parameters.get(fieldName)));
212212
}
213213

214-
evalComputedColumns(resultMap, rowTypeBuilder);
214+
evalComputedColumns(resultMap, schemaBuilder);
215215

216216
return resultMap;
217217
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020

2121
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2222
import org.apache.paimon.flink.sink.cdc.CdcRecord;
23+
import org.apache.paimon.flink.sink.cdc.CdcSchema;
2324
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
24-
import org.apache.paimon.types.DataField;
2525
import org.apache.paimon.types.RowKind;
26-
import org.apache.paimon.types.RowType;
2726

2827
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2928
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -123,16 +122,11 @@ private List<RichCdcMultiplexRecord> handleOperation(
123122
*/
124123
private RichCdcMultiplexRecord processRecord(JsonNode fullDocument, RowKind rowKind)
125124
throws JsonProcessingException {
126-
RowType.Builder rowTypeBuilder = RowType.builder();
125+
CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
127126
Map<String, String> record =
128-
getExtractRow(fullDocument, rowTypeBuilder, computedColumns, mongodbConfig);
129-
List<DataField> fields = rowTypeBuilder.build().getFields();
130-
127+
getExtractRow(fullDocument, schemaBuilder, computedColumns, mongodbConfig);
128+
schemaBuilder.primaryKey(extractPrimaryKeys());
131129
return new RichCdcMultiplexRecord(
132-
databaseName,
133-
collection,
134-
fields,
135-
extractPrimaryKeys(),
136-
new CdcRecord(rowKind, record));
130+
databaseName, collection, schemaBuilder.build(), new CdcRecord(rowKind, record));
137131
}
138132
}

0 commit comments

Comments
 (0)