-
Notifications
You must be signed in to change notification settings - Fork 498
[FLINK-36527][autoscaler] Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy #904
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
SamBarker
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm lacking context for the change so my comments could be missing the mark but I think that means I represent a lot of people trying to consume the proposed config, if the naming confuses me its likely to confuse others.
| <td>Time interval to resend the identical event</td> | ||
| </tr> | ||
| <tr> | ||
| <td><h5>job.autoscaler.scaling.radical.enabled</h5></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coming at this cold, its not at all clear to me what radical means. While the description goes some way towards clarifying the intent it doesn't feel like a great term, additionally following through the JIRA links radical feels like a very off term for a default (assuming I'm following properly). I wonder if job.autoscaler.scaling.maximizeUtilisation.enabled would make things more explicit?
| (scalingRadical | ||
| && numKeyGroupsOrPartitions / p | ||
| < numKeyGroupsOrPartitions / newParallelism)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think extracting this as a method canMaximiseUtilisation the intent is to make the intent of the condition easier to understand when working through the code.
| // When adjust the parallelism after rounding up cannot be | ||
| // find the right degree of parallelism to meet requirements, | ||
| // Try to find the smallest parallelism that can satisfy the current consumption rate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits
| // When adjust the parallelism after rounding up cannot be | |
| // find the right degree of parallelism to meet requirements, | |
| // Try to find the smallest parallelism that can satisfy the current consumption rate. | |
| // When adjusting the parallelism after rounding up cannot | |
| // find the right degree of parallelism to meet requirements. | |
| // Try to find the smallest parallelism that can satisfy the current consumption rate. |
| .withFallbackKeys(oldOperatorConfigKey("scaling.radical.enabled")) | ||
| .withDescription( | ||
| "If this option is enabled, The determination of parallelism will be more radical, which" | ||
| + " will maximize resource utilization, but may also cause data skew in some vertex."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be helpful to give consumers/users some more context as to how/why it would potentially cause skew.
@SamBarker Thank you very much for your review. |
...r/src/main/java/org/apache/flink/autoscaler/NumKeyGroupsOrPartitionsParallelismAdjuster.java
Outdated
Show resolved
Hide resolved
| context)); | ||
|
|
||
| assertEquals( | ||
| 32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to tie this answer to something so it was clear why it was picked.
e.g.
| 32, | |
| EXPECTED_KEY_GROUPS, |
| context)); | ||
|
|
||
| assertEquals( | ||
| 199, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| 199, | |
| PARTITION_COUNT, |
| AutoScalerOptions.SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE, | ||
| NumKeyGroupsOrPartitionsParallelismAdjuster.Mode.MAXIMIZE_UTILISATION); | ||
| assertEquals( | ||
| 100, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| 100, | |
| MAXIMUM_UTILISATION_PARALLELISM, |
Its not the number that matters but what it means.
SamBarker
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks for listening @huyuanfeng2018
|
Hey @1996fanrui @mxm , Would you help reviewing this PR ? |
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @huyuanfeng2018 for the ping, I will review it this week.
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! This is an interesting idea. See the comments inline. I would prefer to do the refactoring separately, as it distracts from the actual changes at hand (which are relatively small). There is also some value in preserving the Git history of the changes.
| (mode == MAXIMIZE_UTILISATION | ||
| && numKeyGroupsOrPartitions / p | ||
| < numKeyGroupsOrPartitions / newParallelism)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I can tell, this is the only change in this PR, apart from the refactoring. The assumption here is that for cases where a parallelism such that numKeyGroupsOrPartitions % parallelism == 0 cannot be found, we at least pick a parallelism which leads to fewer state/partition imbalance.
I understand the idea behind this but I wonder whether it has the desired effect. The autoscaling algorithm isn't aware of any state/partition imbalance, it assumes linear scale through state/partition balance can be achieved. A slight adjustment to the parallelism won't drastically improve the situation.
It looks like this change could help in situations where the number of partition / key groups do not have many divisors, but its also kind of hard to reason about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for replying so late.
I understand the idea behind this but I wonder whether it has the desired effect. The autoscaling algorithm isn't aware of any state/partition imbalance, it assumes linear scale through state/partition balance can be achieved. A slight adjustment to the parallelism won't drastically improve the situation.
When the partition or keygroups is evenly distributed and there are relatively many divisors such as (60, 120, 720), we can indeed easily achieve linear scaling through state/partition balancing, which is the most ideal situation.
If the number of partitions is unreasonable, such as 35, hope adjust the parallelism to linearly increase the consumption rate, Although it is not possible to consume evenly, it is meaningful.
Imagine a situation: when running with 7 parallelism degrees, the busy value of the operator is large. No matter what the scaleFactor , it can be expected that the autoscaler will expand the parallelism degree to 35, which may cause the operator to be very idle, but we know that the linear assumption may not be completely linear when changing 7->35, and it may not be possible to obtain a 5 (35/7) times increase in processing speed (this is related to many factors), it may be in the next cycle Trigger scale down again, and then the cycle repeats
In addition, due to the limitations of the scale-down.max-factor parameter, we may never be able to reduce the degree of parallelism
If use the MAXIMIZE_UTILISATION mode, can significantly improve this phenomenon.
It looks like this change could help in situations where the number of partition / key groups do not have many divisors, but its also kind of hard to reason about.
Any suggestions?
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to do the refactoring separately
I agree with @mxm , refactoring should at least be a separate commit.
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ParallelismAdjuster.java
Outdated
Show resolved
Hide resolved
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ParallelismAdjuster.java
Outdated
Show resolved
Hide resolved
ok, I will revert the changes |
a9b7555 to
5570270
Compare
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ParallelismAdjuster.java
Outdated
Show resolved
Hide resolved
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ParallelismAdjuster.java
Outdated
Show resolved
Hide resolved
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ParallelismAdjuster.java
Outdated
Show resolved
Hide resolved
5570270 to
0aa6795
Compare
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me overall, but I would like to ask you to drop the refactoring commit. The reason is that this code won't be reused via an extra class. It isn't more easily testable. It also doesn't make the code easier to read, and it removes the valuable Git history from the JobVertexScaler file. I hope that makes sense.
|
|
||
| /** The mode of the key group or parallelism adjustment. */ | ||
| public enum KeyGroupOrPartitionsAdjustMode implements DescribedEnum { | ||
| ABSOLUTELY_EVENLY_DISTRIBUTION( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is confusing because this doesn't guarantee absolutely even distribution.
How about EVENLY_SPREAD?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is confusing because this doesn't guarantee absolutely even distribution.
How about
EVENLY_SPREAD?
Looks like EVENLY_SPREAD is more reasonable, I'm fine with that
@1996fanrui What do you think about this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
Thanks for the review. Regarding this part of the refactoring, I originally thought that there might be some other logic to modify the parallelism in the future, just like fine-tuning based on numpartitionOrKeygroup here, so I refactored it, But I think your concerns are valid (this would change the git history). So, i have no problem with canceling this part of the refactoring. |
|
Thanks! Let's get the PR ready to be merged then. |
… adopt a more radical strategy when source vertex or upstream shuffle is keyBy
0aa6795 to
a339203
Compare
| var upperBoundForAlignment = | ||
| Math.min( | ||
| // Optimize the case where newParallelism <= maxParallelism / 2 | ||
| newParallelism > numKeyGroupsOrPartitions / 2 | ||
| ? numKeyGroupsOrPartitions | ||
| : numKeyGroupsOrPartitions / 2, | ||
| upperBound); | ||
|
|
||
| KeyGroupOrPartitionsAdjustMode mode = | ||
| context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE); | ||
|
|
||
| var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the upperBoundForAlignment logic is updated? Would you mind sharing one case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optimization to use half of the keyspace when the parallelism is less/equal to the keyspace doesn't work for the new parallelism adjustment mode. It was anyways just a shortcut, to avoid checking all divisors until the maximum. No harm removing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the
upperBoundForAlignmentlogic is updated? Would you mind sharing one case?
When I ran the test case, i found this logic is wrong when keygroups or partitionnum are not even numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me give you an example:
partition=199, newParallelism=96
At this time, the original logical upperBoundForAlignment will be calculated as 99,
But under EVENLY_SPREAD, I think the result is 199 as expected
Under MAXIMIZE_UTILISATION, the result is 100 as expected,
But due to this wrong logic, we cannot get the expected results. This is a bug. If it is not appropriate to fix it in this PR, is it necessary to fix it in another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's seems it's wrong logic when numKeyGroupsOrPartitions is odd number.
Thanks for the clarification! This change makes sense to me.
| var upperBoundForAlignment = | ||
| Math.min( | ||
| // Optimize the case where newParallelism <= maxParallelism / 2 | ||
| newParallelism > numKeyGroupsOrPartitions / 2 | ||
| ? numKeyGroupsOrPartitions | ||
| : numKeyGroupsOrPartitions / 2, | ||
| upperBound); | ||
|
|
||
| KeyGroupOrPartitionsAdjustMode mode = | ||
| context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE); | ||
|
|
||
| var upperBoundForAlignment = Math.min(numKeyGroupsOrPartitions, upperBound); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optimization to use half of the keyspace when the parallelism is less/equal to the keyspace doesn't work for the new parallelism adjustment mode. It was anyways just a shortcut, to avoid checking all divisors until the maximum. No harm removing it.
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Outdated
Show resolved
Hide resolved
…ition remains unchanged
5c543f1 to
f18634a
Compare
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @huyuanfeng2018 for the update!
LGTM
|
Thanks @huyuanfeng2018! |
|
Thanks for the review @mxm @1996fanrui @SamBarker |
What is the purpose of the change
Introduce a parameter to support autoscaler adopt a more radical strategy when source vertex or upstream shuffle is keyBy
Brief change log
scaling.key-group.partitions.adjust.modeVerifying this change
in
org.apache.flink.autoscaler.JobVertexScalerTest.JobVertexScalerTesttestParallelismComputationWithAdjustmentandtestNumPartitionsAdjustmentadd logic to testDoes this pull request potentially affect one of the following parts:
CustomResourceDescriptors: (yes / no)Documentation