Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,15 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex
statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient());
reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx);
} catch (UpgradeFailureException ufe) {
handleUpgradeFailure(ctx, ufe);
ReconciliationUtils.updateForReconciliationError(ctx, ufe);
triggerErrorEvent(ctx, ufe, ufe.getReason());
} catch (DeploymentFailedException dfe) {
handleDeploymentFailed(ctx, dfe);
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
ReconciliationUtils.updateForReconciliationError(ctx, dfe);
triggerErrorEvent(ctx, dfe, dfe.getReason());
} catch (Exception e) {
eventRecorder.triggerEvent(
flinkApp,
EventRecorder.Type.Warning,
"ClusterDeploymentException",
ExceptionUtils.getExceptionMessage(e),
EventRecorder.Component.JobManagerDeployment,
josdkContext.getClient());
triggerErrorEvent(ctx, e, EventRecorder.Reason.Error.name());
throw new ReconciliationException(e);
}

Expand All @@ -175,32 +173,13 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex
ctx.getOperatorConfig(), flinkApp, previousDeployment, true);
}

private void handleDeploymentFailed(
FlinkResourceContext<FlinkDeployment> ctx, DeploymentFailedException dfe) {
var flinkApp = ctx.getResource();
LOG.error("Flink Deployment failed", dfe);
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
ReconciliationUtils.updateForReconciliationError(ctx, dfe);
private void triggerErrorEvent(
FlinkResourceContext<FlinkDeployment> ctx, Exception e, String reason) {
eventRecorder.triggerEvent(
flinkApp,
EventRecorder.Type.Warning,
dfe.getReason(),
dfe.getMessage(),
EventRecorder.Component.JobManagerDeployment,
ctx.getKubernetesClient());
}

private void handleUpgradeFailure(
FlinkResourceContext<FlinkDeployment> ctx, UpgradeFailureException ufe) {
LOG.error("Error while upgrading Flink Deployment", ufe);
var flinkApp = ctx.getResource();
ReconciliationUtils.updateForReconciliationError(ctx, ufe);
eventRecorder.triggerEvent(
flinkApp,
ctx.getResource(),
EventRecorder.Type.Warning,
ufe.getReason(),
ufe.getMessage(),
reason,
ExceptionUtils.getExceptionMessage(e),
EventRecorder.Component.JobManagerDeployment,
ctx.getKubernetesClient());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,7 @@ public UpdateControl<FlinkSessionJob> reconcile(
statusRecorder.patchAndCacheStatus(flinkSessionJob, ctx.getKubernetesClient());
reconciler.reconcile(ctx);
} catch (Exception e) {
eventRecorder.triggerEvent(
flinkSessionJob,
EventRecorder.Type.Warning,
"SessionJobException",
ExceptionUtils.getExceptionMessage(e),
EventRecorder.Component.Job,
josdkContext.getClient());
triggerErrorEvent(ctx, e);
throw new ReconciliationException(e);
}
statusRecorder.patchAndCacheStatus(flinkSessionJob, ctx.getKubernetesClient());
Expand Down Expand Up @@ -167,6 +161,16 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context josdkContext) {
return deleteControl;
}

private void triggerErrorEvent(FlinkResourceContext<?> ctx, Exception e) {
eventRecorder.triggerEvent(
ctx.getResource(),
EventRecorder.Type.Warning,
EventRecorder.Reason.Error.name(),
ExceptionUtils.getExceptionMessage(e),
EventRecorder.Component.Job,
ctx.getKubernetesClient());
}

@Override
public ErrorStatusUpdateControl<FlinkSessionJob> updateErrorStatus(
FlinkSessionJob sessionJob, Context<FlinkSessionJob> context, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.runtime.client.JobStatusMessage;

import org.slf4j.Logger;
Expand Down Expand Up @@ -182,7 +183,7 @@ private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clust
markSuspended(resource);
}

setErrorIfPresent(ctx, clusterJobStatus);
recordJobErrorIfPresent(ctx, clusterJobStatus);
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
Expand All @@ -203,7 +204,8 @@ private static void markSuspended(AbstractFlinkResource<?, ?> resource) {
});
}

