Skip to content

Commit b0bc3a3

Browse files
darenwktgyfora
authored andcommitted
[FLINK-37769] Include cause in event when restarting unhealthy job
1 parent b7c429a commit b0bc3a3

File tree

8 files changed

+164
-32
lines changed

8 files changed

+164
-32
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.kubernetes.operator.health;
1919

2020
import org.apache.flink.annotation.Experimental;
21+
import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult;
2122

2223
import com.fasterxml.jackson.core.JsonProcessingException;
2324
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -47,15 +48,15 @@ public class ClusterHealthInfo {
4748
private long numCompletedCheckpointsIncreasedTimeStamp;
4849

4950
/** Calculated field whether the cluster is healthy or not. */
50-
private boolean healthy;
51+
private ClusterHealthResult healthResult;
5152

5253
public ClusterHealthInfo() {
5354
this(Clock.systemDefaultZone());
5455
}
5556

5657
public ClusterHealthInfo(Clock clock) {
5758
timeStamp = clock.millis();
58-
healthy = true;
59+
healthResult = ClusterHealthResult.healthy();
5960
}
6061

6162
public static boolean isValid(ClusterHealthInfo clusterHealthInfo) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -97,25 +97,28 @@ public void evaluate(
9797
LOG.debug("Last valid health info: {}", lastValidClusterHealthInfo);
9898
LOG.debug("Observed health info: {}", observedClusterHealthInfo);
9999

100-
boolean isHealthy =
100+
var jobHealthResult =
101101
evaluateRestarts(
102-
configuration,
103-
clusterInfo,
104-
lastValidClusterHealthInfo,
105-
observedClusterHealthInfo)
106-
&& evaluateCheckpoints(
107-
configuration,
108-
lastValidClusterHealthInfo,
109-
observedClusterHealthInfo);
102+
configuration,
103+
clusterInfo,
104+
lastValidClusterHealthInfo,
105+
observedClusterHealthInfo);
106+
107+
var checkpointHealthResult =
108+
evaluateCheckpoints(
109+
configuration,
110+
lastValidClusterHealthInfo,
111+
observedClusterHealthInfo);
110112

111113
lastValidClusterHealthInfo.setTimeStamp(observedClusterHealthInfo.getTimeStamp());
112-
lastValidClusterHealthInfo.setHealthy(isHealthy);
114+
lastValidClusterHealthInfo.setHealthResult(
115+
jobHealthResult.join(checkpointHealthResult));
113116
setLastValidClusterHealthInfo(clusterInfo, lastValidClusterHealthInfo);
114117
}
115118
}
116119
}
117120

118-
private boolean evaluateRestarts(
121+
private ClusterHealthResult evaluateRestarts(
119122
Configuration configuration,
120123
Map<String, String> clusterInfo,
121124
ClusterHealthInfo lastValidClusterHealthInfo,
@@ -128,7 +131,7 @@ private boolean evaluateRestarts(
128131
lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
129132
lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
130133
observedClusterHealthInfo.getTimeStamp());
131-
return true;
134+
return ClusterHealthResult.healthy();
132135
}
133136

134137
var timestampDiffMs =
@@ -153,9 +156,15 @@ private boolean evaluateRestarts(
153156
LOG.debug("Calculated restart count for {} window: {}", restartCheckWindow, numRestarts);
154157

155158
var restartThreshold = configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
159+
160+
var healthResult = ClusterHealthResult.healthy();
161+
156162
boolean isHealthy = numRestarts <= restartThreshold;
157163
if (!isHealthy) {
158164
LOG.info("Restart count hit threshold: {}", restartThreshold);
165+
healthResult =
166+
ClusterHealthResult.error(
167+
String.format("Restart count hit threshold: %s", restartThreshold));
159168
}
160169

161170
if (lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp()
@@ -167,15 +176,15 @@ private boolean evaluateRestarts(
167176
observedClusterHealthInfo.getTimeStamp());
168177
}
169178

170-
return isHealthy;
179+
return healthResult;
171180
}
172181

173-
private boolean evaluateCheckpoints(
182+
private ClusterHealthResult evaluateCheckpoints(
174183
Configuration configuration,
175184
ClusterHealthInfo lastValidClusterHealthInfo,
176185
ClusterHealthInfo observedClusterHealthInfo) {
177186
if (!configuration.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED)) {
178-
return true;
187+
return ClusterHealthResult.healthy();
179188
}
180189

181190
var windowOpt =
@@ -195,7 +204,7 @@ private boolean evaluateCheckpoints(
195204
if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled()) {
196205
// If no explicit checkpoint check window is specified and checkpointing is disabled
197206
// based on the config, we don't do anything
198-
return true;
207+
return ClusterHealthResult.healthy();
199208
}
200209

201210
var completedCheckpointsCheckWindow =
@@ -226,15 +235,14 @@ private boolean evaluateCheckpoints(
226235
observedClusterHealthInfo.getNumCompletedCheckpoints());
227236
lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
228237
observedClusterHealthInfo.getTimeStamp());
229-
return true;
238+
return ClusterHealthResult.healthy();
230239
}
231240

