Skip to content

Commit 31bd759

Browse files
committed
[FLINK-37455] Create error Event when job goes into FAILED state
1 parent 93e68f2 commit 31bd759

File tree

10 files changed

+99
-54
lines changed

10 files changed

+99
-54
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -155,17 +155,15 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex
155155
statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient());
156156
reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);
157157
} catch (UpgradeFailureException ufe) {
158-
handleUpgradeFailure(ctx, ufe);
158+
ReconciliationUtils.updateForReconciliationError(ctx, ufe);
159+
triggerErrorEvent(ctx, ufe, ufe.getReason());
159160
} catch (DeploymentFailedException dfe) {
160-
handleDeploymentFailed(ctx, dfe);
161+
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
162+
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
163+
ReconciliationUtils.updateForReconciliationError(ctx, dfe);
164+
triggerErrorEvent(ctx, dfe, dfe.getReason());
161165
} catch (Exception e) {
162-
eventRecorder.triggerEvent(
163-
flinkApp,
164-
EventRecorder.Type.Warning,
165-
"ClusterDeploymentException",
166-
ExceptionUtils.getExceptionMessage(e),
167-
EventRecorder.Component.JobManagerDeployment,
168-
josdkContext.getClient());
166+
triggerErrorEvent(ctx, e, EventRecorder.Reason.Error.name());
169167
throw new ReconciliationException(e);
170168
}
171169

@@ -175,32 +173,13 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex
175173
ctx.getOperatorConfig(), flinkApp, previousDeployment, true);
176174
}
177175

178-
private void handleDeploymentFailed(
179-
FlinkResourceContext<FlinkDeployment> ctx, DeploymentFailedException dfe) {
180-
var flinkApp = ctx.getResource();
181-
LOG.error("Flink Deployment failed", dfe);
182-
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
183-
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
184-
ReconciliationUtils.updateForReconciliationError(ctx, dfe);
176+
private void triggerErrorEvent(
177+
FlinkResourceContext<FlinkDeployment> ctx, Exception e, String reason) {
185178
eventRecorder.triggerEvent(
186-
flinkApp,
187-
EventRecorder.Type.Warning,
188-
dfe.getReason(),
189-
dfe.getMessage(),
190-
EventRecorder.Component.JobManagerDeployment,
191-
ctx.getKubernetesClient());
192-
}
193-
194-
private void handleUpgradeFailure(
195-
FlinkResourceContext<FlinkDeployment> ctx, UpgradeFailureException ufe) {
196-
LOG.error("Error while upgrading Flink Deployment", ufe);
197-
var flinkApp = ctx.getResource();
198-
ReconciliationUtils.updateForReconciliationError(ctx, ufe);
199-
eventRecorder.triggerEvent(
200-
flinkApp,
179+
ctx.getResource(),
201180
EventRecorder.Type.Warning,
202-
ufe.getReason(),
203-
ufe.getMessage(),
181+
reason,
182+
ExceptionUtils.getExceptionMessage(e),
204183
EventRecorder.Component.JobManagerDeployment,
205184
ctx.getKubernetesClient());
206185
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,7 @@ public UpdateControl<FlinkSessionJob> reconcile(
121121
statusRecorder.patchAndCacheStatus(flinkSessionJob, ctx.getKubernetesClient());
122122
reconciler.reconcile(ctx);
123123
} catch (Exception e) {
124-
eventRecorder.triggerEvent(
125-
flinkSessionJob,
126-
EventRecorder.Type.Warning,
127-
"SessionJobException",
128-
ExceptionUtils.getExceptionMessage(e),
129-
EventRecorder.Component.Job,
130-
josdkContext.getClient());
124+
triggerErrorEvent(ctx, e);
131125
throw new ReconciliationException(e);
132126
}
133127
statusRecorder.patchAndCacheStatus(flinkSessionJob, ctx.getKubernetesClient());
@@ -167,6 +161,16 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context josdkContext) {
167161
return deleteControl;
168162
}
169163

164+
private void triggerErrorEvent(FlinkResourceContext<?> ctx, Exception e) {
165+
eventRecorder.triggerEvent(
166+
ctx.getResource(),
167+
EventRecorder.Type.Warning,
168+
EventRecorder.Reason.Error.name(),
169+
ExceptionUtils.getExceptionMessage(e),
170+
EventRecorder.Component.Job,
171+
ctx.getKubernetesClient());
172+
}
173+
170174
@Override
171175
public ErrorStatusUpdateControl<FlinkSessionJob> updateErrorStatus(
172176
FlinkSessionJob sessionJob, Context<FlinkSessionJob> context, Exception e) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
2828
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
2929
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
30+
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
3031
import org.apache.flink.runtime.client.JobStatusMessage;
3132

3233
import org.slf4j.Logger;
@@ -182,7 +183,7 @@ private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clust
182183
markSuspended(resource);
183184
}
184185

