Skip to content

Commit 347117d

Browse files
committed
Addressing edge error cases when patching an existing FlinkDeployment resource.
1 parent 3d55610 commit 347117d

File tree

4 files changed

+66
-50
lines changed

4 files changed

+66
-50
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,10 @@ public FlinkDeployment getGreenDeployment() {
5656
public FlinkDeployment getDeploymentByType(BlueGreenDeploymentType type) {
5757
return type == BlueGreenDeploymentType.BLUE ? getBlueDeployment() : getGreenDeployment();
5858
}
59+
60+
public BlueGreenDeploymentType getOppositeDeploymentType(BlueGreenDeploymentType type) {
61+
return type == BlueGreenDeploymentType.BLUE
62+
? BlueGreenDeploymentType.GREEN
63+
: BlueGreenDeploymentType.BLUE;
64+
}
5965
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.deployCluster;
4646
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.isFlinkDeploymentReady;
4747
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.suspendFlinkDeployment;
48-
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.updateFlinkDeployment;
4948
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.fetchSavepointInfo;
5049
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getReconciliationReschedInterval;
5150
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getSpecDiff;
@@ -54,7 +53,6 @@
5453
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.isSavepointRequired;
5554
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.millisToInstantStr;
5655
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.prepareFlinkDeployment;
57-
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.setAbortTimestamp;
5856
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.setLastReconciledSpec;
5957
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.triggerSavepoint;
6058

@@ -131,7 +129,7 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
131129
context, currentBlueGreenDeploymentType, currentFlinkDeployment);
132130
} else {
133131
setLastReconciledSpec(context);
134-
return patchFlinkDeployment(context, currentBlueGreenDeploymentType, specDiff);
132+
return patchFlinkDeployment(context, currentBlueGreenDeploymentType);
135133
}
136134
} else {
137135
if (context.getDeploymentStatus().getJobStatus().getState() != JobStatus.FAILING) {
@@ -152,25 +150,25 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
152150
}
153151

154152
private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
155-
BlueGreenContext context,
156-
BlueGreenDeploymentType currentBlueGreenDeploymentType,
157-
BlueGreenDiffType specDiff) {
153+
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {
158154

159-
if (specDiff == BlueGreenDiffType.PATCH_CHILD) {
160-
FlinkDeployment nextFlinkDeployment =
161-
context.getDeploymentByType(currentBlueGreenDeploymentType);
155+
String childDeploymentName =
156+
context.getBgDeployment().getMetadata().getName()
157+
+ "-"
158+
+ currentBlueGreenDeploymentType.toString().toLowerCase();
159+
LOG.info("Patching FlinkDeployment '{}'", childDeploymentName);
162160

163-
nextFlinkDeployment.setSpec(
164-
context.getBgDeployment().getSpec().getTemplate().getSpec());
161+
// We want to patch, therefore the transition should point to the existing deployment
162+
// details
163+
var patchingState = calculatePatchingState(currentBlueGreenDeploymentType);
165164

166-
updateFlinkDeployment(nextFlinkDeployment, context);
167-
}
165+
// If we're not transitioning between deployments, mark as a single deployment to have it
166+
// not wait for synchronization
167+
var isFirstDeployment = context.getDeployments().getNumberOfDeployments() != 2;
168168

169-
return patchStatusUpdateControl(
170-
context,
171-
calculatePatchingState(currentBlueGreenDeploymentType),
172-
JobStatus.RECONCILING)
173-
.rescheduleAfter(BlueGreenUtils.getReconciliationReschedInterval(context));
169+
// No checkpoint will be used when patching
170+
return initiateDeployment(
171+
context, currentBlueGreenDeploymentType, patchingState, null, isFirstDeployment);
174172
}
175173

176174
private UpdateControl<FlinkBlueGreenDeployment> startTransition(
@@ -284,7 +282,7 @@ private boolean handleSavepoint(
284282
return true;
285283
}
286284

287-
LOG.debug("Savepoint previously requested (triggerId: {})", savepointTriggerId);
285+
LOG.info("Savepoint previously requested (triggerId: {})", savepointTriggerId);
288286
return false;
289287
}
290288

@@ -309,11 +307,16 @@ private FlinkBlueGreenDeploymentState calculateSavepointingState(
309307
public UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
310308
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {
311309

310+
var updateControl =
311+
handleSpecChangesDuringTransition(context, currentBlueGreenDeploymentType);
312+
313+
if (updateControl != null) {
314+
return updateControl;
315+
}
316+
312317
TransitionState transitionState =
313318
determineTransitionState(context, currentBlueGreenDeploymentType);
314319

315-
handleSpecChangesDuringTransition(context, transitionState);
316-
317320
if (isFlinkDeploymentReady(transitionState.nextDeployment)) {
318321
return shouldWeDelete(
319322
context,
@@ -326,28 +329,20 @@ public UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
326329
}
327330
}
328331

329-
private void handleSpecChangesDuringTransition(
330-
BlueGreenContext context, TransitionState transitionState) {
332+
private UpdateControl<FlinkBlueGreenDeployment> handleSpecChangesDuringTransition(
333+
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {
331334
if (hasSpecChanged(context)) {
332335
BlueGreenDiffType diffType = getSpecDiff(context);
333336

334-
FlinkDeployment incomingFlinkDeployment = transitionState.nextDeployment;
335-
336337
if (diffType != BlueGreenDiffType.IGNORE) {
337-
// Apply new spec changes to the deployment currently being transitioned to
338-
incomingFlinkDeployment.setSpec(
339-
context.getBgDeployment().getSpec().getTemplate().getSpec());
340-
// Reset abort grace period to allow more time for the updated deployment
341-
setAbortTimestamp(context);
342-
// Mark spec change as reconciled
343338
setLastReconciledSpec(context);
344-
updateFlinkDeployment(incomingFlinkDeployment, context);
345-
346-
LOG.info(
347-
"Blue/Green Spec change detected during transition, patching incoming '{}' deployment with new changes",
348-
incomingFlinkDeployment.getMetadata().getName());
339+
var oppositeDeploymentType =
340+
context.getOppositeDeploymentType(currentBlueGreenDeploymentType);
341+
return patchFlinkDeployment(context, oppositeDeploymentType);
349342
}
350343
}
344+
345+
return null;
351346
}
352347

353348
private TransitionState determineTransitionState(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -261,25 +261,38 @@ public static boolean lookForCheckpoint(BlueGreenContext context) {
261261

262262
@SneakyThrows
263263
public static String triggerSavepoint(FlinkResourceContext<FlinkDeployment> ctx) {
264-
265264
var jobId = ctx.getResource().getStatus().getJobStatus().getJobId();
266265
var conf = ctx.getObserveConfig();
267266
var savepointFormatType =
268267
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
269268
var savepointDirectory =
270269
Preconditions.checkNotNull(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
271270

272-
return ctx.getFlinkService()
273-
.triggerSavepoint(jobId, savepointFormatType, savepointDirectory, conf);
271+
LOG.info("About to trigger savepoint info for jobId: {}", jobId);
272+
273+
String triggerId =
274+
ctx.getFlinkService()
275+
.triggerSavepoint(jobId, savepointFormatType, savepointDirectory, conf);
276+
277+
LOG.info(
278+
"Triggered savepoint for jobId: {}, triggerId: {}, savepoint dir: {}",
279+
jobId,
280+
triggerId,
281+
savepointDirectory);
282+
283+
return triggerId;
274284
}
275285

276286
public static SavepointFetchResult fetchSavepointInfo(
277287
FlinkResourceContext<FlinkDeployment> ctx, String triggerId) {
278-
return ctx.getFlinkService()
279-
.fetchSavepointInfo(
280-
triggerId,
281-
ctx.getResource().getStatus().getJobStatus().getJobId(),
282-
ctx.getObserveConfig());
288+
String jobId = ctx.getResource().getStatus().getJobStatus().getJobId();
289+
LOG.info("About to fetch savepoint info for jobId: {}, triggerId: {}", jobId, triggerId);
290+
291+
var savepointFetchResult =
292+
ctx.getFlinkService().fetchSavepointInfo(triggerId, jobId, ctx.getObserveConfig());
293+
294+
LOG.info("Fetched savepoint info for jobId: {}, triggerId: {}", jobId, triggerId);
295+
return savepointFetchResult;
283296
}
284297

285298
public static Savepoint getLastCheckpoint(

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,6 @@ public void applyChanges(FlinkBlueGreenDeployment deployment, KubernetesClient c
562562
public void verifySpecificBehavior(
563563
ReconcileResult result, List<FlinkDeployment> deployments) {
564564
assertEquals(1, deployments.size());
565-
var updatedDeployment = deployments.get(0);
566565

567566
// Child spec change should be applied to FlinkDeployment
568567
assertEquals(
@@ -617,20 +616,23 @@ private ReconcileResult reconcileAndVerifyPatchBehavior(
617616
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs)
618617
throws Exception {
619618

620-
var flinkDeployments = getFlinkDeployments();
621-
assertEquals(1, flinkDeployments.size());
622-
var existingFlinkDeployment = flinkDeployments.get(0);
623-
619+
// Initiating a patch operation
624620
var minReconciliationTs = System.currentTimeMillis() - 1;
625621
rs = reconcile(rs.deployment);
626622

627623
assertPatchOperationTriggered(rs, minReconciliationTs);
628624
assertTransitioningState(rs);
629625

626+
var flinkDeployments = getFlinkDeployments();
627+
assertEquals(1, flinkDeployments.size());
628+
629+
// The patch operation reinitialized the deployment, simulating startup
630+
simulateSuccessfulJobStart(flinkDeployments.get(0));
631+
630632
minReconciliationTs = System.currentTimeMillis() - 1;
631633
rs = reconcile(rs.deployment);
632634

633-
return new ReconcileResult(minReconciliationTs, rs, existingFlinkDeployment);
635+
return new ReconcileResult(minReconciliationTs, rs, null);
634636
}
635637

636638
private ReconcileResult reconcileAndVerifyIgnoreBehavior(

0 commit comments

Comments
 (0)