Skip to content

Commit 4683333

Browse files
huyuanfenggyfora
authored andcommitted
[hotfix] Fix incorrect messageKey passed in ScalingLimited event
1 parent cfcee02 commit 4683333

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
464464
AutoScalerEventHandler.Type.Warning,
465465
SCALING_LIMITED,
466466
message,
467-
SCALING_LIMITED + vertex + (scaleFactor * currentParallelism),
467+
SCALING_LIMITED + vertex + newParallelism,
468468
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
469469
return p;
470470
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,30 @@ public void testSendingScalingLimitedEvents() {
11141114
.isEqualTo(
11151115
String.format(
11161116
SCALE_LIMITED_MESSAGE_FORMAT, jobVertexID, 20, 15, 15, 200, 1));
1117+
// small changes for scaleFactor, verify that the event messageKey is the same.
1118+
var smallChangesForScaleFactor = evaluated(10, 199, 100);
1119+
smallChangesForScaleFactor.put(
1120+
ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(15));
1121+
assertEquals(
1122+
ParallelismChange.required(15),
1123+
vertexScaler.computeScaleTargetParallelism(
1124+
context,
1125+
jobVertexID,
1126+
List.of(),
1127+
smallChangesForScaleFactor,
1128+
history,
1129+
restartTime,
1130+
delayedScaleDown));
1131+
assertEquals(1, eventCollector.events.size());
1132+
TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>>
1133+
smallChangesForScaleFactorLimitedEvent = eventCollector.events.poll();
1134+
assertThat(partitionLimitedEvent.getMessage())
1135+
.isEqualTo(
1136+
String.format(
1137+
SCALE_LIMITED_MESSAGE_FORMAT, jobVertexID, 20, 15, 15, 200, 1));
1138+
assertThat(smallChangesForScaleFactorLimitedEvent).isNotNull();
1139+
assertThat(partitionLimitedEvent.getMessageKey())
1140+
.isEqualTo(smallChangesForScaleFactorLimitedEvent.getMessageKey());
11171141
}
11181142

11191143
private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(

0 commit comments

Comments
 (0)