Skip to content

Commit 7dc6496

Browse files
committed
[FLINK-37769] Include cause in event when restarting unhealthy job
1 parent 27daa5a commit 7dc6496

File tree

8 files changed

+147
-29
lines changed

8 files changed

+147
-29
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.kubernetes.operator.health;
1919

2020
import org.apache.flink.annotation.Experimental;
21+
import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult;
2122

2223
import com.fasterxml.jackson.core.JsonProcessingException;
2324
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -47,15 +48,15 @@ public class ClusterHealthInfo {
4748
private long numCompletedCheckpointsIncreasedTimeStamp;
4849

4950
/** Calculated field whether the cluster is healthy or not. */
50-
private boolean healthy;
51+
private ClusterHealthResult healthResult;
5152

5253
public ClusterHealthInfo() {
5354
this(Clock.systemDefaultZone());
5455
}
5556

5657
public ClusterHealthInfo(Clock clock) {
5758
timeStamp = clock.millis();
58-
healthy = true;
59+
healthResult = ClusterHealthResult.healthy();
5960
}
6061

6162
public static boolean isValid(ClusterHealthInfo clusterHealthInfo) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -97,25 +97,28 @@ public void evaluate(
9797
LOG.debug("Last valid health info: {}", lastValidClusterHealthInfo);
9898
LOG.debug("Observed health info: {}", observedClusterHealthInfo);
9999

100-
boolean isHealthy =
100+
ClusterHealthResult jobHealthResult =
101101
evaluateRestarts(
102-
configuration,
103-
clusterInfo,
104-
lastValidClusterHealthInfo,
105-
observedClusterHealthInfo)
106-
&& evaluateCheckpoints(
107-
configuration,
108-
lastValidClusterHealthInfo,
109-
observedClusterHealthInfo);
102+
configuration,
103+
clusterInfo,
104+
lastValidClusterHealthInfo,
105+
observedClusterHealthInfo);
106+
107+
ClusterHealthResult checkpointHealthResult =
108+
evaluateCheckpoints(
109+
configuration,
110+
lastValidClusterHealthInfo,
111+
observedClusterHealthInfo);
110112

111113
lastValidClusterHealthInfo.setTimeStamp(observedClusterHealthInfo.getTimeStamp());
112-
lastValidClusterHealthInfo.setHealthy(isHealthy);
114+
lastValidClusterHealthInfo.setHealthResult(
115+
jobHealthResult.join(checkpointHealthResult));
113116
setLastValidClusterHealthInfo(clusterInfo, lastValidClusterHealthInfo);
114117
}
115118
}
116119
}
117120

