Skip to content

Commit 2ee8c55

Browse files
committed
[FLINK-37730][Job Manager] Expose JM exception as K8s exceptions
1 parent 1ae2f33 commit 2ee8c55

File tree

5 files changed

+132
-1
lines changed

5 files changed

+132
-1
lines changed

flink-kubernetes-operator/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,12 @@ under the License.
236236
<artifactId>junit-jupiter-params</artifactId>
237237
<scope>test</scope>
238238
</dependency>
239+
<dependency>
240+
<groupId>joda-time</groupId>
241+
<artifactId>joda-time</artifactId>
242+
<version>2.12.7</version>
243+
<scope>compile</scope>
244+
</dependency>
239245
</dependencies>
240246

241247
<build>

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.common.JobID;
2121
import org.apache.flink.api.common.JobStatus;
2222
import org.apache.flink.autoscaler.JobAutoScaler;
23+
import org.apache.flink.autoscaler.utils.DateTimeUtils;
2324
import org.apache.flink.configuration.Configuration;
2425
import org.apache.flink.configuration.HighAvailabilityOptions;
2526
import org.apache.flink.configuration.PipelineOptionsInternal;
@@ -45,6 +46,7 @@
4546
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
4647
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
4748
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
49+
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
4850
import org.apache.flink.util.Preconditions;
4951

5052
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -55,6 +57,8 @@
5557
import org.slf4j.LoggerFactory;
5658

5759
import java.time.Instant;
60+
import java.time.ZoneId;
61+
import java.util.Map;
5862
import java.util.Optional;
5963
import java.util.UUID;
6064

@@ -299,9 +303,92 @@ public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> ctx)
299303
return true;
300304
}
301305

306+
// check for JobManager exceptions if the REST API server is still up.
307+
if (!ReconciliationUtils.isJobInTerminalState(deployment.getStatus())) {
308+
observeJobManagerExceptions(ctx, deployment, observeConfig);
309+
}
310+
302311
return cleanupTerminalJmAfterTtl(ctx.getFlinkService(), deployment, observeConfig);
303312
}
304313

314+
private void observeJobManagerExceptions(
315+
FlinkResourceContext<FlinkDeployment> ctx,
316+
FlinkDeployment deployment,
317+
Configuration observeConfig) {
318+
try {
319+
var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
320+
var history = ctx.getFlinkService().getJobExceptions(deployment, jobId, observeConfig);
321+
if (history == null || history.getExceptionHistory() == null) {
322+
return;
323+
}
324+
var exceptionHistory = history.getExceptionHistory();
325+
var exceptions = exceptionHistory.getEntries();
326+
if (exceptions.isEmpty()) {
327+
LOG.info(String.format("No exceptions found in job exception history for jobId '%s'.", jobId));
328+
return;
329+
}
330+
if (exceptionHistory.isTruncated()) {
331+
LOG.warn(String.format("Job exception history is truncated for jobId '%s'. "
332+
+ "Some exceptions are not shown.", jobId));
333+
}
334+
for (var exception : exceptions) {
335+
emitJobManagerExceptionEvent(ctx, deployment, exception);
336+
}
337+
} catch (Exception e) {
338+
LOG.warn("Could not fetch JobManager exception info.", e);
339+
}
340+
}
341+
342+
private void emitJobManagerExceptionEvent(
343+
FlinkResourceContext<FlinkDeployment> ctx,
344+
FlinkDeployment deployment,
345+
JobExceptionsInfoWithHistory.RootExceptionInfo exception) {
346+
347+
String message = exception.getExceptionName();
348+
if (message == null || message.isBlank()) {
349+
return;
350+
}
351+
352+
String stacktrace = exception.getStacktrace();
353+
String taskName = exception.getTaskName();
354+
String endpoint = exception.getEndpoint();
355+
String tmId = exception.getTaskManagerId();
356+
Map<String, String> labels = exception.getFailureLabels();
357+
String time = DateTimeUtils.readable(Instant.ofEpochMilli(exception.getTimestamp()), ZoneId.systemDefault());
358+
359+
StringBuilder combined = new StringBuilder();
360+
combined.append("JobManager Exception at ").append(time).append(":\n");
361+
combined.append(message).append("\n\n");
362+
363+
if (taskName != null) {
364+
combined.append("Task: ").append(taskName).append("\n");
365+
}
366+
if (endpoint != null) {
367+
combined.append("Endpoint: ").append(endpoint).append("\n");
368+
}
369+
if (tmId != null) {
370+
combined.append("TaskManager ID: ").append(tmId).append("\n");
371+
}
372+
373+
if (labels != null && !labels.isEmpty()) {
374+
combined.append("Failure Labels:\n");
375+
labels.forEach((k, v) -> combined.append("- ").append(k).append(": ").append(v).append("\n"));
376+
}
377+
378+
if (stacktrace != null && !stacktrace.isBlank()) {
379+
combined.append("\nStacktrace:\n").append(stacktrace);
380+
}
381+
382+
eventRecorder.triggerEventOnce(
383+
deployment,
384+
EventRecorder.Type.Warning,
385+
EventRecorder.Reason.JobManagerException,
386+
combined.toString(),
387+
EventRecorder.Component.JobManagerDeployment,
388+
"jobmanager-exception-" + message.hashCode(),
389+
ctx.getKubernetesClient());
390+
}
391+
305392
private boolean shouldRestartJobBecauseUnhealthy(
306393
FlinkDeployment deployment, Configuration observeConfig) {
307394
boolean restartNeeded = false;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
6666
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
6767
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
68+
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
69+
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
6870
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
6971
import org.apache.flink.runtime.rest.messages.TriggerId;
7072
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
@@ -77,6 +79,7 @@
7779
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
7880
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
7981
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
82+
import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
8083
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
8184
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
8285
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
@@ -845,6 +848,34 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
845848
(c, e) -> new StandaloneClientHAServices(restServerAddress));
846849
}
847850

