Skip to content

Commit 1ca6efb

Browse files
author
李鹏程
committed
[cdc] Support audit time during cdc by computed columns
1 parent b5ee79a commit 1ca6efb

File tree

12 files changed

+411
-8
lines changed

12 files changed

+411
-8
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ enum ExpressionFunction {
141141
referencedField.fieldType(),
142142
referencedField.literals());
143143
}),
144-
CAST((typeMapping, caseSensitive, args) -> cast(args));
144+
CAST((typeMapping, caseSensitive, args) -> cast(args)),
145+
NOW((typeMapping, caseSensitive, args) -> new NowExpression());
145146

146147
public final ExpressionCreator creator;
147148

@@ -608,4 +609,22 @@ public String eval(String input) {
608609
return value;
609610
}
610611
}
612+
613+
/** Get current timestamp. */
614+
final class NowExpression implements Expression {
615+
@Override
616+
public String fieldReference() {
617+
return null;
618+
}
619+
620+
@Override
621+
public DataType outputType() {
622+
return DataTypes.TIMESTAMP();
623+
}
624+
625+
@Override
626+
public String eval(String input) {
627+
return java.sql.Timestamp.valueOf(LocalDateTime.now()).toString();
628+
}
629+
}
611630
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2929
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
3030
import org.apache.paimon.table.FileStoreTable;
31+
import org.apache.paimon.table.SpecialFields;
3132

3233
import org.apache.flink.api.common.functions.FlatMapFunction;
3334
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -45,6 +46,7 @@
4546
import java.util.regex.Pattern;
4647

4748
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
49+
import static org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
4850

4951
/** Base {@link Action} for synchronizing into one Paimon database. */
5052
public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
@@ -60,6 +62,8 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
6062
protected String includingTables = ".*";
6163
protected List<String> partitionKeys = new ArrayList<>();
6264
protected List<String> primaryKeys = new ArrayList<>();
65+
protected List<String> computedColumnArgs = new ArrayList<>();
66+
protected List<ComputedColumn> computedColumns = new ArrayList<>();
6367
@Nullable protected String excludingTables;
6468
protected String includingDbs = ".*";
6569
@Nullable protected String excludingDbs;
@@ -172,10 +176,15 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) {
172176
return this;
173177
}
174178

179+
public SyncDatabaseActionBase withComputedColumnArgs(List<String> computedColumnArgs) {
180+
this.computedColumnArgs = computedColumnArgs;
181+
return this;
182+
}
183+
175184
@Override
176185
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
177186
return syncJobHandler.provideRecordParser(
178-
Collections.emptyList(), typeMapping, metadataConverters);
187+
this.computedColumns, typeMapping, metadataConverters);
179188
}
180189

181190
public SyncDatabaseActionBase withPartitionKeyMultiple(
@@ -261,4 +270,11 @@ protected void buildSink(
261270
excludingTables))
262271
.build();
263272
}
273+
274+
@Override
275+
protected void beforeBuildingSourceSink() throws Exception {
276+
computedColumns =
277+
buildComputedColumns(
278+
computedColumnArgs, Collections.singletonList(SpecialFields.VALUE_KIND));
279+
}
264280
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import org.apache.paimon.flink.action.ActionFactory;
2323
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
2424

25+
import java.util.ArrayList;
2526
import java.util.Optional;
2627