118-
private boolean evaluateRestarts(
121+
private ClusterHealthResult evaluateRestarts(
119122
Configuration configuration,
120123
Map<String, String> clusterInfo,
121124
ClusterHealthInfo lastValidClusterHealthInfo,
@@ -128,7 +131,7 @@ private boolean evaluateRestarts(
128131
lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts());
129132
lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp(
130133
observedClusterHealthInfo.getTimeStamp());
131-
return true;
134+
return ClusterHealthResult.healthy();
132135
}
133136

134137
var timestampDiffMs =
@@ -153,9 +156,15 @@ private boolean evaluateRestarts(
153156
LOG.debug("Calculated restart count for {} window: {}", restartCheckWindow, numRestarts);
154157

155158
var restartThreshold = configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
159+
160+
ClusterHealthResult healthResult = ClusterHealthResult.healthy();
161+
156162
boolean isHealthy = numRestarts <= restartThreshold;
157163
if (!isHealthy) {
158164
LOG.info("Restart count hit threshold: {}", restartThreshold);
165+
healthResult =
166+
ClusterHealthResult.error(
167+
String.format("Restart count hit threshold: %s", restartThreshold));
159168
}
160169

161170
if (lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp()
@@ -167,15 +176,15 @@ private boolean evaluateRestarts(
167176
observedClusterHealthInfo.getTimeStamp());
168177
}
169178

170-
return isHealthy;
179+
return healthResult;
171180
}
172181

173-
private boolean evaluateCheckpoints(
182+
private ClusterHealthResult evaluateCheckpoints(
174183
Configuration configuration,
175184
ClusterHealthInfo lastValidClusterHealthInfo,
176185
ClusterHealthInfo observedClusterHealthInfo) {
177186
if (!configuration.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED)) {
178-
return true;
187+
return ClusterHealthResult.healthy();
179188
}
180189

181190
var windowOpt =
@@ -195,7 +204,7 @@ private boolean evaluateCheckpoints(
195204
if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled()) {
196205
// If no explicit checkpoint check window is specified and checkpointing is disabled
197206
// based on the config, we don't do anything
198-
return true;
207+
return ClusterHealthResult.healthy();
199208
}
200209

201210
var completedCheckpointsCheckWindow =
@@ -226,15 +235,14 @@ private boolean evaluateCheckpoints(
226235
observedClusterHealthInfo.getNumCompletedCheckpoints());
227236
lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(
228237
observedClusterHealthInfo.getTimeStamp());
229-
return true;
238+
return ClusterHealthResult.healthy();
230239
}
231240

232241
var timestampDiffMs =
233242
observedClusterHealthInfo.getTimeStamp()
234243
- lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp();
235244
LOG.debug("Time difference between health infos: {}", Duration.ofMillis(timestampDiffMs));
236245

237-
boolean isHealthy = true;
238246
var completedCheckpointsCheckWindowMs = completedCheckpointsCheckWindow.toMillis();
239247

240248
if (observedClusterHealthInfo.getNumCompletedCheckpoints()
@@ -248,9 +256,9 @@ private boolean evaluateCheckpoints(
248256
+ completedCheckpointsCheckWindowMs
249257
< clock.millis()) {
250258
LOG.info("Cluster is not able to complete checkpoints");
251-
isHealthy = false;
259+
return ClusterHealthResult.error("Cluster is not able to complete checkpoints");
252260
}
253261

254-
return isHealthy;
262+
return ClusterHealthResult.healthy();
255263
}
256264
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.observer;
19+
20+
import org.apache.flink.annotation.Experimental;
21+
22+
import lombok.AllArgsConstructor;
23+
import lombok.Data;
24+
import lombok.NoArgsConstructor;
25+
26+
import java.util.Objects;
27+
import java.util.stream.Collectors;
28+
import java.util.stream.Stream;
29+
30+
/** Cluster Health Result. */
31+
@Experimental
32+
@Data
33+
@AllArgsConstructor
34+
@NoArgsConstructor
35+
public class ClusterHealthResult {
36+
boolean healthy;
37+
String error;
38+
39+
public static ClusterHealthResult error(String error) {
40+
return new ClusterHealthResult(false, error);
41+
}
42+
43+
public static ClusterHealthResult healthy() {
44+
return new ClusterHealthResult(true, null);
45+
}
46+
47+
public ClusterHealthResult join(ClusterHealthResult clusterHealthResult) {
48+
boolean isHealthy = this.healthy && clusterHealthResult.healthy;
49+
String error =
50+
Stream.of(this.error, clusterHealthResult.getError())
51+
.filter(Objects::nonNull)
52+
.collect(Collectors.joining("|"));
53+
54+
return new ClusterHealthResult(isHealthy, error);
55+
}
56+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx)
268268

269269
var deployment = ctx.getResource();
270270
var observeConfig = ctx.getObserveConfig();
271+
var clusterHealthInfo =
272+
ClusterHealthEvaluator.getLastValidClusterHealthInfo(deployment.getStatus().getClusterInfo());
271273
boolean shouldRestartJobBecauseUnhealthy =
272274
shouldRestartJobBecauseUnhealthy(deployment, observeConfig);
273275
boolean shouldRecoverDeployment = shouldRecoverDeployment(observeConfig, deployment);
@@ -288,7 +290,7 @@ public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx)
288290
EventRecorder.Type.Warning,
289291
EventRecorder.Reason.RestartUnhealthyJob,
290292
EventRecorder.Component.Job,
291-
MSG_RESTART_UNHEALTHY,
293+
getMsgRestartUnhealthy(clusterHealthInfo),
292294
ctx.getKubernetesClient());
293295
cleanupAfterFailedJob(ctx);
294296
}
@@ -302,6 +304,14 @@ public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx)
302304
return cleanupTerminalJmAfterTtl(ctx.getFlinkService(), deployment, observeConfig);
303305
}
304306

