Skip to content

Commit 148f835

Browse files
mateczaganygyfora
authored andcommitted
[FLINK-35265] Gracefully handle errors when fetching checkpoint path
1 parent e711ac0 commit 148f835

File tree

2 files changed

+24
-7
lines changed

2 files changed

+24
-7
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/snapshot/StateSnapshotObserver.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434

35+
import static org.apache.flink.configuration.WebOptions.CHECKPOINTS_HISTORY_SIZE;
3536
import static org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
3637

3738
/** The observer of {@link org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot}. */
@@ -142,12 +143,29 @@ private void handleCheckpoint(
142143

143144
if (checkpointStatsResult.isPending()) {
144145
return;
145-
} else if (checkpointStatsResult.getError() != null) {
146-
throw new ReconciliationException(checkpointStatsResult.getError());
147146
}
148147

149-
LOG.info("Checkpoint {} successful: {}", resourceName, checkpointStatsResult.getPath());
150-
FlinkStateSnapshotUtils.snapshotSuccessful(
151-
resource, checkpointStatsResult.getPath(), false);
148+
String path = checkpointStatsResult.getPath();
149+
// At this point the checkpoint is already assumed to be complete, so we can mark the
150+
// snapshot complete with empty path and trigger an event.
151+
if (checkpointStatsResult.getError() != null) {
152+
path = "";
153+
var error =
154+
String.format(
155+
"Checkpoint %s was successful, but failed to fetch path. Flink webserver stores only a limited amount of checkpoints in its cache, try increasing '%s' config for this job.\n%s",
156+
resourceName,
157+
CHECKPOINTS_HISTORY_SIZE.key(),
158+
checkpointStatsResult.getError());
159+
eventRecorder.triggerSnapshotEvent(
160+
resource,
161+
EventRecorder.Type.Warning,
162+
EventRecorder.Reason.CheckpointError,
163+
EventRecorder.Component.Snapshot,
164+
error,
165+
ctx.getKubernetesClient());
166+
}
167+
168+
LOG.info("Checkpoint {} successful: {}", resourceName, path);
169+
FlinkStateSnapshotUtils.snapshotSuccessful(resource, path, false);
152170
}
153171
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -776,8 +776,7 @@ public CheckpointStatsResult fetchCheckpointStats(
776776
stats.getClass().getSimpleName()));
777777
}
778778
} catch (Exception e) {
779-
LOG.error("Exception while fetching checkpoint statistics", e);
780-
return CheckpointStatsResult.pending();
779+
return CheckpointStatsResult.error(e.getMessage());
781780
}
782781
}
783782

0 commit comments

Comments
 (0)