Skip to content

Commit 0679d63

Browse files
authored
[FLINK-37730] Expose JM exception as K8s exceptions
1 parent b0bc3a3 commit 0679d63

23 files changed

+1017
-25
lines changed

docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,18 @@
9898
<td>Boolean</td>
9999
<td>Enables dynamic change of watched/monitored namespaces.</td>
100100
</tr>
101+
<tr>
102+
<td><h5>kubernetes.operator.events.exceptions.limit-per-reconciliation</h5></td>
103+
<td style="word-wrap: break-word;">10</td>
104+
<td>Integer</td>
105+
<td>Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.</td>
106+
</tr>
107+
<tr>
108+
<td><h5>kubernetes.operator.events.exceptions.stacktrace-lines</h5></td>
109+
<td style="word-wrap: break-word;">5</td>
110+
<td>Integer</td>
111+
<td>Maximum number of stack trace lines to include in exception-related Kubernetes event messages.</td>
112+
</tr>
101113
<tr>
102114
<td><h5>kubernetes.operator.exception.field.max.length</h5></td>
103115
<td style="word-wrap: break-word;">2048</td>

docs/layouts/shortcodes/generated/system_advanced_section.html

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,18 @@
3838
<td>Boolean</td>
3939
<td>Whether to enable on-the-fly config changes through the operator configmap.</td>
4040
</tr>
41+
<tr>
42+
<td><h5>kubernetes.operator.events.exceptions.limit-per-reconciliation</h5></td>
43+
<td style="word-wrap: break-word;">10</td>
44+
<td>Integer</td>
45+
<td>Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.</td>
46+
</tr>
47+
<tr>
48+
<td><h5>kubernetes.operator.events.exceptions.stacktrace-lines</h5></td>
49+
<td style="word-wrap: break-word;">5</td>
50+
<td>Integer</td>
51+
<td>Maximum number of stack trace lines to include in exception-related Kubernetes event messages.</td>
52+
</tr>
4153
<tr>
4254
<td><h5>kubernetes.operator.health.canary.resource.timeout</h5></td>
4355
<td style="word-wrap: break-word;">1 min</td>

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public class FlinkOperatorConfiguration {
7777
DeletionPropagation deletionPropagation;
7878
boolean snapshotResourcesEnabled;
7979
Duration slowRequestThreshold;
80+
int reportedExceptionEventsMaxCount;
81+
int reportedExceptionEventsMaxStackTraceLength;
8082

8183
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
8284
Duration reconcileInterval =
@@ -195,6 +197,12 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
195197
Duration slowRequestThreshold =
196198
operatorConfig.get(OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD);
197199

200+
int reportedExceptionEventsMaxCount =
201+
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT);
202+
int reportedExceptionEventsMaxStackTraceLength =
203+
operatorConfig.get(
204+
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES);
205+
198206
return new FlinkOperatorConfiguration(
199207
reconcileInterval,
200208
reconcilerMaxParallelism,
@@ -224,7 +232,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
224232
getLeaderElectionConfig(operatorConfig),
225233
deletionPropagation,
226234
snapshotResourcesEnabled,
227-
slowRequestThreshold);
235+
slowRequestThreshold,
236+
reportedExceptionEventsMaxCount,
237+
reportedExceptionEventsMaxStackTraceLength);
228238
}
229239