307+
private String getMsgRestartUnhealthy(ClusterHealthInfo clusterHealthInfo) {
308+
if (clusterHealthInfo != null) {
309+
return MSG_RESTART_UNHEALTHY + " : " + clusterHealthInfo.getHealthResult().getError();
310+
} else {
311+
return MSG_RESTART_UNHEALTHY;
312+
}
313+
}
314+
305315
private boolean shouldRestartJobBecauseUnhealthy(
306316
FlinkDeployment deployment, Configuration observeConfig) {
307317
boolean restartNeeded = false;
@@ -312,7 +322,7 @@ private boolean shouldRestartJobBecauseUnhealthy(
312322
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
313323
if (clusterHealthInfo != null) {
314324
LOG.debug("Cluster info contains job health info");
315-
if (!clusterHealthInfo.isHealthy()) {
325+
if (!clusterHealthInfo.getHealthResult().isHealthy()) {
316326
if (deployment.getSpec().getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
317327
LOG.debug("Stateless job, recovering unhealthy jobmanager deployment");
318328
restartNeeded = true;

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.kubernetes.operator.health;
1919

20+
import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult;
21+
2022
import org.junit.jupiter.api.Test;
2123

2224
import java.time.Clock;
@@ -43,11 +45,13 @@ public void isValidShouldReturnTrueWhenTimestampIsNonzero() {
4345

4446
@Test
4547
public void deserializeWithOldVersionShouldDeserializeCorrectly() {
46-
var clusterHealthInfoJson = "{\"timeStamp\":1,\"numRestarts\":2,\"healthy\":true}";
48+
var clusterHealthInfoJson =
49+
"{\"timeStamp\":1,\"numRestarts\":2,\"healthResult\": {\"healthy\":false, \"error\":\"test-error\"}}}}";
4750
var clusterHealthInfoFromJson = ClusterHealthInfo.deserialize(clusterHealthInfoJson);
4851
assertEquals(1, clusterHealthInfoFromJson.getTimeStamp());
4952
assertEquals(2, clusterHealthInfoFromJson.getNumRestarts());
50-
assertTrue(clusterHealthInfoFromJson.isHealthy());
53+
assertFalse(clusterHealthInfoFromJson.getHealthResult().isHealthy());
54+
assertEquals("test-error", clusterHealthInfoFromJson.getHealthResult().getError());
5155
}
5256

5357
@Test
@@ -58,7 +62,7 @@ public void serializationRoundTrip() {
5862
clusterHealthInfo.setNumRestartsEvaluationTimeStamp(3);
5963
clusterHealthInfo.setNumCompletedCheckpoints(4);
6064
clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(5);
61-
clusterHealthInfo.setHealthy(false);
65+
clusterHealthInfo.setHealthResult(ClusterHealthResult.error("error"));
6266
var clusterHealthInfoJson = ClusterHealthInfo.serialize(clusterHealthInfo);
6367

6468
var clusterHealthInfoFromJson = ClusterHealthInfo.deserialize(clusterHealthInfoJson);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,6 @@ private void assertClusterHealthIs(boolean healthy) {
358358
var lastValidClusterHealthInfo =
359359
ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
360360
assertNotNull(lastValidClusterHealthInfo);
361-
assertEquals(healthy, lastValidClusterHealthInfo.isHealthy());
361+
assertEquals(healthy, lastValidClusterHealthInfo.getHealthResult().isHealthy());
362362
}
363363
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package org.apache.flink.kubernetes.operator.observer;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
import static org.junit.jupiter.api.Assertions.assertFalse;
7+
import static org.junit.jupiter.api.Assertions.assertNull;
8+
import static org.junit.jupiter.api.Assertions.assertTrue;
9+
10+
class ClusterHealthResultTest {
11+
12+
@Test
13+
void error() {
14+
ClusterHealthResult clusterHealthResult = ClusterHealthResult.error("test-error");
15+
16+
assertFalse(clusterHealthResult.isHealthy());
17+
assertEquals("test-error", clusterHealthResult.getError());
18+
}
19+
20+
@Test
21+
void healthy() {
22+
ClusterHealthResult clusterHealthResult = ClusterHealthResult.healthy();
23+
24+
assertTrue(clusterHealthResult.isHealthy());
25+
assertNull(clusterHealthResult.getError());
26+
}
27+
28+
@Test
29+
void join() {
30+
ClusterHealthResult clusterHealthResult =
31+
ClusterHealthResult.healthy()
32+
.join(ClusterHealthResult.error("test-error-1"))
33+
.join(ClusterHealthResult.error("test-error-2"));
34+
35+
assertFalse(clusterHealthResult.isHealthy());
36+
assertEquals("test-error-1|test-error-2", clusterHealthResult.getError());
37+
}
38+
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
6666
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
6767
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
68+
import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult;
6869
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
6970
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
7071
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
@@ -1223,7 +1224,7 @@ public void testRestartUnhealthyEvent() throws Exception {
12231224
var clusterHealthInfo = new ClusterHealthInfo();
12241225
clusterHealthInfo.setTimeStamp(System.currentTimeMillis());
12251226
clusterHealthInfo.setNumRestarts(2);
1226-
clusterHealthInfo.setHealthy(false);
1227+
clusterHealthInfo.setHealthResult(ClusterHealthResult.error("error"));
12271228
ClusterHealthEvaluator.setLastValidClusterHealthInfo(
12281229
deployment.getStatus().getClusterInfo(), clusterHealthInfo);
12291230
reconciler.reconcile(deployment, context);

0 commit comments

Comments
 (0)