Skip to content

Commit 09d0227

Browse files
author
李鹏程
committed
[cdc] Support audit time during cdc by computed columns
1 parent a2ed191 commit 09d0227

File tree

18 files changed

+476
-28
lines changed

18 files changed

+476
-28
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
package org.apache.paimon.flink.action.cdc;
2020

2121
import org.apache.paimon.data.Timestamp;
22+
import org.apache.paimon.table.SpecialFields;
2223
import org.apache.paimon.types.DataType;
2324
import org.apache.paimon.types.DataTypeFamily;
2425
import org.apache.paimon.types.DataTypeJsonParser;
2526
import org.apache.paimon.types.DataTypes;
27+
import org.apache.paimon.types.RowKind;
2628
import org.apache.paimon.utils.DateTimeUtils;
2729
import org.apache.paimon.utils.SerializableSupplier;
2830
import org.apache.paimon.utils.StringUtils;
@@ -141,7 +143,15 @@ enum ExpressionFunction {
141143
referencedField.fieldType(),
142144
referencedField.literals());
143145
}),
144-
CAST((typeMapping, caseSensitive, args) -> cast(args));
146+
CAST((typeMapping, caseSensitive, args) -> cast(args)),
147+
CREATE_TIME(
148+
(typeMapping, caseSensitive, args) -> {
149+
return new AuditTimeExpression(true);
150+
}),
151+
UPDATE_TIME(
152+
(typeMapping, caseSensitive, args) -> {
153+
return new AuditTimeExpression(false);
154+
});
145155

146156
public final ExpressionCreator creator;
147157

@@ -608,4 +618,37 @@ public String eval(String input) {
608618
return value;
609619
}
610620
}
621+
622+
/** Get current timestamp. */
623+
final class AuditTimeExpression implements Expression {
624+
625+
private final boolean insertOnly;
626+
private final String fieldReference;
627+
628+
public AuditTimeExpression(boolean insertOnly) {
629+
this.insertOnly = insertOnly;
630+
this.fieldReference = SpecialFields.VALUE_KIND.name();
631+
}
632+
633+
@Override
634+
public String fieldReference() {
635+
return this.fieldReference;
636+
}
637+
638+
@Override
639+
public DataType outputType() {
640+
return DataTypes.TIMESTAMP();
641+
}
642+
643+
@Override
644+
public String eval(String input) {
645+
RowKind rowKind = RowKind.fromShortString(input);
646+
String now = java.sql.Timestamp.valueOf(LocalDateTime.now()).toString();
647+
if (rowKind == RowKind.INSERT) {
648+
return now;
649+
}
650+
651+
return insertOnly ? null : now;
652+
}
653+
}
611654
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2828
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
2929
import org.apache.paimon.table.FileStoreTable;
30+
import org.apache.paimon.table.SpecialFields;
3031

3132
import org.apache.flink.api.common.functions.FlatMapFunction;
3233
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -44,6 +45,7 @@
4445
import java.util.regex.Pattern;
4546

4647
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
48+
import static org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
4749

4850
/** Base {@link Action} for synchronizing into one Paimon database. */
4951
public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
@@ -58,6 +60,8 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
5860
protected String includingTables = ".*";
5961
protected List<String> partitionKeys = new ArrayList<>();
6062
protected List<String> primaryKeys = new ArrayList<>();
63+
protected List<String> computedColumnArgs = new ArrayList<>();
64+
protected List<ComputedColumn> computedColumns = new ArrayList<>();
6165
@Nullable protected String excludingTables;
6266
protected String includingDbs = ".*";
6367
@Nullable protected String excludingDbs;
@@ -165,10 +169,15 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) {
165169
return this;
166170
}
167171

172+
public SyncDatabaseActionBase withComputedColumnArgs(List<String> computedColumnArgs) {
173+
this.computedColumnArgs = computedColumnArgs;
174+
return this;
175+
}
176+
168177
@Override
169178
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
170179
return syncJobHandler.provideRecordParser(
171-
Collections.emptyList(), typeMapping, metadataConverters);
180+
this.computedColumns, typeMapping, metadataConverters);
172181
}
173182

