-
Notifications
You must be signed in to change notification settings - Fork 478
[flink] Flink sink support hash by bucket id for PrimaryKey Table #579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
69a01ee to
cb28ec4
Compare
|
The test coverage is failed: |
...s-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java
Outdated
Show resolved
Hide resolved
...s-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java
Outdated
Show resolved
Hide resolved
...rs/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java
Outdated
Show resolved
Hide resolved
...rs/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSink.java
Outdated
Show resolved
Hide resolved
...connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/RowDataKeySelector.java
Outdated
Show resolved
Hide resolved
...uss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java
Outdated
Show resolved
Hide resolved
9b95593 to
de907f9
Compare
|
@wuchong comments addressed. |
...uss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java
Outdated
Show resolved
Hide resolved
...uss-flink-common/src/main/java/com/alibaba/fluss/flink/sink/FlinkRowDataChannelComputer.java
Show resolved
Hide resolved
| void testAppendLogWithBucketKeyWithSinkBucketShuffle() throws Exception { | ||
| testAppendLogWithBucketKey(true); | ||
| } | ||
|
|
||
| @Test | ||
| void testAppendLogWithBucketKeyWithoutSinkBucketShuffle() throws Exception { | ||
| testAppendLogWithBucketKey(false); | ||
| } | ||
|
|
||
| private void testAppendLogWithBucketKey(boolean sinkBucketShuffle) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you try @ParameterizedTest? It would be better to use @ParameterizedTest if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried.... But I encountered the same problem as when I developed #351 (add parameterized test in FlinkTableSinkITCase) : When using @ParameterizedTest, the CI on GitHub will hang to timeout, but in local env tests will passed. I'll create an issue to trace this problem and look into the root cause later: #659
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I think we need to investigate the reason, otherwise, we don't know whether to add @ParameterizedTest or not.
de907f9 to
21c27b3
Compare
Purpose
Linked issue: #570
This pr is aims to support hash by bucket key for
PrimaryKeyTable for Flink sink. The approach of this pr is to first hash the data bybucket idbefore writing to the Flink sink, sending data with the samebucket idto the same sink parallelism. This allows the Fluss writer to batch kv data more effectively, thus reducing the pressure on the client and improving throughput. From the testing results, this can greatly enhance the throughput of writing FlussPrimaryKeytables and reduce memory usage, while also reducing pressure on the server side (fewerputKvrequests and larger CDC log batches).Brief change log
Tests
API and Format
Documentation