Skip to content

Commit 384674c

Browse files
authored
[cdc] Support computed expression of now (#5138)
1 parent 417109d commit 384674c

File tree

10 files changed

+300
-9
lines changed

10 files changed

+300
-9
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ enum ExpressionFunction {
141141
referencedField.fieldType(),
142142
referencedField.literals());
143143
}),
144-
CAST((typeMapping, caseSensitive, args) -> cast(args));
144+
CAST((typeMapping, caseSensitive, args) -> cast(args)),
145+
NOW((typeMapping, caseSensitive, args) -> new NowExpression());
145146

146147
public final ExpressionCreator creator;
147148

@@ -608,4 +609,22 @@ public String eval(String input) {
608609
return value;
609610
}
610611
}
612+
613+
/** Get current timestamp. */
614+
final class NowExpression implements Expression {
615+
@Override
616+
public String fieldReference() {
617+
return null;
618+
}
619+
620+
@Override
621+
public DataType outputType() {
622+
return DataTypes.TIMESTAMP(3);
623+
}
624+
625+
@Override
626+
public String eval(String input) {
627+
return DateTimeUtils.formatLocalDateTime(LocalDateTime.now(), 3);
628+
}
629+
}
611630
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.regex.Pattern;
4646

4747
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
48+
import static org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
4849

4950
/** Base {@link Action} for synchronizing into one Paimon database. */
5051
public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
@@ -60,6 +61,7 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
6061
protected String includingTables = ".*";
6162
protected List<String> partitionKeys = new ArrayList<>();
6263
protected List<String> primaryKeys = new ArrayList<>();
64+
protected List<ComputedColumn> computedColumns = new ArrayList<>();
6365
@Nullable protected String excludingTables;
6466
protected String includingDbs = ".*";
6567
@Nullable protected String excludingDbs;
@@ -172,10 +174,15 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) {
172174
return this;
173175
}
174176

177+
public SyncDatabaseActionBase withComputedColumnArgs(List<String> computedColumnArgs) {
178+
this.computedColumns = buildComputedColumns(computedColumnArgs, Collections.emptyList());
179+
return this;
180+
}
181+
175182
@Override
176183
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
177184
return syncJobHandler.provideRecordParser(
178-
Collections.emptyList(), typeMapping, metadataConverters);
185+
this.computedColumns, typeMapping, metadataConverters);
179186
}
180187

