Skip to content

Commit 3c3a1b5

Browse files
committed
[FLINK-37730] Moves exception emitter to JobStatusObserver
1 parent c434722 commit 3c3a1b5

21 files changed

+399
-345
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -657,15 +657,15 @@ public static String operatorConfigKey(String key) {
657657

658658
@Documentation.Section(SECTION_ADVANCED)
659659
public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES =
660-
operatorConfig("event.exception.stacktrace.lines")
660+
operatorConfig("events.exceptions.stacktrace-lines")
661661
.intType()
662662
.defaultValue(5)
663663
.withDescription(
664664
"Maximum number of stack trace lines to include in exception-related Kubernetes event messages.");
665665

666666
@Documentation.Section(SECTION_ADVANCED)
667667
public static final ConfigOption<Integer> OPERATOR_EVENT_EXCEPTION_LIMIT =
668-
operatorConfig("event.exception.limit.per.reconciliation")
668+
operatorConfig("events.exceptions.limit-per-reconciliation")
669669
.intType()
670670
.defaultValue(10)
671671
.withDescription(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@
2525
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
2626
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
2727
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
28+
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
2829
import org.apache.flink.kubernetes.operator.service.FlinkService;
2930

3031
import io.javaoperatorsdk.operator.api.reconciler.Context;
32+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3133

34+
import java.util.Map;
3235
import java.util.function.Function;
3336

3437
/** Context for reconciling a Flink resource. */
@@ -39,8 +42,16 @@ public FlinkDeploymentContext(
3942
Context<?> josdkContext,
4043
KubernetesResourceMetricGroup resourceMetricGroup,
4144
FlinkConfigManager configManager,
42-
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory) {
43-
super(resource, josdkContext, resourceMetricGroup, configManager, flinkServiceFactory);
45+
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory,
46+
Map<ResourceID, FlinkResourceContextFactory.ExceptionCacheEntry>
47+
lastRecordedExceptionCache) {
48+
super(
49+
resource,
50+
josdkContext,
51+
resourceMetricGroup,
52+
configManager,
53+
flinkServiceFactory,
54+
lastRecordedExceptionCache);
4455
}
4556

4657
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,18 @@
3030
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
3131
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
3232
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
33+
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
3334
import org.apache.flink.kubernetes.operator.service.FlinkService;
3435

3536
import io.fabric8.kubernetes.client.KubernetesClient;
3637
import io.javaoperatorsdk.operator.api.reconciler.Context;
38+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3739
import lombok.Getter;
3840
import lombok.RequiredArgsConstructor;
3941

4042
import javax.annotation.Nullable;
4143

44+
import java.util.Map;
4245
import java.util.function.Function;
4346

4447
/** Context for reconciling a Flink resource. */
@@ -51,6 +54,10 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
5154
protected final FlinkConfigManager configManager;
5255
private final Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory;
5356

57+
@Getter
58+
private final Map<ResourceID, FlinkResourceContextFactory.ExceptionCacheEntry>
59+
lastRecordedExceptionCache;
60+
5461
private FlinkOperatorConfiguration operatorConfig;
5562
private Configuration observeConfig;
5663
private FlinkService flinkService;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@
2626
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
2727
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
2828
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
29+
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
2930
import org.apache.flink.kubernetes.operator.service.FlinkService;
3031

3132
import io.javaoperatorsdk.operator.api.reconciler.Context;
33+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3234

35+
import java.util.Map;
3336
import java.util.function.Function;
3437

3538
import static org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.sessionClusterReady;
@@ -44,8 +47,16 @@ public FlinkSessionJobContext(
4447
Context<?> josdkContext,
4548
KubernetesResourceMetricGroup resourceMetricGroup,
4649
FlinkConfigManager configManager,
47-
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory) {
48-
super(resource, josdkContext, resourceMetricGroup, configManager, flinkServiceFactory);
50+
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory,
51+
Map<ResourceID, FlinkResourceContextFactory.ExceptionCacheEntry>
52+
lastRecordedExceptionCache) {
53+
super(
54+
resource,
55+
josdkContext,
56+
resourceMetricGroup,
57+
configManager,
58+
flinkServiceFactory,
59+
lastRecordedExceptionCache);
4960
}
5061

5162
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 166 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,28 @@
1919

2020
import org.apache.flink.api.common.JobID;
2121
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.autoscaler.utils.DateTimeUtils;
2223
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
2324
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
2425
import org.apache.flink.kubernetes.operator.api.spec.JobState;
2526
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
2627
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
2728
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
2829
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
30+
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
2931
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3032
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
3133
import org.apache.flink.runtime.client.JobStatusMessage;
34+
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
3235

36+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3337
import org.slf4j.Logger;
3438
import org.slf4j.LoggerFactory;
3539

40+
import java.time.Instant;
41+
import java.time.ZoneId;
42+
import java.util.HashMap;
43+
import java.util.Map;
3644
import java.util.concurrent.TimeoutException;
3745

3846
import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
@@ -69,17 +77,23 @@ public boolean observe(FlinkResourceContext<R> ctx) {
6977
var jobStatus = resource.getStatus().getJobStatus();
7078
LOG.debug("Observing job status");
7179
var previousJobStatus = jobStatus.getState();
72-
80+
var jobId = jobStatus.getJobId();
7381
try {
7482
var newJobStatusOpt =
7583
ctx.getFlinkService()
76-
.getJobStatus(
77-
ctx.getObserveConfig(),
78-
JobID.fromHexString(jobStatus.getJobId()));
84+
.getJobStatus(ctx.getObserveConfig(), JobID.fromHexString(jobId));
7985

8086
if (newJobStatusOpt.isPresent()) {
81-
updateJobStatus(ctx, newJobStatusOpt.get());
87+
var newJobStatus = newJobStatusOpt.get();
88+
updateJobStatus(ctx, newJobStatus);
8289
ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
90+
// now check if the job is in a terminal state. This might not be need as we have
91+
// already
92+
// verified that the REST API server is available
93+
// now check if the job is NOT in a terminal state
94+
if (!newJobStatus.getJobState().isGloballyTerminalState()) {
95+
observeJobManagerExceptions(ctx);
96+
}
8397
return true;
8498
} else {
8599
onTargetJobNotFound(ctx);
@@ -95,6 +109,153 @@ public boolean observe(FlinkResourceContext<R> ctx) {
95109
return false;
96110
}
97111

112+
/**
113+
* Observe the exceptions raised in the job manager and take appropriate action.
114+
*
115+
* @param ctx the context with which the operation is executed
116+
*/
117+
protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
118+
var resource = ctx.getResource();
119+
var operatorConfig = ctx.getOperatorConfig();
120+
var jobStatus = resource.getStatus().getJobStatus();
121+
// Ideally should not happen
122+
if (jobStatus == null || jobStatus.getJobId() == null) {
123+
LOG.warn(
124+
"No jobId found for deployment '{}', skipping exception observation.",
125+
resource.getMetadata().getName());
126+
return;
127+
}
128+
129+
try {
130+
JobID jobId = JobID.fromHexString(jobStatus.getJobId());
131+
// TODO: Ideally the best way to restrict the number of events is to use the query param
132+
// `maxExceptions`
133+
// but the JobExceptionsMessageParameters does not expose the parameters and nor does
134+
// it have setters.
135+
var history =
136+
ctx.getFlinkService()
137+
.getJobExceptions(
138+
resource, jobId, ctx.getDeployConfig(resource.getSpec()));
139+
140+
if (history == null || history.getExceptionHistory() == null) {
141+
return;
142+
}
143+
144+
var exceptionHistory = history.getExceptionHistory();
145+
var exceptions = exceptionHistory.getEntries();
146+
if (exceptions.isEmpty()) {
147+
return;
148+
}
149+
150+
if (exceptionHistory.isTruncated()) {
151+
LOG.warn(
152+
"Job exception history is truncated for jobId '{}'. Some exceptions may be missing.",
153+
jobId);
154+
}
155+
156+
ResourceID resourceID = ResourceID.fromResource(resource);
157+
String currentJobId = jobStatus.getJobId();
158+
Instant lastRecorded = null; // first reconciliation
159+
160+
FlinkResourceContextFactory.ExceptionCacheEntry cacheEntry =
161+
ctx.getLastRecordedExceptionCache().get(resourceID);
162+
if (cacheEntry != null && cacheEntry.getJobId().equals(currentJobId)) {
163+
lastRecorded = Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
164+
}
165+
166+
Instant now = Instant.now();
167+
int maxEvents = operatorConfig.getReportedExceptionEventsMaxCount();
168+
int maxStackTraceLines = operatorConfig.getReportedExceptionEventsMaxStackTraceLength();
169+
170+
int count = 0;
171+
for (var exception : exceptions) {
172+
Instant exceptionTime = Instant.ofEpochMilli(exception.getTimestamp());
173+
if (lastRecorded != null && exceptionTime.isBefore(lastRecorded)) {
174+
continue;
175+
}
176+
177+
emitJobManagerExceptionEvent(ctx, exception, exceptionTime, maxStackTraceLines);
178+
if (++count >= maxEvents) {
179+
break;
180+
}
181+
}
182+
ctx.getLastRecordedExceptionCache()
183+
.put(
184+
resourceID,
185+
new FlinkResourceContextFactory.ExceptionCacheEntry(
186+
currentJobId, now.toEpochMilli()));
187+
} catch (Exception e) {
188+
LOG.warn(
189+
"Failed to fetch JobManager exception info for deployment '{}'.",
190+
resource.getMetadata().getName(),
191+
e);
192+
}
193+
}
194+
195+
private void emitJobManagerExceptionEvent(
196+
FlinkResourceContext<R> ctx,
197+
JobExceptionsInfoWithHistory.RootExceptionInfo exception,
198+
Instant exceptionTime,
199+
int maxStackTraceLines) {
200+
201+
String exceptionName = exception.getExceptionName();
202+
if (exceptionName == null || exceptionName.isBlank()) {
203+
return;
204+
}
205+
206+
Map<String, String> annotations = new HashMap<>();
207+
annotations.put(
208+
"event-time-readable",
209+
DateTimeUtils.readable(exceptionTime, ZoneId.systemDefault()));
210+
annotations.put("event-timestamp-millis", String.valueOf(exceptionTime.toEpochMilli()));
211+
212+
if (exception.getTaskName() != null) {
213+
annotations.put("task-name", exception.getTaskName());
214+
}
215+
if (exception.getEndpoint() != null) {
216+
annotations.put("endpoint", exception.getEndpoint());
217+
}
218+
if (exception.getTaskManagerId() != null) {
219+
annotations.put("tm-id", exception.getTaskManagerId());
220+
}
221+
222+
if (exception.getFailureLabels() != null) {
223+
exception
224+
.getFailureLabels()
225+
.forEach((k, v) -> annotations.put("failure-label-" + k, v));
226+
}
227+
228+
StringBuilder eventMessage = new StringBuilder(exceptionName);
229+
String stacktrace = exception.getStacktrace();
230+
if (stacktrace != null && !stacktrace.isBlank()) {
231+
String[] lines = stacktrace.split("\n");
232+
eventMessage.append("\n\nStacktrace (truncated):\n");
233+
for (int i = 0; i < Math.min(maxStackTraceLines, lines.length); i++) {
234+
eventMessage.append(lines[i]).append("\n");
235+
}
236+
if (lines.length > maxStackTraceLines) {
237+
eventMessage
238+
.append("... (")
239+
.append(lines.length - maxStackTraceLines)
240+
.append(" more lines)");
241+
}
242+
}
243+
244+
String keyMessage =
245+
exceptionName.length() > 128 ? exceptionName.substring(0, 128) : exceptionName;
246+
247+
eventRecorder.triggerEventOnceWithAnnotations(
248+
exceptionTime.toEpochMilli(),
249+
ctx.getResource(),
250+
EventRecorder.Type.Warning,
251+
EventRecorder.Reason.JobException,
252+
eventMessage.toString().trim(),
253+
EventRecorder.Component.JobManagerDeployment,
254+
"jobmanager-exception-" + keyMessage.hashCode(),
255+
ctx.getKubernetesClient(),
256+
annotations);
257+
}
258+
98259
/**
99260
* Callback when no matching target job was found on a cluster where jobs were found.
100261
*

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,6 @@ public void observeInternal(FlinkResourceContext<FlinkDeployment> ctx) {
7474
observeClusterInfo(ctx);
7575
}
7676

77-
if (!ReconciliationUtils.isJobInTerminalState(flinkDep.getStatus())) {
78-
observeJobManagerExceptions(ctx);
79-
}
80-
8177
clearErrorsIfDeploymentIsHealthy(flinkDep);
8278
}
8379

@@ -301,11 +297,4 @@ protected boolean checkIfAlreadyUpgraded(FlinkResourceContext<FlinkDeployment> c
301297
* @param ctx the context with which the operation is executed
302298
*/
303299
protected abstract void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx);
304-
305-
/**
306-
* Observe the exceptions raised in the job manager and take appropriate action.
307-
*
308-
* @param ctx the context with which the operation is executed
309-
*/
310-
protected abstract void observeJobManagerExceptions(FlinkResourceContext<FlinkDeployment> ctx);
311300
}

0 commit comments

Comments
 (0)