851+
@Override
852+
public JobExceptionsInfoWithHistory getJobExceptions(FlinkDeployment deployment,
853+
JobID jobId,
854+
Configuration conf) {
855+
JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance();
856+
int port = conf.getInteger(RestOptions.PORT);
857+
String host =
858+
ObjectUtils.firstNonNull(
859+
operatorConfig.getFlinkServiceHostOverride(),
860+
ExternalServiceDecorator.getNamespacedExternalServiceName(
861+
deployment.getMetadata().getName(), deployment.getMetadata().getNamespace()));
862+
JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
863+
params.jobPathParameter.resolve(jobId);
864+
try (var restClient = getRestClient(conf)) {
865+
return restClient
866+
.sendRequest(
867+
host,
868+
port,
869+
jobExceptionsHeaders,
870+
params,
871+
EmptyRequestBody.getInstance())
872+
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
873+
} catch (Exception e) {
874+
LOG.warn(String.format("Failed to fetch job exceptions from REST API for jobId %s", jobId), e);
875+
return null;
876+
}
877+
}
878+
848879
@VisibleForTesting
849880
protected void runJar(
850881
JobSpec job,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
3434
import org.apache.flink.runtime.client.JobStatusMessage;
3535
import org.apache.flink.runtime.jobmaster.JobResult;
36+
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
3637

3738
import io.fabric8.kubernetes.api.model.ObjectMeta;
3839
import io.fabric8.kubernetes.api.model.PodList;
@@ -127,6 +128,10 @@ Map<String, String> getMetrics(Configuration conf, String jobId, List<String> me
127128

128129
RestClusterClient<String> getClusterClient(Configuration conf) throws Exception;
129130

131+
JobExceptionsInfoWithHistory getJobExceptions(FlinkDeployment deployment,
132+
JobID jobId,
133+
Configuration conf) throws Exception;
134+
130135
/** Result of a cancel operation. */
131136
@AllArgsConstructor
132137
class CancelResult {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ public KubernetesClient getKubernetesClient() {
280280
/** The type of the events. */
281281
public enum Type {
282282
Normal,
283-
Warning
283+
Warning,
284+
Error
284285
}
285286

286287
/** The component of events. */
@@ -315,6 +316,7 @@ public enum Reason {
315316
UnsupportedFlinkVersion,
316317
SnapshotError,
317318
SnapshotAbandoned,
319+
JobManagerException,
318320
Error
319321
}
320322
}

0 commit comments

Comments
 (0)