Skip to content

Commit 6ecc28c

Browse files
committed
Addressing the test sync issues that caused intermittent NullPointerExceptions.
1 parent 8addc70 commit 6ecc28c

File tree

1 file changed

+93
-93
lines changed

1 file changed

+93
-93
lines changed

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 93 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ public class FlinkBlueGreenDeploymentControllerTest {
7777
public static final String IMAGE_POLICY = "IfNotPresent";
7878

7979
private static final String CUSTOM_CONFIG_FIELD = "custom-configuration-field";
80-
private static final int ALT_DELAY_VALUE = 1200;
80+
private static final int DEFAULT_DELETION_DELAY_VALUE = 500;
81+
private static final int ALT_DELETION_DELAY_VALUE = 1000;
8182
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
8283
private TestingFlinkService flinkService;
8384
private Context<FlinkBlueGreenDeployment> context;
@@ -107,81 +108,105 @@ public void setup() {
107108
public void verifyBasicDeployment(FlinkVersion flinkVersion) throws Exception {
108109
var blueGreenDeployment =
109110
buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, flinkVersion);
111+
executeBasicDeployment(flinkVersion, blueGreenDeployment, true);
112+
}
113+
114+
private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult
115+
executeBasicDeployment(
116+
FlinkVersion flinkVersion,
117+
FlinkBlueGreenDeployment blueGreenDeployment,
118+
boolean execAssertions)
119+
throws Exception {
110120

111121
// 1. Initiate the Blue deployment
112122
var bgSpecBefore = blueGreenDeployment.getSpec();
113123
Long minReconciliationTs = System.currentTimeMillis() - 1;
114124
var rs = reconcile(blueGreenDeployment);
115125

116-
assertTrue(rs.updateControl.isPatchStatus());
117-
assertTrue(minReconciliationTs < rs.reconciledStatus.getLastReconciledTimestamp());
126+
if (execAssertions) {
127+
assertTrue(rs.updateControl.isPatchStatus());
128+
assertTrue(minReconciliationTs < rs.reconciledStatus.getLastReconciledTimestamp());
118129

119-
// check the status (reconciled spec, reconciled ts, a/b state)
120-
assertEquals(
121-
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE,
122-
rs.reconciledStatus.getBlueGreenState());
123-
assertNotNull(rs.reconciledStatus.getLastReconciledSpec());
124-
assertNull(rs.reconciledStatus.getJobStatus().getState());
125-
assertEquals(0, rs.reconciledStatus.getDeploymentReadyTimestamp());
130+
// check the status (reconciled spec, reconciled ts, a/b state)
131+
assertEquals(
132+
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE,
133+
rs.reconciledStatus.getBlueGreenState());
134+
assertNotNull(rs.reconciledStatus.getLastReconciledSpec());
135+
assertNull(rs.reconciledStatus.getJobStatus().getState());
136+
assertEquals(0, rs.reconciledStatus.getDeploymentReadyTimestamp());
137+
}
126138

127-
var flinkDeploymentList = getFlinkDeployments();
128-
assertEquals(1, flinkDeploymentList.size());
129-
var deploymentA = flinkDeploymentList.get(0);
139+
var flinkDeployments = getFlinkDeployments();
140+
var deploymentA = flinkDeployments.get(0);
130141

131-
verifyOwnerReferences(rs.deployment, deploymentA);
142+
if (execAssertions) {
143+
assertEquals(1, flinkDeployments.size());
144+
verifyOwnerReferences(rs.deployment, deploymentA);
145+
}
132146

133147
simulateSubmitAndSuccessfulJobStart(deploymentA);
134148

135149
// 2. Mark the Blue deployment ready
136150
rs = reconcile(rs.deployment);
137151

152+
// 3. Logic for the deployment to get deleted
153+
assertDeploymentDeleted(rs, DEFAULT_DELETION_DELAY_VALUE, bgSpecBefore);
154+
155+
// 4. Finalize the Blue deployment
156+
minReconciliationTs = System.currentTimeMillis() - 1;
157+
rs = reconcile(rs.deployment);
158+
159+
if (execAssertions) {
160+
assertEquals(JobStatus.RUNNING, rs.reconciledStatus.getJobStatus().getState());
161+
assertTrue(minReconciliationTs < rs.reconciledStatus.getLastReconciledTimestamp());
162+
assertEquals(0, rs.reconciledStatus.getDeploymentReadyTimestamp());
163+
assertEquals(
164+
FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
165+
rs.reconciledStatus.getBlueGreenState());
166+
167+
// 5. Subsequent reconciliation calls = NO-OP
168+
var rs2 = reconcile(rs.deployment);
169+
assertTrue(rs2.updateControl.isNoUpdate());
170+
}
171+
172+
return rs;
173+
}
174+
175+
private void assertDeploymentDeleted(
176+
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs,
177+
long expectedDeletionDelay,
178+
FlinkBlueGreenDeploymentSpec bgSpecBefore)
179+
throws Exception {
180+
var deletionDelay = rs.updateControl.getScheduleDelay().get();
181+
138182
assertTrue(rs.updateControl.isPatchStatus());
183+
assertEquals(expectedDeletionDelay, deletionDelay);
139184
assertTrue(rs.reconciledStatus.getDeploymentReadyTimestamp() > 0);
140185
assertEquals(
141186
SpecUtils.serializeObject(bgSpecBefore, "spec"),
142187
rs.reconciledStatus.getLastReconciledSpec());
143188

144-
// 3. Finalize the Blue deployment
145-
minReconciliationTs = System.currentTimeMillis() - 1;
146-
rs = reconcile(rs.deployment);
147-
148-
assertEquals(JobStatus.RUNNING, rs.reconciledStatus.getJobStatus().getState());
149-
assertTrue(minReconciliationTs < rs.reconciledStatus.getLastReconciledTimestamp());
150-
assertEquals(0, rs.reconciledStatus.getDeploymentReadyTimestamp());
151-
assertEquals(
152-
FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState());
189+
// A reconciliation before the deletion delay has expired should result in no-op
190+
var rs2 = reconcile(rs.deployment);
191+
var remainingDeletionDelay = rs2.updateControl.getScheduleDelay().get();
192+
assertTrue(remainingDeletionDelay < expectedDeletionDelay);
193+
assertTrue(rs2.updateControl.isNoUpdate());
153194

154-
// 4. Subsequent reconciliation calls = NO-OP
155-
rs = reconcile(rs.deployment);
156-
assertTrue(rs.updateControl.isNoUpdate());
195+
Thread.sleep(remainingDeletionDelay);
157196
}
158197

159198
@ParameterizedTest
160199
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
161200
public void verifyBasicTransition(FlinkVersion flinkVersion) throws Exception {
162201
var blueGreenDeployment =
163202
buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, flinkVersion);
203+
var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false);
164204

