Skip to content

Commit 79d75bb

Browse files
committed
Optimizing the State Handling
1 parent 05e9425 commit 79d75bb

File tree

12 files changed

+96
-108
lines changed

12 files changed

+96
-108
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2222
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
2323
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
24-
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenStateHandlerRegistry;
2524
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
26-
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
2725
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
26+
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenStateHandlerRegistry;
27+
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
2828
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
2929

3030
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
@@ -95,8 +95,8 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
9595
josdkContext,
9696
null,
9797
ctxFactory);
98-
return BlueGreenDeploymentService
99-
.patchStatusUpdateControl(context, INITIALIZING_BLUE, null)
98+
return BlueGreenDeploymentService.patchStatusUpdateControl(
99+
context, INITIALIZING_BLUE, null)
100100
.rescheduleAfter(100);
101101
} else {
102102
FlinkBlueGreenDeploymentState currentState = deploymentStatus.getBlueGreenState();
@@ -118,7 +118,7 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
118118

119119
BlueGreenStateHandler handler = handlerRegistry.getHandler(currentState);
120120
return handler.handle(context);
121-
// return stateMachine.processState(context);
121+
// return stateMachine.processState(context);
122122
}
123123
}
124124

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenContext.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package org.apache.flink.kubernetes.operator.controller.bluegreen;
22

3-
import io.javaoperatorsdk.operator.api.reconciler.Context;
4-
import lombok.Getter;
5-
import lombok.RequiredArgsConstructor;
63
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
74
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
85
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
96
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
107
import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
118
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
129

