@@ -267,17 +267,28 @@ private JobGraph createJobGraphWithKeyedState(
267267 env .addSource (
268268 new NotifyingDefiniteKeySource (
269269 numberKeys , numberElements , failAfterEmission ) {
270+
271+ String lastCheckpointPath = null ;
272+
273+ /**
274+ * This wait method waits at least two checkpoint finished to
275+ * make sure the latest checkpoint contains all the source data.
276+ */
270277 @ Override
271- public void waitCheckpointCompleted () throws Exception {
278+ public boolean waitCheckpointCompleted () throws Exception {
272279 Optional <String > mostRecentCompletedCheckpointPath =
273280 getLatestCompletedCheckpointPath (
274281 jobID .get (), miniClusterRef .get ());
275- while (!mostRecentCompletedCheckpointPath .isPresent ()) {
276- Thread .sleep (50 );
277- mostRecentCompletedCheckpointPath =
278- getLatestCompletedCheckpointPath (
279- jobID .get (), miniClusterRef .get ());
282+ if (mostRecentCompletedCheckpointPath .isPresent ()) {
283+ if (lastCheckpointPath == null ) {
284+ lastCheckpointPath =
285+ mostRecentCompletedCheckpointPath .get ();
286+ } else if (!lastCheckpointPath .equals (
287+ mostRecentCompletedCheckpointPath .get ())) {
288+ return true ;
289+ }
280290 }
291+ return false ;
281292 }
282293 })
283294 .keyBy (
@@ -315,7 +326,9 @@ public NotifyingDefiniteKeySource(
315326 this .failAfterEmission = failAfterEmission ;
316327 }
317328
318- public void waitCheckpointCompleted () throws Exception {}
329+ public boolean waitCheckpointCompleted () throws Exception {
330+ return true ;
331+ }
319332
320333 @ Override
321334 public void run (SourceContext <Integer > ctx ) throws Exception {
@@ -334,7 +347,18 @@ public void run(SourceContext<Integer> ctx) throws Exception {
334347 counter ++;
335348 }
336349 } else {
337- waitCheckpointCompleted ();
350+ boolean newCheckpoint = false ;
351+ long waited = 0L ;
352+ // maximum wait 5min
353+ while (!newCheckpoint && waited < 30000L ) {
354+ synchronized (ctx .getCheckpointLock ()) {
355+ newCheckpoint = waitCheckpointCompleted ();
356+ }
357+ if (!newCheckpoint ) {
358+ waited += 10L ;
359+ Thread .sleep (10L );
360+ }
361+ }
338362 if (failAfterEmission ) {
339363 throw new FlinkRuntimeException (
340364 "Make job fail artificially, to retain completed checkpoint." );
0 commit comments