165-
// 1. Initiate the Blue deployment
166-
var rs = reconcile(blueGreenDeployment);
167-
168-
// 2. Mark the Blue deployment ready
169-
simulateSubmitAndSuccessfulJobStart(getFlinkDeployments().get(0));
170-
rs = reconcile(rs.deployment);
171-
assertTrue(rs.reconciledStatus.getDeploymentReadyTimestamp() > 0);
172-
173-
// 3. Finalize the Blue deployment
174-
rs = reconcile(rs.deployment);
175-
176-
// Verify noUpdate if reconciliation is triggered without a spec change
177-
var rs2 = reconcile(rs.deployment);
178-
assertTrue(rs2.updateControl.isNoUpdate());
179-
180-
// 3. Simulate a change in the spec to trigger a Green deployment
205+
// Simulate a change in the spec to trigger a Green deployment
181206
String customValue = UUID.randomUUID().toString();
182-
simulateChangeInSpec(rs.deployment, customValue, ALT_DELAY_VALUE);
207+
simulateChangeInSpec(rs.deployment, customValue, ALT_DELETION_DELAY_VALUE);
183208

184-
// 4. Transitioning to the Green deployment
209+
// Transitioning to the Green deployment
185210
var bgUpdatedSpec = rs.deployment.getSpec();
186211
testTransitionToGreen(rs, customValue, bgUpdatedSpec);
187212
}
@@ -191,23 +216,26 @@ private void testTransitionToGreen(
191216
String customValue,
192217
FlinkBlueGreenDeploymentSpec bgUpdatedSpec)
193218
throws Exception {
194-
// Initiating Green deployment
219+
220+
// Initiate the Green deployment
195221
Long minReconciliationTs = System.currentTimeMillis() - 1;
222+
var bgSpecBefore = rs.deployment.getSpec();
196223
rs = reconcile(rs.deployment);
197224

198225
var flinkDeployments = getFlinkDeployments();
199226
var greenDeploymentName = flinkDeployments.get(1).getMetadata().getName();
200227

201228
assertTrue(rs.updateControl.isPatchStatus());
202229
assertTrue(minReconciliationTs < rs.reconciledStatus.getLastReconciledTimestamp());
203-
204230
assertEquals(2, flinkDeployments.size());
205231
assertNull(flinkDeployments.get(0).getSpec().getJob().getInitialSavepointPath());
206232
assertNotNull(flinkDeployments.get(1).getSpec().getJob().getInitialSavepointPath());
207233

208234
assertEquals(
209235
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN,
210236
rs.reconciledStatus.getBlueGreenState());
237+
assertNotNull(rs.reconciledStatus.getLastReconciledSpec());
238+
assertEquals(0, rs.reconciledStatus.getDeploymentReadyTimestamp());
211239
assertEquals(
212240
customValue,
213241
rs.deployment
@@ -216,22 +244,13 @@ private void testTransitionToGreen(
216244
.getSpec()
217245
.getFlinkConfiguration()
218246
.get(CUSTOM_CONFIG_FIELD));
219-
assertNotNull(rs.reconciledStatus.getLastReconciledSpec());
220247

221-
// New Green deployment successfully started
248+
// Initiate and mark the Green deployment ready
222249
simulateSuccessfulJobStart(getFlinkDeployments().get(1));
223-
224-
// New Green deployment marked ready
225250
rs = reconcile(rs.deployment);
226-
assertEquals(ALT_DELAY_VALUE, rs.updateControl.getScheduleDelay().get());
227251

228-
// Subsequent reconciliation calls = NO-OP and the delay to delete should be decreasing
229-
var rs2 = reconcile(rs.deployment);
230-
assertTrue(rs2.updateControl.isNoUpdate());
231-
Long currentDelay = rs2.updateControl.getScheduleDelay().get();
232-
assertTrue(currentDelay < ALT_DELAY_VALUE && currentDelay > 0);
233-
234-
Thread.sleep(ALT_DELAY_VALUE);
252+
// Logic for the deployment to get deleted
253+
assertDeploymentDeleted(rs, ALT_DELETION_DELAY_VALUE, bgSpecBefore);
235254

236255
// Calling the rescheduled reconciliation (will delete the deployment)
237256
reconcile(rs.deployment);
@@ -262,25 +281,15 @@ private void testTransitionToGreen(
262281
public void verifyFailureBeforeTransition(FlinkVersion flinkVersion) throws Exception {
263282
var blueGreenDeployment =
264283
buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, flinkVersion);
265-
var originalSpec = blueGreenDeployment.getSpec();
266-
267-
// 1. Initiate the Green deployment
268-
var rs = reconcile(blueGreenDeployment);
284+
var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false);
269285

270-
// 2a. Mark the Green deployment ready
271-
simulateSubmitAndSuccessfulJobStart(getFlinkDeployments().get(0));
272-
rs = reconcile(rs.deployment);
273-
274-
// 2b. Finalizing the Green deployment
275-
rs = reconcile(rs.deployment);
276-
277-
// 3. Simulate a change in the spec to trigger a Blue deployment
286+
// Simulate a change in the spec to trigger a Blue deployment
278287
simulateChangeInSpec(rs.deployment, UUID.randomUUID().toString(), 0);
279288

280289
// Simulate a failure in the running deployment
281290
simulateJobFailure(getFlinkDeployments().get(0));
282291

283-
// 4. Initiate the Blue deployment
292+
// Initiate the Green deployment
284293
var minReconciliationTs = System.currentTimeMillis() - 1;
285294
rs = reconcile(rs.deployment);
286295

@@ -298,7 +307,7 @@ public void verifyFailureBeforeTransition(FlinkVersion flinkVersion) throws Exce
298307
ReconciliationState.UPGRADING,
299308
flinkDeployments.get(0).getStatus().getReconciliationStatus().getState());
300309

301-
// 5. No update
310+
// No update
302311
rs = reconcile(rs.deployment);
303312
assertTrue(rs.updateControl.isNoUpdate());
304313
}
@@ -308,7 +317,6 @@ public void verifyFailureBeforeTransition(FlinkVersion flinkVersion) throws Exce
308317
public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exception {
309318
var blueGreenDeployment =
310319
buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, flinkVersion);
311-
var originalSpec = blueGreenDeployment.getSpec();
312320

