@@ -73,8 +73,9 @@ func (u removeProcessGroups) reconcile(ctx context.Context, r *FoundationDBClust
73
73
// If no process groups are marked to remove we have to check if all process groups are excluded.
74
74
if len (processGroupsToRemove ) == 0 {
75
75
if ! allExcluded {
76
- return & requeue {message : "Reconciliation needs to exclude more processes" }
76
+ return & requeue {message : "Reconciliation needs to exclude more processes" , delay : 15 * time . Second }
77
77
}
78
+
78
79
return nil
79
80
}
80
81
@@ -132,7 +133,7 @@ func (u removeProcessGroups) reconcile(ctx context.Context, r *FoundationDBClust
132
133
logger .Info ("Removing process groups" , "zone" , zone , "count" , len (zoneRemovals ), "deletionMode" , cluster .GetRemovalMode ())
133
134
// This will return a map of the newly removed ProcessGroups and the ProcessGroups with the ResourcesTerminating condition
134
135
removedProcessGroups := r .removeProcessGroups (ctx , logger , cluster , zoneRemovals , zonedRemovals [removals .TerminatingZone ])
135
- err = includeProcessGroup (ctx , logger , r , cluster , removedProcessGroups , status , adminClient )
136
+ err = includeProcessGroup (ctx , logger , r , status , cluster , removedProcessGroups , adminClient )
136
137
if err != nil {
137
138
// If the inclusion is blocked or another issues happened we will retry in 60 seconds.
138
139
return & requeue {curError : err , delayedRequeue : true , delay : 60 * time .Second }
@@ -221,24 +222,20 @@ func confirmRemoval(ctx context.Context, logger logr.Logger, r *FoundationDBClus
221
222
canBeIncluded = false
222
223
}
223
224
224
- // TODO(johscheuer): https://github.com/FoundationDB/fdb-kubernetes-operator/v2/issues/1638
225
- pvcs := & corev1.PersistentVolumeClaimList {}
226
- err = r .List (ctx , pvcs , internal .GetSinglePodListOptions (cluster , processGroup .ProcessGroupID )... )
227
- if err != nil {
225
+ pvc := & corev1.PersistentVolumeClaim {}
226
+ err = r .Get (ctx , client.ObjectKey {Name : processGroup .GetPvcName (cluster ), Namespace : cluster .Namespace }, pvc )
227
+ if err != nil && ! k8serrors .IsNotFound (err ) {
228
228
return false , err
229
229
}
230
230
231
- if len (pvcs .Items ) == 1 {
232
- pvc := pvcs .Items [0 ]
233
- if pvc .DeletionTimestamp == nil {
234
- logger .Info ("Waiting for volume claim to get torn down" , "processGroupID" , processGroup .ProcessGroupID , "pvc" , pvc .Name )
235
- return false , internal.ResourceNotDeleted {Resource : & pvc }
231
+ if err == nil {
232
+ if pvc .DeletionTimestamp .IsZero () {
233
+ logger .Info ("Waiting for process group to get torn down" , "processGroupID" , processGroup .ProcessGroupID , "pod" , podName )
234
+ return false , internal.ResourceNotDeleted {Resource : pod }
236
235
}
237
236
238
237
// PVC is in terminating state so we don't want to block, but we also don't want to include it
239
238
canBeIncluded = false
240
- } else if len (pvcs .Items ) > 1 {
241
- return false , fmt .Errorf ("multiple PVCs found for cluster %s, processGroupID %s" , cluster .Name , processGroup .ProcessGroupID )
242
239
}
243
240
244
241
service := & corev1.Service {}
@@ -262,7 +259,13 @@ func confirmRemoval(ctx context.Context, logger logr.Logger, r *FoundationDBClus
262
259
return canBeIncluded , nil
263
260
}
264
261
265
- func includeProcessGroup (ctx context.Context , logger logr.Logger , r * FoundationDBClusterReconciler , cluster * fdbv1beta2.FoundationDBCluster , removedProcessGroups map [fdbv1beta2.ProcessGroupID ]bool , status * fdbv1beta2.FoundationDBStatus , adminClient fdbadminclient.AdminClient ) error {
262
+ func includeProcessGroup (ctx context.Context , logger logr.Logger , r * FoundationDBClusterReconciler , status * fdbv1beta2.FoundationDBStatus , cluster * fdbv1beta2.FoundationDBCluster , removedProcessGroups map [fdbv1beta2.ProcessGroupID ]bool , adminClient fdbadminclient.AdminClient ) error {
263
+ // Fetch the latest status to ensure the excluded server list is the latest one.
264
+ currentExclusions , err := adminClient .GetExclusions ()
265
+ if err != nil {
266
+ return err
267
+ }
268
+
266
269
// Update here for ready inclusion --> Check here
267
270
var readyForInclusion map [fdbv1beta2.ProcessGroupID ]time.Time
268
271
readyForInclusionUpdates := map [fdbv1beta2.ProcessGroupID ]fdbv1beta2.UpdateAction {}
@@ -274,11 +277,8 @@ func includeProcessGroup(ctx context.Context, logger logr.Logger, r *FoundationD
274
277
}
275
278
}
276
279
277
- fdbProcessesToInclude , newProcessGroups , err := getProcessesToInclude (logger , cluster , removedProcessGroups , status , readyForInclusion , readyForInclusionUpdates )
278
- if err != nil {
279
- return err
280
- }
281
-
280
+ // In this step we will check if any of the "local" processes should be included.
281
+ fdbProcessesToInclude , newProcessGroups := getProcessesToInclude (logger , cluster , removedProcessGroups , currentExclusions , readyForInclusion , readyForInclusionUpdates )
282
282
if cluster .GetSynchronizationMode () == fdbv1beta2 .SynchronizationModeGlobal {
283
283
err = adminClient .UpdateReadyForInclusion (readyForInclusionUpdates )
284
284
if err != nil {
@@ -303,6 +303,12 @@ func includeProcessGroup(ctx context.Context, logger logr.Logger, r *FoundationD
303
303
return err
304
304
}
305
305
306
+ // Make sure the inclusion are coordinated across multiple operator instances.
307
+ err = r .takeLock (logger , cluster , "include removed process groups" )
308
+ if err != nil {
309
+ return err
310
+ }
311
+
306
312
if cluster .GetSynchronizationMode () == fdbv1beta2 .SynchronizationModeGlobal {
307
313
pendingForInclusion , err := adminClient .GetPendingForInclusion ("" )
308
314
if err != nil {
@@ -318,12 +324,13 @@ func includeProcessGroup(ctx context.Context, logger logr.Logger, r *FoundationD
318
324
if err != nil {
319
325
return err
320
326
}
321
- }
322
327
323
- // Make sure the inclusion are coordinated across multiple operator instances.
324
- err = r .takeLock (logger , cluster , "include removed process groups" )
325
- if err != nil {
326
- return err
328
+ fdbProcessesToInclude , err = coordination .GetAddressesFromCoordinationState (logger , adminClient , readyForInclusion , true , true )
329
+ if err != nil {
330
+ return err
331
+ }
332
+
333
+ fdbProcessesToInclude = filterAddressesToInclude (fdbProcessesToInclude , currentExclusions )
327
334
}
328
335
329
336
r .Recorder .Event (cluster , corev1 .EventTypeNormal , "IncludingProcesses" , fmt .Sprintf ("Including removed processes: %v" , fdbProcessesToInclude ))
@@ -340,17 +347,33 @@ func includeProcessGroup(ctx context.Context, logger logr.Logger, r *FoundationD
340
347
return r .updateOrApply (ctx , cluster )
341
348
}
342
349
343
- func getProcessesToInclude (logger logr.Logger , cluster * fdbv1beta2.FoundationDBCluster , removedProcessGroups map [fdbv1beta2.ProcessGroupID ]bool , status * fdbv1beta2.FoundationDBStatus , readyForInclusion map [fdbv1beta2.ProcessGroupID ]time.Time , readyForInclusionUpdates map [fdbv1beta2.ProcessGroupID ]fdbv1beta2.UpdateAction ) ([]fdbv1beta2.ProcessAddress , []* fdbv1beta2.ProcessGroupStatus , error ) {
350
+ // filterAddressesToInclude will remove all addresses that are part of the fdbProcessesToInclude slice but are not excluded in FDB itself.
351
+ func filterAddressesToInclude (fdbProcessesToInclude []fdbv1beta2.ProcessAddress , excludedServers []fdbv1beta2.ProcessAddress ) []fdbv1beta2.ProcessAddress {
352
+ excludedServersMap := make (map [string ]fdbv1beta2.None , len (excludedServers ))
353
+ for _ , excludedServer := range excludedServers {
354
+ excludedServersMap [excludedServer .String ()] = fdbv1beta2.None {}
355
+ }
356
+
357
+ result := make ([]fdbv1beta2.ProcessAddress , 0 , len (fdbProcessesToInclude ))
358
+ for _ , addr := range fdbProcessesToInclude {
359
+ _ , ok := excludedServersMap [addr .String ()]
360
+ if ! ok {
361
+ continue
362
+ }
363
+
364
+ result = append (result , addr )
365
+ }
366
+
367
+ return result
368
+ }
369
+
370
+ func getProcessesToInclude (logger logr.Logger , cluster * fdbv1beta2.FoundationDBCluster , removedProcessGroups map [fdbv1beta2.ProcessGroupID ]bool , excludedServers []fdbv1beta2.ProcessAddress , readyForInclusion map [fdbv1beta2.ProcessGroupID ]time.Time , readyForInclusionUpdates map [fdbv1beta2.ProcessGroupID ]fdbv1beta2.UpdateAction ) ([]fdbv1beta2.ProcessAddress , []* fdbv1beta2.ProcessGroupStatus ) {
344
371
fdbProcessesToInclude := make ([]fdbv1beta2.ProcessAddress , 0 )
345
372
346
373
if len (removedProcessGroups ) == 0 {
347
- return fdbProcessesToInclude , cluster .Status .ProcessGroups , nil
374
+ return fdbProcessesToInclude , cluster .Status .ProcessGroups
348
375
}
349
376
350
- excludedServers , err := fdbstatus .GetExclusions (status )
351
- if err != nil {
352
- return fdbProcessesToInclude , nil , fmt .Errorf ("unable to get excluded servers from status, %w" , err )
353
- }
354
377
excludedServersMap := make (map [string ]fdbv1beta2.None , len (excludedServers ))
355
378
for _ , excludedServer := range excludedServers {
356
379
excludedServersMap [excludedServer .String ()] = fdbv1beta2.None {}
@@ -359,6 +382,11 @@ func getProcessesToInclude(logger logr.Logger, cluster *fdbv1beta2.FoundationDBC
359
382
processGroups := cluster .Status .DeepCopy ().ProcessGroups
360
383
idx := 0
361
384
for _ , processGroup := range processGroups {
385
+ // Tester processes are not excluded, so there is no need to include them.
386
+ if processGroup .ProcessClass == fdbv1beta2 .ProcessClassTest {
387
+ continue
388
+ }
389
+
362
390
if processGroup .IsMarkedForRemoval () && removedProcessGroups [processGroup .ProcessGroupID ] {
363
391
foundInExcludedServerList := false
364
392
exclusionString := processGroup .GetExclusionString ()
@@ -374,7 +402,7 @@ func getProcessesToInclude(logger logr.Logger, cluster *fdbv1beta2.FoundationDBC
374
402
for _ , pAddr := range processGroup .Addresses {
375
403
// Ensure we include the process address if the removed process group is a log process as we are always
376
404
// excluding the log process with locality and IP address when validating that the log process was
377
- // fully excluded. Otherwise we might leave the IP address in the excluded list.
405
+ // fully excluded. Otherwise, we might leave the IP address in the excluded list.
378
406
if foundInExcludedServerList && processGroup .ProcessClass .IsLogProcess () {
379
407
fdbProcessesToInclude = append (fdbProcessesToInclude , fdbv1beta2.ProcessAddress {IPAddress : net .ParseIP (pAddr )})
380
408
continue
@@ -403,7 +431,7 @@ func getProcessesToInclude(logger logr.Logger, cluster *fdbv1beta2.FoundationDBC
403
431
idx ++
404
432
}
405
433
406
- return fdbProcessesToInclude , processGroups [:idx ], nil
434
+ return fdbProcessesToInclude , processGroups [:idx ]
407
435
}
408
436
409
437
func (r * FoundationDBClusterReconciler ) getProcessGroupsToRemove (logger logr.Logger , cluster * fdbv1beta2.FoundationDBCluster , remainingMap map [string ]bool , cordSet map [string ]fdbv1beta2.None ) (bool , bool , []* fdbv1beta2.ProcessGroupStatus ) {
0 commit comments