Skip to content

Commit 52e0e8a

Browse files
author
huyuanfeng
committed
[FLINK-36192][autoscaler] Autocaler supports adjusts the parallelism of the Source to the number of partitions in kafka or pulsar
1 parent 968a578 commit 52e0e8a

File tree

6 files changed

+380
-55
lines changed

6 files changed

+380
-55
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.autoscaler;
1919

2020
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.api.java.tuple.Tuple2;
2122
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2223
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
2324
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
@@ -39,6 +40,7 @@
3940
import java.util.Collection;
4041
import java.util.Map;
4142
import java.util.Objects;
43+
import java.util.Optional;
4244
import java.util.SortedMap;
4345

4446
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
@@ -50,6 +52,7 @@
5052
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
5153
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
5254
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
55+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_PARTITIONS;
5356
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
5457
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
5558
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
@@ -66,6 +69,12 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
6669
protected static final String INEFFECTIVE_MESSAGE_FORMAT =
6770
"Ineffective scaling detected for %s (expected increase: %s, actual increase %s). Blocking of ineffective scaling decisions is %s";
6871

72+
@VisibleForTesting protected static final String SCALE_LIMITED = "ScalingLimited";
73+
74+
@VisibleForTesting
75+
protected static final String SCALE_LIMITED_MESSAGE_FORMAT =
76+
"Scaling limited detected for %s (expected parallelism: %s, actual parallelism %s). Scaling limited due to %s";
77+
6978
private Clock clock = Clock.system(ZoneId.systemDefault());
7079

7180
private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;
@@ -191,16 +200,29 @@ public ParallelismChange computeScaleTargetParallelism(
191200
double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
192201
LOG.debug("Capped target processing capacity for {} is {}", vertex, cappedTargetCapacity);
193202

194-
int newParallelism =
203+
Tuple2<Integer, Optional<String>> newParallelism =
195204
scale(
205+
vertex,
196206
currentParallelism,
197207
inputShipStrategies,
208+
(int) evaluatedMetrics.get(NUM_PARTITIONS).getCurrent(),
198209
(int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
199210
scaleFactor,
200211
Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
201212
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)));
202213

203-
if (newParallelism == currentParallelism) {
214+
newParallelism.f1.ifPresent(
215+
message -> {
216+
autoScalerEventHandler.handleEvent(
217+
context,
218+
AutoScalerEventHandler.Type.Warning,
219+
SCALE_LIMITED,
220+
message,
221+
SCALE_LIMITED + vertex + cappedTargetCapacity,
222+
conf.get(SCALING_EVENT_INTERVAL));
223+
});
224+
225+
if (newParallelism.f0 == currentParallelism) {
204226
// Clear delayed scale down request if the new parallelism is equal to
205227
// currentParallelism.
206228
delayedScaleDown.clearVertex(vertex);
@@ -219,7 +241,7 @@ public ParallelismChange computeScaleTargetParallelism(
219241
evaluatedMetrics,
220242
history,
221243
currentParallelism,
222-
newParallelism,
244+
newParallelism.f0,
223245
delayedScaleDown);
224246
}
225247

@@ -345,11 +367,16 @@ private boolean detectIneffectiveScaleUp(
345367
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
346368
* parallelism for source and keyed vertex such that it divides the maxParallelism without a
347369
* remainder.
370+
*
371+
* <p>This method also attempts to adjust the parallelism to ensure it aligns well with the
372+
* number of partitions if a vertex has a known partition count.
348373
*/
349374
@VisibleForTesting
350-
protected static int scale(
375+
protected static Tuple2<Integer, Optional<String>> scale(
376+
JobVertexID vertex,
351377
int currentParallelism,
352378
Collection<ShipStrategy> inputShipStrategies,
379+
int numPartitions,
353380
int maxParallelism,
354381
double scaleFactor,
355382
int parallelismLowerLimit,
@@ -378,28 +405,70 @@ protected static int scale(
378405

379406
// Cap parallelism at either maxParallelism(number of key groups or source partitions) or
380407
// parallelism upper limit
381-
final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
408+
int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
382409

383410
// Apply min/max parallelism
384411
newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound);
385412

386413
var adjustByMaxParallelism =
387414
inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH);
388415
if (!adjustByMaxParallelism) {
389-
return newParallelism;
416+
return Tuple2.of(newParallelism, Optional.empty());
390417
}
391418

392-
// When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to
393-
// adjust the parallelism such that it divides the maxParallelism without a remainder
394-
// => data is evenly spread across subtasks
395-
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
396-
if (maxParallelism % p == 0) {
397-
return p;
419+
if (numPartitions <= 0) {
420+
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
421+
// we try to adjust the parallelism such that it divides the maxParallelism without a
422+
// remainder => data is evenly spread across subtasks
423+
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
424+
if (maxParallelism % p == 0) {
425+
return Tuple2.of(p, Optional.empty());
426+
}
427+
}
428+
// If parallelism adjustment fails, use originally computed parallelism
429+
return Tuple2.of(newParallelism, Optional.empty());
430+
} else {
431+
432+
// When we know the numPartitions at a vertex,
433+
// adjust the parallelism such that it divides the numPartitions without a remainder
434+
// => Data is evenly distributed among subtasks
435+
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) {
436+
if (numPartitions % p == 0) {
437+
return Tuple2.of(p, Optional.empty());
438+
}
398439
}
399-
}
400440

401-
// If parallelism adjustment fails, use originally computed parallelism
402-
return newParallelism;
441+
// When the degree of parallelism after rounding up cannot be evenly divided by source
442+
// PartitionCount, Try to find the smallest parallelism that can satisfy the current
443+
// consumption rate.
444+
for (int p = newParallelism; p > parallelismLowerLimit; p--) {
445+
if (numPartitions / p > numPartitions / newParallelism) {
446+
if (numPartitions % p != 0) {
447+
p += 1;
448+
}
449+
var message =
450+
String.format(
451+
SCALE_LIMITED_MESSAGE_FORMAT,
452+
vertex,
453+
newParallelism,
454+
p,
455+
String.format(
456+
"numPartitions : %s,upperBound(maxParallelism or "
457+
+ "parallelismUpperLimit): %s",
458+
numPartitions, upperBound));
459+
return Tuple2.of(p, Optional.of(message));
460+
}
461+
}
462+
// If a suitable degree of parallelism cannot be found, return parallelismLowerLimit
463+
var message =
464+
String.format(
465+
SCALE_LIMITED_MESSAGE_FORMAT,
466+
vertex,
467+
newParallelism,
468+
parallelismLowerLimit,
469+
String.format("parallelismLowerLimit : %s", parallelismLowerLimit));
470+
return Tuple2.of(parallelismLowerLimit, Optional.of(message));
471+
}
403472
}
404473