185-
setErrorIfPresent(ctx, clusterJobStatus);
186+
recordJobErrorIfPresent(ctx, clusterJobStatus);
186187
eventRecorder.triggerEvent(
187188
resource,
188189
EventRecorder.Type.Normal,
@@ -203,7 +204,8 @@ private static void markSuspended(AbstractFlinkResource<?, ?> resource) {
203204
});
204205
}
205206

206-
private void setErrorIfPresent(FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
207+
private void recordJobErrorIfPresent(
208+
FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
207209
if (clusterJobStatus.getJobState() == JobStatus.FAILED) {
208210
try {
209211
var result =
@@ -215,10 +217,14 @@ private void setErrorIfPresent(FlinkResourceContext<R> ctx, JobStatusMessage clu
215217
t -> {
216218
updateFlinkResourceException(
217219
t, ctx.getResource(), ctx.getOperatorConfig());
218-
LOG.error(
219-
"Job {} failed with error: {}",
220-
clusterJobStatus.getJobId(),
221-
t.getFullStringifiedStackTrace());
220+
221+
eventRecorder.triggerEvent(
222+
ctx.getResource(),
223+
EventRecorder.Type.Warning,
224+
EventRecorder.Reason.Error,
225+
EventRecorder.Component.Job,
226+
ExceptionUtils.getExceptionMessage(t),
227+
ctx.getKubernetesClient());
222228
});
223229
} catch (Exception e) {
224230
LOG.warn("Failed to request the job result", e);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ public enum Reason {
314314
Scaling,
315315
UnsupportedFlinkVersion,
316316
SnapshotError,
317-
SnapshotAbandoned
317+
SnapshotAbandoned,
318+
Error
318319
}
319320
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
7575
* &rarr; cause3"
7676
*/
7777
public static String getExceptionMessage(Throwable throwable) {
78-
return getExceptionMessage(throwable, 0);
78+
return getExceptionMessage(throwable, 1);
7979
}
8080

8181
/**

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.flink.kubernetes.operator.service.SuspendMode;
5353
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
5454
import org.apache.flink.runtime.client.JobStatusMessage;
55+
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
5556
import org.apache.flink.runtime.execution.ExecutionState;
5657
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
5758
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -141,6 +142,7 @@ public class TestingFlinkService extends AbstractFlinkService {
141142
@Getter private final Map<String, Boolean> checkpointTriggers = new HashMap<>();
142143
private final Map<Long, String> checkpointStats = new HashMap<>();
143144
@Setter private boolean throwCheckpointingDisabledError = false;
145+
@Setter private Throwable jobFailedErr;
144146

145147
@Getter private int desiredReplicas = 0;
146148
@Getter private int cancelJobCallCount = 0;
@@ -301,9 +303,29 @@ public Optional<JobStatusMessage> getJobStatus(Configuration conf, JobID jobID)
301303
if (!isPortReady) {
302304
throw new TimeoutException("JM port is unavailable");
303305
}
306+
307+
if (jobFailedErr != null) {
308+
return Optional.of(new JobStatusMessage(jobID, "n", JobStatus.FAILED, 0));
309+
}
310+
304311
return super.getJobStatus(conf, jobID);
305312
}
306313

314+
@Override
315+
public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
316+
if (jobFailedErr != null) {
317+
return new JobResult.Builder()
318+
.jobId(jobID)
319+
.serializedThrowable(new SerializedThrowable(jobFailedErr))
320+
.netRuntime(1)
321+
.accumulatorResults(new HashMap<>())
322+
.applicationStatus(ApplicationStatus.FAILED)
323+
.build();
324+
}
325+
326+
return super.requestJobResult(conf, jobID);
327+
}
328+
307329
public List<Tuple3<String, JobStatusMessage, Configuration>> listJobs() {
308330
return jobs;
309331
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ public void testEventOfNonDeploymentFailedException() throws Exception {
981981
var event = testController.flinkResourceEvents().remove();
982982
assertEquals("Submit", event.getReason());
983983
event = testController.flinkResourceEvents().remove();
984-
assertEquals("ClusterDeploymentException", event.getReason());
984+
assertEquals("Error", event.getReason());
985985
assertEquals("Deployment failure", event.getMessage());
986986
}
987987

@@ -1006,7 +1006,7 @@ public void testEventOfNonDeploymentFailedChainedException() {
10061006
var event = testController.flinkResourceEvents().remove();
10071007
assertEquals("Submit", event.getReason());
10081008
event = testController.flinkResourceEvents().remove();
1009-
assertEquals("ClusterDeploymentException", event.getReason());
1009+
assertEquals("Error", event.getReason());
10101010
assertEquals(
10111011
"Deployment Failure -> IllegalStateException -> actual failure reason",
10121012
event.getMessage());
@@ -1112,7 +1112,7 @@ public void testErrorOnReconcileWithChainedExceptions() throws Exception {
11121112
var event = testController.flinkResourceEvents().remove();
11131113
assertEquals("Submit", event.getReason());
11141114
event = testController.flinkResourceEvents().remove();
1115-
assertEquals("ClusterDeploymentException", event.getReason());
1115+
assertEquals("Error", event.getReason());
11161116
assertEquals(
11171117
"Deployment Failure -> IllegalStateException -> actual failure reason",
11181118
event.getMessage());

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void testSubmitJobButException() {
103103

104104
var event = testController.events().remove();
105105
Assertions.assertEquals(EventRecorder.Type.Warning.toString(), event.getType());
106-
Assertions.assertEquals("SessionJobException", event.getReason());
106+
Assertions.assertEquals("Error", event.getReason());
107107

108108
testController.cleanup(sessionJob, context);
109109
}
@@ -635,7 +635,7 @@ public void testErrorOnReconcileWithChainedExceptions() throws Exception {
635635
var event = testController.events().remove();
636636
assertEquals("Submit", event.getReason());
637637
event = testController.events().remove();
638-
assertEquals("SessionJobException", event.getReason());
638+
assertEquals("Error", event.getReason());
639639
assertEquals(
640640
"Deployment Failure -> IllegalStateException -> actual failure reason",
641641
event.getMessage());

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
3030
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
3131
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
32+
import org.apache.flink.util.SerializedThrowable;
3233

3334
import io.fabric8.kubernetes.client.KubernetesClient;
3435
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
3536
import lombok.Getter;
37+
import org.junit.jupiter.api.Test;
3638
import org.junit.jupiter.params.ParameterizedTest;
3739
import org.junit.jupiter.params.provider.Arguments;
3840
import org.junit.jupiter.params.provider.EnumSource;
@@ -42,6 +44,7 @@
4244
import java.util.stream.Stream;
4345

4446
import static org.junit.jupiter.api.Assertions.assertEquals;
47+
import static org.junit.jupiter.api.Assertions.assertTrue;
4548

4649
/** Tests for the {@link JobStatusObserver}. */
4750
@EnableKubernetesMockClient(crud = true)
@@ -114,6 +117,36 @@ void testCancellingToTerminal(JobStatus fromStatus) throws Exception {
114117
.getState());
115118
}
116119

120+
@Test
121+
void testFailed() throws Exception {
122+
var observer = new JobStatusObserver<>(eventRecorder);
123+
var deployment = initDeployment();
124+
var status = deployment.getStatus();
125+
var jobStatus = status.getJobStatus();
126+
jobStatus.setState(JobStatus.RUNNING);
127+
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment);
128+
flinkService.submitApplicationCluster(
129+
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
130+
131+
// Mark failed
132+
flinkService.setJobFailedErr(
133+
new Exception("job err", new SerializedThrowable(new Exception("root"))));
134+
observer.observe(ctx);
135+
136+
// First event should be job error reported
137+
var jobErrorEvent = flinkResourceEventCollector.events.poll();
138+
assertEquals(EventRecorder.Reason.Error.name(), jobErrorEvent.getReason());
139+
assertEquals("job err -> root", jobErrorEvent.getMessage());
140+
141+
// Make sure job status still reported
142+
assertEquals(
143+
EventRecorder.Reason.JobStatusChanged.name(),
144+
flinkResourceEventCollector.events.poll().getReason());
145+
146+
observer.observe(ctx);
147+
assertTrue(flinkResourceEventCollector.events.isEmpty());
148+
}
149+
117150
private static Stream<Arguments> cancellingArgs() {
118151
var args = new ArrayList<Arguments>();
119152
for (var status : JobStatus.values()) {

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ void testGetExceptionMessage_differentKindsOfExceptions() {
4747
var ex2 = new RuntimeException("Cause 2", new SerializedThrowable(ex3));
4848
var ex = new RuntimeException("Cause 1", ex2);
4949
assertThat(ExceptionUtils.getExceptionMessage(ex))
50-
.isEqualTo("Cause 1 -> Cause 2 -> Cause 3 -> Cause 4");
50+
.isEqualTo("Cause 1 -> Cause 2 -> Cause 3");
5151
}
5252

5353
@Test

0 commit comments

Comments
 (0)