diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 164c9bbd3d..4a72d8b766 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -98,6 +98,18 @@
Boolean |
Enables dynamic change of watched/monitored namespaces. |
+
+ kubernetes.operator.events.exceptions.limit-per-reconciliation |
+ 10 |
+ Integer |
+ Maximum number of exception-related Kubernetes events emitted per reconciliation cycle. |
+
+
+ kubernetes.operator.events.exceptions.stacktrace-lines |
+ 5 |
+ Integer |
+ Maximum number of stack trace lines to include in exception-related Kubernetes event messages. |
+
kubernetes.operator.exception.field.max.length |
2048 |
diff --git a/docs/layouts/shortcodes/generated/system_advanced_section.html b/docs/layouts/shortcodes/generated/system_advanced_section.html
index 092604b493..7752c15d5f 100644
--- a/docs/layouts/shortcodes/generated/system_advanced_section.html
+++ b/docs/layouts/shortcodes/generated/system_advanced_section.html
@@ -38,6 +38,18 @@
Boolean |
Whether to enable on-the-fly config changes through the operator configmap. |
+
+ kubernetes.operator.events.exceptions.limit-per-reconciliation |
+ 10 |
+ Integer |
+ Maximum number of exception-related Kubernetes events emitted per reconciliation cycle. |
+
+
+ kubernetes.operator.events.exceptions.stacktrace-lines |
+ 5 |
+ Integer |
+ Maximum number of stack trace lines to include in exception-related Kubernetes event messages. |
+
kubernetes.operator.health.canary.resource.timeout |
1 min |
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index ad044242af..f65385ecc4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -77,6 +77,8 @@ public class FlinkOperatorConfiguration {
DeletionPropagation deletionPropagation;
boolean snapshotResourcesEnabled;
Duration slowRequestThreshold;
+ int reportedExceptionEventsMaxCount;
+ int reportedExceptionEventsMaxStackTraceLength;
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
@@ -195,6 +197,12 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
Duration slowRequestThreshold =
operatorConfig.get(OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD);
+ int reportedExceptionEventsMaxCount =
+ operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT);
+ int reportedExceptionEventsMaxStackTraceLength =
+ operatorConfig.get(
+ KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES);
+
return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
@@ -224,7 +232,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
getLeaderElectionConfig(operatorConfig),
deletionPropagation,
snapshotResourcesEnabled,
- slowRequestThreshold);
+ slowRequestThreshold,
+ reportedExceptionEventsMaxCount,
+ reportedExceptionEventsMaxStackTraceLength);
}
private static GenericRetry getRetryConfig(Configuration conf) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index f4203fb7be..72821cb01d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -654,4 +654,20 @@ public static String operatorConfigKey(String key) {
.defaultValue(Duration.ofMinutes(-1))
.withDescription(
"How often to retrieve Kubernetes cluster resource usage information. This information is used to avoid running out of cluster resources when scaling up resources. Negative values disable the feature.");
+
+ @Documentation.Section(SECTION_ADVANCED)
+ public static final ConfigOption OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES =
+ operatorConfig("events.exceptions.stacktrace-lines")
+ .intType()
+ .defaultValue(5)
+ .withDescription(
+ "Maximum number of stack trace lines to include in exception-related Kubernetes event messages.");
+
+ @Documentation.Section(SECTION_ADVANCED)
+ public static final ConfigOption OPERATOR_EVENT_EXCEPTION_LIMIT =
+ operatorConfig("events.exceptions.limit-per-reconciliation")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.");
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java
index 20cd5f9557..2944217836 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java
@@ -25,6 +25,7 @@
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
+import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -39,8 +40,15 @@ public FlinkDeploymentContext(
Context> josdkContext,
KubernetesResourceMetricGroup resourceMetricGroup,
FlinkConfigManager configManager,
- Function, FlinkService> flinkServiceFactory) {
- super(resource, josdkContext, resourceMetricGroup, configManager, flinkServiceFactory);
+ Function, FlinkService> flinkServiceFactory,
+ FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry) {
+ super(
+ resource,
+ josdkContext,
+ resourceMetricGroup,
+ configManager,
+ flinkServiceFactory,
+ exceptionCacheEntry);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
index 3010f478c9..f62df74f79 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
@@ -30,6 +30,7 @@
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
+import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -51,6 +52,8 @@ public abstract class FlinkResourceContext, FlinkService> flinkServiceFactory;
+ @Getter private final FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry;
+
private FlinkOperatorConfiguration operatorConfig;
private Configuration observeConfig;
private FlinkService flinkService;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java
index 5d18f56f29..f0256b31f4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java
@@ -26,6 +26,7 @@
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
+import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -44,8 +45,15 @@ public FlinkSessionJobContext(
Context> josdkContext,
KubernetesResourceMetricGroup resourceMetricGroup,
FlinkConfigManager configManager,
- Function, FlinkService> flinkServiceFactory) {
- super(resource, josdkContext, resourceMetricGroup, configManager, flinkServiceFactory);
+ Function, FlinkService> flinkServiceFactory,
+ FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry) {
+ super(
+ resource,
+ josdkContext,
+ resourceMetricGroup,
+ configManager,
+ flinkServiceFactory,
+ exceptionCacheEntry);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 444d0a0ad5..caf4a32a23 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -28,11 +28,21 @@
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
+import org.apache.flink.kubernetes.operator.utils.K8sAnnotationsSanitizer;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
@@ -69,17 +79,20 @@ public boolean observe(FlinkResourceContext ctx) {
var jobStatus = resource.getStatus().getJobStatus();
LOG.debug("Observing job status");
var previousJobStatus = jobStatus.getState();
-
+ var jobId = jobStatus.getJobId();
try {
var newJobStatusOpt =
ctx.getFlinkService()
- .getJobStatus(
- ctx.getObserveConfig(),
- JobID.fromHexString(jobStatus.getJobId()));
+ .getJobStatus(ctx.getObserveConfig(), JobID.fromHexString(jobId));
if (newJobStatusOpt.isPresent()) {
- updateJobStatus(ctx, newJobStatusOpt.get());
+ var newJobStatus = newJobStatusOpt.get();
+ updateJobStatus(ctx, newJobStatus);
ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
+ // see if the JM server is up, try to get the exceptions
+ if (!previousJobStatus.isGloballyTerminalState()) {
+ observeJobManagerExceptions(ctx);
+ }
return true;
} else {
onTargetJobNotFound(ctx);
@@ -95,6 +108,149 @@ public boolean observe(FlinkResourceContext ctx) {
return false;
}
+ /**
+ * Observe the exceptions raised in the job manager and take appropriate action.
+ *
+ * @param ctx the context with which the operation is executed
+ */
+ protected void observeJobManagerExceptions(FlinkResourceContext ctx) {
+ var resource = ctx.getResource();
+ var operatorConfig = ctx.getOperatorConfig();
+ var jobStatus = resource.getStatus().getJobStatus();
+
+ try {
+ var jobId = JobID.fromHexString(jobStatus.getJobId());
+ // TODO: Ideally the best way to restrict the number of events is to use the query param
+ // `maxExceptions`
+ // but the JobExceptionsMessageParameters does not expose the parameters and nor does
+ // it have setters.
+ var history =
+ ctx.getFlinkService().getJobExceptions(resource, jobId, ctx.getObserveConfig());
+
+ if (history == null || history.getExceptionHistory() == null) {
+ return;
+ }
+
+ var exceptionHistory = history.getExceptionHistory();
+ List exceptions =
+ exceptionHistory.getEntries();
+ if (exceptions == null || exceptions.isEmpty()) {
+ return;
+ }
+
+ if (exceptionHistory.isTruncated()) {
+ LOG.warn(
+ "Job exception history is truncated for jobId '{}'. Some exceptions may be missing.",
+ jobId);
+ }
+
+ String currentJobId = jobStatus.getJobId();
+ Instant lastRecorded = null; // first reconciliation
+
+ var cacheEntry = ctx.getExceptionCacheEntry();
+ // a cache entry is created should always be present. The timestamp for the first
+ // reconciliation would be
+ // when the job was created. This check is still necessary because even though there
+ // might be an entry,
+ // the jobId could have changed since the job was first created.
+ if (cacheEntry.getJobId() != null && cacheEntry.getJobId().equals(currentJobId)) {
+ lastRecorded = Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
+ }
+
+ int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount();
+ int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength();
+
+ // Sort and reverse to prioritize the newest exceptions
+ var sortedExceptions = new ArrayList<>(exceptions);
+ sortedExceptions.sort(
+ Comparator.comparingLong(
+ JobExceptionsInfoWithHistory.RootExceptionInfo::getTimestamp)
+ .reversed());
+ int count = 0;
+ Instant latestSeen = null;
+
+ for (var exception : sortedExceptions) {
+ Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
+ // Skip already recorded exceptions
+ if (lastRecorded != null && !exceptionTime.isAfter(lastRecorded)) {
+ break;
+ }
+ emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
+ if (latestSeen == null) {
+ latestSeen = exceptionTime;
+ }
+ if (++count >= maxEvents) {
+ break;
+ }
+ }
+
+ ctx.getExceptionCacheEntry().setJobId(currentJobId);
+ // Set to the timestamp of the latest emitted exception, if any were emitted
+ // the other option is that if no exceptions were emitted, we set this to now.
+ if (latestSeen != null) {
+ ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen.toEpochMilli());
+ }
+
+ } catch (Exception e) {
+ LOG.warn("Failed to fetch JobManager exception info.", e);
+ }
+ }
+
+ private void emitJobManagerExceptionEvent(
+ FlinkResourceContext ctx,
+ JobExceptionsInfoWithHistory.RootExceptionInfo exception,
+ Instant exceptionTime,
+ int maxStackTraceLines) {
+ Map annotations = new HashMap<>();
+ if (exceptionTime != null) {
+ annotations.put(
+ "exception-timestamp",
+ exceptionTime.atZone(ZoneId.systemDefault()).toOffsetDateTime().toString());
+ }
+ if (exception.getTaskName() != null) {
+ annotations.put("task-name", exception.getTaskName());
+ }
+ if (exception.getEndpoint() != null) {
+ annotations.put("endpoint", exception.getEndpoint());
+ }
+ if (exception.getTaskManagerId() != null) {
+ annotations.put("tm-id", exception.getTaskManagerId());
+ }
+ if (exception.getFailureLabels() != null) {
+ exception
+ .getFailureLabels()
+ .forEach((k, v) -> annotations.put("failure-label-" + k, v));
+ }
+
+ StringBuilder eventMessage = new StringBuilder();
+ String stacktrace = exception.getStacktrace();
+ if (stacktrace != null && !stacktrace.isBlank()) {
+ String[] lines = stacktrace.split("\n");
+ for (int i = 0; i < Math.min(maxStackTraceLines, lines.length); i++) {
+ eventMessage.append(lines[i]).append("\n");
+ }
+ if (lines.length > maxStackTraceLines) {
+ eventMessage
+ .append("... (")
+ .append(lines.length - maxStackTraceLines)
+ .append(" more lines)");
+ }
+ }
+
+ String identityKey =
+ "jobmanager-exception-"
+ + Integer.toHexString(Objects.hash(eventMessage.toString()));
+ eventRecorder.triggerEventWithAnnotations(
+ ctx.getResource(),
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.JobException,
+ eventMessage.toString().trim(),
+ EventRecorder.Component.Job,
+ identityKey,
+ ctx.getKubernetesClient(),
+ K8sAnnotationsSanitizer.sanitizeAnnotations(annotations));
+ }
+
/**
* Callback when no matching target job was found on a cluster where jobs were found.
*
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 733ebd3ae2..3d6da4f5bd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -31,6 +31,7 @@
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
@@ -65,6 +66,8 @@
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
@@ -77,6 +80,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
@@ -845,6 +849,37 @@ public RestClusterClient getClusterClient(Configuration conf) throws Exc
(c, e) -> new StandaloneClientHAServices(restServerAddress));
}
+ @Override
+ public JobExceptionsInfoWithHistory getJobExceptions(
+ AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) {
+ JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance();
+ int port = observeConfig.getInteger(RestOptions.PORT);
+ String host =
+ ObjectUtils.firstNonNull(
+ operatorConfig.getFlinkServiceHostOverride(),
+ ExternalServiceDecorator.getNamespacedExternalServiceName(
+ resource.getMetadata().getName(),
+ resource.getMetadata().getNamespace()));
+ JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
+ params.jobPathParameter.resolve(jobId);
+ try (var restClient = getRestClient(observeConfig)) {
+ return restClient
+ .sendRequest(
+ host,
+ port,
+ jobExceptionsHeaders,
+ params,
+ EmptyRequestBody.getInstance())
+ .get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.warn(
+ String.format(
+ "Failed to fetch job exceptions from REST API for jobId %s", jobId),
+ e);
+ return null;
+ }
+ }
+
@VisibleForTesting
protected void runJar(
JobSpec job,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
index d9d3688b20..8228ffbf78 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
@@ -39,6 +39,7 @@
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +53,22 @@ public class FlinkResourceContextFactory {
private static final Logger LOG = LoggerFactory.getLogger(FlinkResourceContextFactory.class);
+ /** The cache entry for the last recorded exception timestamp for a JobID. */
+ @Data
+ public static final class ExceptionCacheEntry {
+ private String jobId;
+ private long lastTimestamp;
+
+ public ExceptionCacheEntry(String jobId, long lastTimestamp) {
+ this.jobId = jobId;
+ this.lastTimestamp = lastTimestamp;
+ }
+ }
+
+ @VisibleForTesting
+ final Map lastRecordedExceptionCache =
+ new ConcurrentHashMap<>();
+
private final FlinkConfigManager configManager;
private final ArtifactManager artifactManager;
private final ExecutorService clientExecutorService;
@@ -93,20 +110,44 @@ public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
r ->
OperatorMetricUtils.createResourceMetricGroup(
operatorMetricGroup, configManager, resource));
-
+ String jobId = null;
+ if (resource.getStatus() != null) {
+ if (resource.getStatus().getJobStatus() != null) {
+ jobId = resource.getStatus().getJobStatus().getJobId();
+ }
+ }
if (resource instanceof FlinkDeployment) {
var flinkDep = (FlinkDeployment) resource;
+ var resourceId = ResourceID.fromResource(flinkDep);
+ var flinkDepJobId = jobId;
return (FlinkResourceContext)
new FlinkDeploymentContext(
- flinkDep, josdkContext, resMg, configManager, this::getFlinkService);
+ flinkDep,
+ josdkContext,
+ resMg,
+ configManager,
+ this::getFlinkService,
+ lastRecordedExceptionCache.computeIfAbsent(
+ resourceId,
+ id ->
+ new ExceptionCacheEntry(
+ flinkDepJobId, System.currentTimeMillis())));
} else if (resource instanceof FlinkSessionJob) {
+ var resourceId = ResourceID.fromResource(resource);
+ var flinkSessionJobId = jobId;
return (FlinkResourceContext)
new FlinkSessionJobContext(
(FlinkSessionJob) resource,
josdkContext,
resMg,
configManager,
- this::getFlinkService);
+ this::getFlinkService,
+ lastRecordedExceptionCache.computeIfAbsent(
+ resourceId,
+ id ->
+ new ExceptionCacheEntry(
+ flinkSessionJobId,
+ System.currentTimeMillis())));
} else {
throw new IllegalArgumentException(
"Unknown resource type " + resource.getClass().getSimpleName());
@@ -137,13 +178,16 @@ protected FlinkService getFlinkService(FlinkResourceContext> ctx) {
}
public > void cleanup(CR flinkApp) {
+ ResourceID resourceId = ResourceID.fromResource(flinkApp);
var resourceMetricGroup =
- resourceMetricGroups.remove(
- Tuple2.of(flinkApp.getClass(), ResourceID.fromResource(flinkApp)));
+ resourceMetricGroups.remove(Tuple2.of(flinkApp.getClass(), resourceId));
if (resourceMetricGroup != null) {
resourceMetricGroup.close();
} else {
LOG.warn("Unknown resource metric group for {}", flinkApp);
}
+ // remove the resource from the cache
+ lastRecordedExceptionCache.remove(resourceId);
+ LOG.debug("Removed resource {} from last recorded exception cache", resourceId);
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 168ea4c1f4..b1a078fbb2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
@@ -33,6 +34,7 @@
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
@@ -127,6 +129,10 @@ Map getMetrics(Configuration conf, String jobId, List me
RestClusterClient getClusterClient(Configuration conf) throws Exception;
+ JobExceptionsInfoWithHistory getJobExceptions(
+ AbstractFlinkResource resource, JobID jobId, Configuration observeConfig)
+ throws Exception;
+
/** Result of a cancel operation. */
@AllArgsConstructor
class CancelResult {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index 1989de0e14..cae00694ac 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -171,6 +171,27 @@ public boolean triggerEventOnce(
messageKey);
}
+ public boolean triggerEventWithAnnotations(
+ AbstractFlinkResource, ?> resource,
+ Type type,
+ Reason reason,
+ String message,
+ Component component,
+ String messageKey,
+ KubernetesClient client,
+ Map annotations) {
+ return EventUtils.createWithAnnotations(
+ client,
+ resource,
+ type,
+ reason.toString(),
+ message,
+ component,
+ e -> eventListenerFlinkResource.accept(resource, e),
+ messageKey,
+ annotations);
+ }
+
/**
* @param resource The resource
* @param type The type
@@ -315,6 +336,7 @@ public enum Reason {
UnsupportedFlinkVersion,
SnapshotError,
SnapshotAbandoned,
+ JobException,
Error
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index fb0dbfc663..df6232d46f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -104,6 +104,40 @@ public static Event findExistingEvent(
.get();
}
+ /**
+ * Create or update an event for the target resource. If the event already exists, it will be
+ * updated with the new annotations, message and the count will be increased.
+ */
+ public static boolean createWithAnnotations(
+ KubernetesClient client,
+ HasMetadata target,
+ EventRecorder.Type type,
+ String reason,
+ String message,
+ EventRecorder.Component component,
+ Consumer eventListener,
+ @Nullable String messageKey,
+ @Nullable Map annotations) {
+ String eventName =
+ generateEventName(
+ target, type, reason, messageKey != null ? messageKey : message, component);
+ Event existing = findExistingEvent(client, target, eventName);
+
+ if (existing != null) {
+ existing.setLastTimestamp(Instant.now().toString());
+ existing.setCount(existing.getCount() + 1);
+ existing.setMessage(message);
+ setAnnotations(existing, annotations);
+ createOrReplaceEvent(client, existing).ifPresent(eventListener);
+ return false;
+ } else {
+ Event event = buildEvent(target, type, reason, message, component, eventName);
+ setAnnotations(event, annotations);
+ createOrReplaceEvent(client, event).ifPresent(eventListener);
+ return true;
+ }
+ }
+
public static boolean createIfNotExists(
KubernetesClient client,
HasMetadata target,
@@ -182,6 +216,16 @@ private static void setLabels(Event existing, @Nullable Map labe
existing.getMetadata().setLabels(labels);
}
+ private static void setAnnotations(Event existing, @Nullable Map annotations) {
+ if (annotations == null) {
+ return;
+ }
+ if (existing.getMetadata() == null) {
+ existing.setMetadata(new ObjectMeta());
+ }
+ existing.getMetadata().setAnnotations(annotations);
+ }
+
private static Event buildEvent(
HasMetadata target,
EventRecorder.Type type,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/K8sAnnotationsSanitizer.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/K8sAnnotationsSanitizer.java
new file mode 100644
index 0000000000..086c1995ee
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/K8sAnnotationsSanitizer.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Utility class for sanitizing Kubernetes annotations as per k8s rules. See Kubernetes
+ * Annotations Syntax and Character Set
+ */
+public class K8sAnnotationsSanitizer {
+
+ private static final int MAX_PREFIX_LENGTH = 253;
+ private static final int MAX_NAME_LENGTH = 63;
+
+ // Name: starts and ends with alphanumeric, allows alphanum, '-', '_', '.' in middle
+ private static final Pattern NAME_PATTERN =
+ Pattern.compile("^[a-zA-Z0-9]([a-zA-Z0-9_.-]*[a-zA-Z0-9])?$");
+
+ // DNS label: alphanumeric, may have hyphens inside, length ≤ 63
+ private static final Pattern DNS_LABEL_PATTERN =
+ Pattern.compile("^[a-zA-Z0-9]([-a-zA-Z0-9]*[a-zA-Z0-9])?$");
+
+ /**
+ * Sanitizes the input annotations by validating keys and cleaning values. Only includes entries
+ * with valid keys and non-null values.
+ */
+ public static Map sanitizeAnnotations(Map annotations) {
+ Map sanitized = new HashMap<>();
+ if (annotations == null) {
+ return sanitized;
+ }
+
+ for (Map.Entry entry : annotations.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (isValidAnnotationKey(key)) {
+ String sanitizedValue = sanitizeAnnotationValue(value);
+ if (sanitizedValue != null) {
+ sanitized.put(key, sanitizedValue);
+ }
+ }
+ }
+ return sanitized;
+ }
+
+ /** Validates the annotation key according to Kubernetes rules: Optional prefix + "/" + name. */
+ @VisibleForTesting
+ static boolean isValidAnnotationKey(String key) {
+ if (key == null || key.isEmpty()) {
+ return false;
+ }
+
+ String[] parts = key.split("/", 2);
+ if (parts.length == 2) {
+ return isValidPrefix(parts[0]) && isValidName(parts[1]);
+ } else {
+ return isValidName(parts[0]);
+ }
+ }
+
+ /**
+ * Validates the prefix as a DNS subdomain: series of DNS labels separated by dots, total length
+ * ≤ 253.
+ */
+ private static boolean isValidPrefix(String prefix) {
+ if (prefix.length() > MAX_PREFIX_LENGTH) {
+ return false;
+ }
+ if (prefix.endsWith(".")) {
+ return false; // no trailing dot allowed
+ }
+ String[] labels = prefix.split("\\.");
+ for (String label : labels) {
+ if (label.isEmpty() || label.length() > 63) {
+ return false;
+ }
+ if (!DNS_LABEL_PATTERN.matcher(label).matches()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /** Validates the name part of the key. */
+ private static boolean isValidName(String name) {
+ return name != null
+ && name.length() <= MAX_NAME_LENGTH
+ && NAME_PATTERN.matcher(name).matches();
+ }
+
+ /**
+ * Sanitizes the annotation value by trimming and removing control characters, replacing
+ * newlines and tabs with spaces. No length limit.
+ */
+ @VisibleForTesting
+ static String sanitizeAnnotationValue(String value) {
+ if (value == null) {
+ return null;
+ }
+
+ // Trim whitespace, remove control chars except \r \n \t, replace those with space
+ String sanitized =
+ value.trim()
+ .replaceAll("[\\p{Cntrl}&&[^\r\n\t]]", "")
+ .replaceAll("[\\r\\n\\t]+", " ");
+
+ return sanitized;
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
index 8a6af931e4..2692ba4935 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/OperatorTestBase.java
@@ -31,7 +31,7 @@
import org.junit.jupiter.api.BeforeEach;
/**
- * @link JobStatusObserver unit tests
+ * @link Base class for unit tests
*/
public abstract class OperatorTestBase {
@@ -69,6 +69,11 @@ public void prepare() {
return getResourceContext(cr, context);
}
+ public > FlinkResourceContext getResourceContext(
+ CR cr, Configuration configuration) {
+ return getResourceContext(cr, context, configuration);
+ }
+
public > FlinkResourceContext getResourceContext(
CR cr, Context josdkContext) {
var ctxFactory =
@@ -76,4 +81,13 @@ public void prepare() {
configManager, operatorMetricGroup, flinkService, eventRecorder);
return ctxFactory.getResourceContext(cr, josdkContext);
}
+
+ public > FlinkResourceContext getResourceContext(
+ CR cr, Context josdkContext, Configuration configuration) {
+ configManager.updateDefaultConfig(configuration);
+ var ctxFactory =
+ new TestingFlinkResourceContextFactory(
+ configManager, operatorMetricGroup, flinkService, eventRecorder);
+ return ctxFactory.getResourceContext(cr, josdkContext);
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index fa893e7c49..9b7bc2e2ca 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -64,6 +64,7 @@
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
@@ -113,6 +114,7 @@ public class TestingFlinkService extends AbstractFlinkService {
"15.0.0",
DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
"1234567 @ 1970-01-01T00:00:00+00:00");
+ private final Map jobExceptionsMap = new HashMap<>();
public static final String SNAPSHOT_ERROR_MESSAGE = "Failed";
private int savepointCounter = 0;
@@ -311,6 +313,12 @@ public Optional getJobStatus(Configuration conf, JobID jobID)
return super.getJobStatus(conf, jobID);
}
+ @Override
+ public JobExceptionsInfoWithHistory getJobExceptions(
+ AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) {
+ return jobExceptionsMap.getOrDefault(jobId, null);
+ }
+
@Override
public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
if (jobFailedErr != null) {
@@ -494,11 +502,13 @@ public RestClusterClient getClusterClient(Configuration config) throws E
}
private MultipleJobsDetails getMultipleJobsDetails() {
- return new MultipleJobsDetails(
- jobs.stream()
- .map(tuple -> tuple.f1)
- .map(TestingFlinkService::toJobDetails)
- .collect(Collectors.toList()));
+ MultipleJobsDetails multipleJobsDetails =
+ new MultipleJobsDetails(
+ jobs.stream()
+ .map(tuple -> tuple.f1)
+ .map(TestingFlinkService::toJobDetails)
+ .collect(Collectors.toList()));
+ return multipleJobsDetails;
}
private AggregatedMetricsResponseBody getSubtaskMetrics() {
@@ -722,4 +732,42 @@ public Map getMetrics(
Configuration conf, String jobId, List metricNames) {
return metricsValues;
}
+
+ /**
+ * Utilities to add exception history for testing.
+ *
+ * @param jobId of Job for which exception history is mocked
+ * @param exceptionName dummy exception name
+ * @param stackTrace dummy stack trace
+ * @param timestamp dummy timestamp
+ */
+ public void addExceptionHistory(
+ JobID jobId, String exceptionName, String stackTrace, long timestamp) {
+ JobExceptionsInfoWithHistory.RootExceptionInfo exception =
+ new JobExceptionsInfoWithHistory.RootExceptionInfo(
+ exceptionName,
+ stackTrace,
+ timestamp,
+ Map.of("label-key", "label-value"),
+ "task-name-1",
+ "location-1",
+ "endpoint-1",
+ "tm-id-1",
+ List.of() // concurrentExceptions
+ );
+
+ JobExceptionsInfoWithHistory existing = jobExceptionsMap.get(jobId);
+ List entries =
+ existing != null && existing.getExceptionHistory() != null
+ ? new ArrayList<>(existing.getExceptionHistory().getEntries())
+ : new ArrayList<>();
+ entries.add(exception);
+
+ JobExceptionsInfoWithHistory.JobExceptionHistory exceptionHistory =
+ new JobExceptionsInfoWithHistory.JobExceptionHistory(entries, false);
+
+ JobExceptionsInfoWithHistory newExceptionHistory =
+ new JobExceptionsInfoWithHistory(exceptionHistory);
+ jobExceptionsMap.put(jobId, newExceptionHistory);
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
index a1416385d0..87df93669b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContextTest.java
@@ -63,6 +63,7 @@ public KubernetesClient getClient() {
},
null,
new FlinkConfigManager(new Configuration()),
+ null,
null));
assertThat(context.getTaskManagerCpu()).isEqualTo(Optional.of(23.));
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java
index dbf0c3bcbf..6b0849be82 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java
@@ -54,6 +54,7 @@ public KubernetesClient getClient() {
},
null,
new FlinkConfigManager(new Configuration()),
+ null,
null));
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContextTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContextTest.java
index 75c9841a98..1f734274a3 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContextTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContextTest.java
@@ -153,10 +153,10 @@ public Configuration getObserveConfig(FlinkDeployment deployment) {
FlinkResourceContext getContext(AbstractFlinkResource, ?> cr) {
if (cr instanceof FlinkDeployment) {
return new FlinkDeploymentContext(
- (FlinkDeployment) cr, josdkCtx, null, configManager, serviceFactory);
+ (FlinkDeployment) cr, josdkCtx, null, configManager, serviceFactory, null);
} else {
return new FlinkSessionJobContext(
- (FlinkSessionJob) cr, josdkCtx, null, configManager, serviceFactory);
+ (FlinkSessionJob) cr, josdkCtx, null, configManager, serviceFactory, null);
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index aee49e3c3c..e40ae74e8e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -19,6 +19,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -27,7 +28,9 @@
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.util.SerializedThrowable;
@@ -41,9 +44,12 @@
import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for the {@link JobStatusObserver}. */
@@ -147,6 +153,218 @@ void testFailed() throws Exception {
assertTrue(flinkResourceEventCollector.events.isEmpty());
}
+ @Test
+ public void testExceptionObservedEvenWhenNewStateIsTerminal() throws Exception {
+ var deployment = initDeployment();
+ var status = deployment.getStatus();
+ var jobStatus = status.getJobStatus();
+ jobStatus.setState(JobStatus.RUNNING);
+ Map configuration = new HashMap<>();
+ configuration.put(
+ KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2");
+ Configuration operatorConfig = Configuration.fromMap(configuration);
+ FlinkResourceContext> ctx =
+ getResourceContext(deployment, operatorConfig);
+
+ var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+ ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
+ ctx.getExceptionCacheEntry().setLastTimestamp(500L);
+ flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 1000L);
+
+ // Ensure jobFailedErr is null before the observe call
+ flinkService.submitApplicationCluster(
+ deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
+ flinkService.cancelJob(JobID.fromHexString(jobStatus.getJobId()), false);
+ flinkService.setJobFailedErr(null);
+
+ observer.observe(ctx);
+
+ var events =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(deployment.getMetadata().getNamespace())
+ .list()
+ .getItems();
+ assertEquals(2, events.size()); // one will be for job status changed
+ // assert that none of the events contain JOB_NOT_FOUND_ERR
+ assertFalse(
+ events.stream()
+ .anyMatch(
+ event ->
+ event.getMessage()
+ .contains(JobStatusObserver.JOB_NOT_FOUND_ERR)));
+ }
+
+ @Test
+ public void testExceptionNotObservedWhenOldStateIsTerminal() throws Exception {
+ var deployment = initDeployment();
+ var status = deployment.getStatus();
+ var jobStatus = status.getJobStatus();
+ jobStatus.setState(JobStatus.CANCELED);
+ Map configuration = new HashMap<>();
+ configuration.put(
+ KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2");
+ Configuration operatorConfig = Configuration.fromMap(configuration);
+ FlinkResourceContext> ctx =
+ getResourceContext(deployment, operatorConfig);
+
+ var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+ ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
+ ctx.getExceptionCacheEntry().setLastTimestamp(500L);
+ flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 1000L);
+
+ // Ensure jobFailedErr is null before the observe call
+ flinkService.submitApplicationCluster(
+ deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
+ flinkService.setJobFailedErr(null);
+
+ observer.observe(ctx);
+
+ var events =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(deployment.getMetadata().getNamespace())
+ .list()
+ .getItems();
+ assertEquals(1, events.size()); // only one event for job status changed
+ assertEquals(EventRecorder.Reason.JobStatusChanged.name(), events.get(0).getReason());
+ }
+
+ @Test
+ public void testExceptionLimitConfig() throws Exception {
+ var observer = new JobStatusObserver<>(eventRecorder);
+ var deployment = initDeployment();
+ var status = deployment.getStatus();
+ var jobStatus = status.getJobStatus();
+ jobStatus.setState(JobStatus.RUNNING);
+ Map configuration = new HashMap<>();
+ configuration.put(
+ KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2");
+ Configuration operatorConfig = Configuration.fromMap(configuration);
+ FlinkResourceContext> ctx =
+ getResourceContext(deployment, operatorConfig); // set a non-terminal state
+
+ var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+ ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
+ ctx.getExceptionCacheEntry().setLastTimestamp(500L);
+
+ flinkService.submitApplicationCluster(
+ deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
+ flinkService.addExceptionHistory(jobId, "ExceptionOne", "trace1", 1000L);
+ flinkService.addExceptionHistory(jobId, "ExceptionTwo", "trace2", 2000L);
+ flinkService.addExceptionHistory(jobId, "ExceptionThree", "trace3", 3000L);
+
+ // Ensure jobFailedErr is null before the observe call
+ flinkService.setJobFailedErr(null);
+
+ observer.observe(ctx);
+
+ var events =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(deployment.getMetadata().getNamespace())
+ .list()
+ .getItems();
+ assertEquals(2, events.size());
+ }
+
+ @Test
+ public void testStackTraceTruncationConfig() throws Exception {
+ var deployment = initDeployment();
+ var status = deployment.getStatus();
+ var jobStatus = status.getJobStatus();
+ jobStatus.setState(JobStatus.RUNNING);
+ Map configuration = new HashMap<>();
+ configuration.put(
+ KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES.key(),
+ "2");
+ Configuration operatorConfig = Configuration.fromMap(configuration);
+ FlinkResourceContext> ctx =
+ getResourceContext(deployment, operatorConfig);
+
+ var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+ flinkService.submitApplicationCluster(
+ deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
+ ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
+ ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
+ ctx.getExceptionCacheEntry().setLastTimestamp(3000L);
+
+ long exceptionTime = 4000L;
+ String longTrace = "line1\nline2\nline3\nline4";
+ flinkService.addExceptionHistory(jobId, "StackTraceCheck", longTrace, exceptionTime);
+
+ // Ensure jobFailedErr is null before the observe call
+ flinkService.setJobFailedErr(null);
+ observer.observe(ctx);
+
+ var events =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(deployment.getMetadata().getNamespace())
+ .list()
+ .getItems();
+ assertEquals(1, events.size());
+ String msg = events.get(0).getMessage();
+ assertTrue(msg.contains("line1"));
+ assertTrue(msg.contains("line2"));
+ assertFalse(msg.contains("line3"));
+ assertTrue(msg.contains("... (2 more lines)"));
+ }
+
+ @Test
+ public void testIgnoreOldExceptions() throws Exception {
+ var deployment = initDeployment();
+ var status = deployment.getStatus();
+ var jobStatus = status.getJobStatus();
+ jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state
+
+ FlinkResourceContext> ctx = getResourceContext(deployment);
+ ctx.getExceptionCacheEntry().setJobId(deployment.getStatus().getJobStatus().getJobId());
+ ctx.getExceptionCacheEntry().setLastTimestamp(2500L);
+
+ var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
+ flinkService.submitApplicationCluster(
+ deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
+ // Map exception names to timestamps
+ Map exceptionHistory =
+ Map.of(
+ "OldException", 1000L,
+ "MidException", 2000L,
+ "NewException", 3000L);
+ String dummyStackTrace =
+ "org.apache.%s\n"
+ + "\tat org.apache.flink.kubernetes.operator.observer.JobStatusObserverTest.testIgnoreOldExceptions(JobStatusObserverTest.java:1)\n"
+ + "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n"
+ + "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n"
+ + "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n"
+ + "\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n";
+ // Add mapped exceptions
+ exceptionHistory.forEach(
+ (exceptionName, timestamp) -> {
+ String fullStackTrace = String.format(dummyStackTrace, exceptionName);
+ flinkService.addExceptionHistory(
+ jobId, "org.apache." + exceptionName, fullStackTrace, timestamp);
+ });
+
+ // Ensure jobFailedErr is null before the observe call
+ flinkService.setJobFailedErr(null);
+ observer.observe(ctx);
+
+ var events =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(deployment.getMetadata().getNamespace())
+ .list()
+ .getItems();
+ assertEquals(1, events.size());
+ assertTrue(events.get(0).getMessage().contains("org.apache.NewException"));
+ }
+
private static Stream cancellingArgs() {
var args = new ArrayList();
for (var status : JobStatus.values()) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 352c726cec..64b216324c 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -306,7 +306,8 @@ protected void updateVertexResources(
TestUtils.createEmptyContextWithClient(client),
null,
configManager,
- ctx -> service),
+ ctx -> service,
+ null),
configManager.getDeployConfig(flinkDep.getMetadata(), flinkDep.getSpec())));
assertEquals(
Map.of(
@@ -494,7 +495,8 @@ private void testScaleConditionDep(
TestUtils.createEmptyContextWithClient(client),
null,
configManager,
- c -> service),
+ c -> service,
+ null),
configManager.getDeployConfig(depCopy.getMetadata(), depCopy.getSpec())));
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
index c1e758f34d..4e1c6b82ca 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
@@ -572,4 +572,82 @@ public void accept(Event event) {
null));
Assertions.assertNull(eventConsumed);
}
+
+ @Test
+ public void testCreateWithAnnotations() {
+ var consumer =
+ new Consumer() {
+ @Override
+ public void accept(Event event) {
+ eventConsumed = event;
+ }
+ };
+ var flinkApp = TestUtils.buildApplicationCluster();
+ var reason = "TestReason";
+ var message = "Test message";
+ var messageKey = "testKey";
+ var annotations = Map.of("annot1", "value1", "annot2", "value2");
+ var eventName =
+ EventUtils.generateEventName(
+ flinkApp,
+ EventRecorder.Type.Normal,
+ reason,
+ messageKey,
+ EventRecorder.Component.Operator);
+
+ // First call should create the event
+ boolean created =
+ EventUtils.createWithAnnotations(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Normal,
+ reason,
+ message,
+ EventRecorder.Component.Operator,
+ consumer,
+ messageKey,
+ annotations);
+ Assertions.assertTrue(created);
+
+ var event =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(eventName)
+ .get();
+
+ Assertions.assertNotNull(event);
+ Assertions.assertEquals(reason, event.getReason());
+ Assertions.assertEquals(message, event.getMessage());
+ Assertions.assertEquals("value1", event.getMetadata().getAnnotations().get("annot1"));
+ Assertions.assertEquals("value2", event.getMetadata().getAnnotations().get("annot2"));
+ Assertions.assertEquals(eventConsumed, event);
+
+ // Second call with same key should not create new event
+ eventConsumed = null;
+ boolean createdAgain =
+ EventUtils.createWithAnnotations(
+ kubernetesClient,
+ flinkApp,
+ EventRecorder.Type.Normal,
+ reason,
+ message,
+ EventRecorder.Component.Operator,
+ consumer,
+ messageKey,
+ annotations);
+ Assertions.assertFalse(createdAgain);
+
+ var eventUnchanged =
+ kubernetesClient
+ .v1()
+ .events()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(eventName)
+ .get();
+ // but it should increase the count
+ Assertions.assertEquals(2, eventUnchanged.getCount());
+ Assertions.assertNotNull(eventConsumed);
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/K8sAnnotationsSanitizerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/K8sAnnotationsSanitizerTest.java
new file mode 100644
index 0000000000..0f84b45aa9
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/K8sAnnotationsSanitizerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link K8sAnnotationsSanitizer}. */
+public class K8sAnnotationsSanitizerTest {
+
+ @Test
+ public void testValidKeysWithoutPrefix() {
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo")).isTrue();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo-bar")).isTrue();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo.bar")).isTrue();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo_bar")).isTrue();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("f")).isTrue();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("f1")).isTrue();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("1f")).isTrue();
+ }
+
+ @Test
+ public void testInvalidKeysWithoutPrefix() {
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey(null)).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("-foo")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo-")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey(".foo")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo.")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("_foo")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo_")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo@bar")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo/bar/baz"))
+ .isFalse(); // multiple slashes invalid
+ }
+
+ @Test
+ public void testValidKeysWithPrefix() {
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/foo")).isTrue();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("a.b.c/foo-bar")).isTrue();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("abc/foo_bar")).isTrue();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("abc/foo.bar")).isTrue();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("a/foo")).isTrue();
+ }
+
+ @Test
+ public void testInvalidPrefix() {
+ String longPrefix = "a".repeat(254);
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey(longPrefix + "/foo")).isFalse();
+
+ String longLabel = "a".repeat(64);
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey(longLabel + ".com/foo")).isFalse();
+
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("ex_ample.com/foo")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("-example.com/foo")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example-.com/foo")).isFalse();
+
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey(".example.com/foo")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example..com/foo")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com./foo")).isFalse();
+ }
+
+ @Test
+ public void testInvalidNameWithPrefix() {
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/-foo")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/foo-")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/.foo")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/foo.")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/foo@bar")).isFalse();
+ assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/")).isFalse();
+ }
+
+ @Test
+ public void testSanitizeAnnotationValue() {
+ assertThat(K8sAnnotationsSanitizer.sanitizeAnnotationValue(null)).isNull();
+
+ assertThat(K8sAnnotationsSanitizer.sanitizeAnnotationValue(" value ")).isEqualTo("value");
+
+ String rawValue = "line1\nline2\r\nline3\tend\u0007"; // \u0007 is bell (control char)
+ String expected = "line1 line2 line3 end";
+ assertThat(K8sAnnotationsSanitizer.sanitizeAnnotationValue(rawValue)).isEqualTo(expected);
+
+ String unicode = "café résumé – test";
+ assertThat(K8sAnnotationsSanitizer.sanitizeAnnotationValue(unicode)).isEqualTo(unicode);
+ }
+
+ @Test
+ public void testSanitizeAnnotations() {
+ Map input =
+ Map.of(
+ "valid-key", " some value ",
+ "example.com/valid-name", "value\nwith\nnewlines",
+ "invalid key", "value",
+ "prefix_with_underscore/foo", "value",
+ "validprefix/foo-", "value",
+ "example.com/invalid_name@", "value");
+
+ Map sanitized = K8sAnnotationsSanitizer.sanitizeAnnotations(input);
+
+ assertThat(sanitized).hasSize(2).containsKeys("valid-key", "example.com/valid-name");
+
+ assertThat(sanitized.get("valid-key")).isEqualTo("some value");
+ assertThat(sanitized.get("example.com/valid-name")).isEqualTo("value with newlines");
+ }
+}