232241
var timestampDiffMs =
233242
observedClusterHealthInfo.getTimeStamp()
234243
- lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp();
235244
LOG.debug("Time difference between health infos: {}", Duration.ofMillis(timestampDiffMs));
236245

237-
boolean isHealthy = true;
238246
var completedCheckpointsCheckWindowMs = completedCheckpointsCheckWindow.toMillis();
239247

240248
if (observedClusterHealthInfo.getNumCompletedCheckpoints()
@@ -248,9 +256,9 @@ private boolean evaluateCheckpoints(
248256
+ completedCheckpointsCheckWindowMs
249257
< clock.millis()) {
250258
LOG.info("Cluster is not able to complete checkpoints");
251-
isHealthy = false;
259+
return ClusterHealthResult.error("Cluster is not able to complete checkpoints");
252260
}
253261

254-
return isHealthy;
262+
return ClusterHealthResult.healthy();
255263
}
256264
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.observer;
19+
20+
import com.fasterxml.jackson.annotation.JsonCreator;
21+
import com.fasterxml.jackson.annotation.JsonProperty;
22+
import lombok.Value;
23+
24+
import java.util.Objects;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.Stream;
27+
28+
/** Cluster Health Result. */
29+
@Value
30+
public class ClusterHealthResult {
31+
boolean healthy;
32+
String error;
33+
34+
@JsonCreator
35+
public ClusterHealthResult(
36+
@JsonProperty("healthy") boolean healthy, @JsonProperty("error") String error) {
37+
this.healthy = healthy;
38+
this.error = error;
39+
}
40+
41+
public static ClusterHealthResult error(String error) {
42+
return new ClusterHealthResult(false, error);
43+
}
44+
45+
public static ClusterHealthResult healthy() {
46+
return new ClusterHealthResult(true, null);
47+
}
48+
49+
public ClusterHealthResult join(ClusterHealthResult clusterHealthResult) {
50+
boolean isHealthy = this.healthy && clusterHealthResult.healthy;
51+
String error =
52+
Stream.of(this.error, clusterHealthResult.getError())
53+
.filter(Objects::nonNull)
54+
.collect(Collectors.joining("|"));
55+
56+
return new ClusterHealthResult(isHealthy, error);
57+
}
58+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
3636
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
3737
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
38+
import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult;
3839
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
3940
import org.apache.flink.kubernetes.operator.service.FlinkService;
4041
import org.apache.flink.kubernetes.operator.service.SuspendMode;
@@ -268,6 +269,9 @@ public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx)
268269

