Skip to content

Commit 5bce141

Browse files
[FLINK-36192][autocaler] Autocaler supports adjusting the parallelism of source vertex based on the number of partitions in Kafka or pulsars (#879)
1 parent 4f87bc2 commit 5bce141

File tree

8 files changed

+379
-55
lines changed

8 files changed

+379
-55
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
5151
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
5252
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
53+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS;
5354
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
5455
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
5556
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
@@ -66,6 +67,14 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
6667
protected static final String INEFFECTIVE_MESSAGE_FORMAT =
6768
"Ineffective scaling detected for %s (expected increase: %s, actual increase %s). Blocking of ineffective scaling decisions is %s";
6869

70+
@VisibleForTesting protected static final String SCALING_LIMITED = "ScalingLimited";
71+
72+
@VisibleForTesting
73+
protected static final String SCALE_LIMITED_MESSAGE_FORMAT =
74+
"Scaling limited detected for %s (expected parallelism: %s, actual parallelism %s). "
75+
+ "Scaling limited due to numKeyGroupsOrPartitions : %s,"
76+
+ "upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, parallelismLowerLimit: %s.";
77+
6978
private Clock clock = Clock.system(ZoneId.systemDefault());
7079

7180
private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;
@@ -193,12 +202,16 @@ public ParallelismChange computeScaleTargetParallelism(
193202

194203
int newParallelism =
195204
scale(
205+
vertex,
196206
currentParallelism,
197207
inputShipStrategies,
208+
(int) evaluatedMetrics.get(NUM_SOURCE_PARTITIONS).getCurrent(),
198209
(int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
199210
scaleFactor,
200211
Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
201-
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)));
212+
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)),
213+
autoScalerEventHandler,
214+
context);
202215

203216
if (newParallelism == currentParallelism) {
204217
// Clear delayed scale down request if the new parallelism is equal to
@@ -345,15 +358,22 @@ private boolean detectIneffectiveScaleUp(
345358
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
346359
* parallelism for source and keyed vertex such that it divides the maxParallelism without a
347360
* remainder.
361+
*
362+
* <p>This method also attempts to adjust the parallelism to ensure it aligns well with the
363+
* number of source partitions if a vertex has a known source partition count.
348364
*/
349365
@VisibleForTesting
350-
protected static int scale(
366+
protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
367+
JobVertexID vertex,
351368
int currentParallelism,
352369
Collection<ShipStrategy> inputShipStrategies,
370+
int numSourcePartitions,
353371
int maxParallelism,
354372
double scaleFactor,
355373
int parallelismLowerLimit,
356-
int parallelismUpperLimit) {
374+
int parallelismUpperLimit,
375+
AutoScalerEventHandler<KEY, Context> eventHandler,
376+
Context context) {
357377
checkArgument(
358378
parallelismLowerLimit <= parallelismUpperLimit,
359379
"The parallelism lower limitation must not be greater than the parallelism upper limitation.");
@@ -383,23 +403,62 @@ protected static int scale(
383403
// Apply min/max parallelism
384404
newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound);
385405

386-
var adjustByMaxParallelism =
387-
inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH);
388-
if (!adjustByMaxParallelism) {
406+
var adjustByMaxParallelismOrPartitions =
407+
numSourcePartitions > 0 || inputShipStrategies.contains(HASH);
408+
if (!adjustByMaxParallelismOrPartitions) {
389409
return newParallelism;
390410
}
391411

392-
// When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to
393-
// adjust the parallelism such that it divides the maxParallelism without a remainder
394-
// => data is evenly spread across subtasks
395-
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
396-
if (maxParallelism % p == 0) {
412+
var numKeyGroupsOrPartitions =
413+
numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
414+
var upperBoundForAlignment =
415+
Math.min(
416+
// Optimize the case where newParallelism <= maxParallelism / 2
417+
newParallelism > numKeyGroupsOrPartitions / 2
418+
? numKeyGroupsOrPartitions
419+
: numKeyGroupsOrPartitions / 2,
420+
upperBound);
421+
422+
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
423+
// we try to adjust the parallelism such that it divides
424+
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks
425+
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
426+
if (numKeyGroupsOrPartitions % p == 0) {
397427
return p;
398428
}
399429
}
400430

401-
// If parallelism adjustment fails, use originally computed parallelism
402-
return newParallelism;
431+
// When adjust the parallelism after rounding up cannot be evenly divided by
432+
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
433+
// current consumption rate.
434+
int p = newParallelism;
435+
for (; p > 0; p--) {
436+
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
437+
if (numKeyGroupsOrPartitions % p != 0) {
438+
p++;
439+
}
440+
break;
441+
}
442+
}
443+
444+
p = Math.max(p, parallelismLowerLimit);
445+
var message =
446+
String.format(
447+
SCALE_LIMITED_MESSAGE_FORMAT,
448+
vertex,
449+
newParallelism,
450+
p,
451+
numKeyGroupsOrPartitions,
452+
upperBound,
453+
parallelismLowerLimit);
454+
eventHandler.handleEvent(
455+
context,
456+
AutoScalerEventHandler.Type.Warning,
457+
SCALING_LIMITED,
458+
message,
459+
SCALING_LIMITED + vertex + (scaleFactor * currentParallelism),
460+
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
461+
return p;
403462
}
404463

405464
@VisibleForTesting

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ protected JobTopology getJobTopology(
204204

205205
Set<JobVertexID> vertexSet = Set.copyOf(t.getVerticesInTopologicalOrder());
206206
updateVertexList(stateStore, ctx, clock.instant(), vertexSet);
207-
updateKafkaPulsarSourceMaxParallelisms(ctx, jobDetailsInfo.getJobId(), t);
207+
updateKafkaPulsarSourceNumPartitions(ctx, jobDetailsInfo.getJobId(), t);
208208
excludeVerticesFromScaling(ctx.getConfiguration(), t.getFinishedVertices());
209209
return t;
210210
}
@@ -249,7 +249,7 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
249249
json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished);
250250
}
251251

