Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class FlinkOperatorConfiguration {
DeletionPropagation deletionPropagation;
boolean snapshotResourcesEnabled;
Duration slowRequestThreshold;
int reportedExceptionEventsMaxCount;
int reportedExceptionEventsMaxStackTraceLength;

public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
Expand Down Expand Up @@ -195,6 +197,12 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
Duration slowRequestThreshold =
operatorConfig.get(OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD);

int reportedExceptionEventsMaxCount =
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT);
int reportedExceptionEventsMaxStackTraceLength =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES);

return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
Expand Down Expand Up @@ -224,7 +232,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
getLeaderElectionConfig(operatorConfig),
deletionPropagation,
snapshotResourcesEnabled,
slowRequestThreshold);
slowRequestThreshold,
reportedExceptionEventsMaxCount,
reportedExceptionEventsMaxStackTraceLength);
}

private static GenericRetry getRetryConfig(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,4 +654,20 @@ public static String operatorConfigKey(String key) {
.defaultValue(Duration.ofMinutes(-1))
.withDescription(
"How often to retrieve Kubernetes cluster resource usage information. This information is used to avoid running out of cluster resources when scaling up resources. Negative values disable the feature.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES =
operatorConfig("event.exception.stacktrace.lines")
.intType()
.defaultValue(5)
.withDescription(
"Maximum number of stack trace lines to include in exception-related Kubernetes event messages.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_LIMIT =
operatorConfig("event.exception.limit.per.reconciliation")
.intType()
.defaultValue(10)
.withDescription(
"Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public void observeInternal(FlinkResourceContext<FlinkDeployment> ctx) {
observeClusterInfo(ctx);
}

if (!ReconciliationUtils.isJobInTerminalState(flinkDep.getStatus())) {
observeJobManagerExceptions(ctx);
}

clearErrorsIfDeploymentIsHealthy(flinkDep);
}

Expand Down Expand Up @@ -297,4 +301,11 @@ protected boolean checkIfAlreadyUpgraded(FlinkResourceContext<FlinkDeployment> c
* @param ctx the context with which the operation is executed
*/
protected abstract void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx);

/**
* Observe the exceptions raised in the job manager and take appropriate action.
*
* @param ctx the context with which the operation is executed
*/
protected abstract void observeJobManagerExceptions(FlinkResourceContext<FlinkDeployment> ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,50 @@

package org.apache.flink.kubernetes.operator.observer.deployment;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthObserver;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.SnapshotObserver;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;

/** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
public class ApplicationObserver extends AbstractFlinkDeploymentObserver {

/** The cache entry for the last recorded exception timestamp. */
public static final class ExceptionCacheEntry {
final String jobId;
final long lastTimestamp;

ExceptionCacheEntry(String jobId, long lastTimestamp) {
this.jobId = jobId;
this.lastTimestamp = lastTimestamp;
}
}

@VisibleForTesting
final Map<String, ExceptionCacheEntry> lastRecordedExceptionCache = new ConcurrentHashMap<>();

private static final Logger LOG = LoggerFactory.getLogger(ApplicationObserver.class);

private final SnapshotObserver<FlinkDeployment, FlinkDeploymentStatus> savepointObserver;
private final JobStatusObserver<FlinkDeployment> jobStatusObserver;

private final ClusterHealthObserver clusterHealthObserver;

public ApplicationObserver(EventRecorder eventRecorder) {
Expand All @@ -56,6 +84,149 @@ protected void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx) {
}
}

@Override
protected void observeJobManagerExceptions(FlinkResourceContext<FlinkDeployment> ctx) {
var deployment = ctx.getResource();
var operatorConfig = ctx.getOperatorConfig();
var jobStatus = deployment.getStatus().getJobStatus();
// Ideally should not happen
if (jobStatus == null || jobStatus.getJobId() == null) {
LOG.warn(
"No jobId found for deployment '{}', skipping exception observation.",
deployment.getMetadata().getName());
return;
}

try {
JobID jobId = JobID.fromHexString(jobStatus.getJobId());
// TODO: Ideally the best way to restrict the number of events is to use the query param
// `maxExceptions`
// but the JobExceptionsMessageParameters does not expose the parameters and nor does
// it have setters.
var history =
ctx.getFlinkService()
.getJobExceptions(
deployment, jobId, ctx.getDeployConfig(deployment.getSpec()));

if (history == null || history.getExceptionHistory() == null) {
return;
}

var exceptionHistory = history.getExceptionHistory();
var exceptions = exceptionHistory.getEntries();
if (exceptions.isEmpty()) {
return;
}

if (exceptionHistory.isTruncated()) {
LOG.warn(
"Job exception history is truncated for jobId '{}'. Some exceptions may be missing.",
jobId);
}

String resourceKey =
deployment.getMetadata().getNamespace()
+ "/"
+ deployment.getMetadata().getName();
String currentJobId = jobStatus.getJobId();
Instant lastRecorded = null; // first reconciliation

ExceptionCacheEntry cacheEntry = lastRecordedExceptionCache.get(resourceKey);
if (cacheEntry != null && cacheEntry.jobId.equals(currentJobId)) {
lastRecorded = Instant.ofEpochMilli(cacheEntry.lastTimestamp);
}

Instant now = Instant.now();
int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount();
int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength();

int count = 0;
for (var exception : exceptions) {
Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
if (lastRecorded != null && exceptionTime.isBefore(lastRecorded)) {
continue;
}

emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
if (++count >= maxEvents) {
break;
}
}
lastRecordedExceptionCache.put(
resourceKey, new ExceptionCacheEntry(currentJobId, now.toEpochMilli()));

} catch (Exception e) {
LOG.warn(
"Failed to fetch JobManager exception info for deployment '{}'.",
deployment.getMetadata().getName(),
e);
}
}

private void emitJobManagerExceptionEvent(
FlinkResourceContext<FlinkDeployment> ctx,
JobExceptionsInfoWithHistory.RootExceptionInfo exception,
Instant exceptionTime,
int maxStackTraceLines) {

String exceptionName = exception.getExceptionName();
if (exceptionName == null || exceptionName.isBlank()) {
return;
}

Map<String, String> annotations = new HashMap<>();
annotations.put(
"event-time-readable",
DateTimeUtils.readable(exceptionTime, ZoneId.systemDefault()));
annotations.put("event-timestamp-millis", String.valueOf(exceptionTime.toEpochMilli()));

if (exception.getTaskName() != null) {
annotations.put("task-name", exception.getTaskName());
}
if (exception.getEndpoint() != null) {
annotations.put("endpoint", exception.getEndpoint());
}
if (exception.getTaskManagerId() != null) {
annotations.put("tm-id", exception.getTaskManagerId());
}

if (exception.getFailureLabels() != null) {
exception
.getFailureLabels()
.forEach((k, v) -> annotations.put("failure-label-" + k, v));
}

StringBuilder eventMessage = new StringBuilder(exceptionName);
String stacktrace = exception.getStacktrace();
if (stacktrace != null && !stacktrace.isBlank()) {
String[] lines = stacktrace.split("\n");
eventMessage.append("\n\nStacktrace (truncated):\n");
for (int i = 0; i < Math.min(maxStackTraceLines, lines.length); i++) {
eventMessage.append(lines[i]).append("\n");
}
if (lines.length > maxStackTraceLines) {
eventMessage
.append("... (")
.append(lines.length - maxStackTraceLines)
.append(" more lines)");
}
}

String keyMessage =
exceptionName.length() > 128 ? exceptionName.substring(0, 128) : exceptionName;

eventRecorder.triggerEventOnceWithAnnotations(
exceptionTime.toEpochMilli(),
ctx.getResource(),
EventRecorder.Type.Warning,
EventRecorder.Reason.JobException,
eventMessage.toString().trim(),
EventRecorder.Component.JobManagerDeployment,
"jobmanager-exception-" + keyMessage.hashCode(),
ctx.getKubernetesClient(),
annotations);
}

private class ApplicationJobObserver extends JobStatusObserver<FlinkDeployment> {
public ApplicationJobObserver(EventRecorder eventRecorder) {
super(eventRecorder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,10 @@ public void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx) {
}
}
}

@Override
protected void observeJobManagerExceptions(FlinkResourceContext<FlinkDeployment> ctx) {
logger.error("Observing job manager exceptions not supported for session cluster");
return;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
Expand All @@ -77,6 +79,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
Expand Down Expand Up @@ -845,6 +848,37 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
(c, e) -> new StandaloneClientHAServices(restServerAddress));
}

@Override
public JobExceptionsInfoWithHistory getJobExceptions(
FlinkDeployment deployment, JobID jobId, Configuration deployConfig) {
JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance();
int port = deployConfig.getInteger(RestOptions.PORT);
String host =
ObjectUtils.firstNonNull(
operatorConfig.getFlinkServiceHostOverride(),
ExternalServiceDecorator.getNamespacedExternalServiceName(
deployment.getMetadata().getName(),
deployment.getMetadata().getNamespace()));
JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
params.jobPathParameter.resolve(jobId);
try (var restClient = getRestClient(deployConfig)) {
return restClient
.sendRequest(
host,
port,
jobExceptionsHeaders,
params,
EmptyRequestBody.getInstance())
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn(
String.format(
"Failed to fetch job exceptions from REST API for jobId %s", jobId),
e);
return null;
}
}

@VisibleForTesting
protected void runJar(
JobSpec job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
Expand Down Expand Up @@ -127,6 +128,9 @@ Map<String, String> getMetrics(Configuration conf, String jobId, List<String> me

RestClusterClient<String> getClusterClient(Configuration conf) throws Exception;

JobExceptionsInfoWithHistory getJobExceptions(
FlinkDeployment deployment, JobID jobId, Configuration deployConfig) throws Exception;

/** Result of a cancel operation. */
@AllArgsConstructor
class CancelResult {
Expand Down
Loading