10+
import io.javaoperatorsdk.operator.api.reconciler.Context;
11+
import lombok.Getter;
12+
import lombok.RequiredArgsConstructor;
13+
1314
/**
1415
* Simplified context object containing all the necessary state and dependencies for Blue/Green
1516
* deployment state transitions.

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
107107
lastCheckpoint,
108108
false);
109109
} else {
110-
if (context.getDeploymentStatus().getJobStatus().getState()
111-
!= JobStatus.FAILING) {
110+
if (context.getDeploymentStatus().getJobStatus().getState() != JobStatus.FAILING) {
112111
return patchStatusUpdateControl(context, null, JobStatus.FAILING);
113112
}
114113
}
@@ -168,15 +167,17 @@ private TransitionState determineTransitionState(
168167
TransitionState transitionState;
169168

170169
if (DeploymentType.BLUE == currentDeploymentType) {
171-
transitionState = new TransitionState(
172-
context.getBlueDeployment(), // currentDeployment
173-
context.getGreenDeployment(), // nextDeployment
174-
FlinkBlueGreenDeploymentState.ACTIVE_GREEN); // next State
170+
transitionState =
171+
new TransitionState(
172+
context.getBlueDeployment(), // currentDeployment
173+
context.getGreenDeployment(), // nextDeployment
174+
FlinkBlueGreenDeploymentState.ACTIVE_GREEN); // next State
175175
} else {
176-
transitionState = new TransitionState(
177-
context.getGreenDeployment(), // currentDeployment
178-
context.getBlueDeployment(), // nextDeployment
179-
FlinkBlueGreenDeploymentState.ACTIVE_BLUE); // next State
176+
transitionState =
177+
new TransitionState(
178+
context.getGreenDeployment(), // currentDeployment
179+
context.getBlueDeployment(), // nextDeployment
180+
FlinkBlueGreenDeploymentState.ACTIVE_BLUE); // next State
180181
}
181182

182183
Preconditions.checkNotNull(
@@ -263,8 +264,7 @@ private UpdateControl<FlinkBlueGreenDeployment> shouldWeAbort(
263264
FlinkBlueGreenDeploymentState nextState) {
264265

265266
String deploymentName = nextDeployment.getMetadata().getName();
266-
long abortTimestamp =
267-
instantStrToMillis(context.getDeploymentStatus().getAbortTimestamp());
267+
long abortTimestamp = instantStrToMillis(context.getDeploymentStatus().getAbortTimestamp());
268268

269269
if (abortTimestamp == 0) {
270270
throw new IllegalStateException("Unexpected abortTimestamp == 0");
@@ -337,10 +337,7 @@ private static FlinkBlueGreenDeploymentState getPreviousState(
337337
public UpdateControl<FlinkBlueGreenDeployment> finalizeBlueGreenDeployment(
338338
BlueGreenContext context, FlinkBlueGreenDeploymentState nextState) {
339339

340-
LOG.info(
341-
"Finalizing deployment '{}' to {} state",
342-
context.getDeploymentName(),
343-
nextState);
340+
LOG.info("Finalizing deployment '{}' to {} state", context.getDeploymentName(), nextState);
344341

345342
context.getDeploymentStatus().setDeploymentReadyTimestamp(millisToInstantStr(0));
346343
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
@@ -398,4 +395,4 @@ private static class TransitionState {
398395
this.nextState = nextState;
399396
}
400397
}
401-
}
398+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenStateHandlerRegistry.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,10 @@
1818
package org.apache.flink.kubernetes.operator.controller.bluegreen;
1919

2020
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
21-
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.ActiveBlueStateHandler;
22-
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.ActiveGreenStateHandler;
21+
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.ActiveStateHandler;
2322
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
2423
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.InitializingBlueStateHandler;
25-
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.TransitioningToBlueStateHandler;
26-
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.TransitioningToGreenStateHandler;
27-
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
24+
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.TransitioningStateHandler;
2825

2926
import java.util.Map;
3027

@@ -47,12 +44,14 @@ public BlueGreenStateHandlerRegistry() {
4744
this.handlers =
4845
Map.of(
4946
INITIALIZING_BLUE, new InitializingBlueStateHandler(deploymentService),
50-
ACTIVE_BLUE, new ActiveBlueStateHandler(deploymentService),
51-
ACTIVE_GREEN, new ActiveGreenStateHandler(deploymentService),
47+
ACTIVE_BLUE, new ActiveStateHandler(ACTIVE_BLUE, deploymentService),
48+
ACTIVE_GREEN, new ActiveStateHandler(ACTIVE_GREEN, deploymentService),
5249
TRANSITIONING_TO_BLUE,
53-
new TransitioningToBlueStateHandler(deploymentService),
50+
new TransitioningStateHandler(
51+
TRANSITIONING_TO_BLUE, deploymentService),
5452
TRANSITIONING_TO_GREEN,
55-
new TransitioningToGreenStateHandler(deploymentService));
53+
new TransitioningStateHandler(
54+
TRANSITIONING_TO_GREEN, deploymentService));
5655
}
5756

5857
/**

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/AbstractBlueGreenStateHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.flink.kubernetes.operator.controller.bluegreen.handlers;
1919

2020
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
21-
2221
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
22+
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveBlueStateHandler.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveGreenStateHandler.java

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,24 @@
2525

2626
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
2727

28-
/** State handler for the TRANSITIONING_TO_GREEN state. */
29-
public class TransitioningToGreenStateHandler extends AbstractBlueGreenStateHandler {
28+
/** Consolidated state handler for both ACTIVE_BLUE and ACTIVE_GREEN states. */
29+
public class ActiveStateHandler extends AbstractBlueGreenStateHandler {
3030

31-
public TransitioningToGreenStateHandler(BlueGreenDeploymentService deploymentService) {
32-
super(FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN, deploymentService);
31+
public ActiveStateHandler(
32+
FlinkBlueGreenDeploymentState supportedState,
33+
BlueGreenDeploymentService deploymentService) {
34+
super(supportedState, deploymentService);
3335
}
3436

3537
@Override
3638
public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context) {
37-
return deploymentService.monitorTransition(context, DeploymentType.BLUE);
39+
DeploymentType currentType = getCurrentDeploymentType();
40+
return deploymentService.checkAndInitiateDeployment(context, currentType);
41+
}
42+
43+
private DeploymentType getCurrentDeploymentType() {
44+
return getSupportedState() == FlinkBlueGreenDeploymentState.ACTIVE_BLUE
45+
? DeploymentType.BLUE
46+
: DeploymentType.GREEN;
3847
}
3948
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.controller.bluegreen.handlers;
19+
20+
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
21+
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
22+
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
23+
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
24+
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
25+
26+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
27+
28+
/** Consolidated state handler for both TRANSITIONING_TO_BLUE and TRANSITIONING_TO_GREEN states. */
29+
public class TransitioningStateHandler extends AbstractBlueGreenStateHandler {
30+
31+
public TransitioningStateHandler(
32+
FlinkBlueGreenDeploymentState supportedState,
33+
BlueGreenDeploymentService deploymentService) {
34+
super(supportedState, deploymentService);
35+
}
36+
37+
@Override
38+
public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context) {
39+
DeploymentType currentType = getCurrentDeploymentType();
40+
return deploymentService.monitorTransition(context, currentType);
41+
}
42+
43+
private DeploymentType getCurrentDeploymentType() {
44+
return getSupportedState() == FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE
45+
? DeploymentType.GREEN // Transitioning FROM green TO blue
46+
: DeploymentType.BLUE; // Transitioning FROM blue TO green
47+
}
48+
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/TransitioningToBlueStateHandler.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

0 commit comments

Comments
 (0)