Skip to content

Commit afb13ec

Browse files
committed
Clearer comments
1 parent 9abfd87 commit afb13ec

File tree

5 files changed

+40
-20
lines changed

5 files changed

+40
-20
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@
5858
*
5959
* <p>Orchestration Process
6060
*
61-
* <p>FlinkBlueGreenDeploymentController.reconcile() 1. Create BlueGreenContext with current
62-
* deployment state 2. Query StateHandlerRegistry for appropriate handler 3. Delegate to
63-
* specific StateHandler.handle(context) 4. StateHandler invokes BlueGreenDeploymentService
64-
* operations ↓ 5. Return UpdateControl with next reconciliation schedule
61+
* <p>FlinkBlueGreenDeploymentController.reconcile() 1. Create BlueGreenContext with current
62+
* deployment state 2. Query StateHandlerRegistry for appropriate handler 3. Delegate to specific
63+
* StateHandler.handle(context) 4. StateHandler invokes BlueGreenDeploymentService operations 5.
64+
* Return UpdateControl with next reconciliation schedule
6565
*/
6666
@ControllerConfiguration
6767
public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueGreenDeployment> {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
114114
if (isFlinkDeploymentReady(currentFlinkDeployment)) {
115115
if (specDiff == BlueGreenDiffType.TRANSITION) {
116116
if (handleSavepoint(context, currentFlinkDeployment)) {
117-
// This is the only portion where the last reconciled spec is not set,
118-
// so we can reprocess TRANSITION after the savepoint is done
117+
// Spec is intentionally not marked as reconciled here to allow
118+
// reprocessing the TRANSITION once savepoint creation completes
119119
var savepointingState = calculateSavepointingState(currentDeploymentType);
120120
return patchStatusUpdateControl(context, savepointingState, null)
121121
.rescheduleAfter(getReconciliationReschedInterval(context));
@@ -218,7 +218,8 @@ private Savepoint configureInitialSavepoint(
218218
context.getCtxFactory()
219219
.getResourceContext(currentFlinkDeployment, context.getJosdkContext());
220220

221-
// For now UpgradeMode != STATELESS will use a savepoint, originally only SAVEPOINT
221+
// Create savepoint for all upgrade modes except STATELESS
222+
// (originally only SAVEPOINT mode required savepoints)
222223
if (isSavepointRequired(context)) {
223224
String savepointTriggerId = context.getDeploymentStatus().getSavepointTriggerId();
224225
var savepointFetchResult = fetchSavepointInfo(ctx, savepointTriggerId);
@@ -236,8 +237,8 @@ private Savepoint configureInitialSavepoint(
236237
savepointFormatType);
237238
}
238239

239-
// The logic below looked for the last checkpoint in case upgradeMode = LAST_STATE
240-
// We don't want to rely on last checkpoint for now.
240+
// Currently not using last checkpoint recovery for LAST_STATE upgrade mode
241+
// This could be re-enabled in the future by uncommenting the logic below
241242
return null;
242243

243244
// if (!lookForCheckpoint(context)) {
@@ -315,12 +316,12 @@ private void handleSpecChangesDuringTransition(
315316
FlinkDeployment incomingFlinkDeployment = transitionState.nextDeployment;
316317

317318
if (diffType != BlueGreenDiffType.IGNORE) {
318-
// Apply the new spec changes to the currently active deployment
319+
// Apply new spec changes to the deployment currently being transitioned to
319320
incomingFlinkDeployment.setSpec(
320321
context.getBgDeployment().getSpec().getTemplate().getSpec());
321-
// Resetting the abort grace period
322+
// Reset abort grace period to allow more time for the updated deployment
322323
setAbortTimestamp(context);
323-
// Ack'ing the spec change
324+
// Mark spec change as reconciled
324325
setLastReconciledSpec(context);
325326
updateFlinkDeployment(incomingFlinkDeployment, context);
326327

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ public InitializingBlueStateHandler(BlueGreenDeploymentService deploymentService
4141
public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context) {
4242
FlinkBlueGreenDeploymentStatus deploymentStatus = context.getDeploymentStatus();
4343

44-
// We only allow a deployment if it's indeed the first (null last spec)
45-
// or if we're recovering (failing status) and the spec has changed
44+
// Deploy only if this is the initial deployment (no previous spec exists)
45+
// or if we're recovering from a failure and the spec has changed since the last attempt
4646
if (deploymentStatus.getLastReconciledSpec() == null
4747
|| (deploymentStatus.getJobStatus().getState().equals(JobStatus.FAILING)
4848
&& hasSpecChanged(context))) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/SavepointingStateHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context)
4242
DeploymentType currentType = getCurrentDeploymentType();
4343
var isSavepointReady = deploymentService.monitorSavepoint(context, currentType);
4444

45-
// Savepoint complete, continue with the transition
45+
// Savepoint creation completed, transition back to active state to continue deployment
4646
if (isSavepointReady) {
4747
var nextState =
4848
getSupportedState() == FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,13 @@ public static void setAbortTimestamp(BlueGreenContext context) {
219219

220220
// ==================== Savepoint/Checkpoint Operations ====================
221221

222+
/**
223+
* Determines if a savepoint is required based on the deployment's upgrade mode. Currently all
224+
* upgrade modes except STATELESS require savepoints.
225+
*
226+
* @param context the Blue/Green transition context
227+
* @return true if savepoint is required, false otherwise
228+
*/
222229
public static boolean isSavepointRequired(BlueGreenContext context) {
223230
UpgradeMode upgradeMode =
224231
context.getBgDeployment()
@@ -228,7 +235,8 @@ public static boolean isSavepointRequired(BlueGreenContext context) {
228235
.getJob()
229236
.getUpgradeMode();
230237
// return UpgradeMode.SAVEPOINT == upgradeMode;
231-
// For now we're taking savepoints in STATELESS or LAST-STATE
238+
// Currently taking savepoints for all modes except STATELESS
239+
// (previously only SAVEPOINT mode required savepoints)
232240
return UpgradeMode.STATELESS != upgradeMode;
233241
}
234242

@@ -289,7 +297,7 @@ public static Savepoint getLastCheckpoint(
289297
.getJobId()),
290298
resourceContext.getObserveConfig());
291299

292-
// Alternative action if no checkpoint is available?
300+
// Alternative fallback if no checkpoint is available could be implemented here
293301
if (lastCheckpoint.isEmpty()) {
294302
throw new IllegalStateException(
295303
"Last Checkpoint for Job "
@@ -302,6 +310,17 @@ public static Savepoint getLastCheckpoint(
302310

303311
// ==================== Deployment Preparation Utilities ====================
304312

313+
/**
314+
* Creates a new FlinkDeployment resource for a Blue/Green deployment transition. This method
315+
* prepares the deployment with proper metadata, specs, and savepoint configuration.
316+
*
317+
* @param context the Blue/Green transition context
318+
* @param deploymentType the type of deployment (BLUE or GREEN)
319+
* @param lastCheckpoint the savepoint/checkpoint to restore from (can be null)
320+
* @param isFirstDeployment whether this is the initial deployment
321+
* @param bgMeta the metadata of the parent Blue/Green deployment
322+
* @return configured FlinkDeployment ready for deployment
323+
*/
305324
public static FlinkDeployment prepareFlinkDeployment(
306325
BlueGreenContext context,
307326
DeploymentType deploymentType,
@@ -323,7 +342,7 @@ public static FlinkDeployment prepareFlinkDeployment(
323342
"spec",
324343
FlinkBlueGreenDeploymentSpec.class);
325344

326-
// The B/G initialSavepointPath is only used in first time deployments
345+
// The Blue/Green initialSavepointPath is only used for first-time deployments
327346
if (isFirstDeployment) {
328347
String initialSavepointPath =
329348
adjustedSpec.getTemplate().getSpec().getJob().getInitialSavepointPath();
@@ -335,11 +354,11 @@ public static FlinkDeployment prepareFlinkDeployment(
335354
.getJob()
336355
.setInitialSavepointPath(initialSavepointPath);
337356
} else {
338-
LOG.info("Clean start up, no checkpoint/savepoint");
357+
LOG.info("Clean startup with no checkpoint/savepoint restoration");
339358
}
340359
} else if (lastCheckpoint != null) {
341360
String location = lastCheckpoint.getLocation().replace("file:", "");
342-
LOG.info("Using B/G checkpoint: " + location);
361+
LOG.info("Using Blue/Green savepoint/checkpoint: " + location);
343362
adjustedSpec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
344363
}
345364

0 commit comments

Comments
 (0)