Skip to content

Commit 7d90ce8

Browse files
[FLINK-35896][autoscaler] Handle exception events in RescaleApiScalingRealizer (#856)
1 parent 5296d63 commit 7d90ce8

File tree

4 files changed

+55
-10
lines changed

4 files changed

+55
-10
lines changed

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public RescaleApiScalingRealizer(AutoScalerEventHandler<KEY, Context> eventHandl
6666

6767
@Override
6868
public void realizeParallelismOverrides(
69-
Context context, Map<String, String> parallelismOverrides) {
69+
Context context, Map<String, String> parallelismOverrides) throws Exception {
7070
Configuration conf = context.getConfiguration();
7171
if (!conf.get(JobManagerOptions.SCHEDULER)
7272
.equals(JobManagerOptions.SchedulerType.Adaptive)) {
@@ -121,8 +121,6 @@ public void realizeParallelismOverrides(
121121
} else {
122122
LOG.info("Vertex resources requirements already match target, nothing to do...");
123123
}
124-
} catch (Exception e) {
125-
LOG.warn("Failed to apply parallelism overrides.", e);
126124
}
127125
}
128126

flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class RescaleApiScalingRealizerTest {
5959
*/
6060
@ParameterizedTest
6161
@ValueSource(booleans = {true, false})
62-
void testUpdateResourceRequirements(boolean resourceIsChanged) {
62+
void testUpdateResourceRequirements(boolean resourceIsChanged) throws Exception {
6363
var jobID = new JobID();
6464
var jobVertex1 = new JobVertexID().toHexString();
6565
var jobVertex2 = new JobVertexID().toHexString();
@@ -110,7 +110,7 @@ void testUpdateResourceRequirements(boolean resourceIsChanged) {
110110
}
111111

112112
@Test
113-
void testDisableAdaptiveScheduler() {
113+
void testDisableAdaptiveScheduler() throws Exception {
114114
var jobID = new JobID();
115115
var jobVertex1 = new JobVertexID().toHexString();
116116
var jobVertex2 = new JobVertexID().toHexString();
@@ -132,7 +132,7 @@ void testDisableAdaptiveScheduler() {
132132
}
133133

134134
@Test
135-
void testJobNotRunning() {
135+
void testJobNotRunning() throws Exception {
136136
var jobID = new JobID();
137137
var jobVertex1 = new JobVertexID().toHexString();
138138
var jobVertex2 = new JobVertexID().toHexString();

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,18 @@
3333
@Experimental
3434
public interface ScalingRealizer<KEY, Context extends JobAutoScalerContext<KEY>> {
3535

36-
/** Update job's parallelism to parallelismOverrides. */
37-
void realizeParallelismOverrides(Context context, Map<String, String> parallelismOverrides);
36+
/**
37+
* Update job's parallelism to parallelismOverrides.
38+
*
39+
* @throws Exception Error during realize parallelism overrides.
40+
*/
41+
void realizeParallelismOverrides(Context context, Map<String, String> parallelismOverrides)
42+
throws Exception;
3843

39-
/** Updates the TaskManager memory configuration. */
40-
void realizeConfigOverrides(Context context, ConfigChanges configChanges);
44+
/**
45+
* Updates the TaskManager memory configuration.
46+
*
47+
* @throws Exception Error during realize config overrides.
48+
*/
49+
void realizeConfigOverrides(Context context, ConfigChanges configChanges) throws Exception;
4150
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
2626
import org.apache.flink.autoscaler.metrics.ScalingMetric;
2727
import org.apache.flink.autoscaler.metrics.TestMetrics;
28+
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
2829
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
2930
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
3031
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
@@ -190,6 +191,43 @@ protected Collection<String> queryAggregatedMetricNames(
190191
0, autoscaler.flinkMetrics.get(context.getJobKey()).getNumErrorsCount());
191192
}
192193

194+
@Test
195+
public void testRealizeParallelismOverridesExceptions() throws Exception {
196+
JobVertexID jobVertexID = new JobVertexID();
197+
JobTopology jobTopology = new JobTopology(new VertexInfo(jobVertexID, Map.of(), 1, 20));
198+
var metricsCollector =
199+
new TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>>(jobTopology);
200+
ScalingRealizer<JobID, JobAutoScalerContext<JobID>>
201+
realizeParallelismOverridesWithExceptionsScalingRealizer =
202+
new ScalingRealizer<>() {
203+
@Override
204+
public void realizeConfigOverrides(
205+
JobAutoScalerContext context, ConfigChanges configChanges) {}
206+
207+
@Override
208+
public void realizeParallelismOverrides(
209+
JobAutoScalerContext context, Map parallelismOverrides) {
210+
throw new RuntimeException(
211+
"Test Realize Parallelism Overrides Exceptions.");
212+
}
213+
};
214+
stateStore.storeParallelismOverrides(context, Map.of(jobVertexID.toHexString(), "2"));
215+
216+
var autoscaler =
217+
new JobAutoScalerImpl<>(
218+
metricsCollector,
219+
null,
220+
null,
221+
eventCollector,
222+
realizeParallelismOverridesWithExceptionsScalingRealizer,
223+
stateStore);
224+
225+
// Should produce an error
226+
autoscaler.scale(context);
227+
Assertions.assertEquals(
228+
1, autoscaler.flinkMetrics.get(context.getJobKey()).getNumErrorsCount());
229+
}
230+
193231
@Test
194232
void testParallelismOverrides() throws Exception {
195233
var autoscaler =

0 commit comments

Comments
 (0)