-
Notifications
You must be signed in to change notification settings - Fork 498
[FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism to reduce the rescale frequency #920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism to reduce the rescale frequency #920
Conversation
b9c61ae to
3a3c045
Compare
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @gyfora @mxm @huyuanfeng2018 , please help take a look this PR in your free time, thanks~
gyfora
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one other problem with this logic in general is that we are handling the scale down window/interval more or less independently for different vertices.
So if you have 10 vertices and they would be scaled down at different times you can have 10 restarts within the scale down window. Which does not feel right....
I think we should consider setting global scale down windows, for example one possible implementation would be that if the scale down interval is 1 hour and we have one vertex requesting the scale down first at 12:10 the second at 12:20, then we scale both down at 13:20 (instead of doing 2 scale downs). So basically coalescing the intervals to some extent to really max 1 scale down per hour
Exactly, I actually thought that's how the originally proposed "lazy" scale down technique should work. |
|
Thanks @gyfora and @mxm for the quick comment and suggestion!
Actually, we only rescale twice (instead of 10 times) if we have 10 vertices and they would be scaled down at different times.
And then vertex1 will be scaled down at 13:10, and rest of them do not be changed. After 13: 10, if all of vertex2 to vertex10 need to scale down, all of them will trigger scale down at 13: 10, and will be scaled down at 14:10
There may be an unexpected case, and it’s easy to happen when scale down interval >= 24 hours.
Trigger times:
So this strategy hope scale down is executed at 2024-11-21 23:00. If all of vertices need to scale down after 24 hours, it works well. But unexpected case is : vertex2 always wanna scale down, but vertex1 and vertex3 runs on the parallelism of peak time. The scale down trigger is canceled for vertex1 at 2024-11-21 18:00, and At this time, the trigger status are:
It hopes scale down is executed at 2024-11-22 19:00 But vertex3 will cancel and re-trigger again, the execution time will be postponed again. Please correct me if I misunderstand anything, and I’m happy to hear more suggestions from you. |
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Show resolved
Hide resolved
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
Show resolved
Hide resolved
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some comments to help understand this approach easily.
|
Thanks @1996fanrui , after some offline discussion I think this works as expected I just slightly misunderstood the mechanism. We can later optimize this further by coalescing downscale triggers that are close to each other but we don't need to do it right now. |
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
Outdated
Show resolved
Hide resolved
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
Outdated
Show resolved
Hide resolved
| var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism); | ||
|
|
||
| // Never scale down within scale down interval | ||
| if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we rename this method to getFirstScaledUpTime()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a typo?
Do you mean rename the firstTriggerTime to firstScaleDownTime inside of VertexDelayedScaleDownInfo?
I prefer to use firstTriggerTime because the class name includes DelayedScaleDown, so field name or method name doesn't need to includes scale down again. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that getFirstTriggerTime() returns the first time we scaled up, but we are actually recording the time we first try to scale down.
I'm not sure this is correct. We want to delay scale down from the first time we scale up, not the first time we scaled down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mxm for the clarification!
Actually, we expect to use the first scale down time instead of scale up time as the first trigger time.
- job.autoscaler.scale-up.grace-period hopes to use scale up time
- But it has been deprecated in FLINK-36018
- And job.autoscaler.scale-down.interval hopes to use the first scale down time, it
scale-down.intervalis 1 hour, we expect the scale down can be executed after 1 hour.- It's delayed scale down
- And It could merge multiple scale down requests into one scale down execution.
- If scale-down.interval uses the scale up time, when scale down request comes, job will execute scale down directly if job is scaled up 1 hour ago. (It cannot merge multiple scale down request into one execution)
As I understand, the scale down interval wanna to reduce the scale down frequency.
Please correct me if anything is wrong, thank you~
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little bit comment, PTAL
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Show resolved
Hide resolved
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/DelayedScaleDown.java
Outdated
Show resolved
Hide resolved
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @gyfora for the update!
And thanks @mxm and @huyuanfeng2018 for the quick review!
I have addressed or replied all comments.
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
Outdated
Show resolved
Hide resolved
| var delayedScaleDownInfo = delayedScaleDown.triggerScaleDown(vertex, now, newParallelism); | ||
|
|
||
| // Never scale down within scale down interval | ||
| if (now.isBefore(delayedScaleDownInfo.getFirstTriggerTime().plus(scaleDownInterval))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a typo?
Do you mean rename the firstTriggerTime to firstScaleDownTime inside of VertexDelayedScaleDownInfo?
I prefer to use firstTriggerTime because the class name includes DelayedScaleDown, so field name or method name doesn't need to includes scale down again. WDYT?
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Show resolved
Hide resolved
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Show resolved
Hide resolved
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, the PR looks good.
I missed this in the original JIRA, even though you wrote it there: #920 (comment) I can open a separate JIRA issue, if we don't want to resolve it here.
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. We can handle #920 (comment) independently of this PR.
|
please rebase on main as there is a conflicting test change |
|
Thanks for the review, I will address all comments next Monday due to I'm on the Flink Forward Asia Conference. |
980386d to
d912ec3
Compare
…orical parallelism to reduce the rescale frequency 1. Using the maximum parallelism within the window instead of the latest parallelism when scaling down 2. Never scale down when (currentTime - triggerTime) < scale-down.interval
d912ec3 to
8fc87ae
Compare
thanks @gyfora for the reminder, rebased and fixed the conflict. |
|
Thanks all for the patient review! We could discuss #920 (comment) asynchronously, so let me merge this PR first. |
What is the purpose of the change
Get more details from FLINK-36535.
Brief change log
[FLINK-36535][autoscaler] Optimize the scale down logic based on historical parallelism
Verifying this change
This change added tests and can be verified as follows:
testRequiredScaleDownAfterIntervaltotestScaleDownAfterIntervalAbstractAutoScalerStateStoreTest,JobVertexScalerTestandScalingExecutorTestDoes this pull request potentially affect one of the following parts:
CustomResourceDescriptors: noDocumentation