28+
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
2729
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EAGER_INIT;
2830
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
2931
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
@@ -79,5 +81,10 @@ protected void withParams(MultipleParameterToolAdapter params, T action) {
7981
String[] options = params.get(TYPE_MAPPING).split(",");
8082
action.withTypeMapping(TypeMapping.parse(options));
8183
}
84+
85+
if (params.has(COMPUTED_COLUMN)) {
86+
action.withComputedColumnArgs(
87+
new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
88+
}
8289
}
8390
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
103103
}
104104
return Objects.toString(entry.getValue());
105105
}));
106+
106107
evalComputedColumns(rowData, rowTypeBuilder);
107108
return rowData;
108109
}

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ protected void assertTablePartitionKeys(Map<String, String> partitionKeyMultiple
130130
protected void waitForResult(
131131
List<String> expected, FileStoreTable table, RowType rowType, List<String> primaryKeys)
132132
throws Exception {
133+
waitForResult(false, expected, table, rowType, primaryKeys);
134+
}
135+
136+
protected void waitForResult(
137+
boolean withRegx,
138+
List<String> expected,
139+
FileStoreTable table,
140+
RowType rowType,
141+
List<String> primaryKeys)
142+
throws Exception {
133143
assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);
134144

135145
// wait for table schema to become our expected schema
@@ -143,6 +153,13 @@ protected void waitForResult(
143153
if (sameName && sameType) {
144154
cnt++;
145155
}
156+
157+
LOG.info("actual: field: " + field.name() + " " + field.type());
158+
LOG.info(
159+
"expected:rowType: "
160+
+ rowType.getFieldNames().get(i)
161+
+ " "
162+
+ rowType.getFieldTypes().get(i));
146163
}
147164
if (cnt == rowType.getFieldCount()) {
148165
break;
@@ -165,15 +182,40 @@ protected void waitForResult(
165182
rowType);
166183
List<String> sortedActual = new ArrayList<>(result);
167184
Collections.sort(sortedActual);
168-
if (sortedExpected.equals(sortedActual)) {
169-
break;
170-
}
185+
171186
LOG.info("actual: " + sortedActual);
172187
LOG.info("expected: " + sortedExpected);
188+
189+
if (withRegx) {
190+
if (isRegxMatchList(sortedActual, sortedExpected)) {
191+
break;
192+
}
193+
} else if (sortedExpected.equals(sortedActual)) {
194+
break;
195+
}
196+
LOG.warn("actual: " + sortedActual);
197+
LOG.warn("expected: " + sortedExpected);
173198
Thread.sleep(1000);
174199
}
175200
}
176201

