Skip to content

[cdc] Support audit time computed clumns during cdc sync database#5069

Closed
JackeyLee007 wants to merge 115 commits intoapache:masterfrom
JackeyLee007:audit-time-clean
Closed

[cdc] Support audit time computed clumns during cdc sync database#5069
JackeyLee007 wants to merge 115 commits intoapache:masterfrom
JackeyLee007:audit-time-clean

Conversation

@JackeyLee007
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #5068

Support special audition computed columns, especially for cdc sync database

Tests

org.apache.paimon.flink.action.cdc.kafka.KafkaCanalSyncDatabaseActionITCase#testAuditTime

API and Format

Documentation

@Override
protected void beforeBuildingSourceSink() throws Exception {
computedColumns =
buildComputedColumns(computedColumnArgs, Arrays.asList(SpecialFields.VALUE_KIND));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.singletonList(SpecialFields.VALUE_KIND)

protected String includingTables = ".*";
protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
protected boolean auditTimeEnabled = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where auditTimeEnabled used?

protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
protected boolean auditTimeEnabled = false;
protected List<String> computedColumnArgs = new ArrayList<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think computedColumns is better?

computedColumn.eval(rowData.get(computedColumn.fieldReference())));
String argVal;
if (computedColumn.fieldReference().equals(SpecialFields.VALUE_KIND.name())) {
argVal = rowKind.shortString();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (Objects.equals(computedColumn.fieldReference(), SpecialFields.VALUE_KIND.name()))

LONGTEXT_TO_BYTES,
BIGINT_UNSIGNED_TO_BIGINT;
BIGINT_UNSIGNED_TO_BIGINT,
DECIMAL_NO_CHANGE,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the note of DECIMAL_NO_CHANGE and NO_CHANGE before this enum.

yangjf2019 and others added 28 commits February 14, 2025 10:36
JackeyLee007 and others added 27 commits February 14, 2025 10:36
…ions create_time() and update_time() to compute according to rowkind
@JackeyLee007
Copy link
Copy Markdown
Contributor Author

@wwj6591812 Thanks for your review yesterday. I re-compose a new clean PR #5086.
If possible, have a look please.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support special audition computed columns, especially for cdc sync database