-
Notifications
You must be signed in to change notification settings - Fork 501
[FLINK-30571] Estimate scalability coefficient from past scaling history using linear regression #966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-30571] Estimate scalability coefficient from past scaling history using linear regression #966
Conversation
…ory using linear regression
|
Hi @gyfora , Please help review the PR! Thanks! |
| double timeDiff = | ||
| Duration.between(timestamp, latestTimestamp).getSeconds() | ||
| + 1; // Avoid division by zero | ||
| double weight = parallelism / timeDiff; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you decide on this particular weighting approach? To be specific, what's the benefit compared to:
- Not weighting
- Using weights based on the difference with the current parallelism (locally weighted regression)
I think overall weighting makes sense but maybe weighing based on the parallelism difference ( and time) makes more sense then simply parallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could also add an enum configuration with some strategies here if we feel that would be required, but maybe an overkill initially
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering whether applying too much recency bias could hurt the model. Simply weighing by the parallelism should already produce good results. I see how scalability of a pipeline might change over time due to factors like growing state, so maybe using recency bias is smart, as long as the recency influence isn't too strong.
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
Show resolved
Hide resolved
mxm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pchoudhury22, this is a great addition! Code looks very clean.
| double timeDiff = | ||
| Duration.between(timestamp, latestTimestamp).getSeconds() | ||
| + 1; // Avoid division by zero | ||
| double weight = parallelism / timeDiff; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering whether applying too much recency bias could hurt the model. Simply weighing by the parallelism should already produce good results. I see how scalability of a pipeline might change over time due to factors like growing state, so maybe using recency bias is smart, as long as the recency influence isn't too strong.
@mxm We discussed this with @pchoudhury22 offline a little and came to the conclusion that due to the limited number of datapoints (scaling history is generally very limited epectially with the default kubernetes state store), it is probably best to initially remove the weighing completely. In the future if we have a better way to accumulate history and training data points we can introduce different weighting strategies. What do you think? |
|
+1 Let's remove the weighting completely for now. |
… 2. Check scaling coefficient with threshold before returning. 3. Refactored tests for point [1] and [2].
|
Hi @gyfora @mxm Thanks so much for the review and the notes! Have updated the PR with the below changes
Please help review the PR! Thanks! |
|
Thanks for the update!
Why 0.5? Should this be configurable? |
|
Primarily the thought process was to avoid a very small scaling coefficient. And honestly I guess, I just hesitated a bit to add another config :) . And as @gyfora suggested above on similar lines but in a threshold approach, we could probably use the same lowerBound as from scaling effectiveness threshold? |
I wouldn't reuse the same config exactly here as it would have a different meaning here. So if you and @mxm feel that this should be adjustable we can add a new config for sure |
|
I'm ok with keeping it at |
|
That makes perfect sense! Will add an config for the lower limit 😊. |
…idator to validate the min scaling coefficient config.
|
Hello, Good morning!
Please help review! Thanks! |
|
Hello @gyfora @mxm ! Hope you are doing great! Have updated the PR with the below changes
Whenever you get a chance, I’d really appreciate your review and any feedback you might have. Thanks so much again for your time and support! |
|
Checking on failed checks.... :) |
|
Have updated the PR with the auto scaler configuration doc updates. Hopefully the Checks resolve now! But I don't think I am able to trigger the Checks. @mxm if you can please help with it?! Thanks! Really sorry for the inconvenience 😔 |
|
The failed check of latest run for |
|
All green now. |
|
@gyfora Any further comments? |
|
Thanks so much for all the notes and guidance 😊! |
What is the purpose of the change
Currently, target parallelism computation assumes perfect linear scaling. However, real-time workloads often exhibit nonlinear scalability due to factors like network overhead and coordination costs.
This change introduces an observed scalability coefficient, estimated using linear regression on past (parallelism, processing rate) data, to improve the accuracy of scaling decisions.
Brief change log
Implemented a dynamic scaling coefficient to compute target parallelism based on observed scalability. The system estimates the scalability coefficient using a least squares linear regression approach, leveraging historical (parallelism, processing rate) data.
The regression model minimises the sum of squared errors. The baseline processing rate is computed using the smallest observed parallelism in the history. Model details:
The Linear Model
We define a linear relationship between parallelism (P) and processing rate (R):
where:
Squared Error
The loss function to minimise is the sum of squared errors (SSE):
Substituting ( R̂_i = (β α) P_i ):
Minimising the Error
Expanding ( (R_i - β α P_i)^2 ):
Summing over all data points:
Solving for α
To minimize for α, taking the derivative and solving we get:
Verifying this change
New unit tests added to cover this
Does this pull request potentially affect one of the following parts:
Dependencies (does it add or upgrade a dependency): no
The public API, i.e., is any changes to the CustomResourceDescriptors: no
Core observer or reconciler logic that is regularly executed: no