From 76e2d5a0fbc91a65328a7b559eecd59f4e078dc2 Mon Sep 17 00:00:00 2001 From: Daren Date: Fri, 16 May 2025 13:18:22 +0100 Subject: [PATCH] [FLINK-37769] Include cause in event when restarting unhealthy job --- .../operator/health/ClusterHealthInfo.java | 5 +- .../observer/ClusterHealthEvaluator.java | 48 ++++++++------- .../observer/ClusterHealthResult.java | 58 +++++++++++++++++++ .../deployment/ApplicationReconciler.java | 11 +++- .../health/ClusterHealthInfoTest.java | 10 +++- .../observer/ClusterHealthEvaluatorTest.java | 2 +- .../observer/ClusterHealthResultTest.java | 55 ++++++++++++++++++ .../deployment/ApplicationReconcilerTest.java | 7 +-- 8 files changed, 164 insertions(+), 32 deletions(-) create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthResult.java create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthResultTest.java diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java index 8bfd1dec9e..d215e87c6a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.health; import org.apache.flink.annotation.Experimental; +import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -47,7 +48,7 @@ public class ClusterHealthInfo { private long numCompletedCheckpointsIncreasedTimeStamp; /** Calculated field whether the cluster is healthy or not. */ - private boolean healthy; + private ClusterHealthResult healthResult; public ClusterHealthInfo() { this(Clock.systemDefaultZone()); @@ -55,7 +56,7 @@ public ClusterHealthInfo() { public ClusterHealthInfo(Clock clock) { timeStamp = clock.millis(); - healthy = true; + healthResult = ClusterHealthResult.healthy(); } public static boolean isValid(ClusterHealthInfo clusterHealthInfo) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java index d38a8f0fd2..e8cafdf1f7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.java @@ -97,25 +97,28 @@ public void evaluate( LOG.debug("Last valid health info: {}", lastValidClusterHealthInfo); LOG.debug("Observed health info: {}", observedClusterHealthInfo); - boolean isHealthy = + var jobHealthResult = evaluateRestarts( - configuration, - clusterInfo, - lastValidClusterHealthInfo, - observedClusterHealthInfo) - && evaluateCheckpoints( - configuration, - lastValidClusterHealthInfo, - observedClusterHealthInfo); + configuration, + clusterInfo, + lastValidClusterHealthInfo, + observedClusterHealthInfo); + + var checkpointHealthResult = + evaluateCheckpoints( + configuration, + lastValidClusterHealthInfo, + observedClusterHealthInfo); lastValidClusterHealthInfo.setTimeStamp(observedClusterHealthInfo.getTimeStamp()); - lastValidClusterHealthInfo.setHealthy(isHealthy); + lastValidClusterHealthInfo.setHealthResult( + jobHealthResult.join(checkpointHealthResult)); setLastValidClusterHealthInfo(clusterInfo, lastValidClusterHealthInfo); } } } - private boolean evaluateRestarts( + private ClusterHealthResult evaluateRestarts( Configuration configuration, Map clusterInfo, ClusterHealthInfo lastValidClusterHealthInfo, @@ -128,7 +131,7 @@ private boolean evaluateRestarts( lastValidClusterHealthInfo.setNumRestarts(observedClusterHealthInfo.getNumRestarts()); lastValidClusterHealthInfo.setNumRestartsEvaluationTimeStamp( observedClusterHealthInfo.getTimeStamp()); - return true; + return ClusterHealthResult.healthy(); } var timestampDiffMs = @@ -153,9 +156,15 @@ private boolean evaluateRestarts( LOG.debug("Calculated restart count for {} window: {}", restartCheckWindow, numRestarts); var restartThreshold = configuration.get(OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD); + + var healthResult = ClusterHealthResult.healthy(); + boolean isHealthy = numRestarts <= restartThreshold; if (!isHealthy) { LOG.info("Restart count hit threshold: {}", restartThreshold); + healthResult = + ClusterHealthResult.error( + String.format("Restart count hit threshold: %s", restartThreshold)); } if (lastValidClusterHealthInfo.getNumRestartsEvaluationTimeStamp() @@ -167,15 +176,15 @@ private boolean evaluateRestarts( observedClusterHealthInfo.getTimeStamp()); } - return isHealthy; + return healthResult; } - private boolean evaluateCheckpoints( + private ClusterHealthResult evaluateCheckpoints( Configuration configuration, ClusterHealthInfo lastValidClusterHealthInfo, ClusterHealthInfo observedClusterHealthInfo) { if (!configuration.getBoolean(OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED)) { - return true; + return ClusterHealthResult.healthy(); } var windowOpt = @@ -195,7 +204,7 @@ private boolean evaluateCheckpoints( if (windowOpt.isEmpty() && !checkpointConfig.isCheckpointingEnabled()) { // If no explicit checkpoint check window is specified and checkpointing is disabled // based on the config, we don't do anything - return true; + return ClusterHealthResult.healthy(); } var completedCheckpointsCheckWindow = @@ -226,7 +235,7 @@ private boolean evaluateCheckpoints( observedClusterHealthInfo.getNumCompletedCheckpoints()); lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp( observedClusterHealthInfo.getTimeStamp()); - return true; + return ClusterHealthResult.healthy(); } var timestampDiffMs = @@ -234,7 +243,6 @@ private boolean evaluateCheckpoints( - lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp(); LOG.debug("Time difference between health infos: {}", Duration.ofMillis(timestampDiffMs)); - boolean isHealthy = true; var completedCheckpointsCheckWindowMs = completedCheckpointsCheckWindow.toMillis(); if (observedClusterHealthInfo.getNumCompletedCheckpoints() @@ -248,9 +256,9 @@ private boolean evaluateCheckpoints( + completedCheckpointsCheckWindowMs < clock.millis()) { LOG.info("Cluster is not able to complete checkpoints"); - isHealthy = false; + return ClusterHealthResult.error("Cluster is not able to complete checkpoints"); } - return isHealthy; + return ClusterHealthResult.healthy(); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthResult.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthResult.java new file mode 100644 index 0000000000..bfead21e6d --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthResult.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.observer; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Value; + +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** Cluster Health Result. */ +@Value +public class ClusterHealthResult { + boolean healthy; + String error; + + @JsonCreator + public ClusterHealthResult( + @JsonProperty("healthy") boolean healthy, @JsonProperty("error") String error) { + this.healthy = healthy; + this.error = error; + } + + public static ClusterHealthResult error(String error) { + return new ClusterHealthResult(false, error); + } + + public static ClusterHealthResult healthy() { + return new ClusterHealthResult(true, null); + } + + public ClusterHealthResult join(ClusterHealthResult clusterHealthResult) { + boolean isHealthy = this.healthy && clusterHealthResult.healthy; + String error = + Stream.of(this.error, clusterHealthResult.getError()) + .filter(Objects::nonNull) + .collect(Collectors.joining("|")); + + return new ClusterHealthResult(isHealthy, error); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index b5486e0ec7..7e022dbf25 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException; import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo; import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator; +import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.service.SuspendMode; @@ -268,6 +269,9 @@ public boolean reconcileOtherChanges(FlinkResourceContext ctx) var deployment = ctx.getResource(); var observeConfig = ctx.getObserveConfig(); + var clusterHealthInfo = + ClusterHealthEvaluator.getLastValidClusterHealthInfo( + deployment.getStatus().getClusterInfo()); boolean shouldRestartJobBecauseUnhealthy = shouldRestartJobBecauseUnhealthy(deployment, observeConfig); boolean shouldRecoverDeployment = shouldRecoverDeployment(observeConfig, deployment); @@ -288,7 +292,10 @@ public boolean reconcileOtherChanges(FlinkResourceContext ctx) EventRecorder.Type.Warning, EventRecorder.Reason.RestartUnhealthyJob, EventRecorder.Component.Job, - MSG_RESTART_UNHEALTHY, + Optional.ofNullable(clusterHealthInfo) + .map(ClusterHealthInfo::getHealthResult) + .map(ClusterHealthResult::getError) + .orElse(MSG_RESTART_UNHEALTHY), ctx.getKubernetesClient()); cleanupAfterFailedJob(ctx); } @@ -312,7 +319,7 @@ private boolean shouldRestartJobBecauseUnhealthy( ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo); if (clusterHealthInfo != null) { LOG.debug("Cluster info contains job health info"); - if (!clusterHealthInfo.isHealthy()) { + if (!clusterHealthInfo.getHealthResult().isHealthy()) { if (deployment.getSpec().getJob().getUpgradeMode() == UpgradeMode.STATELESS) { LOG.debug("Stateless job, recovering unhealthy jobmanager deployment"); restartNeeded = true; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java index 77f8c00935..44cfef2e56 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/ClusterHealthInfoTest.java @@ -17,6 +17,8 @@ package org.apache.flink.kubernetes.operator.health; +import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult; + import org.junit.jupiter.api.Test; import java.time.Clock; @@ -43,11 +45,13 @@ public void isValidShouldReturnTrueWhenTimestampIsNonzero() { @Test public void deserializeWithOldVersionShouldDeserializeCorrectly() { - var clusterHealthInfoJson = "{\"timeStamp\":1,\"numRestarts\":2,\"healthy\":true}"; + var clusterHealthInfoJson = + "{\"timeStamp\":1,\"numRestarts\":2,\"healthResult\": {\"healthy\":false, \"error\":\"test-error\"}}}}"; var clusterHealthInfoFromJson = ClusterHealthInfo.deserialize(clusterHealthInfoJson); assertEquals(1, clusterHealthInfoFromJson.getTimeStamp()); assertEquals(2, clusterHealthInfoFromJson.getNumRestarts()); - assertTrue(clusterHealthInfoFromJson.isHealthy()); + assertFalse(clusterHealthInfoFromJson.getHealthResult().isHealthy()); + assertEquals("test-error", clusterHealthInfoFromJson.getHealthResult().getError()); } @Test @@ -58,7 +62,7 @@ public void serializationRoundTrip() { clusterHealthInfo.setNumRestartsEvaluationTimeStamp(3); clusterHealthInfo.setNumCompletedCheckpoints(4); clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(5); - clusterHealthInfo.setHealthy(false); + clusterHealthInfo.setHealthResult(ClusterHealthResult.error("error")); var clusterHealthInfoJson = ClusterHealthInfo.serialize(clusterHealthInfo); var clusterHealthInfoFromJson = ClusterHealthInfo.deserialize(clusterHealthInfoJson); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java index 83a0e7515f..ee7c184a26 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluatorTest.java @@ -358,6 +358,6 @@ private void assertClusterHealthIs(boolean healthy) { var lastValidClusterHealthInfo = ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo); assertNotNull(lastValidClusterHealthInfo); - assertEquals(healthy, lastValidClusterHealthInfo.isHealthy()); + assertEquals(healthy, lastValidClusterHealthInfo.getHealthResult().isHealthy()); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthResultTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthResultTest.java new file mode 100644 index 0000000000..113349bb48 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ClusterHealthResultTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.observer; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ClusterHealthResultTest { + + @Test + void error() { + ClusterHealthResult clusterHealthResult = ClusterHealthResult.error("test-error"); + + assertFalse(clusterHealthResult.isHealthy()); + assertEquals("test-error", clusterHealthResult.getError()); + } + + @Test + void healthy() { + ClusterHealthResult clusterHealthResult = ClusterHealthResult.healthy(); + + assertTrue(clusterHealthResult.isHealthy()); + assertNull(clusterHealthResult.getError()); + } + + @Test + void join() { + ClusterHealthResult clusterHealthResult = + ClusterHealthResult.healthy() + .join(ClusterHealthResult.error("test-error-1")) + .join(ClusterHealthResult.error("test-error-2")); + + assertFalse(clusterHealthResult.isHealthy()); + assertEquals("test-error-1|test-error-2", clusterHealthResult.getError()); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index 19d5446113..b31361b2ae 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -65,6 +65,7 @@ import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException; import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo; import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator; +import org.apache.flink.kubernetes.operator.observer.ClusterHealthResult; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.reconciler.SnapshotType; import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter; @@ -128,7 +129,6 @@ import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.SAVEPOINT; import static org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.MSG_SUBMIT; import static org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.MSG_RECOVERY; -import static org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.MSG_RESTART_UNHEALTHY; import static org.apache.flink.kubernetes.operator.utils.SnapshotUtils.getLastSnapshotStatus; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -1223,12 +1223,11 @@ public void testRestartUnhealthyEvent() throws Exception { var clusterHealthInfo = new ClusterHealthInfo(); clusterHealthInfo.setTimeStamp(System.currentTimeMillis()); clusterHealthInfo.setNumRestarts(2); - clusterHealthInfo.setHealthy(false); + clusterHealthInfo.setHealthResult(ClusterHealthResult.error("error")); ClusterHealthEvaluator.setLastValidClusterHealthInfo( deployment.getStatus().getClusterInfo(), clusterHealthInfo); reconciler.reconcile(deployment, context); - Assertions.assertEquals( - MSG_RESTART_UNHEALTHY, flinkResourceEventCollector.events.remove().getMessage()); + Assertions.assertEquals("error", flinkResourceEventCollector.events.remove().getMessage()); } @Test