Skip to content

Commit a3ef048

Browse files
committed
[FLINK-34947] Only scale down JM in Foreground deletion propagation and reduce timeout
1 parent 8125456 commit a3ef048

File tree

2 files changed

+98
-71
lines changed

2 files changed

+98
-71
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import io.fabric8.kubernetes.client.dsl.EditReplacePatchable;
6161
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
6262
import io.fabric8.kubernetes.client.dsl.base.PatchType;
63+
import org.apache.commons.lang3.ObjectUtils;
6364
import org.slf4j.Logger;
6465
import org.slf4j.LoggerFactory;
6566

@@ -80,6 +81,7 @@ public class NativeFlinkService extends AbstractFlinkService {
8081
private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class);
8182
private static final Deployment SCALE_TO_ZERO =
8283
new DeploymentBuilder().editOrNewSpec().withReplicas(0).endSpec().build();
84+
private static final Duration JM_SHUTDOWN_MAX_WAIT = Duration.ofMinutes(1);
8385
private final EventRecorder eventRecorder;
8486

8587
public NativeFlinkService(
@@ -164,12 +166,15 @@ protected void deleteClusterInternal(
164166
.inNamespace(namespace)
165167
.withName(KubernetesUtils.getDeploymentName(clusterId));
166168

167-
var remainingTimeout =
168-
scaleJmToZeroBlocking(
169-
jmDeployment,
170-
namespace,
171-
clusterId,
172-
operatorConfig.getFlinkShutdownClusterTimeout());
169+
var remainingTimeout = operatorConfig.getFlinkShutdownClusterTimeout();
170+
171+
// We shut down the JobManager first in the (default) Foreground propagation case to have a
172+
// cleaner exit
173+
if (deletionPropagation == DeletionPropagation.FOREGROUND) {
174+
remainingTimeout =
175+
shutdownJobManagersBlocking(
176+
jmDeployment, namespace, clusterId, remainingTimeout);
177+
}
173178
deleteDeploymentBlocking("JobManager", jmDeployment, deletionPropagation, remainingTimeout);
174179
}
175180

@@ -306,34 +311,42 @@ protected Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(
306311
}
307312

308313
/**
309-
* Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped.
310-
* This avoids race conditions between JM shutdown and TM shutdown / failure handling.
314+
* Shut down JobManagers gracefully by scaling JM deployment to zero. This avoids race
315+
* conditions between JM shutdown and TM shutdown / failure handling.
311316
*
312317
* @param jmDeployment
313318
* @param namespace
314319
* @param clusterId
315-
* @param timeout
320+
* @param remainingTimeout
316321
* @return Remaining timeout after the operation.
317322
*/
318-
private Duration scaleJmToZeroBlocking(
323+
private Duration shutdownJobManagersBlocking(
319324
EditReplacePatchable<Deployment> jmDeployment,
320325
String namespace,
321326
String clusterId,
322-
Duration timeout) {
323-
return deleteBlocking(
324-
"Scaling JobManager Deployment to zero",
325-
() -> {
326-
try {
327-
jmDeployment.patch(PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
328-
} catch (Exception ignore) {
329-
// Ignore all errors here as this is an optional step
330-
return null;
331-
}
332-
return kubernetesClient
333-
.pods()
334-
.inNamespace(namespace)
335-
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
336-
},
337-
timeout);
327+
Duration remainingTimeout) {
328+
329+
// We use only half of the shutdown timeout but at most one minute as the main point
330+
// here is to initiate JM shutdown before the TMs
331+
var jmShutdownTimeout =
332+
ObjectUtils.min(JM_SHUTDOWN_MAX_WAIT, remainingTimeout.dividedBy(2));
333+
var remaining =
334+
deleteBlocking(
335+
"Scaling JobManager Deployment to zero",
336+
() -> {
337+
try {
338+
jmDeployment.patch(
339+
PatchContext.of(PatchType.JSON_MERGE), SCALE_TO_ZERO);
340+
} catch (Exception ignore) {
341+
// Ignore all errors here as this is an optional step
342+
return null;
343+
}
344+
return kubernetesClient
345+
.pods()
346+
.inNamespace(namespace)
347+
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId));
348+
},
349+
jmShutdownTimeout);
350+
return remainingTimeout.minus(jmShutdownTimeout).plus(remaining);
338351
}
339352
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java

Lines changed: 59 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
3939
import org.apache.flink.kubernetes.operator.utils.EventCollector;
4040
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
41-
import org.apache.flink.runtime.execution.ExecutionState;
41+
import org.apache.flink.kubernetes.utils.KubernetesUtils;
4242
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
4343
import org.apache.flink.runtime.jobgraph.JobVertexID;
4444
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
@@ -47,22 +47,22 @@
4747
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
4848
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
4949
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
50-
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
5150
import org.apache.flink.util.concurrent.Executors;
5251

5352
import io.fabric8.kubernetes.api.model.DeletionPropagation;
54-
import io.fabric8.kubernetes.api.model.apps.Deployment;
53+
import io.fabric8.kubernetes.api.model.PodBuilder;
5554
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
5655
import io.fabric8.kubernetes.client.KubernetesClient;
57-
import io.fabric8.kubernetes.client.dsl.Resource;
5856
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
5957
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
6058
import org.junit.jupiter.api.BeforeEach;
6159
import org.junit.jupiter.api.Test;
6260
import org.junit.jupiter.params.ParameterizedTest;
61+
import org.junit.jupiter.params.provider.EnumSource;
6362
import org.junit.jupiter.params.provider.MethodSource;
6463

6564
import java.time.Duration;
65+
import java.time.Instant;
6666
import java.util.List;
6767
import java.util.Map;
6868
import java.util.Optional;
@@ -76,7 +76,6 @@
7676
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT;
7777
import static org.junit.jupiter.api.Assertions.assertEquals;
7878
import static org.junit.jupiter.api.Assertions.assertFalse;
79-
import static org.junit.jupiter.api.Assertions.assertNotNull;
8079
import static org.junit.jupiter.api.Assertions.assertNull;
8180
import static org.junit.jupiter.api.Assertions.assertTrue;
8281
import static org.junit.jupiter.api.Assertions.fail;
@@ -107,25 +106,19 @@ public void setup() {
107106
executorService = Executors.newDirectExecutorService();
108107
}
109108

110-
@Test
111-
public void testDeleteClusterInternal() {
112-
109+
@ParameterizedTest
110+
@EnumSource(DeletionPropagation.class)
111+
public void testDeleteClusterInternal(DeletionPropagation propagation) {
112+
var timeout = Duration.ofSeconds(4);
113+
configuration.set(
114+
KubernetesOperatorConfigOptions.OPERATOR_RESOURCE_CLEANUP_TIMEOUT, timeout);
113115
var flinkService =
114116
new NativeFlinkService(
115-
client, null, executorService, operatorConfig, eventRecorder) {
116-
117-
@Override
118-
protected Duration deleteDeploymentBlocking(
119-
String name,
120-
Resource<Deployment> deployment,
121-
DeletionPropagation propagation,
122-
Duration timeout) {
123-
// Ensure deployment is scaled down before deletion
124-
assertEquals(0, deployment.get().getSpec().getReplicas());
125-
return super.deleteDeploymentBlocking(
126-
name, deployment, propagation, timeout);
127-
}
128-
};
117+
client,
118+
null,
119+
executorService,
120+
FlinkOperatorConfiguration.fromConfiguration(configuration),
121+
eventRecorder);
129122

130123
var deployment = TestUtils.buildApplicationCluster();
131124
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
@@ -141,18 +134,55 @@ protected Duration deleteDeploymentBlocking(
141134
.endSpec()
142135
.build();
143136
client.resource(dep).create();
144-
assertNotNull(
145-
client.apps()
146-
.deployments()
147-
.inNamespace(TestUtils.TEST_NAMESPACE)
148-
.withName(TestUtils.TEST_DEPLOYMENT_NAME)
149-
.get());
150137

138+
var patched = new AtomicBoolean(false);
139+
mockServer
140+
.expect()
141+
.patch()
142+
.withPath(
143+
String.format(
144+
"/apis/apps/v1/namespaces/%s/deployments/%s",
145+
TestUtils.TEST_NAMESPACE, TestUtils.TEST_DEPLOYMENT_NAME))
146+
.andReply(
147+
200,
148+
req -> {
149+
patched.set(true);
150+
return deployment;
151+
})
152+
.always();
153+
154+
// We create the JM pod explicitly here, this will block the JM scale down action
155+
// indefinitely and we use this to verify the correct timeout enforcement
156+
var jmPod =
157+
new PodBuilder()
158+
.withNewMetadata()
159+
.withName("JM")
160+
.withLabels(
161+
KubernetesUtils.getJobManagerSelectors(
162+
TestUtils.TEST_DEPLOYMENT_NAME))
163+
.withNamespace(TestUtils.TEST_NAMESPACE)
164+
.endMetadata()
165+
.build();
166+
client.resource(jmPod).create();
167+
168+
var start = Instant.now();
151169
flinkService.deleteClusterInternal(
152170
deployment.getMetadata().getNamespace(),
153171
deployment.getMetadata().getName(),
154172
configManager.getObserveConfig(deployment),
155-
DeletionPropagation.FOREGROUND);
173+
propagation);
174+
var measured = Duration.between(start, Instant.now());
175+
176+
// Do not scale JM deployment during orphan deletion
177+
if (propagation == DeletionPropagation.FOREGROUND) {
178+
assertTrue(patched.get());
179+
// We make sure that we dont use up the entire timeout for jm deletion
180+
assertTrue(timeout.minus(measured).toSeconds() > 0);
181+
// Validate that we actually waited 2 seconds
182+
assertTrue(measured.toSeconds() > 1);
183+
} else {
184+
assertFalse(patched.get());
185+
}
156186

157187
assertNull(
158188
client.apps()
@@ -476,22 +506,6 @@ private void testScaleConditionLastSpec(
476506
scaled);
477507
}
478508

479-
private JobDetailsInfo.JobVertexDetailsInfo jobVertexDetailsInfo(
480-
JobVertexID jvi, int parallelism) {
481-
var ioMetricsInfo = new IOMetricsInfo(0, false, 0, false, 0, false, 0, false, 0L, 0L, 0.);
482-
return new JobDetailsInfo.JobVertexDetailsInfo(
483-
jvi,
484-
"",
485-
900,
486-
parallelism,
487-
ExecutionState.RUNNING,
488-
0,
489-
0,
490-
0,
491-
Map.of(),
492-
ioMetricsInfo);
493-
}
494-
495509
@Test
496510
public void resourceRestApiTest() throws Exception {
497511
var testingClusterClient = new TestingClusterClient<String>(configuration);

0 commit comments

Comments
 (0)