269270
var deployment = ctx.getResource();
270271
var observeConfig = ctx.getObserveConfig();
272+
var clusterHealthInfo =
273+
ClusterHealthEvaluator.getLastValidClusterHealthInfo(
274+
deployment.getStatus().getClusterInfo());
271275
boolean shouldRestartJobBecauseUnhealthy =
272276
shouldRestartJobBecauseUnhealthy(deployment, observeConfig);
273277
boolean shouldRecoverDeployment = shouldRecoverDeployment(observeConfig, deployment);
@@ -288,7 +292,10 @@ public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx)
288292
EventRecorder.Type.Warning,
289293
EventRecorder.Reason.RestartUnhealthyJob,
290294
EventRecorder.Component.Job,
291-
MSG_RESTART_UNHEALTHY,
295+
Optional.ofNullable(clusterHealthInfo)
296+
.map(ClusterHealthInfo::getHealthResult)
297+
.map(ClusterHealthResult::getError)
298+
.orElse(MSG_RESTART_UNHEALTHY),
292299
ctx.getKubernetesClient());
293300
cleanupAfterFailedJob(ctx);
294301
}
@@ -312,7 +319,7 @@ private boolean shouldRestartJobBecauseUnhealthy(
312319
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
313320
if (clusterHealthInfo != null) {
314321
LOG.debug("Cluster info contains job health info");
315-
if (!clusterHealthInfo.isHealthy()) {
322+
if (!clusterHealthInfo.getHealthResult().isHealthy()) {
316323
if (deployment.getSpec().getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
317324
LOG.debug("Stateless job, recovering unhealthy jobmanager deployment");
318325
restartNeeded = true;

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.kubernetes.operator.health;
1919

20+
import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult;
21+
2022
import org.junit.jupiter.api.Test;
2123

2224
import java.time.Clock;
@@ -43,11 +45,13 @@ public void isValidShouldReturnTrueWhenTimestampIsNonzero() {
4345

4446
@Test
4547
public void deserializeWithOldVersionShouldDeserializeCorrectly() {
46-
var clusterHealthInfoJson = "{\"timeStamp\":1,\"numRestarts\":2,\"healthy\":true}";
48+
var clusterHealthInfoJson =
49+
"{\"timeStamp\":1,\"numRestarts\":2,\"healthResult\": {\"healthy\":false, \"error\":\"test-error\"}}}}";
4750
var clusterHealthInfoFromJson = ClusterHealthInfo.deserialize(clusterHealthInfoJson);
4851
assertEquals(1, clusterHealthInfoFromJson.getTimeStamp());
4952
assertEquals(2, clusterHealthInfoFromJson.getNumRestarts());
50-
assertTrue(clusterHealthInfoFromJson.isHealthy());
53+
assertFalse(clusterHealthInfoFromJson.getHealthResult().isHealthy());
54+
assertEquals("test-error", clusterHealthInfoFromJson.getHealthResult().getError());
5155
}
5256

5357
@Test
@@ -58,7 +62,7 @@ public void serializationRoundTrip() {
5862
clusterHealthInfo.setNumRestartsEvaluationTimeStamp(3);
5963
clusterHealthInfo.setNumCompletedCheckpoints(4);
6064
clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(5);
61-
clusterHealthInfo.setHealthy(false);
65+
clusterHealthInfo.setHealthResult(ClusterHealthResult.error("error"));
6266
var clusterHealthInfoJson = ClusterHealthInfo.serialize(clusterHealthInfo);
6367

6468
var clusterHealthInfoFromJson = ClusterHealthInfo.deserialize(clusterHealthInfoJson);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,6 @@ private void assertClusterHealthIs(boolean healthy) {
358358
var lastValidClusterHealthInfo =
359359
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
360360
assertNotNull(lastValidClusterHealthInfo);
361-
assertEquals(healthy, lastValidClusterHealthInfo.isHealthy());
361+
assertEquals(healthy, lastValidClusterHealthInfo.getHealthResult().isHealthy());
362362
}
363363
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.observer;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import static org.junit.jupiter.api.Assertions.assertEquals;
23+
import static org.junit.jupiter.api.Assertions.assertFalse;
24+
import static org.junit.jupiter.api.Assertions.assertNull;
25+
import static org.junit.jupiter.api.Assertions.assertTrue;
26+
27+
class ClusterHealthResultTest {
28+
29+
@Test
30+
void error() {
31+
ClusterHealthResult clusterHealthResult = ClusterHealthResult.error("test-error");
32+
33+
assertFalse(clusterHealthResult.isHealthy());
34+
assertEquals("test-error", clusterHealthResult.getError());
35+
}
36+
37+
@Test
38+
void healthy() {
39+
ClusterHealthResult clusterHealthResult = ClusterHealthResult.healthy();
40+
41+
assertTrue(clusterHealthResult.isHealthy());
42+
assertNull(clusterHealthResult.getError());
43+
}
44+
45+
@Test
46+
void join() {
47+
ClusterHealthResult clusterHealthResult =
48+
ClusterHealthResult.healthy()
49+
.join(ClusterHealthResult.error("test-error-1"))
50+
.join(ClusterHealthResult.error("test-error-2"));
51+
52+
assertFalse(clusterHealthResult.isHealthy());
53+
assertEquals("test-error-1|test-error-2", clusterHealthResult.getError());
54+
}
55+
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
6666
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
6767
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
68+
import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult;
6869
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
6970
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
7071
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
@@ -128,7 +129,6 @@
128129
import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT;
129130
import static org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.MSG_SUBMIT;
130131
import static org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.MSG_RECOVERY;
131-
import static org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.MSG_RESTART_UNHEALTHY;
132132
import static org.apache.flink.kubernetes.operator.utils.SnapshotUtils.getLastSnapshotStatus;
133133
import static org.assertj.core.api.Assertions.assertThat;
134134
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -1223,12 +1223,11 @@ public void testRestartUnhealthyEvent() throws Exception {
12231223
var clusterHealthInfo = new ClusterHealthInfo();
12241224
clusterHealthInfo.setTimeStamp(System.currentTimeMillis());
12251225
clusterHealthInfo.setNumRestarts(2);
1226-
clusterHealthInfo.setHealthy(false);
1226+
clusterHealthInfo.setHealthResult(ClusterHealthResult.error("error"));
12271227
ClusterHealthEvaluator.setLastValidClusterHealthInfo(
12281228
deployment.getStatus().getClusterInfo(), clusterHealthInfo);
12291229
reconciler.reconcile(deployment, context);
1230-
Assertions.assertEquals(
1231-
MSG_RESTART_UNHEALTHY, flinkResourceEventCollector.events.remove().getMessage());
1230+
Assertions.assertEquals("error", flinkResourceEventCollector.events.remove().getMessage());
12321231
}
12331232

12341233
@Test

0 commit comments

Comments
 (0)