Skip to content

Commit 4ea2aed

Browse files
Fix checkpointing not enabled exception and add batch e2e test
1 parent 9eb3c38 commit 4ea2aed

File tree

3 files changed

+46
-3
lines changed

3 files changed

+46
-3
lines changed

examples/basic-batch.yaml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
apiVersion: flink.apache.org/v1beta1
20+
kind: FlinkDeployment
21+
metadata:
22+
name: basic-batch-example
23+
spec:
24+
image: flink:1.20
25+
flinkVersion: v1_20
26+
flinkConfiguration:
27+
taskmanager.numberOfTaskSlots: "2"
28+
serviceAccount: flink
29+
jobManager:
30+
resource:
31+
memory: "2048m"
32+
cpu: 1
33+
taskManager:
34+
resource:
35+
memory: "2048m"
36+
cpu: 1
37+
job:
38+
jarURI: local:///opt/flink/examples/streaming/WordCount.jar
39+
parallelism: 2
40+
upgradeMode: stateless

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import java.util.Optional;
143143
import java.util.concurrent.Callable;
144144
import java.util.concurrent.CompletableFuture;
145+
import java.util.concurrent.ExecutionException;
145146
import java.util.concurrent.ExecutorService;
146147
import java.util.concurrent.TimeUnit;
147148
import java.util.concurrent.TimeoutException;
@@ -546,7 +547,7 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
546547
try {
547548
latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
548549
} catch (Exception e) {
549-
if (e instanceof RestClientException
550+
if (e instanceof ExecutionException
550551
&& e.getMessage() != null
551552
&& e.getMessage().contains("Checkpointing has not been enabled")) {
552553
LOG.warn("Checkpointing not enabled for job {}", jobId, e);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import java.util.Random;
9999
import java.util.Set;
100100
import java.util.concurrent.CompletableFuture;
101+
import java.util.concurrent.ExecutionException;
101102
import java.util.concurrent.TimeoutException;
102103
import java.util.function.Consumer;
103104
import java.util.stream.Collectors;
@@ -595,8 +596,9 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
595596
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
596597
getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
597598
if (throwCheckpointingDisabledError) {
598-
throw new RestClientException(
599-
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST);
599+
throw new ExecutionException(
600+
new RestClientException(
601+
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST));
600602
}
601603

602604
if (checkpointInfo != null) {

0 commit comments

Comments
 (0)