Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
Expand All @@ -45,6 +46,7 @@
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.util.Preconditions;

import io.fabric8.kubernetes.client.KubernetesClient;
Expand All @@ -55,6 +57,8 @@
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.ZoneId;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

Expand Down Expand Up @@ -299,9 +303,92 @@ public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx)
return true;
}

// check for JobManager exceptions if the REST API server is still up.
if (!ReconciliationUtils.isJobInTerminalState(deployment.getStatus())) {
observeJobManagerExceptions(ctx, deployment, observeConfig);
}

return cleanupTerminalJmAfterTtl(ctx.getFlinkService(), deployment, observeConfig);
}

private void observeJobManagerExceptions(
FlinkResourceContext<FlinkDeployment> ctx,
FlinkDeployment deployment,
Configuration observeConfig) {
try {
var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
var history = ctx.getFlinkService().getJobExceptions(deployment, jobId, observeConfig);
if (history == null || history.getExceptionHistory() == null) {
return;
}
var exceptionHistory = history.getExceptionHistory();
var exceptions = exceptionHistory.getEntries();
if (exceptions.isEmpty()) {
LOG.info(String.format("No exceptions found in job exception history for jobId '%s'.", jobId));
return;
}
if (exceptionHistory.isTruncated()) {
LOG.warn(String.format("Job exception history is truncated for jobId '%s'. "
+ "Some exceptions are not shown.", jobId));
}
for (var exception : exceptions) {
emitJobManagerExceptionEvent(ctx, deployment, exception);
}
} catch (Exception e) {
LOG.warn("Could not fetch JobManager exception info.", e);
}
}

private void emitJobManagerExceptionEvent(
FlinkResourceContext<FlinkDeployment> ctx,
FlinkDeployment deployment,
JobExceptionsInfoWithHistory.RootExceptionInfo exception) {

String message = exception.getExceptionName();
if (message == null || message.isBlank()) {
return;
}

String stacktrace = exception.getStacktrace();
String taskName = exception.getTaskName();
String endpoint = exception.getEndpoint();
String tmId = exception.getTaskManagerId();
Map<String, String> labels = exception.getFailureLabels();
String time = DateTimeUtils.readable(Instant.ofEpochMilli(exception.getTimestamp()), ZoneId.systemDefault());

StringBuilder combined = new StringBuilder();
combined.append("JobManager Exception at ").append(time).append(":\n");
combined.append(message).append("\n\n");

if (taskName != null) {
combined.append("Task: ").append(taskName).append("\n");
}
if (endpoint != null) {
combined.append("Endpoint: ").append(endpoint).append("\n");
}
if (tmId != null) {
combined.append("TaskManager ID: ").append(tmId).append("\n");
}

if (labels != null && !labels.isEmpty()) {
combined.append("Failure Labels:\n");
labels.forEach((k, v) -> combined.append("- ").append(k).append(": ").append(v).append("\n"));
}

if (stacktrace != null && !stacktrace.isBlank()) {
combined.append("\nStacktrace:\n").append(stacktrace);
}

eventRecorder.triggerEventOnce(
deployment,
EventRecorder.Type.Warning,
EventRecorder.Reason.JobManagerException,
combined.toString(),
EventRecorder.Component.JobManagerDeployment,
"jobmanager-exception-" + message.hashCode(),
ctx.getKubernetesClient());
}

private boolean shouldRestartJobBecauseUnhealthy(
FlinkDeployment deployment, Configuration observeConfig) {
boolean restartNeeded = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
Expand All @@ -77,6 +79,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
Expand Down Expand Up @@ -845,6 +848,34 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
(c, e) -> new StandaloneClientHAServices(restServerAddress));
}

@Override
public JobExceptionsInfoWithHistory getJobExceptions(FlinkDeployment deployment,
JobID jobId,
Configuration conf) {
JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance();
int port = conf.getInteger(RestOptions.PORT);
String host =
ObjectUtils.firstNonNull(
operatorConfig.getFlinkServiceHostOverride(),
ExternalServiceDecorator.getNamespacedExternalServiceName(
deployment.getMetadata().getName(), deployment.getMetadata().getNamespace()));
JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
params.jobPathParameter.resolve(jobId);
try (var restClient = getRestClient(conf)) {
return restClient
.sendRequest(
host,
port,
jobExceptionsHeaders,
params,
EmptyRequestBody.getInstance())
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn(String.format("Failed to fetch job exceptions from REST API for jobId %s", jobId), e);
return null;
}
}

@VisibleForTesting
protected void runJar(
JobSpec job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
Expand Down Expand Up @@ -127,6 +128,10 @@ Map<String, String> getMetrics(Configuration conf, String jobId, List<String> me

RestClusterClient<String> getClusterClient(Configuration conf) throws Exception;

JobExceptionsInfoWithHistory getJobExceptions(FlinkDeployment deployment,
JobID jobId,
Configuration conf) throws Exception;

/** Result of a cancel operation. */
@AllArgsConstructor
class CancelResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ public KubernetesClient getKubernetesClient() {
/** The type of the events. */
public enum Type {
Normal,
Warning
Warning,
Error
}

/** The component of events. */
Expand Down Expand Up @@ -315,6 +316,7 @@ public enum Reason {
UnsupportedFlinkVersion,
SnapshotError,
SnapshotAbandoned,
JobManagerException,
Error
}
}
Loading