313321
// Overriding the maxNumRetries and Reschedule Interval
314322
var maxNumRetries = 2;
@@ -319,21 +327,13 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
319327
.getTemplate()
320328
.setReconciliationReschedulingIntervalMs(reconciliationReschedulingIntervalMs);
321329

322-
// 1. Initiate the Green deployment
323-
var rs = reconcile(blueGreenDeployment);
330+
var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false);
324331

325-
// 2a. Mark the Green deployment ready
326-
simulateSubmitAndSuccessfulJobStart(getFlinkDeployments().get(0));
327-
rs = reconcile(rs.deployment);
328-
329-
// 2b. Finalize the Green deployment
330-
rs = reconcile(rs.deployment);
331-
332-
// 3. Simulate a change in the spec to trigger a Blue deployment
332+
// Simulate a change in the spec to trigger a Blue deployment
333333
String customValue = UUID.randomUUID().toString();
334334
simulateChangeInSpec(rs.deployment, customValue, 0);
335335

336-
// 4. Initiate the Blue deployment
336+
// Initiate the Green deployment
337337
rs = reconcile(rs.deployment);
338338

339339
// We should be TRANSITIONING_TO_GREEN at this point
@@ -349,7 +349,7 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
349349
.getFlinkConfiguration()
350350
.get(CUSTOM_CONFIG_FIELD));
351351

