Skip to content

Conversation

@huyuanfeng2018
Copy link
Contributor

What is the purpose of the change

Autoscaler adjusts the parallelism of the corresponding vertex according to the number of partitions in Kafka or Pulsar, so that the parallelism is a divisor of the number of partitions.

Brief change log

  • Add a new ScalingMetric.NUM_PARTITIONS to record partition count of kafka or pulsar
  • adjusts org.apache.flink.autoscaler.JobVertexScaler.scale : This method also attempts to adjust the parallelism to ensure it aligns well with the number of partitions if a vertex has a known partition count
  • adjusts org.apache.flink.autoscaler.JobVertexScaler.scale Return exception information that occurs during the adjustment process
  • The eventhandler will handle events where the final degree of parallelism does not meet expectations due to the number of partitions or maxparallelism limitations.

Verifying this change

  • Added test cases org.apache.flink.autoscaler.JobVertexScalerTest#testNumPartitionsAdjustment and org.apache.flink.autoscaler.JobVertexScalerTest#testSendingScalingLimitedEvents

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changes to the CustomResourceDescriptors: (no)
  • Core observer or reconciler logic that is regularly executed: (no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @huyuanfeng2018 for this PR! A couple suggestions and a comment for my understanding.

Comment on lines 419 to 428
if (numPartitions <= 0) {
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the maxParallelism without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());
} else {

// When we know the numPartitions at a vertex,
// adjust the parallelism such that it divides the numPartitions without a remainder
// => Data is evenly distributed among subtasks
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) {
if (numPartitions % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like code duplication to me. Only the for loop termination condition changed. We should be able to pass an argument to the for loop which we set based on the number of partitions.

We may even completely simplify like this:

Suggested change
if (numPartitions <= 0) {
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the maxParallelism without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());
} else {
// When we know the numPartitions at a vertex,
// adjust the parallelism such that it divides the numPartitions without a remainder
// => Data is evenly distributed among subtasks
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) {
if (numPartitions % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
if (numPartitions <= 0) {
// No partition information is available, assume numPartitions equals the number of key groups
numPartitions = maxParallelism;
}
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with respect to p <= maxParallelism / 2

When dealing with inputShipStrategies = hash, maxParallelism = 128, newParallelism = 78, I think newParallelism = 78 is acceptable, because not all tasks have a large state after keyby,

But for consuming kafka's vertex, this becomes unacceptable
Imagine that Kafka with 128 partitions is consumed concurrently by 78 task :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I missed that. I was trying to generalize the two code blocks. How about the following?

Suggested change
if (numPartitions <= 0) {
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the maxParallelism without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());
} else {
// When we know the numPartitions at a vertex,
// adjust the parallelism such that it divides the numPartitions without a remainder
// => Data is evenly distributed among subtasks
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) {
if (numPartitions % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
if (numPartitions <= 0) {
upperBound = Math.min(maxParallelism / 2, upperBound);
} else {
upperBound = Math.min(num_partitions, upperBound);
maxParallelism = num_partitions;
}
for (int p = newParallelism; p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
...
// Resource optimization logic follows (if we can't achieve optimal partitioning)
// (See review comment below)
...
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());

Copy link
Contributor Author

@huyuanfeng2018 huyuanfeng2018 Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I missed that. I was trying to generalize the two code blocks. How about the following?

Thansk, fine with me , However, I do not recommend overriding the values ​​​​of MaxParallelism and UpperBound, so I added two new variables instead:

  1. adjustableMaxParallelism( Indicates the MaxParallelism in the adjustment process )
  2. adjustableUpperBound (Indicates the UpperBound in the adjustment process)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call them like this?

  1. adjustableMaxParallelism => numKeyGroupsOrPartitions
  2. adjustableUpperBound => upperBoundForAlignment

Adjustable just doesn't tell someone who is unfamiliar with the code very much.

Comment on lines 462 to 470
// If a suitable degree of parallelism cannot be found, return parallelismLowerLimit
var message =
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
newParallelism,
parallelismLowerLimit,
String.format("parallelismLowerLimit : %s", parallelismLowerLimit));
return Tuple2.of(parallelismLowerLimit, Optional.of(message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this behavior. Why return the lower limit, instead of the originally computed target parallelism? I think we should retain this logic:

           // If parallelism adjustment fails, use originally computed parallelism
           return newParallelism;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I responded with the specific logic, but I think it's still worth discussing

if (numPartitions / p > numPartitions / newParallelism) {
if (numPartitions % p != 0) {
p += 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why p++ here? Maybe I'm overlooking something but p already divides numPartitions without a remainder.

Copy link
Contributor Author

@huyuanfeng2018 huyuanfeng2018 Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me expand on my thoughts on this point:

  1. The first thing we need to consider is the p that can be divisible by the number of partitions when p<upperBound is satisfied.

  2. If step 1 fails to obtain the corresponding result, it means that we cannot guarantee the even consumption of the number of partitions when rounding up (due to uneven consumption, bottlenecks will exist in some tasks, so theoretically we get the largest upperBound The processing rate will not increase), at this time we consider taking a minimum value that is comparable to the current processing rate, satisfying newParallelism / partition = p/partition

Here is an example:
numPartitions=35 ,newParallelism=20, upperBound = 30;

step1 : We start from 20 and go up. Since upperBound = 30, we cannot obtain the number of partitions that can be consumed evenly. p=30 and p=20 have no change in the consumption rate.

step2:
Since 35/20 = 1 .... 15, 20 degrees of parallelism cannot consume Kafka evenly, so at this time there will be 15 tasks consuming two partitions,5 task consuming one partitions, so our final result only requires that there are tasks consuming two partition, then our processing rate won’t slow down

That is ( numPartitions / p = 2 && numPartitions % p =0 || numPartitions/ (p-1) =2 && (p-1) % !=0 ).

So p+=1 here is caused by the indivisible partition obtained when fetching down. It should need +1 to meet the conditions. 35 / 17 = 2 But eventually there will be a task consuming three partitions, so we need to add 17+1, 18 is our final result (17 tasks consume 2 partitions each, 1 task consumes one partition)

step3:
If p is already less than parallelismLowerLimit during the fetching process, we should directly use parallelismLowerLimit as the final degree of parallelism

However, the above is all theoretical logic. I am not sure whether this will cause some negative effects. For example, the data distribution of the partition itself is uneven. Step 2 may aggravate this phenomenon, so I have reservations about this part of the logic, another approach is to directly return newParallelism

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining in detail. I misread some of the code. It is correct that we need to add +1 when we have found a parallelism which yields a greater value for num_partitions / p than the initial num_partitions / new_parallelism because we have found the tipping point where we achieve the most utilization in terms of partitions per task.

I think we should return new_parallelism if all adaptation logic fails because using a potentially very small configured lower parallelism could make things a lot worse due to resource constraints.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining in detail. I misread some of the code. It is correct that we need to add +1 when we have found a parallelism which yields a greater value for num_partitions / p than the initial num_partitions / new_parallelism because we have found the tipping point where we achieve the most utilization in terms of partitions per task.

I think we should return new_parallelism if all adaptation logic fails because using a potentially very small configured lower parallelism could make things a lot worse due to resource constraints.

I want to explain the reason for using parallelismLowerLimit, example:

numPartitions=35 ,newParallelism=24, upperBound = 30, parallelismLowerLimit = 19

Step1 cannot get a result, so it goes to step2, but step2 still cannot get a result because parallelismLowerLimit = 19 and the expected value of step2 is 18, so it will eventually return 19

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, it's probably ok to use the lower limit. As you said, we would already be approaching the limit. Most users will never run into this because they haven't configured a minimum parallelism.

@1996fanrui 1996fanrui self-requested a review September 11, 2024 01:43
Copy link
Contributor Author

@huyuanfeng2018 huyuanfeng2018 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxm I explained some logic. You can review again when you have time. Thank you very much !

if (numPartitions / p > numPartitions / newParallelism) {
if (numPartitions % p != 0) {
p += 1;
}
Copy link
Contributor Author

@huyuanfeng2018 huyuanfeng2018 Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me expand on my thoughts on this point:

  1. The first thing we need to consider is the p that can be divisible by the number of partitions when p<upperBound is satisfied.

  2. If step 1 fails to obtain the corresponding result, it means that we cannot guarantee the even consumption of the number of partitions when rounding up (due to uneven consumption, bottlenecks will exist in some tasks, so theoretically we get the largest upperBound The processing rate will not increase), at this time we consider taking a minimum value that is comparable to the current processing rate, satisfying newParallelism / partition = p/partition

Here is an example:
numPartitions=35 ,newParallelism=20, upperBound = 30;

step1 : We start from 20 and go up. Since upperBound = 30, we cannot obtain the number of partitions that can be consumed evenly. p=30 and p=20 have no change in the consumption rate.

step2:
Since 35/20 = 1 .... 15, 20 degrees of parallelism cannot consume Kafka evenly, so at this time there will be 15 tasks consuming two partitions,5 task consuming one partitions, so our final result only requires that there are tasks consuming two partition, then our processing rate won’t slow down

That is ( numPartitions / p = 2 && numPartitions % p =0 || numPartitions/ (p-1) =2 && (p-1) % !=0 ).

So p+=1 here is caused by the indivisible partition obtained when fetching down. It should need +1 to meet the conditions. 35 / 17 = 2 But eventually there will be a task consuming three partitions, so we need to add 17+1, 18 is our final result (17 tasks consume 2 partitions each, 1 task consumes one partition)

step3:
If p is already less than parallelismLowerLimit during the fetching process, we should directly use parallelismLowerLimit as the final degree of parallelism

However, the above is all theoretical logic. I am not sure whether this will cause some negative effects. For example, the data distribution of the partition itself is uneven. Step 2 may aggravate this phenomenon, so I have reservations about this part of the logic, another approach is to directly return newParallelism

Comment on lines 419 to 428
if (numPartitions <= 0) {
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the maxParallelism without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());
} else {

// When we know the numPartitions at a vertex,
// adjust the parallelism such that it divides the numPartitions without a remainder
// => Data is evenly distributed among subtasks
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) {
if (numPartitions % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with respect to p <= maxParallelism / 2

When dealing with inputShipStrategies = hash, maxParallelism = 128, newParallelism = 78, I think newParallelism = 78 is acceptable, because not all tasks have a large state after keyby,

But for consuming kafka's vertex, this becomes unacceptable
Imagine that Kafka with 128 partitions is consumed concurrently by 78 task :)

Comment on lines 462 to 470
// If a suitable degree of parallelism cannot be found, return parallelismLowerLimit
var message =
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
newParallelism,
parallelismLowerLimit,
String.format("parallelismLowerLimit : %s", parallelismLowerLimit));
return Tuple2.of(parallelismLowerLimit, Optional.of(message));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I responded with the specific logic, but I think it's still worth discussing

@huyuanfeng2018 huyuanfeng2018 force-pushed the FLINK-36192 branch 2 times, most recently from af88f74 to 745f499 Compare September 12, 2024 06:47
Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @huyuanfeng2018 for the contribution, and @mxm for the review!

I left some comments, please take a look in your free time, thanks~

int numKeyGroupsOrPartitions = maxParallelism;
int upperBoundForAlignment;
if (numPartitions <= 0) {
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why maxParallelism / 2?

Assuming the upstream edge of vertex is keyBy(hash):

  • The maxParallelism is 100
  • newParallelism is 80

We will use 80 as the result, right? If so, it will meet same issue with source partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but I think not all tasks after hash are in large flink keyedState. In this scenario, miss alignment has almost no impact on the task. I think this is the trade-off between performance and resources that was taken into consideration by the previous logic.

But this becomes less acceptable for consuming kafka or pulsar

@mxm @1996fanrui Maybe we can discuss this logic further

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, the key group is totally similar with source partition(kafka or plusar). They determine how many partitions or groups a Flink parallelism can consume.

The performance is unbalanced even if without large state. For example, the maxParallelism(number of keyGroups) is 100, and the actual parallelism is 70.

  • It means that 30 instances process 2 keyGroups each, and the remaining 40 instances process 2 keyGroups each.
  • Assuming that the data of each keyGroup is balanced, the 30 instances processing 2 keyGroups will become the bottleneck of the job.

For this scenario, there is no difference when the parallelism is set to 50 and 99.

IIUC, this situation is exactly the source partition problem you want to solve, and it works exactly the same for keyGroup as well.

Please correct me if anything is wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, you are right, the specific entrance is here KeyGroupStreamPartitionerIn this case, We can unify our logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80.

Are you asking to expand the source logic introduced here to hash keyed state?

For this scenario, there is no difference when the parallelism is set to 50 and 99.

That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data.

In this scenario though, we should not be going down to 50, we probably should be going up to 100. I think the maxParallelism / 2 stems from the idea that the maximum parallelism won't be reached because it is set to a number parallelism <= maxParallelism / 2 which would mean that it doesn't make sense to continue beyond maxParallelism/2 because there aren't more possible divisors. However, when parallelism > maxParallelism / 2, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up to maxParallelism for the initial parallelism > maxParallelism / 2. We could just skip this (premature) optimization entirely.

I agree that we should replace the current key alignment logic with the generalized source logic introduced here. Something like this:

final int numKeyGroupsOrPartitions;
final int upperBoundForAlignment;
if (numSourcePartitions <= 0) {
    numKeyGroupsOrPartitions = maxParallelism;
    upperBoundForAlignment = Math.min(
        // Optimize the case where newParallelism <= maxParallelism / 2
        newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2, 
        upperBound
    );
} else {
    numKeyGroupsOrPartitions = numSourcePartitions;
    upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
}

// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
    if (numKeyGroupsOrPartitions % p == 0) {
        return p;
    }
}

// When adjust the parallelism after rounding up cannot be evenly divided by source
// numSourcePartitions, Try to find the smallest parallelism that can satisfy the
// current
// consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
    if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
        if (numKeyGroupsOrPartitions % p != 0) {
            p++;
        }
        break;
    }
}

p = Math.max(p, parallelismLowerLimit);
return p;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mxm , sorry for the late reply. Because there are too many comments, I missed this one.

@1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80.

Are you asking to expand the source logic introduced here to hash keyed state?

I'm asking hash keyed state, I don't know why we recommend the result is 80 instead of 100 for hash keyed state case. But I think you have answered my question in this comment.

For this scenario, there is no difference when the parallelism is set to 50 and 99.

That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data.

Good point! Hot keys may happen in the flink job. In general, Source partition without data skew.

However, when parallelism > maxParallelism / 2, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up to maxParallelism for the initial parallelism > maxParallelism / 2. We could just skip this (premature) optimization entirely.

This is exactly my question, I think we should use maxParallelism as the final parallelism. (For the above example, 100 instead of 80).

    upperBoundForAlignment = Math.min(
        // Optimize the case where newParallelism <= maxParallelism / 2
        newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2, 
        upperBound
    );

Great, this part solved my concern. thank you~

Copy link
Contributor Author

@huyuanfeng2018 huyuanfeng2018 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @1996fanrui for your review, I have made some changes to the code, PTAL.

int numKeyGroupsOrPartitions = maxParallelism;
int upperBoundForAlignment;
if (numPartitions <= 0) {
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but I think not all tasks after hash are in large flink keyedState. In this scenario, miss alignment has almost no impact on the task. I think this is the trade-off between performance and resources that was taken into consideration by the previous logic.

But this becomes less acceptable for consuming kafka or pulsar

@mxm @1996fanrui Maybe we can discuss this logic further

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great discussion @huyuanfeng2018 @1996fanrui! I think this is a good opportunity to unify the logic further and to address a gap with the current key alignment logic for hash partitioning.

int numKeyGroupsOrPartitions = maxParallelism;
int upperBoundForAlignment;
if (numPartitions <= 0) {
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80.

Are you asking to expand the source logic introduced here to hash keyed state?

For this scenario, there is no difference when the parallelism is set to 50 and 99.

That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data.

In this scenario though, we should not be going down to 50, we probably should be going up to 100. I think the maxParallelism / 2 stems from the idea that the maximum parallelism won't be reached because it is set to a number parallelism <= maxParallelism / 2 which would mean that it doesn't make sense to continue beyond maxParallelism/2 because there aren't more possible divisors. However, when parallelism > maxParallelism / 2, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up to maxParallelism for the initial parallelism > maxParallelism / 2. We could just skip this (premature) optimization entirely.

I agree that we should replace the current key alignment logic with the generalized source logic introduced here. Something like this:

final int numKeyGroupsOrPartitions;
final int upperBoundForAlignment;
if (numSourcePartitions <= 0) {
    numKeyGroupsOrPartitions = maxParallelism;
    upperBoundForAlignment = Math.min(
        // Optimize the case where newParallelism <= maxParallelism / 2
        newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2, 
        upperBound
    );
} else {
    numKeyGroupsOrPartitions = numSourcePartitions;
    upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
}

// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
    if (numKeyGroupsOrPartitions % p == 0) {
        return p;
    }
}

// When adjust the parallelism after rounding up cannot be evenly divided by source
// numSourcePartitions, Try to find the smallest parallelism that can satisfy the
// current
// consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
    if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
        if (numKeyGroupsOrPartitions % p != 0) {
            p++;
        }
        break;
    }
}

p = Math.max(p, parallelismLowerLimit);
return p;

@huyuanfeng2018
Copy link
Contributor Author

Great discussion @huyuanfeng2018 @1996fanrui! I think this is a good opportunity to unify the logic further and to address a gap with the current key alignment logic for hash partitioning.

Thanks @mxm for review,LGTM for these suggestions , I fixed code.

Comment on lines 412 to 426
final int numKeyGroupsOrPartitions;
final int upperBoundForAlignment;
if (numSourcePartitions <= 0) {
numKeyGroupsOrPartitions = maxParallelism;
upperBoundForAlignment =
Math.min(
// Optimize the case where newParallelism <= maxParallelism / 2
newParallelism > maxParallelism / 2
? maxParallelism
: maxParallelism / 2,
upperBound);
} else {
numKeyGroupsOrPartitions = numSourcePartitions;
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize the case where newParallelism <= maxParallelism / 2

Why need this this optimization? Reducing the count of for loop?

I'm curious why source partition doesn't use this optimization? If both of source and keygroup could use this optimization, does the following code work?

Suggested change
final int numKeyGroupsOrPartitions;
final int upperBoundForAlignment;
if (numSourcePartitions <= 0) {
numKeyGroupsOrPartitions = maxParallelism;
upperBoundForAlignment =
Math.min(
// Optimize the case where newParallelism <= maxParallelism / 2
newParallelism > maxParallelism / 2
? maxParallelism
: maxParallelism / 2,
upperBound);
} else {
numKeyGroupsOrPartitions = numSourcePartitions;
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
}
var numKeyGroupsOrPartitions = numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
var upperBoundForAlignment =
Math.min(
// Optimize the case where newParallelism <= maxParallelism / 2
newParallelism > numKeyGroupsOrPartitions / 2
? numKeyGroupsOrPartitions
: numKeyGroupsOrPartitions / 2,
upperBound);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need this this optimization? Reducing the count of for loop?

Yes precisely. We had this optimization in place before, but it is only valid when newParallelism <= maxParallellism / 2


// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks

// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
if (numKeyGroupsOrPartitions % p == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About this comment #879 (comment), I'm thinking whether the following change is more reasonable?

Note: numKeyGroupsOrPartitions / p means how many source partitions or key groups every subtask consume.

Suggested change
if (numKeyGroupsOrPartitions % p == 0) {
if (numKeyGroupsOrPartitions % p == 0 || numKeyGroupsOrPartitions / p < numKeyGroupsOrPartitions / newParallelism) {

For example: maxParallelism is 200, and new parallelism is 60. (Some subtasks consume 4 keyGroups, the rest of subtask consume 3 keyGroups)

  • The final parallelism is 100 based on the main branch code due to we only return p when maxParallelism % p == 0.
  • But I think 67 is more reasonable here. (One subtask consumes 2 key groups. The remaining 66 subtasks, each subtask consumes 3 key groups.)

@mxm @gyfora , WDYT?

Also, it's a bit beyond the scope of this PR. I could file a separate PR if you think it makes sense. Of course, it's acceptable to be done at this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense, but it makes the scaling more aggressive and less balanced. If we want to be more conservative, maybe 100 is ok in this scenario, where there is actually a divisor without a remainder. When there isn't, I think what you propose is way better than just using the initially provided parallelism.

In summary, I'm proposing to do a two-step process, similarly as for the partitions, where we first try to find a parallelism that divides the key groups without a remainder, and if that fails we do what you propose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense, but it makes the scaling more aggressive and less balanced. If we want to be more conservative, maybe 100 is ok in this scenario, where there is actually a divisor without a remainder. When there isn't, I think what you propose is way better than just using the initially provided parallelism.

In summary, I'm proposing to do a two-step process, similarly as for the partitions, where we first try to find a parallelism that divides the key groups without a remainder, and if that fails we do what you propose.

Thanks for the review, I agree with this, We can introduce an additional parameter to enable a more aggressive strategy: #879 (comment). But by default, a divisor of the number of partitions is still used to ensure balanced consumption.
WDYT? @mxm @1996fanrui

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds make sense to me, we could introduce an additional parameter to enable a more aggressive strategy.

I wanna check with @huyuanfeng2018 and @mxm : The strategy will work for both source partition and key group, right? As I understand, we could unify the strategy for these 2 cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanna check with @huyuanfeng2018 and @mxm : The strategy will work for both source partition and key group, right? As I understand, we could unify the strategy for these 2 cases.

I think we can unify strategies

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

@huyuanfeng2018
Copy link
Contributor Author

@mxm @1996fanrui Has been updated according to the latest discussion results. You can take a look again when you have time.

Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @huyuanfeng2018 , I saw this comment is not addressed in your latest change.

For example, introducing an additional option and try to rounding up based on if (numKeyGroupsOrPartitions / p < numKeyGroupsOrPartitions / newParallelism). Do you mean introducing it in a separate PR?

Also, the CI is failed, it seems not related to your change. Would you mind rebasing the main branch?

@huyuanfeng2018
Copy link
Contributor Author

For example, introducing an additional option and try to rounding up based on if (numKeyGroupsOrPartitions / p < numKeyGroupsOrPartitions / newParallelism). Do you mean introducing it in a separate PR?

Yes, i think a separate PR is required

Also, the CI is failed, it seems not related to your change. Would you mind rebasing the main branch?

done.

Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @huyuanfeng2018 for the quick update!

Overall LGTM, I left 2 minor comments, please take a look in your free time, thanks~

@huyuanfeng2018
Copy link
Contributor Author

Thanks @huyuanfeng2018 for the quick update!

Overall LGTM, I left 2 minor comments, please take a look in your free time, thanks~

Thanks for your suggestion, fixed !

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you for your patience @huyuanfeng2018!

Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @huyuanfeng2018 for the hard work and @mxm for the detailed discussion!

LGTM assuming the CI is green.

@huyuanfeng2018
Copy link
Contributor Author

@mxm @1996fanrui
Thank you both for your patient review

@1996fanrui 1996fanrui merged commit 5bce141 into apache:main Oct 11, 2024
233 checks passed
@1996fanrui 1996fanrui changed the title [FLINK-36192][autocaler] Autocaler supports adjusting the parallelism of source vertex based on the number of partitions in Kafka or pulsars [FLINK-36192][autocaler] Optimize the logic when partitions or key groups cannot be evenly distributed to subtasks Oct 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants