Skip to content

[core] Support watermark partition markdone mode#5284

Merged
JingsongLi merged 1 commit intoapache:masterfrom
xiangyuf:partition-watermark
Mar 25, 2025
Merged

[core] Support watermark partition markdone mode#5284
JingsongLi merged 1 commit intoapache:masterfrom
xiangyuf:partition-watermark

Conversation

@xiangyuf
Copy link
Copy Markdown
Contributor

@xiangyuf xiangyuf commented Mar 14, 2025

Purpose

Linked issue: close #4963

Use watermark to trigger partition markdone

Tests

E2E Test in WatermarkPartitionMarkDoneTest#testWaterMarkPartitionMarkDone

API and Format

Documentation

@xiangyuf xiangyuf force-pushed the partition-watermark branch from c7b04f8 to baac615 Compare March 14, 2025 04:45
@LinMingQiang
Copy link
Copy Markdown
Contributor

You should ignore partition.time-interval, otherwise watermark will not be able to properly determine when the partition should be mark done.

@xiangyuf
Copy link
Copy Markdown
Contributor Author

You should ignore partition.time-interval, otherwise watermark will not be able to properly determine when the partition should be mark done.

@LinMingQiang Thx for advice. I don't think the partition.time-interval will affect the watermark partition mark done behavior here. Can you explain more here?

@LinMingQiang
Copy link
Copy Markdown
Contributor

LinMingQiang commented Mar 18, 2025

You should ignore partition.time-interval, otherwise watermark will not be able to properly determine when the partition should be mark done.

@LinMingQiang Thx for advice. I don't think the partition.time-interval will affect the watermark partition mark done behavior here. Can you explain more here?

e.g. partition.time-interval = 2 d ,partition value is '2025-03-10', watermark is '2025-03-11'. Should partition '2025-03-10' be marked as done? The current logic is not.

Related code :

Map.Entry<String, Long> entry = iter.next();
            String partition = entry.getKey();

            long lastUpdateTime = entry.getValue();
            long partitionStartTime =
                    extractDateTime(partition)
                            .atZone(ZoneId.systemDefault())
                            .toInstant()
                            .toEpochMilli();
            long partitionEndTime = partitionStartTime + timeInterval;
            lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);

            if (currentTimeMillis - lastUpdateTime > idleTime) {
                needDone.add(partition);
                iter.remove();
            }

@xiangyuf
Copy link
Copy Markdown
Contributor Author

You should ignore partition.time-interval, otherwise watermark will not be able to properly determine when the partition should be mark done.

@LinMingQiang Thx for advice. I don't think the partition.time-interval will affect the watermark partition mark done behavior here. Can you explain more here?

e.g. partition.time-interval = 2 d ,partition value is '2025-03-10', watermark is '2025-03-11'. Should partition '2025-03-10' be marked as done? The current logic is not.

Related code :

Map.Entry<String, Long> entry = iter.next();
            String partition = entry.getKey();

            long lastUpdateTime = entry.getValue();
            long partitionStartTime =
                    extractDateTime(partition)
                            .atZone(ZoneId.systemDefault())
                            .toInstant()
                            .toEpochMilli();
            long partitionEndTime = partitionStartTime + timeInterval;
            lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);

            if (currentTimeMillis - lastUpdateTime > idleTime) {
                needDone.add(partition);
                iter.remove();
            }

In your case, I think partition '2025-03-10' should be mark done after watermark pass '2025-03-12'.

@LinMingQiang
Copy link
Copy Markdown
Contributor

You should ignore partition.time-interval, otherwise watermark will not be able to properly determine when the partition should be mark done.

@LinMingQiang Thx for advice. I don't think the partition.time-interval will affect the watermark partition mark done behavior here. Can you explain more here?

e.g. partition.time-interval = 2 d ,partition value is '2025-03-10', watermark is '2025-03-11'. Should partition '2025-03-10' be marked as done? The current logic is not.
Related code :

Map.Entry<String, Long> entry = iter.next();
            String partition = entry.getKey();

            long lastUpdateTime = entry.getValue();
            long partitionStartTime =
                    extractDateTime(partition)
                            .atZone(ZoneId.systemDefault())
                            .toInstant()
                            .toEpochMilli();
            long partitionEndTime = partitionStartTime + timeInterval;
            lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);

            if (currentTimeMillis - lastUpdateTime > idleTime) {
                needDone.add(partition);
                iter.remove();
            }

In your case, I think partition '2025-03-10' should be mark done after watermark pass '2025-03-12'.

👌.

@xiangyuf xiangyuf force-pushed the partition-watermark branch 2 times, most recently from 0948526 to 21ee681 Compare March 21, 2025 02:35
@xiangyuf xiangyuf force-pushed the partition-watermark branch from 21ee681 to 2db2b69 Compare March 21, 2025 02:40
@xiangyuf
Copy link
Copy Markdown
Contributor Author

@JingsongLi Ready for review

@xiangyuf xiangyuf force-pushed the partition-watermark branch from 2db2b69 to 24bec1c Compare March 25, 2025 07:04
@JingsongLi
Copy link
Copy Markdown
Contributor

+1

@JingsongLi JingsongLi merged commit 0ba4b95 into apache:master Mar 25, 2025
19 checks passed
danzhewuju pushed a commit to danzhewuju/paimon that referenced this pull request Mar 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support watermark in partition.idle-time-to-done

4 participants