Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@
<td>Boolean</td>
<td>Enables dynamic change of watched/monitored namespaces.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.events.exceptions.limit-per-reconciliation</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.events.exceptions.stacktrace-lines</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>Maximum number of stack trace lines to include in exception-related Kubernetes event messages.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.exception.field.max.length</h5></td>
<td style="word-wrap: break-word;">2048</td>
Expand Down
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/system_advanced_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@
<td>Boolean</td>
<td>Whether to enable on-the-fly config changes through the operator configmap.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.events.exceptions.limit-per-reconciliation</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.events.exceptions.stacktrace-lines</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>Maximum number of stack trace lines to include in exception-related Kubernetes event messages.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.health.canary.resource.timeout</h5></td>
<td style="word-wrap: break-word;">1 min</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class FlinkOperatorConfiguration {
DeletionPropagation deletionPropagation;
boolean snapshotResourcesEnabled;
Duration slowRequestThreshold;
int reportedExceptionEventsMaxCount;
int reportedExceptionEventsMaxStackTraceLength;

public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
Expand Down Expand Up @@ -195,6 +197,12 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
Duration slowRequestThreshold =
operatorConfig.get(OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD);

int reportedExceptionEventsMaxCount =
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT);
int reportedExceptionEventsMaxStackTraceLength =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES);

return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
Expand Down Expand Up @@ -224,7 +232,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
getLeaderElectionConfig(operatorConfig),
deletionPropagation,
snapshotResourcesEnabled,
slowRequestThreshold);
slowRequestThreshold,
reportedExceptionEventsMaxCount,
reportedExceptionEventsMaxStackTraceLength);
}