352-
// 4a. Simulating the Blue deployment doesn't start correctly (status will remain the same)
352+
// Simulating the Blue deployment doesn't start correctly (status will remain the same)
353353
// Asserting the status retry count is incremented by 1
354354
for (int i = 1; i <= maxNumRetries; i++) {
355355
Thread.sleep(1);
@@ -364,7 +364,7 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
364364
System.out.println();
365365
}
366366

367-
// 4b. After the retries are exhausted
367+
// After the retries are exhausted
368368
rs = reconcile(rs.deployment);
369369

370370
assertTrue(rs.updateControl.isPatchStatus());
@@ -387,11 +387,11 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
387387
ReconciliationState.UPGRADING,
388388
flinkDeployments.get(1).getStatus().getReconciliationStatus().getState());
389389

390-
// 5. Simulate another change in the spec to trigger a redeployment
390+
// Simulate another change in the spec to trigger a redeployment
391391
customValue = UUID.randomUUID().toString();
392-
simulateChangeInSpec(rs.deployment, customValue, ALT_DELAY_VALUE);
392+
simulateChangeInSpec(rs.deployment, customValue, ALT_DELETION_DELAY_VALUE);
393393

394-
// 6. Initiate the redeployment
394+
// Initiate the redeployment
395395
var bgUpdatedSpec = rs.deployment.getSpec();
396396
testTransitionToGreen(rs, customValue, bgUpdatedSpec);
397397
}
@@ -402,14 +402,14 @@ public void verifySpecChangeDuringTransition(FlinkVersion flinkVersion) throws E
402402
var blueGreenDeployment =
403403
buildSessionCluster(TEST_DEPLOYMENT_NAME, TEST_NAMESPACE, flinkVersion);
404404

405-
// 1. Initiate the Blue deployment
405+
// Initiate the Blue deployment
406406
var originalSpec = blueGreenDeployment.getSpec();
407407
var rs = reconcile(blueGreenDeployment);
408408

409-
// 2. Job starting...
409+
// Job starting...
410410
simulateSubmitAndSuccessfulJobStart(getFlinkDeployments().get(0));
411411

412-
// 3. Simulate a spec change before the transition is complete
412+
// Simulate a spec change before the transition is complete
413413
simulateChangeInSpec(rs.deployment, "MODIFIED_VALUE", 0);
414414
rs = reconcile(rs.deployment);
415415

@@ -548,7 +548,7 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers
548548

549549
var flinkDeploymentTemplateSpec =
550550
FlinkDeploymentTemplateSpec.builder()
551-
.deploymentDeletionDelayMs(1)
551+
.deploymentDeletionDelayMs(DEFAULT_DELETION_DELAY_VALUE)
552552
.maxNumRetries(1)
553553
.reconciliationReschedulingIntervalMs(500)
554554
.spec(flinkDeploymentSpec)

0 commit comments

Comments
 (0)