Skip to content

Commit 9e83428

Browse files
authored
Ensure process groups are removed from the pending restart list if they are stuck in terminating or the process is missing (#2325)
* Ensure process groups are removed from the pending restart list if they are stuck in terminating or the process is missing
1 parent 9db598c commit 9e83428

File tree

6 files changed

+141
-11
lines changed

6 files changed

+141
-11
lines changed

e2e/test_operator/operator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1789,7 +1789,7 @@ var _ = Describe("Operator", Label("e2e", "pr"), func() {
17891789
fdbCluster.GetCluster().Status.ProcessGroups,
17901790
processGroupID,
17911791
)
1792-
}).WithTimeout(5 * time.Minute).WithPolling(5 * time.Second).Should(BeNil())
1792+
}).WithTimeout(10 * time.Minute).WithPolling(5 * time.Second).Should(BeNil())
17931793

17941794
// Make sure the Pod is actually deleted after some time.
17951795
Eventually(func() bool {

e2e/test_operator_ha_upgrades/operator_ha_upgrade_test.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,20 +388,55 @@ var _ = Describe("Operator HA Upgrades", Label("e2e", "pr"), func() {
388388
log.Println("Ensure cluster(s) are not upgraded")
389389
fdbCluster.VerifyVersion(beforeVersion)
390390
} else {
391-
// If we do a version compatible upgrade, ensure the partition is present for 2 minutes.
392-
time.Sleep(2 * time.Minute)
391+
// If we do a version compatible upgrade, ensure the partition is present for 30 seconds.
392+
time.Sleep(30 * time.Second)
393393
}
394394

395395
log.Println("Restoring connectivity")
396396
factory.DeleteChaosMeshExperimentSafe(partitionExperiment)
397397

398+
// When using protocol compatible versions, the other operator instances are able to move forward. In some
399+
// cases it can happen that new coordinators are selected and all the old coordinators are deleted. In this
400+
// case the remote satellite operator will not be able to connect to the cluster anymore and needs an
401+
// update to the connection string.
402+
if fixtures.VersionsAreProtocolCompatible(beforeVersion, targetVersion) {
403+
Eventually(func(g Gomega) {
404+
currentConnectionString := fdbCluster.GetPrimary().
405+
GetStatus().
406+
Cluster.ConnectionString
407+
remoteSat := fdbCluster.GetRemoteSatellite()
408+
remoteConnectionString := remoteSat.GetCluster().Status.ConnectionString
409+
410+
// If the connection string is different we have to update it on the remote satellite side
411+
// as the operator instances were partitioned.
412+
if currentConnectionString != remoteConnectionString {
413+
if !remoteSat.GetCluster().Spec.Skip {
414+
remoteSat.SetSkipReconciliation(true)
415+
// Wait one minute, that should be enough time for the operator to end the reconciliation loop
416+
// if started.
417+
time.Sleep(1 * time.Minute)
418+
}
419+
420+
remoteSatStatus := remoteSat.GetCluster().Status.DeepCopy()
421+
remoteSatStatus.ConnectionString = currentConnectionString
422+
fdbCluster.GetRemoteSatellite().
423+
UpdateClusterStatusWithStatus(remoteSatStatus)
424+
}
425+
426+
g.Expect(remoteConnectionString).To(Equal(currentConnectionString))
427+
}).WithTimeout(5 * time.Minute).WithPolling(15 * time.Second).Should(Succeed())
428+
}
429+
398430
// Delete the operator Pods to ensure they pick up the work directly otherwise it could take a long time
399431
// until the operator tries to reconcile the cluster again. If the operator is not able to reconcile a
400432
// cluster it will be put into a queue again, at some time the queue will delay the next reconcile attempt
401433
// for a long time and since the network partition is not emitting any events for the operator this won't trigger
402434
// a reconciliation either. So this step is only to speed up the reconcile process.
403435
factory.RecreateOperatorPods(fdbCluster.GetRemoteSatellite().Namespace())
404436

437+
// Ensure that the remote satellite is not set to skip.
438+
fdbCluster.GetRemoteSatellite().SetSkipReconciliation(false)
439+
405440
// Upgrade should make progress now - wait until all processes have upgraded
406441
// to "targetVersion".
407442
fdbCluster.VerifyVersion(targetVersion)

e2e/test_operator_upgrades/operator_upgrades_test.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
510510
)
511511

512512
DescribeTable(
513-
"one process is marked for removal",
513+
"one process is marked for removal and is stuck in removal",
514514
func(beforeVersion string, targetVersion string) {
515515
if fixtures.VersionsAreProtocolCompatible(beforeVersion, targetVersion) {
516516
Skip("this test only affects version incompatible upgrades")
@@ -555,13 +555,30 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
555555
var processesToUpdate int
556556

557557
cluster := fdbCluster.GetCluster()
558-
559558
for _, processGroup := range cluster.Status.ProcessGroups {
560559
if processGroup.ProcessGroupID == processGroupMarkedForRemoval {
561560
continue
562561
}
563562

564563
if len(processGroup.ProcessGroupConditions) > 0 {
564+
// Ignore process groups that are stuck in terminating.If the global synchronization mode is active
565+
// this will be the case for all the transaction system process groups as one process groups is
566+
// blocked to be removed.
567+
if processGroup.GetConditionTime(fdbv1beta2.ResourcesTerminating) != nil {
568+
log.Println(
569+
"processGroup",
570+
processGroup.ProcessGroupID,
571+
"will be ignored since the process group is in terminating",
572+
)
573+
continue
574+
}
575+
576+
log.Println(
577+
"processGroup",
578+
processGroup.ProcessGroupID,
579+
"processes conditions:",
580+
processGroup.ProcessGroupConditions,
581+
)
565582
processesToUpdate++
566583
}
567584
}
@@ -571,6 +588,17 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
571588
return processesToUpdate
572589
}).WithTimeout(30 * time.Minute).WithPolling(5 * time.Second).MustPassRepeatedly(5).Should(BeNumerically("==", 0))
573590

591+
// Remove the buggify option and make sure that the terminating processes are removed.
592+
fdbCluster.SetBuggifyBlockRemoval(nil)
593+
Eventually(func(g Gomega) {
594+
processGroups := fdbCluster.GetCluster().Status.ProcessGroups
595+
596+
for _, processGroup := range processGroups {
597+
g.Expect(processGroup.GetConditionTime(fdbv1beta2.ResourcesTerminating)).
598+
To(BeNil())
599+
}
600+
}).WithTimeout(5 * time.Minute).WithPolling(5 * time.Second).Should(Succeed())
601+
574602
// Make sure the cluster has no data loss.
575603
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
576604
},

e2e/test_operator_velocity/operator_velocity_test.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,11 @@ var _ = Describe("Test Operator Velocity", Label("e2e", "nightly"), func() {
340340
fdbCluster.GetPrimary().ReplacePod(*pod, false)
341341
})
342342

343+
AfterEach(func() {
344+
// Wait until the replaced process group is removed.
345+
Expect(fdbCluster.WaitForReconciliation()).To(Succeed())
346+
})
347+
343348
It("should roll out knob changes within expected time", func() {
344349
Expect(
345350
fdbCluster.SetCustomParameters(
@@ -351,13 +356,23 @@ var _ = Describe("Test Operator Velocity", Label("e2e", "nightly"), func() {
351356
),
352357
).To(Succeed())
353358

359+
knobRolloutTimeout := normalKnobRolloutTimeoutSeconds
360+
// In our testing pipeline we see failures due to the fact that the replacement takes a long time, e.g.
361+
// when a new node must be created for the replaced pod.
362+
if fdbCluster.GetPrimary().
363+
GetCluster().
364+
GetSynchronizationMode() ==
365+
fdbv1beta2.SynchronizationModeLocal {
366+
knobRolloutTimeout += int(
367+
fdbCluster.GetPrimary().GetCluster().GetLockDuration().Seconds() * 2,
368+
)
369+
}
370+
354371
CheckKnobRollout(
355372
fdbCluster,
356373
newGeneralCustomParameters,
357374
newStorageCustomParameters,
358-
normalKnobRolloutTimeoutSeconds+int(
359-
fdbCluster.GetPrimary().GetCluster().GetLockDuration().Seconds(),
360-
),
375+
knobRolloutTimeout,
361376
totalGeneralProcessCount,
362377
totalStorageProcessCount,
363378
)

internal/coordination/coordination.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,6 @@ func UpdateGlobalCoordinationState(
408408
}
409409

410410
processes := GetProcessesFromProcessMap(processGroup.ProcessGroupID, processesMap)
411-
412411
var excluded bool
413412
for _, process := range processes {
414413
excluded = excluded || process.Excluded
@@ -418,33 +417,85 @@ func UpdateGlobalCoordinationState(
418417
// exclusion timestamp set or because the processes are excluded.
419418
if !(processGroup.IsExcluded() || excluded) {
420419
if _, ok := pendingForExclusion[processGroup.ProcessGroupID]; !ok {
420+
logger.V(1).
421+
Info("Adding to pendingForExclusion", "processGroupID", processGroup.ProcessGroupID, "reason", "process group is marked for removal but not excluded")
421422
updatesPendingForExclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
422423
}
423424

424425
if _, ok := pendingForInclusion[processGroup.ProcessGroupID]; !ok {
426+
logger.V(1).
427+
Info("Adding to pendingForInclusion", "processGroupID", processGroup.ProcessGroupID, "reason", "process group is marked for removal but not excluded")
425428
updatesPendingForInclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
426429
}
427430
} else {
431+
reason := "process group is excluded and marked for removal"
428432
// Check if the process group is present in pendingForExclusion or readyForExclusion.
429433
// If so, add them to the set to remove those entries as the process is already excluded.
430434
if _, ok := pendingForExclusion[processGroup.ProcessGroupID]; ok {
435+
logger.V(1).
436+
Info("Removing from pendingForExclusion", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
431437
updatesPendingForExclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
432438
}
433439

434440
if _, ok := readyForExclusion[processGroup.ProcessGroupID]; ok {
441+
logger.V(1).
442+
Info("Removing from readyForExclusion", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
435443
updatesReadyForExclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
436444
}
437445

438446
// Ensure the process is added to the pending for inclusion list.
439447
if _, ok := pendingForInclusion[processGroup.ProcessGroupID]; !ok {
448+
logger.V(1).
449+
Info("Adding to pendingForInclusion", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
440450
updatesPendingForInclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
441451
}
442452

443453
if processGroup.ExclusionSkipped {
444454
if _, ok := readyForInclusion[processGroup.ProcessGroupID]; !ok {
455+
logger.V(1).
456+
Info("Adding to readyForInclusion", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
445457
updatesReadyForInclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
446458
}
447459
}
460+
461+
// if the process group is excluded, we don't need to restart it.
462+
if _, ok := pendingForRestart[processGroup.ProcessGroupID]; ok {
463+
logger.V(1).
464+
Info("Removing from pendingForRestart", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
465+
updatesPendingForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
466+
}
467+
468+
if _, ok := readyForRestart[processGroup.ProcessGroupID]; ok {
469+
logger.V(1).
470+
Info("Removing from readyForRestart", "processGroupID", processGroup.ProcessGroupID, "reason", reason)
471+
updatesReadyForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
472+
}
473+
}
474+
475+
// If the process group is stuck in terminating, we can add it to the ready for inclusion list.
476+
if processGroup.GetConditionTime(fdbv1beta2.ResourcesTerminating) != nil {
477+
if _, ok := pendingForInclusion[processGroup.ProcessGroupID]; !ok {
478+
logger.V(1).
479+
Info("Adding to pendingForInclusion and readyForInclusion", "processGroupID", processGroup.ProcessGroupID, "reason", "process group is marked for removal and in terminating")
480+
updatesPendingForInclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
481+
updatesReadyForInclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
482+
}
483+
484+
// If the process group is marked for removal and the resources are stuck in terminating or the processes are not running, we should
485+
// remove them from the restart list, because there are no processes to restart.
486+
if processGroup.GetConditionTime(fdbv1beta2.MissingProcesses) != nil {
487+
if _, ok := pendingForRestart[processGroup.ProcessGroupID]; ok {
488+
logger.V(1).
489+
Info("Removing from pendingForRestart", "processGroupID", processGroup.ProcessGroupID, "reason", "process group is marked for removal")
490+
updatesPendingForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
491+
}
492+
493+
if _, ok := readyForRestart[processGroup.ProcessGroupID]; ok {
494+
logger.V(1).
495+
Info("Removing from readyForRestart", "processGroupID", processGroup.ProcessGroupID, "reason", "process group is marked for removal")
496+
updatesReadyForRestart[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionDelete
497+
}
498+
}
448499
}
449500

450501
addresses, ok := processAddresses[processGroup.ProcessGroupID]
@@ -457,7 +508,7 @@ func UpdateGlobalCoordinationState(
457508
continue
458509
}
459510

460-
// If the process groups is missing long enough to be ignored, ensure that it's removed from the pending
511+
// If the process group is missing long enough to be ignored, ensure that it's removed from the pending
461512
// and the ready list.
462513
if processGroup.GetConditionTime(fdbv1beta2.IncorrectCommandLine) != nil &&
463514
!restarts.ShouldBeIgnoredBecauseMissing(logger, cluster, processGroup) {

internal/restarts/restarts.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,6 @@ func ShouldBeIgnoredBecauseMissing(
8383
return true
8484
}
8585

86-
return false
86+
// If a process group is stuck in terminating we don't want to block further actions because of that.
87+
return processGroup.GetConditionTime(fdbv1beta2.ResourcesTerminating) != nil
8788
}

0 commit comments

Comments
 (0)