Skip to content

Commit 3d55610

Browse files
committed
Addressing PR comments
1 parent a731b09 commit 3d55610

File tree

12 files changed

+91
-71
lines changed

12 files changed

+91
-71
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
* Enumeration of the two possible Flink Blue/Green deployment types. Only one of each type will be
2424
* present at all times for a particular job.
2525
*/
26-
public enum DeploymentType {
26+
public enum BlueGreenDeploymentType {
2727
/** Identifier for the first or "Blue" deployment type. */
2828
BLUE,
2929

@@ -32,8 +32,8 @@ public enum DeploymentType {
3232

3333
public static final String LABEL_KEY = "flink/blue-green-deployment-type";
3434

35-
public static DeploymentType fromDeployment(FlinkDeployment flinkDeployment) {
35+
public static BlueGreenDeploymentType fromDeployment(FlinkDeployment flinkDeployment) {
3636
String typeAnnotation = flinkDeployment.getMetadata().getLabels().get(LABEL_KEY);
37-
return DeploymentType.valueOf(typeAnnotation);
37+
return BlueGreenDeploymentType.valueOf(typeAnnotation);
3838
}
3939
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentConfigOptions.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public static ConfigOptions.OptionBuilder operatorConfig(String key) {
3434
}
3535

3636
/**
37-
* NOTE: The string durations need to be in format "{length value}{time unit label}", e.g.
37+
* NOTE: The string durations need to be in format "{time unit value}{time unit label}", e.g.
3838
* "123ms", "321 s". If no time unit label is specified, it will be considered as milliseconds.
3939
* There is no fall back to parse ISO-8601 duration format, until Flink 2.x
4040
*
@@ -46,7 +46,7 @@ public static ConfigOptions.OptionBuilder operatorConfig(String key) {
4646
* <li>MINUTES: "m", "min", "minute"
4747
* <li>SECONDS: "s", "sec", "second"
4848
* <li>MILLISECONDS: "ms", "milli", "millisecond"
49-
* <li>MICROSECONDS: "µs", "micro", "microsecond"
49+
* <li>MICROSECONDS: "us", "micro", "microsecond"
5050
* <li>NANOSECONDS: "ns", "nano", "nanosecond"
5151
* </ul>
5252
*/
@@ -55,19 +55,19 @@ public static ConfigOptions.OptionBuilder operatorConfig(String key) {
5555
.durationType()
5656
.defaultValue(Duration.ofMinutes(10))
5757
.withDescription(
58-
"The max time to wait in milliseconds for a deployment to become ready before aborting it. Cannot be smaller than 10 minutes.");
58+
"The max time to wait for a deployment to become ready before aborting it.");
5959

6060
public static final ConfigOption<Duration> RECONCILIATION_RESCHEDULING_INTERVAL =
6161
operatorConfig("reconciliation.reschedule-interval")
6262
.durationType()
6363
.defaultValue(Duration.ofSeconds(15))
6464
.withDescription(
65-
"Configurable delay in milliseconds to use when the operator reschedules a reconciliation.");
65+
"Configurable delay to use when the operator reschedules a reconciliation.");
6666

6767
public static final ConfigOption<Duration> DEPLOYMENT_DELETION_DELAY =
6868
operatorConfig("deployment-deletion.delay")
6969
.durationType()
7070
.defaultValue(Duration.ofMillis(0))
7171
.withDescription(
72-
"Configurable delay in milliseconds before deleting a deployment after being marked done.");
72+
"Configurable delay before deleting a deployment after being marked done.");
7373
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
112112
ctxFactory);
113113
return BlueGreenDeploymentService.patchStatusUpdateControl(
114114
context, INITIALIZING_BLUE, null)
115-
.rescheduleAfter(100);
115+
.rescheduleAfter(0);
116116
} else {
117117
FlinkBlueGreenDeploymentState currentState = deploymentStatus.getBlueGreenState();
118118
var context =

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2121
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
22-
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
22+
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
2323

2424
import io.javaoperatorsdk.operator.api.reconciler.Context;
2525
import lombok.Data;
@@ -58,9 +58,10 @@ static FlinkBlueGreenDeployments fromSecondaryResources(
5858
FlinkBlueGreenDeployments flinkBlueGreenDeployments = new FlinkBlueGreenDeployments();
5959

6060
for (FlinkDeployment dependentDeployment : secondaryResources) {
61-
var flinkBlueGreenDeploymentType = DeploymentType.fromDeployment(dependentDeployment);
61+
var flinkBlueGreenDeploymentType =
62+
BlueGreenDeploymentType.fromDeployment(dependentDeployment);
6263

63-
if (flinkBlueGreenDeploymentType == DeploymentType.BLUE) {
64+
if (flinkBlueGreenDeploymentType == BlueGreenDeploymentType.BLUE) {
6465
if (flinkBlueGreenDeployments.getFlinkDeploymentBlue() != null) {
6566
FlinkBlueGreenDeploymentController.logAndThrow(
6667
"Detected multiple Dependent Deployments of type BLUE");

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2121
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
22-
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
22+
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
2323
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
2424
import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
2525
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
@@ -53,7 +53,7 @@ public FlinkDeployment getGreenDeployment() {
5353
return deployments != null ? deployments.getFlinkDeploymentGreen() : null;
5454
}
5555

56-
public FlinkDeployment getDeploymentByType(DeploymentType type) {
57-
return type == DeploymentType.BLUE ? getBlueDeployment() : getGreenDeployment();
56+
public FlinkDeployment getDeploymentByType(BlueGreenDeploymentType type) {
57+
return type == BlueGreenDeploymentType.BLUE ? getBlueDeployment() : getGreenDeployment();
5858
}
5959
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import org.apache.flink.api.common.JobStatus;
2121
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2222
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
23+
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
2324
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDiffType;
24-
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
2525
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
2626
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
2727
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
@@ -70,23 +70,27 @@ public class BlueGreenDeploymentService {
7070
* Initiates a new Blue/Green deployment.
7171
*
7272
* @param context the transition context
73-
* @param nextDeploymentType the type of deployment to create
73+
* @param nextBlueGreenDeploymentType the type of deployment to create
7474
* @param nextState the next state to transition to
7575
* @param lastCheckpoint the checkpoint to restore from (can be null)
7676
* @param isFirstDeployment whether this is the first deployment
7777
* @return UpdateControl for the deployment
7878
*/
7979
public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
8080
BlueGreenContext context,
81-
DeploymentType nextDeploymentType,
81+
BlueGreenDeploymentType nextBlueGreenDeploymentType,
8282
FlinkBlueGreenDeploymentState nextState,
8383
Savepoint lastCheckpoint,
8484
boolean isFirstDeployment) {
8585
ObjectMeta bgMeta = context.getBgDeployment().getMetadata();
8686

8787
FlinkDeployment flinkDeployment =
8888
prepareFlinkDeployment(
89-
context, nextDeploymentType, lastCheckpoint, isFirstDeployment, bgMeta);
89+
context,
90+
nextBlueGreenDeploymentType,
91+
lastCheckpoint,
92+
isFirstDeployment,
93+
bgMeta);
9094

9195
deployCluster(context, flinkDeployment);
9296

@@ -100,35 +104,44 @@ public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
100104
* Checks if a full transition can be initiated and initiates it if conditions are met.
101105
*
102106
* @param context the transition context
103-
* @param currentDeploymentType the current deployment type
107+
* @param currentBlueGreenDeploymentType the current deployment type
104108
* @return UpdateControl for the deployment
105109
*/
106110
public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
107-
BlueGreenContext context, DeploymentType currentDeploymentType) {
111+
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {
108112
BlueGreenDiffType specDiff = getSpecDiff(context);
109113

110114
if (specDiff != BlueGreenDiffType.IGNORE) {
111115
FlinkDeployment currentFlinkDeployment =
112-
context.getDeploymentByType(currentDeploymentType);
116+
context.getDeploymentByType(currentBlueGreenDeploymentType);
113117

114118
if (isFlinkDeploymentReady(currentFlinkDeployment)) {
115119
if (specDiff == BlueGreenDiffType.TRANSITION) {
116120
if (handleSavepoint(context, currentFlinkDeployment)) {
117121
// Spec is intentionally not marked as reconciled here to allow
118122
// reprocessing the TRANSITION once savepoint creation completes
119-
var savepointingState = calculateSavepointingState(currentDeploymentType);
123+
var savepointingState =
124+
calculateSavepointingState(currentBlueGreenDeploymentType);
120125
return patchStatusUpdateControl(context, savepointingState, null)
121126
.rescheduleAfter(getReconciliationReschedInterval(context));
122127
}
123128

124129
setLastReconciledSpec(context);
125-
return startTransition(context, currentDeploymentType, currentFlinkDeployment);
130+
return startTransition(
131+
context, currentBlueGreenDeploymentType, currentFlinkDeployment);
126132
} else {
127133
setLastReconciledSpec(context);
128-
return patchFlinkDeployment(context, currentDeploymentType, specDiff);
134+
return patchFlinkDeployment(context, currentBlueGreenDeploymentType, specDiff);
129135
}
130136
} else {
131137
if (context.getDeploymentStatus().getJobStatus().getState() != JobStatus.FAILING) {
138+
LOG.warn(
139+
"Transition to {} not possible, current Flink Deployment '{}' is not READY. FAILING '{}'",
140+
calculateTransition(currentBlueGreenDeploymentType)
141+
.nextBlueGreenDeploymentType,
142+
currentFlinkDeployment.getMetadata().getName(),
143+
context.getBgDeployment().getMetadata().getName());
144+
132145
setLastReconciledSpec(context);
133146
return patchStatusUpdateControl(context, null, JobStatus.FAILING);
134147
}
@@ -140,12 +153,12 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
140153

141154
private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
142155
BlueGreenContext context,
143-
DeploymentType currentDeploymentType,
156+
BlueGreenDeploymentType currentBlueGreenDeploymentType,
144157
BlueGreenDiffType specDiff) {
145158

146159
if (specDiff == BlueGreenDiffType.PATCH_CHILD) {
147160
FlinkDeployment nextFlinkDeployment =
148-
context.getDeploymentByType(currentDeploymentType);
161+
context.getDeploymentByType(currentBlueGreenDeploymentType);
149162

150163
nextFlinkDeployment.setSpec(
151164
context.getBgDeployment().getSpec().getTemplate().getSpec());
@@ -155,39 +168,42 @@ private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
155168

156169
return patchStatusUpdateControl(
157170
context,
158-
calculatePatchingState(currentDeploymentType),
171+
calculatePatchingState(currentBlueGreenDeploymentType),
159172
JobStatus.RECONCILING)
160173
.rescheduleAfter(BlueGreenUtils.getReconciliationReschedInterval(context));
161174
}
162175

163176
private UpdateControl<FlinkBlueGreenDeployment> startTransition(
164177
BlueGreenContext context,
165-
DeploymentType currentDeploymentType,
178+
BlueGreenDeploymentType currentBlueGreenDeploymentType,
166179
FlinkDeployment currentFlinkDeployment) {
167-
DeploymentTransition transition = calculateTransition(currentDeploymentType);
180+
DeploymentTransition transition = calculateTransition(currentBlueGreenDeploymentType);
168181

169182
Savepoint lastCheckpoint = configureInitialSavepoint(context, currentFlinkDeployment);
170183

171184
return initiateDeployment(
172185
context,
173-
transition.nextDeploymentType,
186+
transition.nextBlueGreenDeploymentType,
174187
transition.nextState,
175188
lastCheckpoint,
176189
false);
177190
}
178191

179-
private DeploymentTransition calculateTransition(DeploymentType currentType) {
180-
if (DeploymentType.BLUE == currentType) {
192+
private DeploymentTransition calculateTransition(BlueGreenDeploymentType currentType) {
193+
if (BlueGreenDeploymentType.BLUE == currentType) {
181194
return new DeploymentTransition(
182-
DeploymentType.GREEN, FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN);
195+
BlueGreenDeploymentType.GREEN,
196+
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN);
183197
} else {
184198
return new DeploymentTransition(
185-
DeploymentType.BLUE, FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE);
199+
BlueGreenDeploymentType.BLUE,
200+
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE);
186201
}
187202
}
188203

189-
private FlinkBlueGreenDeploymentState calculatePatchingState(DeploymentType currentType) {
190-
if (DeploymentType.BLUE == currentType) {
204+
private FlinkBlueGreenDeploymentState calculatePatchingState(
205+
BlueGreenDeploymentType currentType) {
206+
if (BlueGreenDeploymentType.BLUE == currentType) {
191207
return FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE;
192208
} else {
193209
return FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN;
@@ -197,12 +213,12 @@ private FlinkBlueGreenDeploymentState calculatePatchingState(DeploymentType curr
197213
// ==================== Savepointing Methods ====================
198214

199215
public boolean monitorSavepoint(
200-
BlueGreenContext context, DeploymentType currentDeploymentType) {
216+
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {
201217

202218
FlinkResourceContext<FlinkDeployment> ctx =
203219
context.getCtxFactory()
204220
.getResourceContext(
205-
context.getDeploymentByType(currentDeploymentType),
221+
context.getDeploymentByType(currentBlueGreenDeploymentType),
206222
context.getJosdkContext());
207223

208224
String savepointTriggerId = context.getDeploymentStatus().getSavepointTriggerId();
@@ -272,8 +288,9 @@ private boolean handleSavepoint(
272288
return false;
273289
}
274290

275-
private FlinkBlueGreenDeploymentState calculateSavepointingState(DeploymentType currentType) {
276-
if (DeploymentType.BLUE == currentType) {
291+
private FlinkBlueGreenDeploymentState calculateSavepointingState(
292+
BlueGreenDeploymentType currentType) {
293+
if (BlueGreenDeploymentType.BLUE == currentType) {
277294
return FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE;
278295
} else {
279296
return FlinkBlueGreenDeploymentState.SAVEPOINTING_GREEN;
@@ -286,13 +303,14 @@ private FlinkBlueGreenDeploymentState calculateSavepointingState(DeploymentType
286303
* Monitors an ongoing Blue/Green deployment transition.
287304
*
288305
* @param context the transition context
289-
* @param currentDeploymentType the current deployment type being transitioned from
306+
* @param currentBlueGreenDeploymentType the current deployment type being transitioned from
290307
* @return UpdateControl for the transition
291308
*/
292309
public UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
293-
BlueGreenContext context, DeploymentType currentDeploymentType) {
310+
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {
294311

295-
TransitionState transitionState = determineTransitionState(context, currentDeploymentType);
312+
TransitionState transitionState =
313+
determineTransitionState(context, currentBlueGreenDeploymentType);
296314

297315
handleSpecChangesDuringTransition(context, transitionState);
298316

@@ -333,10 +351,10 @@ private void handleSpecChangesDuringTransition(
333351
}
334352

335353
private TransitionState determineTransitionState(
336-
BlueGreenContext context, DeploymentType currentDeploymentType) {
354+
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {
337355
TransitionState transitionState;
338356

339-
if (DeploymentType.BLUE == currentDeploymentType) {
357+
if (BlueGreenDeploymentType.BLUE == currentBlueGreenDeploymentType) {
340358
transitionState =
341359
new TransitionState(
342360
context.getBlueDeployment(), // currentDeployment
@@ -355,7 +373,7 @@ private TransitionState determineTransitionState(
355373
"Target Dependent Deployment resource not found. Blue/Green deployment name: "
356374
+ context.getDeploymentName()
357375
+ ", current deployment type: "
358-
+ currentDeploymentType);
376+
+ currentBlueGreenDeploymentType);
359377

360378
return transitionState;
361379
}
@@ -544,7 +562,7 @@ public static UpdateControl<FlinkBlueGreenDeployment> patchStatusUpdateControl(
544562
@Getter
545563
@AllArgsConstructor
546564
private static class DeploymentTransition {
547-
final DeploymentType nextDeploymentType;
565+
final BlueGreenDeploymentType nextBlueGreenDeploymentType;
548566
final FlinkBlueGreenDeploymentState nextState;
549567
}
550568

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.flink.kubernetes.operator.controller.bluegreen.handlers;
1919

2020
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
21-
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
21+
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
2222
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
2323
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
2424
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
@@ -36,13 +36,13 @@ public ActiveStateHandler(
3636

3737
@Override
3838
public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context) {
39-
DeploymentType currentType = getCurrentDeploymentType();
39+
BlueGreenDeploymentType currentType = getCurrentDeploymentType();
4040
return deploymentService.checkAndInitiateDeployment(context, currentType);
4141
}
4242

43-
private DeploymentType getCurrentDeploymentType() {
43+
private BlueGreenDeploymentType getCurrentDeploymentType() {
4444
return getSupportedState() == FlinkBlueGreenDeploymentState.ACTIVE_BLUE
45-
? DeploymentType.BLUE
46-
: DeploymentType.GREEN;
45+
? BlueGreenDeploymentType.BLUE
46+
: BlueGreenDeploymentType.GREEN;
4747
}
4848
}

0 commit comments

Comments
 (0)