private void setErrorIfPresent(FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
private void recordJobErrorIfPresent(
FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
if (clusterJobStatus.getJobState() == JobStatus.FAILED) {
try {
var result =
Expand All @@ -215,10 +217,14 @@ private void setErrorIfPresent(FlinkResourceContext<R> ctx, JobStatusMessage clu
t -> {
updateFlinkResourceException(
t, ctx.getResource(), ctx.getOperatorConfig());
LOG.error(
"Job {} failed with error: {}",
clusterJobStatus.getJobId(),
t.getFullStringifiedStackTrace());

eventRecorder.triggerEvent(
ctx.getResource(),
EventRecorder.Type.Warning,
EventRecorder.Reason.Error,
EventRecorder.Component.Job,
ExceptionUtils.getExceptionMessage(t),
ctx.getKubernetesClient());
});
} catch (Exception e) {
LOG.warn("Failed to request the job result", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ public enum Reason {
Scaling,
UnsupportedFlinkVersion,
SnapshotError,
SnapshotAbandoned
SnapshotAbandoned,
Error
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
* &rarr; cause3"
*/
public static String getExceptionMessage(Throwable throwable) {
return getExceptionMessage(throwable, 0);
return getExceptionMessage(throwable, 1);
}

/**
Expand All @@ -93,11 +93,12 @@ private static String getExceptionMessage(Throwable throwable, int level) {
}

if (throwable instanceof SerializedThrowable) {
var serialized = ((SerializedThrowable) throwable);
var deserialized =
((SerializedThrowable) throwable)
.deserializeError(Thread.currentThread().getContextClassLoader());
serialized.deserializeError(Thread.currentThread().getContextClassLoader());
if (deserialized == throwable) {
return "Unknown Error (SerializedThrowable)";
var msg = serialized.getMessage();
return msg != null ? msg : serialized.getOriginalErrorClassName();
} else {
return getExceptionMessage(deserialized, level);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.flink.kubernetes.operator.service.SuspendMode;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
Expand Down Expand Up @@ -141,6 +142,7 @@ public class TestingFlinkService extends AbstractFlinkService {
@Getter private final Map<String, Boolean> checkpointTriggers = new HashMap<>();
private final Map<Long, String> checkpointStats = new HashMap<>();
@Setter private boolean throwCheckpointingDisabledError = false;
@Setter private Throwable jobFailedErr;

@Getter private int desiredReplicas = 0;
@Getter private int cancelJobCallCount = 0;
Expand Down Expand Up @@ -301,9 +303,29 @@ public Optional<JobStatusMessage> getJobStatus(Configuration conf, JobID jobID)
if (!isPortReady) {
throw new TimeoutException("JM port is unavailable");
}

if (jobFailedErr != null) {
return Optional.of(new JobStatusMessage(jobID, "n", JobStatus.FAILED, 0));
}

return super.getJobStatus(conf, jobID);
}

@Override
public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
if (jobFailedErr != null) {
return new JobResult.Builder()
.jobId(jobID)
.serializedThrowable(new SerializedThrowable(jobFailedErr))
.netRuntime(1)
.accumulatorResults(new HashMap<>())
.applicationStatus(ApplicationStatus.FAILED)
.build();
}

return super.requestJobResult(conf, jobID);
}

public List<Tuple3<String, JobStatusMessage, Configuration>> listJobs() {
return jobs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ public void testEventOfNonDeploymentFailedException() throws Exception {
var event = testController.flinkResourceEvents().remove();
assertEquals("Submit", event.getReason());
event = testController.flinkResourceEvents().remove();
assertEquals("ClusterDeploymentException", event.getReason());
assertEquals("Error", event.getReason());
assertEquals("Deployment failure", event.getMessage());
}

Expand All @@ -1006,7 +1006,7 @@ public void testEventOfNonDeploymentFailedChainedException() {
var event = testController.flinkResourceEvents().remove();
assertEquals("Submit", event.getReason());
event = testController.flinkResourceEvents().remove();
assertEquals("ClusterDeploymentException", event.getReason());
assertEquals("Error", event.getReason());
assertEquals(
"Deployment Failure -> IllegalStateException -> actual failure reason",
event.getMessage());
Expand Down Expand Up @@ -1112,7 +1112,7 @@ public void testErrorOnReconcileWithChainedExceptions() throws Exception {
var event = testController.flinkResourceEvents().remove();
assertEquals("Submit", event.getReason());
event = testController.flinkResourceEvents().remove();
assertEquals("ClusterDeploymentException", event.getReason());
assertEquals("Error", event.getReason());
assertEquals(
"Deployment Failure -> IllegalStateException -> actual failure reason",
event.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testSubmitJobButException() {

var event = testController.events().remove();
Assertions.assertEquals(EventRecorder.Type.Warning.toString(), event.getType());
Assertions.assertEquals("SessionJobException", event.getReason());
Assertions.assertEquals("Error", event.getReason());

testController.cleanup(sessionJob, context);
}
Expand Down Expand Up @@ -635,7 +635,7 @@ public void testErrorOnReconcileWithChainedExceptions() throws Exception {
var event = testController.events().remove();
assertEquals("Submit", event.getReason());
event = testController.events().remove();
assertEquals("SessionJobException", event.getReason());
assertEquals("Error", event.getReason());
assertEquals(
"Deployment Failure -> IllegalStateException -> actual failure reason",
event.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.util.SerializedThrowable;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import lombok.Getter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -42,6 +44,7 @@
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/** Tests for the {@link JobStatusObserver}. */
@EnableKubernetesMockClient(crud = true)
Expand Down Expand Up @@ -114,6 +117,36 @@ void testCancellingToTerminal(JobStatus fromStatus) throws Exception {
.getState());
}

@Test
void testFailed() throws Exception {
var observer = new JobStatusObserver<>(eventRecorder);
var deployment = initDeployment();
var status = deployment.getStatus();
var jobStatus = status.getJobStatus();
jobStatus.setState(JobStatus.RUNNING);
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment);
flinkService.submitApplicationCluster(
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);

// Mark failed
flinkService.setJobFailedErr(
new Exception("job err", new SerializedThrowable(new Exception("root"))));
observer.observe(ctx);

// First event should be job error reported
var jobErrorEvent = flinkResourceEventCollector.events.poll();
assertEquals(EventRecorder.Reason.Error.name(), jobErrorEvent.getReason());
assertEquals("job err -> root", jobErrorEvent.getMessage());

// Make sure job status still reported
assertEquals(
EventRecorder.Reason.JobStatusChanged.name(),
flinkResourceEventCollector.events.poll().getReason());

observer.observe(ctx);
assertTrue(flinkResourceEventCollector.events.isEmpty());
}

private static Stream<Arguments> cancellingArgs() {
var args = new ArrayList<Arguments>();
for (var status : JobStatus.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,30 @@ void testGetExceptionMessage_differentKindsOfExceptions() {
var ex2 = new RuntimeException("Cause 2", new SerializedThrowable(ex3));
var ex = new RuntimeException("Cause 1", ex2);
assertThat(ExceptionUtils.getExceptionMessage(ex))
.isEqualTo("Cause 1 -> Cause 2 -> Cause 3 -> Cause 4");
.isEqualTo("Cause 1 -> Cause 2 -> Cause 3");
}

@Test
void testSerializedThrowableError() {
var serializedException = new SerializedThrowable(new NonSerializableException());
assertThat(ExceptionUtils.getExceptionMessage(serializedException))
.isEqualTo("Unknown Error (SerializedThrowable)");
assertThat(
ExceptionUtils.getExceptionMessage(
new SerializedThrowable(new NonSerializableException("Message"))))
.isEqualTo(String.format("%s: Message", NonSerializableException.class.getName()));

assertThat(
ExceptionUtils.getExceptionMessage(
new SerializedThrowable(new NonSerializableException())))
.isEqualTo(NonSerializableException.class.getName());
}

private static class NonSerializableException extends Exception {

public NonSerializableException(String message) {
super(message);
}

public NonSerializableException() {}

private void writeObject(java.io.ObjectOutputStream stream) throws IOException {
throw new IOException();
}
Expand Down