Skip to content

Commit 8575350

Browse files
authored
Limit range over processes to DC (#1994)
* Limit range over processes to DC (needs removal of notes) --------- Co-authored-by: Nicole Morales <[email protected]>
1 parent d25f068 commit 8575350

File tree

6 files changed

+27
-9
lines changed

6 files changed

+27
-9
lines changed

api/v1beta2/foundationdbcluster_types.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2919,3 +2919,15 @@ func (cluster *FoundationDBCluster) GetProcessGroupID(processClass ProcessClass,
29192919
func (cluster *FoundationDBCluster) IsPodIPFamily6() bool {
29202920
return cluster.Spec.Routing.PodIPFamily != nil && *cluster.Spec.Routing.PodIPFamily == 6
29212921
}
2922+
2923+
// ProcessSharesDC returns true if the process's locality matches the cluster's Datacenter.
2924+
// If there is insufficient cluster information, it will return true to avoid filtering when there is insufficient data
2925+
func (cluster *FoundationDBCluster) ProcessSharesDC(process FoundationDBStatusProcessInfo) bool {
2926+
if cluster == nil || cluster.Spec.DataCenter == "" {
2927+
return true
2928+
}
2929+
if cluster.Spec.DataCenter == process.Locality[FDBLocalityDCIDKey] {
2930+
return true
2931+
}
2932+
return false
2933+
}

controllers/bounce_processes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ func getAddressesForUpgrade(logger logr.Logger, r *FoundationDBClusterReconciler
323323
}
324324

325325
// We don't want to check for fault tolerance here to make sure the operator is able to restart processes if some
326-
// processes where restarted before the operator issued the cluster wide restart. For version incompatible upgrades
326+
// processes were restarted before the operator issued the cluster wide restart. For version incompatible upgrades
327327
// that would mean that the processes restarted earlier are not part of the cluster anymore leading to a fault tolerance
328328
// drop.
329329
if !status.Client.DatabaseStatus.Available {

controllers/choose_removals.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ func (c chooseRemovals) reconcile(ctx context.Context, r *FoundationDBClusterRec
7070

7171
localityMap := make(map[string]locality.Info)
7272
for _, process := range status.Cluster.Processes {
73+
if !cluster.ProcessSharesDC(process) {
74+
continue
75+
}
7376
id := process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey]
7477
localityMap[id] = locality.Info{ID: id, Address: process.Address, LocalityData: process.Locality}
7578
}
@@ -78,12 +81,12 @@ func (c chooseRemovals) reconcile(ctx context.Context, r *FoundationDBClusterRec
7881

7982
for _, processClass := range fdbv1beta2.ProcessClasses {
8083
desiredCount := desiredCounts[processClass]
81-
removedCount := currentCounts[processClass] - desiredCount
84+
excessCount := currentCounts[processClass] - desiredCount
8285
processClassLocality := make([]locality.Info, 0, currentCounts[processClass])
8386

8487
for _, processGroup := range cluster.Status.ProcessGroupsByProcessClass(processClass) {
8588
if processGroup.IsMarkedForRemoval() {
86-
removedCount--
89+
excessCount--
8790
continue
8891
}
8992
localityInfo, present := localityMap[string(processGroup.ProcessGroupID)]
@@ -92,8 +95,8 @@ func (c chooseRemovals) reconcile(ctx context.Context, r *FoundationDBClusterRec
9295
}
9396
}
9497

95-
if removedCount > 0 {
96-
r.Recorder.Event(cluster, corev1.EventTypeNormal, "ShrinkingProcesses", fmt.Sprintf("Removing %d %s processes", removedCount, processClass))
98+
if excessCount > 0 {
99+
r.Recorder.Event(cluster, corev1.EventTypeNormal, "ShrinkingProcesses", fmt.Sprintf("Removing %d %s processes", excessCount, processClass))
97100

98101
remainingProcesses, err := locality.ChooseDistributedProcesses(cluster, processClassLocality, desiredCount, locality.ProcessSelectionConstraint{})
99102
if err != nil {

controllers/remove_incompatible_processes.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func processIncompatibleProcesses(ctx context.Context, r *FoundationDBClusterRec
8282

8383
logger.Info("incompatible connections", "incompatibleConnections", status.Cluster.IncompatibleConnections)
8484

85-
incompatibleConnections := parseIncompatibleConnections(logger, status)
85+
incompatibleConnections := parseIncompatibleConnections(logger, status, cluster)
8686
incompatiblePods := make([]*corev1.Pod, 0, len(incompatibleConnections))
8787
for _, processGroup := range cluster.Status.ProcessGroups {
8888
pod, err := r.PodLifecycleManager.GetPod(ctx, r, cluster, processGroup.GetPodName(cluster))
@@ -111,9 +111,12 @@ func processIncompatibleProcesses(ctx context.Context, r *FoundationDBClusterRec
111111

112112
// parseIncompatibleConnections parses the incompatible connections string slice to a map and removes all false reported incompatible processes.
113113
// If a process is still part of the cluster status we can assume it's not an incompatible process.
114-
func parseIncompatibleConnections(logger logr.Logger, status *fdbv1beta2.FoundationDBStatus) map[string]fdbv1beta2.None {
114+
func parseIncompatibleConnections(logger logr.Logger, status *fdbv1beta2.FoundationDBStatus, cluster *fdbv1beta2.FoundationDBCluster) map[string]fdbv1beta2.None {
115115
processAddressMap := map[string]fdbv1beta2.None{}
116116
for _, process := range status.Cluster.Processes {
117+
if !cluster.ProcessSharesDC(process) {
118+
continue
119+
}
117120
processAddressMap[process.Address.MachineAddress()] = fdbv1beta2.None{}
118121
}
119122

controllers/remove_incompatible_processes_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ var _ = Describe("restart_incompatible_pods", func() {
7070
)
7171

7272
DescribeTable("when parsing incompatible connections", func(status *fdbv1beta2.FoundationDBStatus, expected map[string]fdbv1beta2.None) {
73-
Expect(parseIncompatibleConnections(logr.Discard(), status)).To(Equal(expected))
73+
Expect(parseIncompatibleConnections(logr.Discard(), status, nil)).To(Equal(expected))
7474
},
7575
Entry("empty incompatible map",
7676
&fdbv1beta2.FoundationDBStatus{

controllers/update_status.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (updateStatus) reconcile(ctx context.Context, r *FoundationDBClusterReconci
7272
for _, process := range databaseStatus.Cluster.Processes {
7373
versionMap[process.Version]++
7474
// Ignore all processes for the process map that are for a different data center
75-
if cluster.Spec.DataCenter != "" && cluster.Spec.DataCenter != process.Locality[fdbv1beta2.FDBLocalityDCIDKey] {
75+
if !cluster.ProcessSharesDC(process) {
7676
continue
7777
}
7878

0 commit comments

Comments
 (0)