252-
private void updateKafkaPulsarSourceMaxParallelisms(
252+
private void updateKafkaPulsarSourceNumPartitions(
253253
Context ctx, JobID jobId, JobTopology topology) throws Exception {
254254
try (var restClient = ctx.getRestClusterClient()) {
255255
Pattern partitionRegex =
@@ -284,7 +284,7 @@ private void updateKafkaPulsarSourceMaxParallelisms(
284284
"Updating source {} max parallelism based on available partitions to {}",
285285
sourceVertex,
286286
numPartitions);
287-
topology.get(sourceVertex).updateMaxParallelism((int) numPartitions);
287+
topology.get(sourceVertex).setNumSourcePartitions((int) numPartitions);
288288
}
289289
}
290290
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED;
5656
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
5757
import static org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED;
58+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS;
5859
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_TASK_SLOTS_USED;
5960
import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR;
6061
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
@@ -166,6 +167,11 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
166167

167168
evaluatedMetrics.put(
168169
MAX_PARALLELISM, EvaluatedScalingMetric.of(vertexInfo.getMaxParallelism()));
170+
171+
evaluatedMetrics.put(
172+
NUM_SOURCE_PARTITIONS,
173+
EvaluatedScalingMetric.of(vertexInfo.getNumSourcePartitions()));
174+
169175
computeProcessingRateThresholds(evaluatedMetrics, conf, processingBacklog, restartTime);
170176
return evaluatedMetrics;
171177
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ public enum ScalingMetric {
5353

5454
/** Job vertex max parallelism. */
5555
MAX_PARALLELISM(false),
56+
57+
/** Source vertex partition count. */
58+
NUM_SOURCE_PARTITIONS(false),
59+
5660
/** Upper boundary of the target data rate range. */
5761
SCALE_UP_RATE_THRESHOLD(false),
5862

@@ -101,6 +105,7 @@ public enum ScalingMetric {
101105
PARALLELISM,
102106
RECOMMENDED_PARALLELISM,
103107
MAX_PARALLELISM,
108+
NUM_SOURCE_PARTITIONS,
104109
SCALE_UP_RATE_THRESHOLD,
105110
SCALE_DOWN_RATE_THRESHOLD,
106111
EXPECTED_PROCESSING_RATE);

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class VertexInfo {
4646
@Setter(AccessLevel.NONE)
4747
private int maxParallelism;
4848

49-
private final int originalMaxParallelism;
49+
@Setter private int numSourcePartitions;
5050

5151
private final boolean finished;
5252

@@ -65,7 +65,6 @@ public VertexInfo(
6565
this.inputs = inputs;
6666
this.parallelism = parallelism;
6767
this.maxParallelism = maxParallelism;
68-
this.originalMaxParallelism = maxParallelism;
6968
this.finished = finished;
7069
this.ioMetrics = ioMetrics;
7170
}
@@ -99,8 +98,4 @@ public VertexInfo(
9998
int maxParallelism) {
10099
this(id, inputs, parallelism, maxParallelism, null);
101100
}
102-
103-
public void updateMaxParallelism(int maxParallelism) {
104-
this.maxParallelism = Math.min(originalMaxParallelism, maxParallelism);
105-
}
106101
}

0 commit comments

Comments
 (0)