202+
private boolean isRegxMatchList(List<String> actual, List<String> expected) {
203+
if (actual == null && expected == null) {
204+
return true;
205+
}
206+
if (actual.size() != expected.size()) {
207+
return false;
208+
}
209+
210+
for (int i = 0; i < actual.size(); i++) {
211+
if (!actual.get(i).matches(expected.get(i))) {
212+
return false;
213+
}
214+
}
215+
216+
return true;
217+
}
218+
177219
protected Map<String, String> getBasicTableConfig() {
178220
Map<String, String> config = new HashMap<>();
179221
ThreadLocalRandom random = ThreadLocalRandom.current();
@@ -392,6 +434,7 @@ protected abstract class SyncDatabaseActionBuilder<T extends SynchronizationActi
392434
private final List<String> partitionKeys = new ArrayList<>();
393435
private final List<String> primaryKeys = new ArrayList<>();
394436
private final List<String> metadataColumn = new ArrayList<>();
437+
private final List<String> computedColumnArgs = new ArrayList<>();
395438
protected Map<String, String> partitionKeyMultiple = new HashMap<>();
396439

397440
public SyncDatabaseActionBuilder(Class<T> clazz, Map<String, String> sourceConfig) {
@@ -464,6 +507,12 @@ public SyncDatabaseActionBuilder<T> withMetadataColumn(List<String> metadataColu
464507
return this;
465508
}
466509

510+
public SyncDatabaseActionBuilder<T> withComputedColumnArgs(
511+
List<String> computedColumnArgs) {
512+
this.computedColumnArgs.addAll(computedColumnArgs);
513+
return this;
514+
}
515+
467516
public SyncDatabaseActionBuilder<T> withPartitionKeyMultiple(
468517
Map<String, String> partitionKeyMultiple) {
469518
if (partitionKeyMultiple != null) {
@@ -500,6 +549,8 @@ public T build() {
500549
args.addAll(listToArgs("--primary-keys", primaryKeys));
501550
args.addAll(listToArgs("--metadata-column", metadataColumn));
502551

552+
args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
553+
503554
return createAction(clazz, args);
504555
}
505556
}

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
package org.apache.paimon.flink.action.cdc.format.aliyun;
2020

21+
import org.apache.paimon.data.BinaryString;
2122
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
23+
import org.apache.paimon.flink.action.cdc.ComputedColumn;
24+
import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
2225
import org.apache.paimon.flink.action.cdc.TypeMapping;
2326
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
2427
import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
2528
import org.apache.paimon.flink.sink.cdc.CdcRecord;
2629
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2730
import org.apache.paimon.schema.Schema;
2831
import org.apache.paimon.types.RowKind;
32+
import org.apache.paimon.utils.BinaryStringUtils;
2933

3034
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
3135
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,6 +47,7 @@
4347
import java.util.Arrays;
4448
import java.util.Collections;
4549
import java.util.List;
50+
import java.util.Map;
4651

4752
/** Test for AliyunJsonRecordParser. */
4853
public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase {
@@ -51,14 +56,22 @@ public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase {
5156
private static List<String> insertList = new ArrayList<>();
5257
private static List<String> updateList = new ArrayList<>();
5358
private static List<String> deleteList = new ArrayList<>();
59+
private static List<ComputedColumn> computedColumns = new ArrayList<>();
5460

5561
private static ObjectMapper objMapper = new ObjectMapper();
5662

63+
String dateTimeRegex = "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}";
64+
5765
@Before
5866
public void setup() {
5967
String insertRes = "kafka/aliyun/table/event/event-insert.txt";
6068
String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt";
6169
String deleteRes = "kafka/aliyun/table/event/event-delete.txt";
70+
71+
String[] computedColumnArgs = {
72+
"etl_create_time=create_time()", "etl_update_time=update_time()"
73+
};
74+
6275
URL url;
6376
try {
6477
url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(insertRes);
@@ -76,15 +89,20 @@ public void setup() {
7689
.filter(this::isRecordLine)
7790
.forEach(e -> deleteList.add(e));
7891

92+
computedColumns =
93+
ComputedColumnUtils.buildComputedColumns(
94+
Arrays.asList(computedColumnArgs), Collections.emptyList());
95+
7996
} catch (Exception e) {
8097
log.error("Fail to init aliyun-json cases", e);
8198
}
8299
}
83100

84101
@Test
85102
public void extractInsertRecord() throws Exception {
103+
86104
AliyunRecordParser parser =
87-
new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
105+
new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
88106
for (String json : insertList) {
89107
// 将json解析为JsonNode对象
90108
JsonNode rootNode = objMapper.readValue(json, JsonNode.class);
@@ -106,13 +124,31 @@ public void extractInsertRecord() throws Exception {
106124

107125
MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
108126
Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
127+
128+
Map<String, String> data = records.get(0).toRichCdcRecord().toCdcRecord().data();
129+
String createTime = data.get("etl_create_time");
130+
String updateTime = data.get("etl_update_time");
131+
132+
// Mock the real timestamp string which retrieved from store and convert through paimon
133+
// Timestamp
134+
createTime =
135+
BinaryStringUtils.toTimestamp(BinaryString.fromString(createTime), 6)
136+
.toString();
137+
updateTime =
138+
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
139+
.toString();
140+
141+
Assert.assertTrue(createTime.matches(dateTimeRegex));
142+
Assert.assertTrue(updateTime.matches(dateTimeRegex));
143+
144+
log.info("createTime: {}, updateTime: {}", createTime, updateTime);
109145
}
110146
}
111147

112148
@Test
113149
public void extractUpdateRecord() throws Exception {
114150
AliyunRecordParser parser =
115-
new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
151+
new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
116152
for (String json : updateList) {
117153
// 将json解析为JsonNode对象
118154
JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
@@ -134,13 +170,24 @@ public void extractUpdateRecord() throws Exception {
134170

135171
MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
136172
Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
173+
174+
Map<String, String> data = records.get(0).toRichCdcRecord().toCdcRecord().data();
175+
String createTime = data.get("etl_create_time");
176+
String updateTime = data.get("etl_update_time");
177+
Assert.assertNull(createTime);
178+
179+
updateTime =
180+
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
181+
.toString();
182+
183+
Assert.assertTrue(updateTime.matches(dateTimeRegex));
137184
}
138185
}
139186

140187
@Test
141188
public void extractDeleteRecord() throws Exception {
142189
AliyunRecordParser parser =
143-
new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
190+
new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
144191
for (String json : deleteList) {
145192
// 将json解析为JsonNode对象
146193
JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
@@ -162,6 +209,17 @@ public void extractDeleteRecord() throws Exception {
162209

163210
MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
164211
Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
212+
213+
Map<String, String> data = records.get(0).toRichCdcRecord().toCdcRecord().data();
214+
String createTime = data.get("etl_create_time");
215+
String updateTime = data.get("etl_update_time");
216+
Assert.assertNull(createTime);
217+
218+
updateTime =
219+
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
220+
.toString();
221+
222+
Assert.assertTrue(updateTime.matches(dateTimeRegex));
165223
}
166224
}
167225
}

0 commit comments

Comments
 (0)