174183
public SyncDatabaseActionBase withPartitionKeyMultiple(
@@ -237,4 +246,11 @@ protected void buildSink(
237246
.withTableOptions(tableConfig)
238247
.build();
239248
}
249+
250+
@Override
251+
protected void beforeBuildingSourceSink() throws Exception {
252+
computedColumns =
253+
buildComputedColumns(
254+
computedColumnArgs, Collections.singletonList(SpecialFields.VALUE_KIND));
255+
}
240256
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import org.apache.paimon.flink.action.ActionFactory;
2323
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
2424

25+
import java.util.ArrayList;
2526
import java.util.Optional;
2627

28+
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
2729
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
2830
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
2931
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
@@ -77,5 +79,10 @@ protected void withParams(MultipleParameterToolAdapter params, T action) {
7779
String[] options = params.get(TYPE_MAPPING).split(",");
7880
action.withTypeMapping(TypeMapping.parse(options));
7981
}
82+
83+
if (params.has(COMPUTED_COLUMN)) {
84+
action.withComputedColumnArgs(
85+
new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
86+
}
8087
}
8188
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ protected void fillDefaultTypes(JsonNode record, RowType.Builder rowTypeBuilder)
8181
.forEachRemaining(name -> rowTypeBuilder.field(name, DataTypes.STRING()));
8282
}
8383

84-
protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
84+
protected Map<String, String> extractRowData(
85+
RowKind rowKind, JsonNode record, RowType.Builder rowTypeBuilder) {
8586
fillDefaultTypes(record, rowTypeBuilder);
8687
Map<String, Object> recordMap =
8788
convertValue(record, new TypeReference<Map<String, Object>>() {});
@@ -103,7 +104,8 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
103104
}
104105
return Objects.toString(entry.getValue());
105106
}));
106-
evalComputedColumns(rowData, rowTypeBuilder);
107+
108+
evalComputedColumns(rowKind, rowData, rowTypeBuilder);
107109
return rowData;
108110
}
109111

@@ -122,7 +124,13 @@ protected List<String> extractPrimaryKeys() {
122124
protected void processRecord(
123125
JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord> records) {
124126
RowType.Builder rowTypeBuilder = RowType.builder();
127+
<<<<<<< HEAD
125128
Map<String, String> rowData = this.extractRowData(jsonNode, rowTypeBuilder);
129+
evalComputedColumns(rowKind, rowData, rowTypeBuilder);
130+
=======
131+
Map<String, String> rowData = this.extractRowData(rowKind, jsonNode, rowTypeBuilder);
132+
133+
>>>>>>> 1e76b3018 ([cdc] Support audit time during cdc by computed columns)
126134
records.add(createRecord(rowKind, rowData, rowTypeBuilder.build().getFields()));
127135
}
128136

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.flink.sink.cdc.CdcRecord;
2525
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2626
import org.apache.paimon.schema.Schema;
27+
import org.apache.paimon.table.SpecialFields;
2728
import org.apache.paimon.types.DataField;
2829
import org.apache.paimon.types.RowKind;
2930
import org.apache.paimon.types.RowType;
@@ -37,6 +38,7 @@
3738

3839
import java.util.List;
3940
import java.util.Map;
41+
import java.util.Objects;
4042
import java.util.Optional;
4143

4244
/**
@@ -114,12 +116,17 @@ protected boolean isDDL() {
114116

115117
/** generate values for computed columns. */
116118
protected void evalComputedColumns(
117-
Map<String, String> rowData, RowType.Builder rowTypeBuilder) {
119+
RowKind rowKind, Map<String, String> rowData, RowType.Builder rowTypeBuilder) {
118120
computedColumns.forEach(
119121
computedColumn -> {
120-
rowData.put(
121-
computedColumn.columnName(),
122-
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
122+
String argVal;
123+
if (Objects.equals(
124+
computedColumn.fieldReference(), SpecialFields.VALUE_KIND.name())) {
125+
argVal = rowKind.shortString();
126+
} else {
127+
argVal = rowData.get(computedColumn.fieldReference());
128+
}
129+
rowData.put(computedColumn.columnName(), computedColumn.eval(argVal));
123130
rowTypeBuilder.field(computedColumn.columnName(), computedColumn.columnType());
124131
});
125132
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ public List<RichCdcMultiplexRecord> extractRecords() {
168168
}
169169

170170
@Override
171-
protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
171+
protected Map<String, String> extractRowData(
172+
RowKind rowKind, JsonNode record, RowType.Builder rowTypeBuilder) {
172173

173174
Map<String, Object> recordMap =
174175
JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>() {});
@@ -179,7 +180,7 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
179180
rowData.put(entry.getKey(), Objects.toString(entry.getValue(), null));
180181
}
181182

182-
evalComputedColumns(rowData, rowTypeBuilder);
183+
evalComputedColumns(rowKind, rowData, rowTypeBuilder);
183184
return rowData;
184185
}
185186

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ protected String dataField() {
152152
}
153153

154154
@Override
155-
protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
155+
protected Map<String, String> extractRowData(
156+
RowKind rowKind, JsonNode record, RowType.Builder rowTypeBuilder) {
156157
LinkedHashMap<String, String> originalFieldTypes = tryExtractOriginalFieldTypes();
157158
Map<String, Object> recordMap =
158159
JsonSerdeUtil.convertValue(record, new TypeReference<Map<String, Object>>() {});
@@ -180,7 +181,7 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
180181
}
181182
}
182183