private static GenericRetry getRetryConfig(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,4 +654,20 @@ public static String operatorConfigKey(String key) {
.defaultValue(Duration.ofMinutes(-1))
.withDescription(
"How often to retrieve Kubernetes cluster resource usage information. This information is used to avoid running out of cluster resources when scaling up resources. Negative values disable the feature.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES =
operatorConfig("events.exceptions.stacktrace-lines")
.intType()
.defaultValue(5)
.withDescription(
"Maximum number of stack trace lines to include in exception-related Kubernetes event messages.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_LIMIT =
operatorConfig("events.exceptions.limit-per-reconciliation")
.intType()
.defaultValue(10)
.withDescription(
"Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.service.FlinkService;

import io.javaoperatorsdk.operator.api.reconciler.Context;
Expand All @@ -39,8 +40,15 @@ public FlinkDeploymentContext(
Context<?> josdkContext,
KubernetesResourceMetricGroup resourceMetricGroup,
FlinkConfigManager configManager,
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory) {
super(resource, josdkContext, resourceMetricGroup, configManager, flinkServiceFactory);
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory,
FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry) {
super(
resource,
josdkContext,
resourceMetricGroup,
configManager,
flinkServiceFactory,
exceptionCacheEntry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.service.FlinkService;

import io.fabric8.kubernetes.client.KubernetesClient;
Expand All @@ -51,6 +52,8 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
protected final FlinkConfigManager configManager;
private final Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory;

@Getter private final FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry;

private FlinkOperatorConfiguration operatorConfig;
private Configuration observeConfig;
private FlinkService flinkService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.service.FlinkService;

import io.javaoperatorsdk.operator.api.reconciler.Context;
Expand All @@ -44,8 +45,15 @@ public FlinkSessionJobContext(
Context<?> josdkContext,
KubernetesResourceMetricGroup resourceMetricGroup,
FlinkConfigManager configManager,
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory) {
super(resource, josdkContext, resourceMetricGroup, configManager, flinkServiceFactory);
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory,
FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry) {
super(
resource,
josdkContext,
resourceMetricGroup,
configManager,
flinkServiceFactory,
exceptionCacheEntry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,21 @@
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.K8sAnnotationsSanitizer;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
Expand Down Expand Up @@ -69,17 +79,20 @@ public boolean observe(FlinkResourceContext<R> ctx) {
var jobStatus = resource.getStatus().getJobStatus();
LOG.debug("Observing job status");
var previousJobStatus = jobStatus.getState();

var jobId = jobStatus.getJobId();
try {
var newJobStatusOpt =
ctx.getFlinkService()
.getJobStatus(
ctx.getObserveConfig(),
JobID.fromHexString(jobStatus.getJobId()));
.getJobStatus(ctx.getObserveConfig(), JobID.fromHexString(jobId));

if (newJobStatusOpt.isPresent()) {
updateJobStatus(ctx, newJobStatusOpt.get());
var newJobStatus = newJobStatusOpt.get();
updateJobStatus(ctx, newJobStatus);
ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
// see if the JM server is up, try to get the exceptions
if (!previousJobStatus.isGloballyTerminalState()) {
observeJobManagerExceptions(ctx);
}
return true;
} else {
onTargetJobNotFound(ctx);
Expand All @@ -95,6 +108,155 @@ public boolean observe(FlinkResourceContext<R> ctx) {
return false;
}

/**
* Observe the exceptions raised in the job manager and take appropriate action.
*
* @param ctx the context with which the operation is executed
*/
protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
var resource = ctx.getResource();
var operatorConfig = ctx.getOperatorConfig();
var jobStatus = resource.getStatus().getJobStatus();

try {
var jobId = JobID.fromHexString(jobStatus.getJobId());
// TODO: Ideally the best way to restrict the number of events is to use the query param
// `maxExceptions`
// but the JobExceptionsMessageParameters does not expose the parameters and nor does
// it have setters.
var history =
ctx.getFlinkService().getJobExceptions(resource, jobId, ctx.getObserveConfig());

if (history == null || history.getExceptionHistory() == null) {
return;
}

var exceptionHistory = history.getExceptionHistory();
List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptions =
exceptionHistory.getEntries();
if (exceptions == null || exceptions.isEmpty()) {
return;
}

if (exceptionHistory.isTruncated()) {
LOG.warn(
"Job exception history is truncated for jobId '{}'. Some exceptions may be missing.",
jobId);
}

String currentJobId = jobStatus.getJobId();
Instant lastRecorded = null; // first reconciliation

var cacheEntry = ctx.getExceptionCacheEntry();
// a cache entry is created should always be present. The timestamp for the first
// reconciliation would be
// when the job was created. This check is still necessary because even though there
// might be an entry,
// the jobId could have changed since the job was first created.
if (cacheEntry.getJobId() != null && cacheEntry.getJobId().equals(currentJobId)) {
lastRecorded = Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
}

int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount();
int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength();

// Sort and reverse to prioritize the newest exceptions
var sortedExceptions = new ArrayList<>(exceptions);
sortedExceptions.sort(
Comparator.comparingLong(
JobExceptionsInfoWithHistory.RootExceptionInfo::getTimestamp)
.reversed());
int count = 0;
Instant latestSeen = null;

for (var exception : sortedExceptions) {
Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
// Skip already recorded exceptions
if (lastRecorded != null && !exceptionTime.isAfter(lastRecorded)) {
break;
}
emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
if (latestSeen == null || exceptionTime.isAfter(latestSeen)) {
latestSeen = exceptionTime;
}
if (++count >= maxEvents) {
break;
}
}

ctx.getExceptionCacheEntry().setJobId(currentJobId);
// Set to the timestamp of the latest emitted exception, if any were emitted
// the other option is that if no exceptions were emitted, we set this to now.
if (latestSeen != null) {
ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen.toEpochMilli());
}

} catch (Exception e) {
LOG.warn("Failed to fetch JobManager exception info.", e);
}
}

private void emitJobManagerExceptionEvent(
FlinkResourceContext<R> ctx,
JobExceptionsInfoWithHistory.RootExceptionInfo exception,
Instant exceptionTime,
int maxStackTraceLines) {

String exceptionName = exception.getExceptionName();
if (exceptionName == null || exceptionName.isBlank()) {
return;
}
Map<String, String> annotations = new HashMap<>();
if (exceptionTime != null) {
annotations.put(
"exception-timestamp",
exceptionTime.atZone(ZoneId.systemDefault()).toOffsetDateTime().toString());
}
if (exception.getTaskName() != null) {
annotations.put("task-name", exception.getTaskName());
}
if (exception.getEndpoint() != null) {
annotations.put("endpoint", exception.getEndpoint());
}
if (exception.getTaskManagerId() != null) {
annotations.put("tm-id", exception.getTaskManagerId());
}
if (exception.getFailureLabels() != null) {
exception
.getFailureLabels()
.forEach((k, v) -> annotations.put("failure-label-" + k, v));
}

StringBuilder eventMessage = new StringBuilder(exceptionName);
String stacktrace = exception.getStacktrace();
if (stacktrace != null && !stacktrace.isBlank()) {
String[] lines = stacktrace.split("\n");
eventMessage.append("\n\nStacktrace (truncated):\n");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we maybe remove this line completely? it seems to just increase the messages and adds an empty line:

image

Also seems like the first line with the exception name and the stack trace is basically duplicated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

for (int i = 0; i < Math.min(maxStackTraceLines, lines.length); i++) {
eventMessage.append(lines[i]).append("\n");
}
if (lines.length > maxStackTraceLines) {
eventMessage
.append("... (")
.append(lines.length - maxStackTraceLines)
.append(" more lines)");
}
}

String identityKey =
"jobmanager-exception-"
+ Integer.toHexString(Objects.hash(eventMessage.toString()));
eventRecorder.triggerEventWithAnnotations(
ctx.getResource(),
EventRecorder.Type.Warning,
EventRecorder.Reason.JobException,
eventMessage.toString().trim(),
EventRecorder.Component.Job,
identityKey,
ctx.getKubernetesClient(),
K8sAnnotationsSanitizer.sanitizeAnnotations(annotations));
}

/**
* Callback when no matching target job was found on a cluster where jobs were found.
*
Expand Down
Loading
Loading