Skip to content

Commit f712125

Browse files
committed
Addressing PR comments. Corrected abort/delay logic. Added the e2e tests to ci.yml. Missing Licenses.
1 parent ec1382f commit f712125

File tree

9 files changed

+65
-13
lines changed

9 files changed

+65
-13
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ jobs:
172172
- test_flink_operator_ha.sh
173173
- test_snapshot.sh
174174
- test_batch_job.sh
175+
- test_bluegreen_laststate.sh
176+
- test_bluegreen_stateless.sh
175177
exclude:
176178
- flink-version: v1_16
177179
test: test_autoscaler.sh

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.ArrayList;
4343
import java.util.List;
4444

45-
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD;
4645
import static org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState.INITIALIZING_BLUE;
4746

4847
/**
@@ -72,8 +71,6 @@ public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueG
7271
private final FlinkResourceContextFactory ctxFactory;
7372
private final BlueGreenStateHandlerRegistry handlerRegistry;
7473

75-
public static long minimumAbortGracePeriodMs = ABORT_GRACE_PERIOD.defaultValue().toMillis();
76-
7774
public FlinkBlueGreenDeploymentController(FlinkResourceContextFactory ctxFactory) {
7875
this.ctxFactory = ctxFactory;
7976
this.handlerRegistry = new BlueGreenStateHandlerRegistry();

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenContext.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.flink.kubernetes.operator.controller.bluegreen;
219

320
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -426,14 +426,15 @@ private UpdateControl<FlinkBlueGreenDeployment> shouldWeAbort(
426426
if (abortTimestamp < System.currentTimeMillis()) {
427427
return abortDeployment(context, nextDeployment, nextState, deploymentName);
428428
} else {
429-
return retryDeployment(context, abortTimestamp, deploymentName);
429+
return retryDeployment(context, deploymentName);
430430
}
431431
}
432432

433433
private UpdateControl<FlinkBlueGreenDeployment> retryDeployment(
434-
BlueGreenContext context, long abortTimestamp, String deploymentName) {
434+
BlueGreenContext context, String deploymentName) {
435+
436+
long delay = getReconciliationReschedInterval(context);
435437

436-
long delay = abortTimestamp - System.currentTimeMillis();
437438
LOG.info(
438439
"FlinkDeployment '{}' not ready yet, retrying in {} seconds.",
439440
deploymentName,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.flink.kubernetes.operator.controller.bluegreen.handlers;
219

320
import org.apache.flink.api.common.JobStatus;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/SavepointingStateHandler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.flink.kubernetes.operator.controller.bluegreen.handlers;
219

320
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.kubernetes.operator.utils.bluegreen;
1919

20-
import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentController;
2120
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
2221

2322
import java.time.Instant;
@@ -90,8 +89,10 @@ public static long getAbortGracePeriod(BlueGreenContext context) {
9089
long abortGracePeriod =
9190
BlueGreenSpecUtils.getConfigOption(context.getBgDeployment(), ABORT_GRACE_PERIOD)
9291
.toMillis();
93-
return Math.max(
94-
abortGracePeriod, FlinkBlueGreenDeploymentController.minimumAbortGracePeriodMs);
92+
return abortGracePeriod;
93+
// return Math.max(
94+
// abortGracePeriod,
95+
// FlinkBlueGreenDeploymentController.minimumAbortGracePeriodMs);
9596
}
9697

9798
/**

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public class FlinkBlueGreenDeploymentControllerTest {
8484
private static final String CUSTOM_CONFIG_FIELD = "custom-configuration-field";
8585
private static final int DEFAULT_DELETION_DELAY_VALUE = 500;
8686
private static final int ALT_DELETION_DELAY_VALUE = 1000;
87+
private static final int MINIMUM_ABORT_GRACE_PERIOD = 1000;
8788
private static final String TEST_CHECKPOINT_PATH = "/tmp/checkpoints";
8889
private static final String TEST_INITIAL_SAVEPOINT_PATH = "/tmp/savepoints";
8990
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
@@ -240,7 +241,7 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
240241

241242
// Overriding the maxNumRetries and Reschedule Interval
242243
var abortGracePeriodMs = 1200;
243-
var reconciliationReschedulingIntervalMs = 5000;
244+
var reconciliationReschedulingIntervalMs = 3000;
244245
Map<String, String> configuration =
245246
blueGreenDeployment.getSpec().getTemplate().getConfiguration();
246247
configuration.put(ABORT_GRACE_PERIOD.key(), String.valueOf(abortGracePeriodMs));
@@ -280,7 +281,7 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
280281
assertFalse(rs.updateControl.isPatchResource());
281282
assertTrue(rs.updateControl.getScheduleDelay().isPresent());
282283
reschedDelayMs = rs.updateControl.getScheduleDelay().get();
283-
assertTrue(reschedDelayMs < abortGracePeriodMs && reschedDelayMs > 0);
284+
assertTrue(reschedDelayMs == reconciliationReschedulingIntervalMs && reschedDelayMs > 0);
284285
assertTrue(
285286
instantStrToMillis(rs.reconciledStatus.getAbortTimestamp())
286287
> System.currentTimeMillis());
@@ -365,7 +366,7 @@ public void verifyFailureBeforeFirstDeployment(FlinkVersion flinkVersion) throws
365366
var rs = initialPhaseBasicDeployment(blueGreenDeployment, false);
366367

367368
// Simulating the job did not start correctly before the AbortGracePeriodMs
368-
Thread.sleep(FlinkBlueGreenDeploymentController.minimumAbortGracePeriodMs);
369+
Thread.sleep(MINIMUM_ABORT_GRACE_PERIOD);
369370

370371
rs = reconcile(rs.deployment);
371372

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkBlueGreenDeploymentController.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public TestingFlinkBlueGreenDeploymentController(
5151
null);
5252

5353
flinkBlueGreenDeploymentController = new FlinkBlueGreenDeploymentController(contextFactory);
54-
flinkBlueGreenDeploymentController.minimumAbortGracePeriodMs = 1000;
5554
}
5655

5756
@Override

0 commit comments

Comments
 (0)