Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Commit e7267ca

Browse files
yzeng1618zengyi
andauthored
[Improve][Fink] NO_CDC source support checkpoint (apache#10094)
Co-authored-by: zengyi <zengyi@chinatelecom.cn>
1 parent 27c966d commit e7267ca

File tree

5 files changed

+84
-39
lines changed

5 files changed

+84
-39
lines changed

seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,13 @@ static JobMode getJobMode(Config config) {
6161
}
6262

6363
static boolean getEnableCheckpoint(Config config) {
64-
boolean enableCheckpoint;
6564
Config envConfig = config.getConfig("env");
65+
long checkpointInterval = -1;
6666
if (envConfig.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
67-
enableCheckpoint = envConfig.getInt(EnvCommonOptions.CHECKPOINT_INTERVAL.key()) > 0;
68-
} else {
69-
enableCheckpoint = false;
67+
checkpointInterval = envConfig.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
68+
} else if (envConfig.hasPath("execution.checkpoint.interval")) {
69+
checkpointInterval = envConfig.getLong("execution.checkpoint.interval");
7070
}
71-
return enableCheckpoint || getJobMode(config) == JobMode.STREAMING;
71+
return checkpointInterval > 0 || getJobMode(config) == JobMode.STREAMING;
7272
}
7373
}

seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,23 @@ void testEnableCheckpoint() {
4545
+ " checkpoint.interval = 10\n"
4646
+ "}");
4747
Assertions.assertTrue(RuntimeEnvironment.getEnableCheckpoint(config));
48+
49+
config =
50+
ConfigFactory.parseString(
51+
"env {\n"
52+
+ " parallelism = 1\n"
53+
+ " job.mode = \"BATCH\"\n"
54+
+ " execution.checkpoint.interval = 10\n"
55+
+ "}");
56+
Assertions.assertTrue(RuntimeEnvironment.getEnableCheckpoint(config));
57+
58+
config =
59+
ConfigFactory.parseString(
60+
"env {\n"
61+
+ " parallelism = 1\n"
62+
+ " job.mode = \"BATCH\"\n"
63+
+ " checkpoint.interval = 0\n"
64+
+ "}");
65+
Assertions.assertFalse(RuntimeEnvironment.getEnableCheckpoint(config));
4866
}
4967
}

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.ArrayList;
4747
import java.util.List;
4848
import java.util.Objects;
49+
import java.util.OptionalLong;
4950
import java.util.stream.Collectors;
5051

5152
@Slf4j
@@ -56,6 +57,8 @@ public abstract class AbstractFlinkRuntimeEnvironment implements RuntimeEnvironm
5657
protected JobMode jobMode;
5758
protected String jobName = Constants.LOGO;
5859

60+
private static final long DEFAULT_CHECKPOINT_INTERVAL_MS = 10000L;
61+
5962
protected AbstractFlinkRuntimeEnvironment(Config config) {
6063
this.initialize(config);
6164
}
@@ -77,19 +80,23 @@ public StreamExecutionEnvironment getStreamExecutionEnvironment() {
7780
}
7881

7982
protected void setCheckpoint() {
80-
if (jobMode == JobMode.BATCH) {
81-
log.warn(
82-
"Disabled Checkpointing. In flink execution environment, checkpointing is not supported and not needed when executing jobs in BATCH mode");
83+
OptionalLong intervalOpt = resolveCheckpointInterval(true);
84+
boolean hasExplicitInterval = intervalOpt.isPresent();
85+
boolean positiveInterval = intervalOpt.isPresent() && intervalOpt.getAsLong() > 0;
86+
long interval = intervalOpt.orElse(DEFAULT_CHECKPOINT_INTERVAL_MS);
87+
88+
if (jobMode == JobMode.BATCH && !positiveInterval) {
89+
log.info(
90+
"Checkpoint is disabled for batch job because 'checkpoint.interval' is not set or <= 0.");
91+
return;
8392
}
84-
long interval;
85-
if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
86-
interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
87-
} else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
93+
94+
if (hasExplicitInterval && !positiveInterval) {
8895
log.warn(
89-
"the parameter 'execution.checkpoint.interval' will be deprecated, please use common parameter 'checkpoint.interval' to set it");
90-
interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
91-
} else {
92-
interval = 10000L;
96+
"checkpoint.interval is set to {} which is not positive, fallback to default {} ms for streaming job.",
97+
interval,
98+
DEFAULT_CHECKPOINT_INTERVAL_MS);
99+
interval = DEFAULT_CHECKPOINT_INTERVAL_MS;
93100
}
94101

95102
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
@@ -195,8 +202,28 @@ protected void createStreamEnvironment() {
195202
}
196203

