Add ignore delete parameter#405
Conversation
Signed-off-by: rico <bugbubug@gmail.com>
There was a problem hiding this comment.
Pull Request Overview
This PR adds a new ignore-delete parameter to the StarRocks Flink connector, allowing users to skip processing of delete records during CDC synchronization. This is particularly useful when users want to retain full data from Flink CDC without modifying the source configuration.
Key changes:
- Added
SINK_IGNORE_DELETEconfiguration option with default valuefalse - Implemented delete record filtering logic in both sink function implementations
- Registered the new option in the dynamic table sink factory
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| StarRocksSinkOptions.java | Adds the new SINK_IGNORE_DELETE configuration option and getter method |
| StarRocksDynamicTableSinkFactory.java | Registers the new option as an optional configuration parameter |
| StarRocksDynamicSinkFunctionV2.java | Implements delete record filtering logic using the new parameter |
| StarRocksDynamicSinkFunction.java | Implements delete record filtering logic using the new parameter |
| (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreDelete())) { | ||
| return; | ||
| } | ||
| if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData)value).getRowKind())) { |
There was a problem hiding this comment.
This condition is redundant. The previous condition already handles the case where !sinkOptions.supportUpsertDelete() and the row kind is DELETE. This duplicate logic should be removed to avoid confusion and maintain cleaner code.
There was a problem hiding this comment.
@bugbubug as copilot said, should merge these two if
| (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreDelete())) { | ||
| return; | ||
| } | ||
| if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData)value).getRowKind())) { |
There was a problem hiding this comment.
This condition is redundant. The previous condition already handles the case where !sinkOptions.supportUpsertDelete() and the row kind is DELETE. This duplicate logic should be removed to avoid confusion and maintain cleaner code.
There was a problem hiding this comment.
@bugbubug as copilot said, should merge these two if
banmoy
left a comment
There was a problem hiding this comment.
should add a test in StarRocksSinkITTest
| (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreDelete())) { | ||
| return; | ||
| } | ||
| if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData)value).getRowKind())) { |
There was a problem hiding this comment.
@bugbubug as copilot said, should merge these two if
| (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreDelete())) { | ||
| return; | ||
| } | ||
| if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData)value).getRowKind())) { |
There was a problem hiding this comment.
@bugbubug as copilot said, should merge these two if
What type of PR is this:
Which issues of this PR fixes :
Fixes #392
Problem Summary(Required) :
This PR introduces a new parameter to control whether to ignore delete records. By default, this parameter is set to
false, meaning delete records will be processed normally. However, when users choose to set it totrue, the system will ignore the delete records. This feature is particularly useful in scenarios where users want to retain full data when syncing with Flink CDC, without altering the source Flink CDC message sending configuration.Checklist: