Skip to content

[cdc] Support computed expression of now#5138

Merged
yuzelin merged 8 commits intoapache:masterfrom
JackeyLee007:cdc-audit-time
Mar 15, 2025
Merged

[cdc] Support computed expression of now#5138
yuzelin merged 8 commits intoapache:masterfrom
JackeyLee007:cdc-audit-time

Conversation

@JackeyLee007
Copy link
Contributor

Purpose

Linked issue: close #5068

Support special audition computed columns, especially for cdc sync database

Tests

org.apache.paimon.flink.action.cdc.kafka.KafkaCanalSyncDatabaseActionITCase#testAuditTime

API and Format

Documentation

@JackeyLee007
Copy link
Contributor Author

JackeyLee007 commented Feb 24, 2025

@JingsongLi Have a code review again, please. I adpated the code according to your comments in the previous PR 5086.

  1. Add the rowkind parameter to extractRowData method directly, instead of moving evalComputedColumns out.
  2. Refine the UT case, to test UPDATE and check the etl_create_time value which is not updated bu NULL value.

return now;
}

return insertOnly ? null : now;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy from #5086 :
So if a new input update the record, the create_time will become null?

@JackeyLee007 JackeyLee007 changed the title Cdc audit time [cdc] Support computed expression of now Mar 15, 2025

@Override
public DataType outputType() {
return DataTypes.TIMESTAMP();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIMESTAMP(3)


@Override
public String eval(String input) {
return java.sql.Timestamp.valueOf(LocalDateTime.now()).toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use DateTimeUtils.formatLocalDateTime

protected void beforeBuildingSourceSink() throws Exception {
computedColumns =
buildComputedColumns(
computedColumnArgs, Collections.singletonList(SpecialFields.VALUE_KIND));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to withComputedColumnArgs

}
return Objects.toString(entry.getValue());
}));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this.

Comment on lines 196 to 197
LOG.warn("actual: " + sortedActual);
LOG.warn("expected: " + sortedExpected);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this.

cnt++;
}

LOG.info("actual: field: " + field.name() + " " + field.type());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this

new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
for (String json : insertList) {
// 将json解析为JsonNode对象
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use english.

Assert.assertTrue(createTime.matches(dateTimeRegex));
Assert.assertTrue(updateTime.matches(dateTimeRegex));

log.info("createTime: {}, updateTime: {}", createTime, updateTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this.

new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
for (String json : updateList) {
// 将json解析为JsonNode对象
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use english.

new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
new AliyunRecordParser(TypeMapping.defaultMapping(), computedColumns);
for (String json : deleteList) {
// 将json解析为JsonNode对象
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use english.


/** IT cases for {@link KafkaSyncDatabaseAction}. */
public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
private static final Logger LOG =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this is not used.

@yuzelin
Copy link
Contributor

yuzelin commented Mar 15, 2025

+1

@yuzelin yuzelin merged commit 384674c into apache:master Mar 15, 2025
18 checks passed
danzhewuju pushed a commit to danzhewuju/paimon that referenced this pull request Mar 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support special audition computed columns, especially for cdc sync database

3 participants