Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -409,25 +409,30 @@ protected void setUpgradeSavepointPath(
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE)
.name());

FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
conf,
ctx.getOperatorConfig(),
ctx.getKubernetesClient(),
ctx.getResource(),
savepointFormatType,
savepointLocation);
var snapshotCrOpt =
FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
conf,
ctx.getOperatorConfig(),
ctx.getKubernetesClient(),
ctx.getResource(),
savepointFormatType,
savepointLocation);
var jobStatus = ctx.getResource().getStatus().getJobStatus();
jobStatus.setUpgradeSavepointPath(savepointLocation);

// Register created savepoint in the now deprecated savepoint info and history
var savepoint =
new Savepoint(
cancelTs.toEpochMilli(),
savepointLocation,
SnapshotTriggerType.UPGRADE,
savepointFormatType,
null);
jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
if (snapshotCrOpt.isEmpty()) {
// Register created savepoint in the now deprecated savepoint info and history
// only if snapshot CR was not created, otherwise it would be double recorded
// and disposed immediately
var savepoint =
new Savepoint(
cancelTs.toEpochMilli(),
savepointLocation,
SnapshotTriggerType.UPGRADE,
savepointFormatType,
null);
jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,25 +851,15 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc

@Override
public JobExceptionsInfoWithHistory getJobExceptions(
AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) {
AbstractFlinkResource resource, JobID jobId, Configuration observeConfig)
throws IOException {
JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance();
int port = observeConfig.getInteger(RestOptions.PORT);
String host =
ObjectUtils.firstNonNull(
operatorConfig.getFlinkServiceHostOverride(),
ExternalServiceDecorator.getNamespacedExternalServiceName(
resource.getMetadata().getName(),
resource.getMetadata().getNamespace()));
JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
params.jobPathParameter.resolve(jobId);
try (var restClient = getRestClient(observeConfig)) {
return restClient
.sendRequest(
host,
port,
jobExceptionsHeaders,
params,
EmptyRequestBody.getInstance())

try (var clusterClient = getClusterClient(observeConfig)) {
return clusterClient
.sendRequest(jobExceptionsHeaders, params, EmptyRequestBody.getInstance())
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,313 @@ public void testExceptionEventTriggerInitialization() throws Exception {
ctx.getExceptionCacheEntry().getLastTimestamp());
}

@Test
public void testSessionJobExceptionObservedEvenWhenNewStateIsTerminal() throws Exception {
var sessionJob = initSessionJob();
var status = sessionJob.getStatus();
var jobStatus = status.getJobStatus();
jobStatus.setState(JobStatus.RUNNING);
Map<String, String> configuration = new HashMap<>();
configuration.put(
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2");
Configuration operatorConfig = Configuration.fromMap(configuration);
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
getResourceContext(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient),
operatorConfig);

var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(500L));
flinkService.addExceptionHistory(jobId, "SessionJobExceptionOne", "trace1", 1000L);

// Submit the session job
flinkService.submitJobToSessionCluster(
sessionJob.getMetadata(),
sessionJob.getSpec(),
jobId,
ctx.getDeployConfig(sessionJob.getSpec()),
null);

// Cancel the job to make it terminal
flinkService.cancelJob(jobId, false);
flinkService.setJobFailedErr(null);

observer.observe(ctx);

var events =
kubernetesClient
.v1()
.events()
.inNamespace(sessionJob.getMetadata().getNamespace())
.list()
.getItems();
assertEquals(2, events.size()); // one will be for job status changed
// assert that none of the events contain JOB_NOT_FOUND_ERR
assertFalse(
events.stream()
.anyMatch(
event ->
event.getMessage()
.contains(JobStatusObserver.JOB_NOT_FOUND_ERR)));
}

@Test
public void testSessionJobExceptionNotObservedWhenOldStateIsTerminal() throws Exception {
var sessionJob = initSessionJob();
var status = sessionJob.getStatus();
var jobStatus = status.getJobStatus();
jobStatus.setState(JobStatus.FINISHED); // Set to terminal state

FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
getResourceContext(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));

var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
flinkService.submitJobToSessionCluster(
sessionJob.getMetadata(),
sessionJob.getSpec(),
jobId,
ctx.getDeployConfig(sessionJob.getSpec()),
null);
ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration());
ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L));

long exceptionTime = 4000L;
flinkService.addExceptionHistory(jobId, "SessionJobException", "trace", exceptionTime);

// Ensure jobFailedErr is null before the observe call
flinkService.setJobFailedErr(null);
observer.observe(ctx);

var events =
kubernetesClient
.v1()
.events()
.inNamespace(sessionJob.getMetadata().getNamespace())
.list()
.getItems();
assertEquals(
1, events.size()); // Only one event for job status changed, no exception events
assertEquals(EventRecorder.Reason.JobStatusChanged.name(), events.get(0).getReason());
}

@Test
public void testSessionJobExceptionLimitConfig() throws Exception {
var sessionJob = initSessionJob();
var status = sessionJob.getStatus();
var jobStatus = status.getJobStatus();
jobStatus.setState(JobStatus.RUNNING);
Map<String, String> configuration = new HashMap<>();
configuration.put(
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_LIMIT.key(), "2");
Configuration operatorConfig = Configuration.fromMap(configuration);
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
getResourceContext(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient),
operatorConfig);

var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
flinkService.submitJobToSessionCluster(
sessionJob.getMetadata(),
sessionJob.getSpec(),
jobId,
ctx.getDeployConfig(sessionJob.getSpec()),
null);
ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration());
ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L));