181188
public SyncDatabaseActionBase withPartitionKeyMultiple(

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import org.apache.paimon.flink.action.ActionFactory;
2323
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
2424

25+
import java.util.ArrayList;
2526
import java.util.Optional;
2627

28+
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
2729
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EAGER_INIT;
2830
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
2931
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
@@ -79,5 +81,10 @@ protected void withParams(MultipleParameterToolAdapter params, T action) {
7981
String[] options = params.get(TYPE_MAPPING).split(",");
8082
action.withTypeMapping(TypeMapping.parse(options));
8183
}
84+
85+
if (params.has(COMPUTED_COLUMN)) {
86+
action.withComputedColumnArgs(
87+
new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
88+
}
8289
}
8390
}

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ protected void assertTablePartitionKeys(Map<String, String> partitionKeyMultiple
130130
protected void waitForResult(
131131
List<String> expected, FileStoreTable table, RowType rowType, List<String> primaryKeys)
132132
throws Exception {
133+
waitForResult(false, expected, table, rowType, primaryKeys);
134+
}
135+
136+
protected void waitForResult(
137+
boolean withRegx,
138+
List<String> expected,
139+
FileStoreTable table,
140+
RowType rowType,
141+
List<String> primaryKeys)
142+
throws Exception {
133143
assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);
134144

135145
// wait for table schema to become our expected schema
@@ -165,7 +175,8 @@ protected void waitForResult(
165175
rowType);
166176
List<String> sortedActual = new ArrayList<>(result);
167177
Collections.sort(sortedActual);
168-
if (sortedExpected.equals(sortedActual)) {
178+
if (withRegx && isRegxMatchList(sortedActual, sortedExpected)
179+
|| sortedExpected.equals(sortedActual)) {
169180
break;
170181
}
171182
LOG.info("actual: " + sortedActual);
@@ -174,6 +185,20 @@ protected void waitForResult(
174185
}
175186
}
176187

188+
private boolean isRegxMatchList(List<String> actual, List<String> expected) {
189+
if (actual.size() != expected.size()) {
190+
return false;
191+
}
192+
193+
for (int i = 0; i < actual.size(); i++) {
194+
if (!actual.get(i).matches(expected.get(i))) {
195+
return false;
196+
}
197+
}
198+
199+
return true;
200+
}
201+
177202
protected Map<String, String> getBasicTableConfig() {
178203
Map<String, String> config = new HashMap<>();
179204
ThreadLocalRandom random = ThreadLocalRandom.current();
@@ -392,6 +417,7 @@ protected abstract class SyncDatabaseActionBuilder<T extends SynchronizationActi
392417
private final List<String> partitionKeys = new ArrayList<>();
393418
private final List<String> primaryKeys = new ArrayList<>();
394419
private final List<String> metadataColumn = new ArrayList<>();
420+
private final List<String> computedColumnArgs = new ArrayList<>();
395421
protected Map<String, String> partitionKeyMultiple = new HashMap<>();
396422

397423
public SyncDatabaseActionBuilder(Class<T> clazz, Map<String, String> sourceConfig) {
@@ -464,6 +490,12 @@ public SyncDatabaseActionBuilder<T> withMetadataColumn(List<String> metadataColu
464490
return this;
465491
}
466492

493+
public SyncDatabaseActionBuilder<T> withComputedColumnArgs(
494+
List<String> computedColumnArgs) {
495+
this.computedColumnArgs.addAll(computedColumnArgs);
496+
return this;
497+
}
498+
467499
public SyncDatabaseActionBuilder<T> withPartitionKeyMultiple(
468500
Map<String, String> partitionKeyMultiple) {
469501
if (partitionKeyMultiple != null) {
@@ -500,6 +532,8 @@ public T build() {
500532
args.addAll(listToArgs("--primary-keys", primaryKeys));
501533
args.addAll(listToArgs("--metadata-column", metadataColumn));
502534

535+
args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
536+
503537
return createAction(clazz, args);
504538
}
505539
}

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
package org.apache.paimon.flink.action.cdc.format.aliyun;
2020

21+
import org.apache.paimon.data.BinaryString;
2122
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
23+
import org.apache.paimon.flink.action.cdc.ComputedColumn;
24+
import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
2225
import org.apache.paimon.flink.action.cdc.TypeMapping;
2326
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
2427
import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
2528
import org.apache.paimon.flink.sink.cdc.CdcRecord;
2629
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
2730
import org.apache.paimon.schema.Schema;
2831
import org.apache.paimon.types.RowKind;
32+
import org.apache.paimon.utils.BinaryStringUtils;
2933

3034
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
3135
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,6 +47,7 @@
4347
import java.util.Arrays;
4448
import java.util.Collections;
4549
import java.util.List;
50+
import java.util.Map;
4651

4752
/** Test for AliyunJsonRecordParser. */
4853
public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase {
@@ -51,14 +56,20 @@ public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase {
5156
private static List<String> insertList = new ArrayList<>();
5257
private static List<String> updateList = new ArrayList<>();
5358
private static List<String> deleteList = new ArrayList<>();
59+
private static List<ComputedColumn> computedColumns = new ArrayList<>();
5460

5561
private static ObjectMapper objMapper = new ObjectMapper();
5662

63+
String dateTimeRegex = "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}";
64+
5765
@Before
5866
public void setup() {
5967
String insertRes = "kafka/aliyun/table/event/event-insert.txt";
6068
String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt";
6169
String deleteRes = "kafka/aliyun/table/event/event-delete.txt";
70+
71+
String[] computedColumnArgs = {"etl_create_time=now()", "etl_update_time=now()"};
72+
6273
URL url;
6374
try {
6475
url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(insertRes);
@@ -76,17 +87,22 @@ public void setup() {
7687
.filter(this::isRecordLine)
7788
.forEach(e -> deleteList.add(e));
7889

90+
computedColumns =
91+
ComputedColumnUtils.buildComputedColumns(
92+
Arrays.asList(computedColumnArgs), Collections.emptyList());
93+
7994
} catch (Exception e) {
8095
log.error("Fail to init aliyun-json cases", e);
8196
}
8297
}
8398

8499
@Test
85100
public void extractInsertRecord() throws Exception {
101+
86102
AliyunRecordParser parser =
87-
new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
103+
new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
88104
for (String json : insertList) {
89-
// 将json解析为JsonNode对象
105+
90106
JsonNode rootNode = objMapper.readValue(json, JsonNode.class);
91107
CdcSourceRecord cdcRecord = new CdcSourceRecord(rootNode);
92108
Schema schema = parser.buildSchema(cdcRecord);
@@ -106,15 +122,31 @@ public void extractInsertRecord() throws Exception {
106122

107123
MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
108124
Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
125+
126+
Map<String, String> data = records.get(0).toRichCdcRecord().toCdcRecord().data();
127+
String createTime = data.get("etl_create_time");
128+
String updateTime = data.get("etl_update_time");
129+
130+
// Mock the real timestamp string which retrieved from store and convert through paimon
131+
// Timestamp
132+
createTime =
133+
BinaryStringUtils.toTimestamp(BinaryString.fromString(createTime), 6)
134+
.toString();
135+
updateTime =
136+
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
137+
.toString();
138+
139+
Assert.assertTrue(createTime.matches(dateTimeRegex));
140+
Assert.assertTrue(updateTime.matches(dateTimeRegex));
109141
}
110142
}
111143

112144
@Test
113145
public void extractUpdateRecord() throws Exception {
114146
AliyunRecordParser parser =
115-
new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
147+
new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
116148
for (String json : updateList) {
117-
// 将json解析为JsonNode对象
149+
118150
JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
119151
CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
120152
Schema schema = parser.buildSchema(cdcRecord);
@@ -134,15 +166,26 @@ public void extractUpdateRecord() throws Exception {
134166

135167
MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
136168
Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
169+
170+
Map<String, String> data = records.get(0).toRichCdcRecord().toCdcRecord().data();
171+
String createTime = data.get("etl_create_time");
172+
String updateTime = data.get("etl_update_time");
173+
Assert.assertNotNull(createTime);
174+
175+
updateTime =
176+
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
177+
.toString();
178+
179+
Assert.assertTrue(updateTime.matches(dateTimeRegex));
137180
}
138181
}
139182

140183
@Test
141184
public void extractDeleteRecord() throws Exception {
142185
AliyunRecordParser parser =
143-
new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
186+
new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
144187
for (String json : deleteList) {
145-
// 将json解析为JsonNode对象
188+
146189
JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
147190
CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
148191
Schema schema = parser.buildSchema(cdcRecord);
@@ -162,6 +205,17 @@ public void extractDeleteRecord() throws Exception {
162205

163206
MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
164207
Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
208+
209+
Map<String, String> data = records.get(0).toRichCdcRecord().toCdcRecord().data();
210+
String createTime = data.get("etl_create_time");
211+
String updateTime = data.get("etl_update_time");
212+
Assert.assertNotNull(createTime);
213+
214+
updateTime =
215+
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
216+
.toString();
217+
218+
Assert.assertTrue(updateTime.matches(dateTimeRegex));
165219
}
166220
}
167221
}

0 commit comments

Comments
 (0)