197204
if (this.jobMode.equals(JobMode.BATCH)) {
198-
environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
205+
OptionalLong intervalOpt = resolveCheckpointInterval(false);
206+
if (intervalOpt.isPresent() && intervalOpt.getAsLong() > 0) {
207+
log.info(
208+
"Flink batch runtime does not support checkpoint-based restore; 'checkpoint.interval' > 0 will make this batch job run in streaming runtime.");
209+
} else {
210+
environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
211+
}
212+
}
213+
}
214+
215+
protected OptionalLong resolveCheckpointInterval(boolean warnLegacy) {
216+
if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
217+
return OptionalLong.of(config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
218+
}
219+
if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
220+
if (warnLegacy) {
221+
log.warn(
222+
"the parameter 'execution.checkpoint.interval' will be deprecated, please use common parameter 'checkpoint.interval' to set it");
223+
}
224+
return OptionalLong.of(config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL));
199225
}
226+
return OptionalLong.empty();
200227
}
201228

202229
private void setTimeCharacteristic() {

seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.Collections;
5353
import java.util.HashSet;
5454
import java.util.List;
55+
import java.util.OptionalLong;
5556
import java.util.Set;
5657
import java.util.stream.Collectors;
5758
import java.util.stream.Stream;
@@ -135,11 +136,17 @@ public void execute() throws TaskExecuteException {
135136
"Flink Execution Plan: {}",
136137
flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
137138
LOGGER.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
138-
if (!flinkRuntimeEnvironment.isStreaming()) {
139-
flinkRuntimeEnvironment
140-
.getStreamExecutionEnvironment()
141-
.setRuntimeMode(RuntimeExecutionMode.BATCH);
142-
LOGGER.info("Flink job Mode: {}", JobMode.BATCH);
139+
if (flinkRuntimeEnvironment.getJobMode() == JobMode.BATCH) {
140+
OptionalLong checkpointInterval =
141+
flinkRuntimeEnvironment.resolveCheckpointInterval(false);
142+
boolean enableCheckpointForBatch =
143+
checkpointInterval.isPresent() && checkpointInterval.getAsLong() > 0;
144+
if (!enableCheckpointForBatch) {
145+
flinkRuntimeEnvironment
146+
.getStreamExecutionEnvironment()
147+
.setRuntimeMode(RuntimeExecutionMode.BATCH);
148+
LOGGER.info("Flink job Mode: {}", JobMode.BATCH);
149+
}
143150
}
144151
try {
145152
final long jobStartTime = System.currentTimeMillis();

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.seatunnel.e2e.common.TestSuiteBase;
2222
import org.apache.seatunnel.e2e.common.container.EngineType;
2323
import org.apache.seatunnel.e2e.common.container.TestContainer;
24-
import org.apache.seatunnel.e2e.common.container.TestContainerId;
2524
import org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
2625
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2726
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
@@ -208,9 +207,10 @@ public void testZetaStreamingCheckpointNoInterval(TestContainer container)
208207
public void testFlinkCheckpointEnable(AbstractTestFlinkContainer container)
209208
throws IOException, InterruptedException {
210209
/**
211-
* In flink execution environment, checkpoint is not supported and not needed when executing
212-
* jobs in BATCH mode. So it is only necessary to determine whether flink has enabled
213-
* checkpoint by configuring tasks with 'checkpoint.interval'.
210+
* In Flink execution environment, batch jobs normally do not enable checkpointing. When
211+
* 'checkpoint.interval' is configured for a batch job, SeaTunnel will submit it in
212+
* streaming runtime with the same checkpoint interval. This test verifies that Flink has
213+
* enabled checkpointing and uses the configured interval.
214214
*/
215215
Container.ExecResult enableExecResult =
216216
container.executeJob(
@@ -228,19 +228,12 @@ public void testFlinkCheckpointEnable(AbstractTestFlinkContainer container)
228228
jobId)),
229229
String.class,
230230
Object.class);
231-
/**
232-
* when the checkpoint interval is 0x7fffffffffffffff, indicates that checkpoint is
233-
* disabled. reference {@link
234-
* org.apache.flink.runtime.jobgraph.JobGraph#isCheckpointingEnabled()}
235-
*/
236-
if (container.identifier().equals(TestContainerId.FLINK_1_13)
237-
|| container.identifier().equals(TestContainerId.FLINK_1_14)
238-
|| container.identifier().equals(TestContainerId.FLINK_1_15)
239-
|| container.identifier().equals(TestContainerId.FLINK_1_16)) {
240-
Assertions.assertEquals(Long.MAX_VALUE, jobConfig.getOrDefault("interval", 0L));
241-
} else {
242-
Assertions.assertEquals(0, jobConfig.getOrDefault("interval", 0));
243-
}
231+
Object intervalObject = jobConfig.get("interval");
232+
Assertions.assertNotNull(intervalObject);
233+
long interval = ((Number) intervalObject).longValue();
234+
// the value here should be consistent with `checkpoint.interval` in
235+
// batch_fakesource_to_localfile_checkpoint_enable.conf
236+
Assertions.assertEquals(1000L, interval);
244237
}
245238

246239
@TestTemplate

0 commit comments

Comments
 (0)