183-
evalComputedColumns(rowData, rowTypeBuilder);
184+
evalComputedColumns(rowKind, rowData, rowTypeBuilder);
184185
return rowData;
185186
}
186187

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ protected List<RichCdcMultiplexRecord> extractRecords() {
114114
private void processRecord(
115115
GenericRecord record, RowKind rowKind, List<RichCdcMultiplexRecord> records) {
116116
RowType.Builder rowTypeBuilder = RowType.builder();
117-
Map<String, String> rowData = this.extractRowData(record, rowTypeBuilder);
117+
Map<String, String> rowData = this.extractRowData(rowKind, record, rowTypeBuilder);
118118
records.add(createRecord(rowKind, rowData, rowTypeBuilder.build().getFields()));
119119
}
120120

@@ -128,7 +128,7 @@ protected List<String> extractPrimaryKeys() {
128128
}
129129

130130
private Map<String, String> extractRowData(
131-
GenericRecord record, RowType.Builder rowTypeBuilder) {
131+
RowKind rowKind, GenericRecord record, RowType.Builder rowTypeBuilder) {
132132
Schema payloadSchema = sanitizedSchema(record.getSchema());
133133

134134
LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
@@ -158,7 +158,10 @@ private Map<String, String> extractRowData(
158158
rowTypeBuilder.field(fieldName, avroToPaimonDataType(schema));
159159
}
160160

161-
evalComputedColumns(resultMap, rowTypeBuilder);
161+
<<<<<<< HEAD
162+
=======
163+
evalComputedColumns(rowKind, resultMap, rowTypeBuilder);
164+
>>>>>>> 1e76b3018 ([cdc] Support audit time during cdc by computed columns)
162165
return resultMap;
163166
}
164167

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.flink.action.cdc.TypeMapping;
2424
import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
2525
import org.apache.paimon.types.DataTypes;
26+
import org.apache.paimon.types.RowKind;
2627
import org.apache.paimon.types.RowType;
2728
import org.apache.paimon.utils.Preconditions;
2829
import org.apache.paimon.utils.TypeUtils;
@@ -80,7 +81,8 @@ protected void setRoot(CdcSourceRecord record) {
8081
}
8182

8283
@Override
83-
protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
84+
protected Map<String, String> extractRowData(
85+
RowKind rowKind, JsonNode record, RowType.Builder rowTypeBuilder) {
8486
// bson record should be a string
8587
Preconditions.checkArgument(
8688
record.isTextual(),
@@ -95,8 +97,7 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
9597
rowTypeBuilder.field(fieldName, DataTypes.STRING());
9698
}
9799

98-
evalComputedColumns(resultMap, rowTypeBuilder);
99-
100+
evalComputedColumns(rowKind, resultMap, rowTypeBuilder);
100101
return resultMap;
101102
}
102103

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,10 @@ private String getString(JsonNode node, String fieldName) {
181181
}
182182

183183
@Override
184-
protected Map<String, String> extractRowData(JsonNode record, RowType.Builder rowTypeBuilder) {
184+
protected Map<String, String> extractRowData(
185+
RowKind rowKind, JsonNode record, RowType.Builder rowTypeBuilder) {
185186
if (!hasSchema) {
186-
return super.extractRowData(record, rowTypeBuilder);
187+
return super.extractRowData(rowKind, record, rowTypeBuilder);
187188
}
188189

189190
Map<String, Object> recordMap =
@@ -211,8 +212,6 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
211212
debeziumType, className, parameters.get(fieldName)));
212213
}
213214

214-
evalComputedColumns(resultMap, rowTypeBuilder);
215-
216215
return resultMap;
217216
}
218217

0 commit comments

Comments
 (0)