Skip to content

Commit fb7da34

Browse files
author
huyuanfeng
committed
fix comment
1 parent 16b5d4a commit fb7da34

File tree

8 files changed

+33
-34
lines changed

8 files changed

+33
-34
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
5151
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
5252
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
53-
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_PARTITIONS;
53+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS;
5454
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
5555
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
5656
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
@@ -71,7 +71,9 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
7171

7272
@VisibleForTesting
7373
protected static final String SCALE_LIMITED_MESSAGE_FORMAT =
74-
"Scaling limited detected for %s (expected parallelism: %s, actual parallelism %s). Scaling limited due to %s";
74+
"Scaling limited detected for %s (expected parallelism: %s, actual parallelism %s). "
75+
+ "Scaling limited due to source partitions : %s,"
76+
+ "upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, parallelismLowerLimit: %s.";
7577

7678
private Clock clock = Clock.system(ZoneId.systemDefault());
7779

@@ -203,7 +205,7 @@ public ParallelismChange computeScaleTargetParallelism(
203205
vertex,
204206
currentParallelism,
205207
inputShipStrategies,
206-
(int) evaluatedMetrics.get(NUM_PARTITIONS).getCurrent(),
208+
(int) evaluatedMetrics.get(NUM_SOURCE_PARTITIONS).getCurrent(),
207209
(int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
208210
scaleFactor,
209211
Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
@@ -358,14 +360,14 @@ private boolean detectIneffectiveScaleUp(
358360
* remainder.
359361
*
360362
* <p>This method also attempts to adjust the parallelism to ensure it aligns well with the
361-
* number of partitions if a vertex has a known partition count.
363+
* number of source partitions if a vertex has a known source partition count.
362364
*/
363365
@VisibleForTesting
364366
protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
365367
JobVertexID vertex,
366368
int currentParallelism,
367369
Collection<ShipStrategy> inputShipStrategies,
368-
int numPartitions,
370+
int numSourcePartitions,
369371
int maxParallelism,
370372
double scaleFactor,
371373
int parallelismLowerLimit,
@@ -401,19 +403,19 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
401403
// Apply min/max parallelism
402404
newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound);
403405

404-
var adjustByMaxParallelism =
406+
var adjustByMaxParallelismOrPartitions =
405407
inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH);
406-
if (!adjustByMaxParallelism) {
408+
if (!adjustByMaxParallelismOrPartitions) {
407409
return newParallelism;
408410
}
409411

410412
int numKeyGroupsOrPartitions = maxParallelism;
411413
int upperBoundForAlignment;
412-
if (numPartitions <= 0) {
414+
if (numSourcePartitions <= 0) {
413415
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
414416
} else {
415-
upperBoundForAlignment = Math.min(numPartitions, upperBound);
416-
numKeyGroupsOrPartitions = numPartitions;
417+
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
418+
numKeyGroupsOrPartitions = numSourcePartitions;
417419
}
418420

419421
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
@@ -425,33 +427,32 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
425427
}
426428
}
427429

428-
if (numPartitions > 0) {
430+
if (numSourcePartitions > 0) {
429431

430432
// When adjust the parallelism after rounding up cannot be evenly divided by source
431-
// numPartitions, Try to find the smallest parallelism that can satisfy the current
433+
// numSourcePartitions, Try to find the smallest parallelism that can satisfy the
434+
// current
432435
// consumption rate.
433436
int p = newParallelism;
434437
for (; p > 0; p--) {
435-
if (numPartitions / p > numPartitions / newParallelism) {
436-
if (numPartitions % p != 0) {
438+
if (numSourcePartitions / p > numSourcePartitions / newParallelism) {
439+
if (numSourcePartitions % p != 0) {
437440
p++;
438441
}
439442
break;
440443
}
441444
}
442445

443446
p = Math.max(p, parallelismLowerLimit);
444-
445447
var message =
446448
String.format(
447449
SCALE_LIMITED_MESSAGE_FORMAT,
448450
vertex,
449451
newParallelism,
450452
p,
451-
String.format(
452-
"numPartitions : %s,upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, "
453-
+ "parallelismLowerLimit: %s.",
454-
numPartitions, upperBound, parallelismLowerLimit));
453+
numSourcePartitions,
454+
upperBound,
455+
parallelismLowerLimit);
455456
eventHandler.handleEvent(
456457
context,
457458
AutoScalerEventHandler.Type.Warning,

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ private void updateKafkaPulsarSourceNumPartitions(
284284
"Updating source {} max parallelism based on available partitions to {}",
285285
sourceVertex,
286286
numPartitions);
287-
topology.get(sourceVertex).setNumPartitions((int) numPartitions);
287+
topology.get(sourceVertex).setNumSourcePartitions((int) numPartitions);
288288
}
289289
}
290290
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED;
5656
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
5757
import static org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED;
58-
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_PARTITIONS;
58+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS;
5959
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_TASK_SLOTS_USED;
6060
import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR;
6161
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
@@ -169,7 +169,8 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
169169
MAX_PARALLELISM, EvaluatedScalingMetric.of(vertexInfo.getMaxParallelism()));
170170

171171
evaluatedMetrics.put(
172-
NUM_PARTITIONS, EvaluatedScalingMetric.of(vertexInfo.getNumPartitions()));
172+
NUM_SOURCE_PARTITIONS,
173+
EvaluatedScalingMetric.of(vertexInfo.getNumSourcePartitions()));
173174

174175
computeProcessingRateThresholds(evaluatedMetrics, conf, processingBacklog, restartTime);
175176
return evaluatedMetrics;

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public enum ScalingMetric {
5555
MAX_PARALLELISM(false),
5656

5757
/** Source vertex partition count. */
58-
NUM_PARTITIONS(false),
58+
NUM_SOURCE_PARTITIONS(false),
5959
/** Upper boundary of the target data rate range. */
6060
SCALE_UP_RATE_THRESHOLD(false),
6161

@@ -104,7 +104,7 @@ public enum ScalingMetric {
104104
PARALLELISM,
105105
RECOMMENDED_PARALLELISM,
106106
MAX_PARALLELISM,
107-
NUM_PARTITIONS,
107+
NUM_SOURCE_PARTITIONS,
108108
SCALE_UP_RATE_THRESHOLD,
109109
SCALE_DOWN_RATE_THRESHOLD,
110110
EXPECTED_PROCESSING_RATE);

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ public class VertexInfo {
4646
@Setter(AccessLevel.NONE)
4747
private int maxParallelism;
4848

49-
@Setter private int numPartitions;
50-
51-
private final int originalMaxParallelism;
49+
@Setter private int numSourcePartitions;
5250

5351
private final boolean finished;
5452

@@ -67,7 +65,6 @@ public VertexInfo(
6765
this.inputs = inputs;
6866
this.parallelism = parallelism;
6967
this.maxParallelism = maxParallelism;
70-
this.originalMaxParallelism = maxParallelism;
7168
this.finished = finished;
7269
this.ioMetrics = ioMetrics;
7370
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,7 @@ public void testNumPartitionsAdjustment(Collection<ShipStrategy> inputShipStrate
10101010
eventCollector,
10111011
context));
10121012

1013-
// numPartition > upperBound
1013+
// numSourcePartition > upperBound
10141014
assertEquals(
10151015
100,
10161016
JobVertexScaler.scale(
@@ -1077,7 +1077,7 @@ public void testSendingScalingLimitedEvents(Collection<ShipStrategy> inputShipSt
10771077
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
10781078
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
10791079
var evaluated = evaluated(10, 200, 100);
1080-
evaluated.put(ScalingMetric.NUM_PARTITIONS, EvaluatedScalingMetric.of(15));
1080+
evaluated.put(ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(15));
10811081
var history = new TreeMap<Instant, ScalingSummary>();
10821082
var delayedScaleDown = new DelayedScaleDown();
10831083
// partition limited
@@ -1114,7 +1114,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
11141114
var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
11151115
metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
11161116
metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720));
1117-
metrics.put(ScalingMetric.NUM_PARTITIONS, EvaluatedScalingMetric.of(0));
1117+
metrics.put(ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(0));
11181118
metrics.put(
11191119
ScalingMetric.TARGET_DATA_RATE,
11201120
new EvaluatedScalingMetric(targetDataRate, targetDataRate));

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public void testKafkaPulsarNumPartitions() throws Exception {
255255
"1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.3.currentOffset")));
256256

257257
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
258-
assertEquals(5, collectedMetrics.getJobTopology().get(source1).getNumPartitions());
258+
assertEquals(5, collectedMetrics.getJobTopology().get(source1).getNumSourcePartitions());
259259

260260
metricsCollector.setMetricNames(
261261
Map.of(
@@ -274,7 +274,7 @@ public void testKafkaPulsarNumPartitions() throws Exception {
274274
"0.Source__pulsar_source[1].PulsarConsumer"
275275
+ ".persistent_//public/default/testTopic-partition-4.m962n.numMsgsReceived")));
276276
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
277-
assertEquals(5, collectedMetrics.getJobTopology().get(source2).getNumPartitions());
277+
assertEquals(5, collectedMetrics.getJobTopology().get(source2).getNumSourcePartitions());
278278
}
279279

280280
@Test

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -880,7 +880,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
880880
var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
881881
metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
882882
metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(MAX_PARALLELISM));
883-
metrics.put(ScalingMetric.NUM_PARTITIONS, EvaluatedScalingMetric.of(0));
883+
metrics.put(ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(0));
884884
metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target));
885885
metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate));
886886
metrics.put(

0 commit comments

Comments
 (0)