230240
private static GenericRetry getRetryConfig(Configuration conf) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,4 +654,20 @@ public static String operatorConfigKey(String key) {
654654
.defaultValue(Duration.ofMinutes(-1))
655655
.withDescription(
656656
"How often to retrieve Kubernetes cluster resource usage information. This information is used to avoid running out of cluster resources when scaling up resources. Negative values disable the feature.");
657+
658+
@Documentation.Section(SECTION_ADVANCED)
659+
public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES =
660+
operatorConfig("events.exceptions.stacktrace-lines")
661+
.intType()
662+
.defaultValue(5)
663+
.withDescription(
664+
"Maximum number of stack trace lines to include in exception-related Kubernetes event messages.");
665+
666+
@Documentation.Section(SECTION_ADVANCED)
667+
public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_LIMIT =
668+
operatorConfig("events.exceptions.limit-per-reconciliation")
669+
.intType()
670+
.defaultValue(10)
671+
.withDescription(
672+
"Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.");
657673
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
2626
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
2727
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
28+
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
2829
import org.apache.flink.kubernetes.operator.service.FlinkService;
2930

3031
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -39,8 +40,15 @@ public FlinkDeploymentContext(
3940
Context<?> josdkContext,
4041
KubernetesResourceMetricGroup resourceMetricGroup,
4142
FlinkConfigManager configManager,
42-
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory) {
43-
super(resource, josdkContext, resourceMetricGroup, configManager, flinkServiceFactory);
43+
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory,
44+
FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry) {
45+
super(
46+
resource,
47+
josdkContext,
48+
resourceMetricGroup,
49+
configManager,
50+
flinkServiceFactory,
51+
exceptionCacheEntry);
4452
}
4553

4654
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
3131
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
3232
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
33+
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
3334
import org.apache.flink.kubernetes.operator.service.FlinkService;
3435

3536
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -51,6 +52,8 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
5152
protected final FlinkConfigManager configManager;
5253
private final Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory;
5354

55+
@Getter private final FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry;
56+
5457
private FlinkOperatorConfiguration operatorConfig;
5558
private Configuration observeConfig;
5659
private FlinkService flinkService;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
2727
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
2828
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
29+
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
2930
import org.apache.flink.kubernetes.operator.service.FlinkService;
3031

3132
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -44,8 +45,15 @@ public FlinkSessionJobContext(
4445
Context<?> josdkContext,
4546
KubernetesResourceMetricGroup resourceMetricGroup,
4647
FlinkConfigManager configManager,
47-
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory) {
48-
super(resource, josdkContext, resourceMetricGroup, configManager, flinkServiceFactory);
48+
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory,
49+
FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry) {
50+
super(
51+
resource,
52+
josdkContext,
53+
resourceMetricGroup,
54+
configManager,
55+
flinkServiceFactory,
56+
exceptionCacheEntry);
4957
}
5058

5159
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 161 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,21 @@
2828
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
2929
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3030
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
31+
import org.apache.flink.kubernetes.operator.utils.K8sAnnotationsSanitizer;
3132
import org.apache.flink.runtime.client.JobStatusMessage;
33+
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
3234

3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
3537

38+
import java.time.Instant;
39+
import java.time.ZoneId;
40+
import java.util.ArrayList;
41+
import java.util.Comparator;
42+
import java.util.HashMap;
43+
import java.util.List;
44+
import java.util.Map;
45+
import java.util.Objects;
3646
import java.util.concurrent.TimeoutException;
3747

3848
import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
@@ -69,17 +79,20 @@ public boolean observe(FlinkResourceContext<R> ctx) {
6979
var jobStatus = resource.getStatus().getJobStatus();
7080
LOG.debug("Observing job status");
7181
var previousJobStatus = jobStatus.getState();
72-
82+
var jobId = jobStatus.getJobId();
7383
try {
7484
var newJobStatusOpt =
7585
ctx.getFlinkService()
76-
.getJobStatus(
77-
ctx.getObserveConfig(),
78-
JobID.fromHexString(jobStatus.getJobId()));
86+
.getJobStatus(ctx.getObserveConfig(), JobID.fromHexString(jobId));
7987

8088
if (newJobStatusOpt.isPresent()) {
81-
updateJobStatus(ctx, newJobStatusOpt.get());
89+
var newJobStatus = newJobStatusOpt.get();
90+
updateJobStatus(ctx, newJobStatus);
8291
ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
92+
// see if the JM server is up, try to get the exceptions
93+
if (!previousJobStatus.isGloballyTerminalState()) {
94+
observeJobManagerExceptions(ctx);
95+
}
8396
return true;
8497
} else {
8598
onTargetJobNotFound(ctx);
@@ -95,6 +108,149 @@ public boolean observe(FlinkResourceContext<R> ctx) {
95108
return false;
96109
}
97110

111+
/**
112+
* Observe the exceptions raised in the job manager and take appropriate action.
113+
*
114+
* @param ctx the context with which the operation is executed
115+
*/
116+
protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
117+
var resource = ctx.getResource();
118+
var operatorConfig = ctx.getOperatorConfig();
119+
var jobStatus = resource.getStatus().getJobStatus();
120+
121+
try {
122+
var jobId = JobID.fromHexString(jobStatus.getJobId());
123+
// TODO: Ideally the best way to restrict the number of events is to use the query param
124+
// `maxExceptions`
125+
// but the JobExceptionsMessageParameters does not expose the parameters and nor does
126+
// it have setters.
127+
var history =
128+
ctx.getFlinkService().getJobExceptions(resource, jobId, ctx.getObserveConfig());
129+
130+
if (history == null || history.getExceptionHistory() == null) {
131+
return;
132+
}
133+
134+
var exceptionHistory = history.getExceptionHistory();
135+
List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptions =
136+
exceptionHistory.getEntries();
137+
if (exceptions == null || exceptions.isEmpty()) {
138+
return;
139+
}
140+
141+
if (exceptionHistory.isTruncated()) {
142+
LOG.warn(
143+
"Job exception history is truncated for jobId '{}'. Some exceptions may be missing.",
144+
jobId);
145+
}
146+
147+
String currentJobId = jobStatus.getJobId();
148+
Instant lastRecorded = null; // first reconciliation
149+
150+
var cacheEntry = ctx.getExceptionCacheEntry();
151+
// a cache entry is created should always be present. The timestamp for the first
152+
// reconciliation would be
153+
// when the job was created. This check is still necessary because even though there
154+
// might be an entry,
155+
// the jobId could have changed since the job was first created.
156+
if (cacheEntry.getJobId() != null && cacheEntry.getJobId().equals(currentJobId)) {
157+
lastRecorded = Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
158+
}
159+
160+
int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount();
161+
int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength();
162+
163+
// Sort and reverse to prioritize the newest exceptions
164+
var sortedExceptions = new ArrayList<>(exceptions);
165+
sortedExceptions.sort(
166+
Comparator.comparingLong(
167+
JobExceptionsInfoWithHistory.RootExceptionInfo::getTimestamp)
168+
.reversed());
169+
int count = 0;
170+
Instant latestSeen = null;
171+
172+
for (var exception : sortedExceptions) {
173+
Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
174+
// Skip already recorded exceptions
175+
if (lastRecorded != null && !exceptionTime.isAfter(lastRecorded)) {
176+
break;
177+
}
178+
emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
179+
if (latestSeen == null) {
180+
latestSeen = exceptionTime;
181+
}
182+
if (++count >= maxEvents) {
183+
break;
184+
}
185+
}
186+
187+
ctx.getExceptionCacheEntry().setJobId(currentJobId);
188+
// Set to the timestamp of the latest emitted exception, if any were emitted
189+
// the other option is that if no exceptions were emitted, we set this to now.
190+
if (latestSeen != null) {
191+
ctx.getExceptionCacheEntry().setLastTimestamp(latestSeen.toEpochMilli());
192+
}
193+
194+
} catch (Exception e) {
195+
LOG.warn("Failed to fetch JobManager exception info.", e);
196+
}
197+
}
198+
199+
private void emitJobManagerExceptionEvent(
200+
FlinkResourceContext<R> ctx,
201+
JobExceptionsInfoWithHistory.RootExceptionInfo exception,
202+
Instant exceptionTime,
203+
int maxStackTraceLines) {
204+
Map<String, String> annotations = new HashMap<>();
205+
if (exceptionTime != null) {
206+
annotations.put(
207+
"exception-timestamp",
208+
exceptionTime.atZone(ZoneId.systemDefault()).toOffsetDateTime().toString());
209+
}
210+
if (exception.getTaskName() != null) {
211+
annotations.put("task-name", exception.getTaskName());
212+
}
213+
if (exception.getEndpoint() != null) {
214+
annotations.put("endpoint", exception.getEndpoint());
215+
}
216+
if (exception.getTaskManagerId() != null) {
217+
annotations.put("tm-id", exception.getTaskManagerId());
218+
}
219+
if (exception.getFailureLabels() != null) {
220+
exception
221+
.getFailureLabels()
222+
.forEach((k, v) -> annotations.put("failure-label-" + k, v));
223+
}
224+
225+
StringBuilder eventMessage = new StringBuilder();
226+
String stacktrace = exception.getStacktrace();
227+
if (stacktrace != null && !stacktrace.isBlank()) {
228+
String[] lines = stacktrace.split("\n");
229+
for (int i = 0; i < Math.min(maxStackTraceLines, lines.length); i++) {
230+
eventMessage.append(lines[i]).append("\n");
231+
}
232+
if (lines.length > maxStackTraceLines) {
233+
eventMessage
234+
.append("... (")
235+
.append(lines.length - maxStackTraceLines)
236+
.append(" more lines)");
237+
}
238+
}
239+
240+
String identityKey =
241+
"jobmanager-exception-"
242+
+ Integer.toHexString(Objects.hash(eventMessage.toString()));
243+
eventRecorder.triggerEventWithAnnotations(
244+
ctx.getResource(),
245+
EventRecorder.Type.Warning,
246+
EventRecorder.Reason.JobException,
247+
eventMessage.toString().trim(),
248+
EventRecorder.Component.Job,
249+
identityKey,
250+
ctx.getKubernetesClient(),
251+
K8sAnnotationsSanitizer.sanitizeAnnotations(annotations));
252+
}
253+
98254
/**
99255
* Callback when no matching target job was found on a cluster where jobs were found.
100256
*

0 commit comments

Comments
 (0)