// Add 3 exceptions, but only 2 should be reported due to limit
flinkService.addExceptionHistory(jobId, "SessionJobException1", "trace1", 4000L);
flinkService.addExceptionHistory(jobId, "SessionJobException2", "trace2", 5000L);
flinkService.addExceptionHistory(jobId, "SessionJobException3", "trace3", 6000L);

// Ensure jobFailedErr is null before the observe call
flinkService.setJobFailedErr(null);
observer.observe(ctx);

var events =
kubernetesClient
.v1()
.events()
.inNamespace(sessionJob.getMetadata().getNamespace())
.list()
.getItems();
assertEquals(2, events.size()); // Only 2 exceptions should be reported
}

@Test
public void testSessionJobStackTraceTruncationConfig() throws Exception {
var sessionJob = initSessionJob();
var status = sessionJob.getStatus();
var jobStatus = status.getJobStatus();
jobStatus.setState(JobStatus.RUNNING);
Map<String, String> configuration = new HashMap<>();
configuration.put(
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES.key(),
"2");
Configuration operatorConfig = Configuration.fromMap(configuration);
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
getResourceContext(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient),
operatorConfig);

var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
flinkService.submitJobToSessionCluster(
sessionJob.getMetadata(),
sessionJob.getSpec(),
jobId,
ctx.getDeployConfig(sessionJob.getSpec()),
null);
ReconciliationUtils.updateStatusForDeployedSpec(sessionJob, new Configuration());
ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(3000L));

long exceptionTime = 4000L;
String longTrace = "line1\nline2\nline3\nline4";
flinkService.addExceptionHistory(
jobId, "SessionJobStackTraceCheck", longTrace, exceptionTime);

// Ensure jobFailedErr is null before the observe call
flinkService.setJobFailedErr(null);
observer.observe(ctx);

var events =
kubernetesClient
.v1()
.events()
.inNamespace(sessionJob.getMetadata().getNamespace())
.list()
.getItems();
assertEquals(1, events.size());
String msg = events.get(0).getMessage();
assertTrue(msg.contains("line1"));
assertTrue(msg.contains("line2"));
assertFalse(msg.contains("line3"));
assertTrue(msg.contains("... (2 more lines)"));
}

@Test
public void testSessionJobIgnoreOldExceptions() throws Exception {
var sessionJob = initSessionJob();
var status = sessionJob.getStatus();
var jobStatus = status.getJobStatus();
jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state

FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
getResourceContext(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
ctx.getExceptionCacheEntry().setInitialized(true);
ctx.getExceptionCacheEntry().setJobId(sessionJob.getStatus().getJobStatus().getJobId());
ctx.getExceptionCacheEntry().setLastTimestamp(Instant.ofEpochMilli(2500L));

var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
flinkService.submitJobToSessionCluster(
sessionJob.getMetadata(),
sessionJob.getSpec(),
jobId,
ctx.getDeployConfig(sessionJob.getSpec()),
null);

// Map exception names to timestamps
Map<String, Long> exceptionHistory =
Map.of(
"OldSessionException", 1000L,
"MidSessionException", 2000L,
"NewSessionException", 3000L);
String dummyStackTrace =
"org.apache.%s\n"
+ "\tat org.apache.flink.kubernetes.operator.observer.JobStatusObserverTest.testSessionJobIgnoreOldExceptions(JobStatusObserverTest.java:1)\n"
+ "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n"
+ "\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n"
+ "\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n"
+ "\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n";
// Add mapped exceptions
exceptionHistory.forEach(
(exceptionName, timestamp) -> {
String fullStackTrace = String.format(dummyStackTrace, exceptionName);
flinkService.addExceptionHistory(
jobId, "org.apache." + exceptionName, fullStackTrace, timestamp);
});

// Ensure jobFailedErr is null before the observe call
flinkService.setJobFailedErr(null);
observer.observe(ctx);

var events =
kubernetesClient
.v1()
.events()
.inNamespace(sessionJob.getMetadata().getNamespace())
.list()
.getItems();
assertEquals(1, events.size());
assertTrue(events.get(0).getMessage().contains("org.apache.NewSessionException"));
}

@Test
public void testSessionJobExceptionEventTriggerInitialization() throws Exception {
var sessionJob = initSessionJob();
var status = sessionJob.getStatus();
var jobStatus = status.getJobStatus();
jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state

FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx =
getResourceContext(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));

var now = Instant.now();
var jobId = JobID.fromHexString(sessionJob.getStatus().getJobStatus().getJobId());
flinkService.submitJobToSessionCluster(
sessionJob.getMetadata(),
sessionJob.getSpec(),
jobId,
ctx.getDeployConfig(sessionJob.getSpec()),
null);

// Old exception that happened outside of kubernetes event retention should be ignored
flinkService.addExceptionHistory(
jobId,
"OldSessionException",
"OldSessionException",
now.minus(Duration.ofHours(1)).toEpochMilli());
flinkService.addExceptionHistory(
jobId,
"NewSessionException",
"NewSessionException",
now.minus(Duration.ofMinutes(1)).toEpochMilli());

// Ensure jobFailedErr is null before the observe call
flinkService.setJobFailedErr(null);
observer.observe(ctx);

var events =
kubernetesClient
.v1()
.events()
.inNamespace(sessionJob.getMetadata().getNamespace())
.list()
.getItems();
assertEquals(1, events.size());
assertTrue(events.get(0).getMessage().contains("NewSessionException"));
assertTrue(ctx.getExceptionCacheEntry().isInitialized());
assertEquals(
now.minus(Duration.ofMinutes(1)).truncatedTo(ChronoUnit.MILLIS),
ctx.getExceptionCacheEntry().getLastTimestamp());
}

private static Stream<Arguments> cancellingArgs() {
var args = new ArrayList<Arguments>();
for (var status : JobStatus.values()) {
Expand Down
Loading