405474
@VisibleForTesting

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ protected JobTopology getJobTopology(
204204

205205
Set<JobVertexID> vertexSet = Set.copyOf(t.getVerticesInTopologicalOrder());
206206
updateVertexList(stateStore, ctx, clock.instant(), vertexSet);
207-
updateKafkaPulsarSourceMaxParallelisms(ctx, jobDetailsInfo.getJobId(), t);
207+
updateKafkaPulsarSourceNumPartitions(ctx, jobDetailsInfo.getJobId(), t);
208208
excludeVerticesFromScaling(ctx.getConfiguration(), t.getFinishedVertices());
209209
return t;
210210
}
@@ -249,7 +249,7 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
249249
json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished);
250250
}
251251

252-
private void updateKafkaPulsarSourceMaxParallelisms(
252+
private void updateKafkaPulsarSourceNumPartitions(
253253
Context ctx, JobID jobId, JobTopology topology) throws Exception {
254254
try (var restClient = ctx.getRestClusterClient()) {
255255
Pattern partitionRegex =
@@ -284,7 +284,7 @@ private void updateKafkaPulsarSourceMaxParallelisms(
284284
"Updating source {} max parallelism based on available partitions to {}",
285285
sourceVertex,
286286
numPartitions);
287-
topology.get(sourceVertex).updateMaxParallelism((int) numPartitions);
287+
topology.get(sourceVertex).setNumPartitions((int) numPartitions);
288288
}
289289
}
290290
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED;
5656
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
5757
import static org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED;
58+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_PARTITIONS;
5859
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_TASK_SLOTS_USED;
5960
import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR;
6061
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
@@ -166,6 +167,10 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
166167

167168
evaluatedMetrics.put(
168169
MAX_PARALLELISM, EvaluatedScalingMetric.of(vertexInfo.getMaxParallelism()));
170+
171+
evaluatedMetrics.put(
172+
NUM_PARTITIONS, EvaluatedScalingMetric.of(vertexInfo.getNumPartitions()));
173+
169174
computeProcessingRateThresholds(evaluatedMetrics, conf, processingBacklog, restartTime);
170175
return evaluatedMetrics;
171176
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public enum ScalingMetric {
5353

5454
/** Job vertex max parallelism. */
5555
MAX_PARALLELISM(false),
56+
57+
/** Source vertex partition count. */
58+
NUM_PARTITIONS(false),
5659
/** Upper boundary of the target data rate range. */
5760
SCALE_UP_RATE_THRESHOLD(false),
5861

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ public class VertexInfo {
4646
@Setter(AccessLevel.NONE)
4747
private int maxParallelism;
4848

49+
@Setter private int numPartitions;
50+
4951
private final int originalMaxParallelism;
5052

5153
private final boolean finished;
@@ -99,8 +101,4 @@ public VertexInfo(
99101
int maxParallelism) {
100102
this(id, inputs, parallelism, maxParallelism, null);
101103
}
102-
103-
public void updateMaxParallelism(int maxParallelism) {
104-
this.maxParallelism = Math.min(originalMaxParallelism, maxParallelism);
105-
}
106104
}

0 commit comments

Comments
 (0)