Skip to content

Commit 14e271f

Browse files
authored
Ensure that the operator checks if the processes for newly created pods are up and running for the update pod config reconciler (#2244)
* Ensure that the operator checks if the processes for newly created pods are up and running for the update pod config reconciler
1 parent d7c2895 commit 14e271f

File tree

2 files changed

+197
-46
lines changed

2 files changed

+197
-46
lines changed

controllers/update_pods.go

Lines changed: 78 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"context"
2525
"fmt"
2626
"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbadminclient"
27+
k8serrors "k8s.io/apimachinery/pkg/api/errors"
2728
"k8s.io/utils/pointer"
2829
"time"
2930

@@ -43,7 +44,23 @@ type updatePods struct{}
4344

4445
// reconcile runs the reconciler's work.
4546
func (u updatePods) reconcile(ctx context.Context, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, status *fdbv1beta2.FoundationDBStatus, logger logr.Logger) *requeue {
46-
updates, err := getPodsToUpdate(ctx, logger, r, cluster)
47+
adminClient, err := r.getAdminClient(logger, cluster)
48+
if err != nil {
49+
return &requeue{curError: err, delayedRequeue: true}
50+
}
51+
defer func() {
52+
_ = adminClient.Close()
53+
}()
54+
55+
// If the status is not cached, we have to fetch it.
56+
if status == nil {
57+
status, err = adminClient.GetStatus()
58+
if err != nil {
59+
return &requeue{curError: err}
60+
}
61+
}
62+
63+
updates, err := getPodsToUpdate(ctx, logger, r, cluster, getProcessesByProcessGroup(cluster, status))
4764
if err != nil {
4865
return &requeue{curError: err, delay: podSchedulingDelayDuration, delayedRequeue: true}
4966
}
@@ -65,22 +82,6 @@ func (u updatePods) reconcile(ctx context.Context, r *FoundationDBClusterReconci
6582
return nil
6683
}
6784

68-
adminClient, err := r.getAdminClient(logger, cluster)
69-
if err != nil {
70-
return &requeue{curError: err, delayedRequeue: true}
71-
}
72-
defer func() {
73-
_ = adminClient.Close()
74-
}()
75-
76-
// If the status is not cached, we have to fetch it.
77-
if status == nil {
78-
status, err = adminClient.GetStatus()
79-
if err != nil {
80-
return &requeue{curError: err}
81-
}
82-
}
83-
8485
return deletePodsForUpdates(ctx, r, cluster, updates, logger, status, adminClient)
8586
}
8687

@@ -131,8 +132,32 @@ func getFaultDomainsWithUnavailablePods(ctx context.Context, logger logr.Logger,
131132
return faultDomainsWithUnavailablePods
132133
}
133134

135+
func getProcessesByProcessGroup(cluster *fdbv1beta2.FoundationDBCluster, status *fdbv1beta2.FoundationDBStatus) map[string][]fdbv1beta2.FoundationDBStatusProcessInfo {
136+
processMap := map[string][]fdbv1beta2.FoundationDBStatusProcessInfo{}
137+
138+
for _, process := range status.Cluster.Processes {
139+
if len(process.Locality) == 0 {
140+
continue
141+
}
142+
143+
processGroupID, ok := process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey]
144+
if !ok {
145+
continue
146+
}
147+
148+
// Ignore all processes for the process map that are for a different data center
149+
if !cluster.ProcessSharesDC(process) {
150+
continue
151+
}
152+
153+
processMap[processGroupID] = append(processMap[processGroupID], process)
154+
}
155+
156+
return processMap
157+
}
158+
134159
// getPodsToUpdate returns a map of Zone to Pods mapping. The map has the fault domain as key and all Pods in that fault domain will be present as a slice of *corev1.Pod.
135-
func getPodsToUpdate(ctx context.Context, logger logr.Logger, reconciler *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster) (map[string][]*corev1.Pod, error) {
160+
func getPodsToUpdate(ctx context.Context, logger logr.Logger, reconciler *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, processInformation map[string][]fdbv1beta2.FoundationDBStatusProcessInfo) (map[string][]*corev1.Pod, error) {
136161
updates := make(map[string][]*corev1.Pod)
137162

138163
faultDomainsWithUnavailablePods := getFaultDomainsWithUnavailablePods(ctx, logger, reconciler, cluster)
@@ -182,29 +207,50 @@ func getPodsToUpdate(ctx context.Context, logger logr.Logger, reconciler *Founda
182207
pod, err := reconciler.PodLifecycleManager.GetPod(ctx, reconciler, cluster, processGroup.GetPodName(cluster))
183208
// If a Pod is not found ignore it for now.
184209
if err != nil {
185-
logger.V(1).Info("Could not find Pod for process group ID",
186-
"processGroupID", processGroup.ProcessGroupID)
210+
if k8serrors.IsNotFound(err) {
211+
logger.V(1).Info("Could not find Pod for process group ID",
212+
"processGroupID", processGroup.ProcessGroupID)
213+
214+
// Check when the Pod went missing. If the condition is unset the current timestamp will be used, in that case
215+
// the fdbv1beta2.MissingPod duration will be smaller than the 90 seconds buffer. The 90 seconds buffer
216+
// was chosen as per default the failure detection in FDB takes 60 seconds to detect a failing fdbserver
217+
// process (or actually to mark it failed). Without this check there could be a race condition where the
218+
// Pod is already removed, so the process group would be skipped here but the fdbserver process is not yet
219+
// marked as failed in FDB, which causes FDB to return full replication in the cluster status.
220+
//
221+
// With the unified image there is support for delaying the shutdown to reduce this risk even further.
222+
missingPodDuration := time.Since(time.Unix(pointer.Int64Deref(processGroup.GetConditionTime(fdbv1beta2.MissingPod), time.Now().Unix()), 0))
223+
if missingPodDuration < 90*time.Second {
224+
podMissingError = fmt.Errorf("ProcessGroup: %s is missing the associated Pod for %s will be blocking until the Pod is missing for at least 90 seconds", processGroup.ProcessGroupID, missingPodDuration.String())
225+
}
187226

188-
// Check when the Pod went missing. If the condition is unset the current timestamp will be used, in that case
189-
// the fdbv1beta2.MissingPod duration will be smaller than the 90 seconds buffer. The 90 seconds buffer
190-
// was chosen as per default the failure detection in FDB takes 60 seconds to detect a failing fdbserver
191-
// process (or actually to mark it failed). Without this check there could be a race condition where the
192-
// Pod is already removed, so the process group would be skipped here but the fdbserver process is not yet
193-
// marked as failed in FDB, which causes FDB to return full replication in the cluster status.
194-
//
195-
// With the unified image there is support for delaying the shutdown to reduce this risk even further.
196-
missingPodDuration := time.Since(time.Unix(pointer.Int64Deref(processGroup.GetConditionTime(fdbv1beta2.MissingPod), time.Now().Unix()), 0))
197-
if missingPodDuration < 90*time.Second {
198-
podMissingError = fmt.Errorf("ProcessGroup: %s is missing the associated Pod for %s will be blocking until the Pod is missing for at least 90 seconds", processGroup.ProcessGroupID, missingPodDuration.String())
227+
continue
199228
}
200229

201-
continue
230+
return nil, err
202231
}
203232

204233
if shouldRequeueDueToTerminatingPod(pod, cluster, processGroup.ProcessGroupID) {
205234
return nil, fmt.Errorf("cluster has Pod %s that is pending deletion", pod.Name)
206235
}
207236

237+
// If the pod was recently created, check if the processes are already running, if not return an error.
238+
timeSincePodCreation := time.Since(pod.CreationTimestamp.Time)
239+
if timeSincePodCreation < 1*time.Minute {
240+
processes, ok := processInformation[string(processGroup.ProcessGroupID)]
241+
if len(processes) == 0 || !ok {
242+
return nil, fmt.Errorf("%s was recently created and the processes are not yet running", pod.Name)
243+
}
244+
245+
for _, process := range processes {
246+
// If the uptime is higher than the time since the pod was created, that means the reported process
247+
// has some stale data. This could happen in cases where the status is cached in the operator.
248+
if process.UptimeSeconds > timeSincePodCreation.Seconds() && !reconciler.InSimulation {
249+
return nil, fmt.Errorf("%s was recently created but the process uptime reports old uptime, time since pod was created: %f.2 seconds and process up time: %f.2", pod.Name, timeSincePodCreation.Seconds(), process.UptimeSeconds)
250+
}
251+
}
252+
}
253+
208254
specHash, err := internal.GetPodSpecHash(cluster, processGroup, nil)
209255
if err != nil {
210256
logger.Info("Skipping Pod due to error generating spec hash",
@@ -262,9 +308,6 @@ func getPodsToUpdate(ctx context.Context, logger logr.Logger, reconciler *Founda
262308
zone = "simulation"
263309
}
264310

265-
if updates[zone] == nil {
266-
updates[zone] = make([]*corev1.Pod, 0)
267-
}
268311
updates[zone] = append(updates[zone], pod)
269312
}
270313

controllers/update_pods_test.go

Lines changed: 119 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"fmt"
2626
"time"
2727

28+
"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbadminclient/mock"
29+
2830
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal"
2931
ctrlClient "sigs.k8s.io/controller-runtime/pkg/client"
3032

@@ -317,7 +319,7 @@ var _ = Describe("update_pods", func() {
317319
When("fetching all Pods that needs an update", func() {
318320
var cluster *fdbv1beta2.FoundationDBCluster
319321
var updates map[string][]*corev1.Pod
320-
var err error
322+
var updateErr error
321323

322324
BeforeEach(func() {
323325
cluster = internal.CreateDefaultCluster()
@@ -329,13 +331,18 @@ var _ = Describe("update_pods", func() {
329331
})
330332

331333
JustBeforeEach(func() {
332-
updates, err = getPodsToUpdate(context.Background(), globalControllerLogger, clusterReconciler, cluster)
334+
adminClient, err := mock.NewMockAdminClient(cluster, k8sClient)
335+
Expect(err).NotTo(HaveOccurred())
336+
status, err := adminClient.GetStatus()
337+
Expect(err).NotTo(HaveOccurred())
338+
339+
updates, updateErr = getPodsToUpdate(context.Background(), globalControllerLogger, clusterReconciler, cluster, getProcessesByProcessGroup(cluster, status))
333340
})
334341

335342
When("the cluster has no changes", func() {
336343
It("should return no errors and an empty map", func() {
337344
Expect(updates).To(HaveLen(0))
338-
Expect(err).NotTo(HaveOccurred())
345+
Expect(updateErr).NotTo(HaveOccurred())
339346
})
340347

341348
When("a Pod is missing", func() {
@@ -352,7 +359,7 @@ var _ = Describe("update_pods", func() {
352359

353360
It("should return no errors and an empty map", func() {
354361
Expect(updates).To(HaveLen(0))
355-
Expect(err).NotTo(HaveOccurred())
362+
Expect(updateErr).NotTo(HaveOccurred())
356363
})
357364
})
358365
})
@@ -368,7 +375,7 @@ var _ = Describe("update_pods", func() {
368375
It("should return no errors and a map with one zone", func() {
369376
// We only have one zone in this case, the simulation zone
370377
Expect(updates).To(HaveLen(1))
371-
Expect(err).NotTo(HaveOccurred())
378+
Expect(updateErr).NotTo(HaveOccurred())
372379
})
373380

374381
When("a Pod is missing", func() {
@@ -388,7 +395,7 @@ var _ = Describe("update_pods", func() {
388395
When("the process group has no MissingPod condition", func() {
389396
It("should return an error and an empty map", func() {
390397
Expect(updates).To(HaveLen(0))
391-
Expect(err).To(HaveOccurred())
398+
Expect(updateErr).To(HaveOccurred())
392399
})
393400
})
394401

@@ -400,7 +407,7 @@ var _ = Describe("update_pods", func() {
400407

401408
It("should return an error and an empty map", func() {
402409
Expect(updates).To(HaveLen(0))
403-
Expect(err).To(HaveOccurred())
410+
Expect(updateErr).To(HaveOccurred())
404411
})
405412
})
406413

@@ -412,7 +419,108 @@ var _ = Describe("update_pods", func() {
412419

413420
It("should return no error updates", func() {
414421
Expect(updates).To(HaveLen(1))
422+
Expect(updateErr).NotTo(HaveOccurred())
423+
})
424+
})
425+
})
426+
427+
When("a Pod was recently created", func() {
428+
var picked *fdbv1beta2.ProcessGroupStatus
429+
430+
BeforeEach(func() {
431+
picked = internal.PickProcessGroups(cluster, fdbv1beta2.ProcessClassStorage, 1)[0]
432+
433+
podList := &corev1.PodList{}
434+
Expect(k8sClient.List(context.Background(), podList, ctrlClient.InNamespace(cluster.Namespace), ctrlClient.MatchingLabels(cluster.GetMatchLabels()))).To(Succeed())
435+
436+
for _, pod := range podList.Items {
437+
currentPod := pod.DeepCopy()
438+
Expect(k8sClient.Delete(context.Background(), currentPod)).To(Succeed())
439+
440+
Expect(currentPod.Labels).NotTo(BeNil())
441+
processGroupID, ok := currentPod.Labels[fdbv1beta2.FDBProcessGroupIDLabel]
442+
Expect(ok).To(BeTrue())
443+
444+
var creationTimestamp metav1.Time
445+
if processGroupID == string(picked.ProcessGroupID) {
446+
creationTimestamp = metav1.NewTime(time.Now())
447+
} else {
448+
// Reset the metadata and ensure that all pods were created 24 hours ago
449+
creationTimestamp = metav1.NewTime(time.Now().Add(-24 * time.Hour))
450+
}
451+
452+
// Reset the metadata and ensure that all pods were created 24 hours ago
453+
currentPod.ObjectMeta = metav1.ObjectMeta{
454+
Name: pod.Name,
455+
Namespace: pod.Namespace,
456+
Annotations: pod.Annotations,
457+
Labels: pod.Labels,
458+
CreationTimestamp: creationTimestamp,
459+
}
460+
461+
// Recreate Pod
462+
Expect(k8sClient.Create(context.Background(), currentPod)).To(Succeed())
463+
464+
}
465+
466+
clusterReconciler.InSimulation = false
467+
})
468+
469+
AfterEach(func() {
470+
clusterReconciler.InSimulation = true
471+
})
472+
473+
When("the process is not yet running", func() {
474+
BeforeEach(func() {
475+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
415476
Expect(err).NotTo(HaveOccurred())
477+
adminClient.MockMissingProcessGroup(picked.ProcessGroupID, true)
478+
})
479+
480+
It("should return an error and an empty map", func() {
481+
Expect(updates).To(HaveLen(0))
482+
Expect(updateErr).To(MatchError(And(ContainSubstring("was recently created and the processes are not yet running"), ContainSubstring(string(picked.ProcessGroupID)))))
483+
})
484+
})
485+
486+
When("the process is running but the uptime seconds is greater than the pod uptime ", func() {
487+
It("should return an error and an empty map", func() {
488+
Expect(updates).To(HaveLen(0))
489+
Expect(updateErr).To(MatchError(And(ContainSubstring("was recently created but the process uptime reports old uptime"), ContainSubstring(string(picked.ProcessGroupID)))))
490+
})
491+
})
492+
493+
When("the process is running and the uptime seconds is less than the pod uptime ", func() {
494+
BeforeEach(func() {
495+
pod := &corev1.Pod{}
496+
Expect(k8sClient.Get(context.Background(), ctrlClient.ObjectKey{Name: picked.GetPodName(cluster), Namespace: cluster.Namespace}, pod)).To(Succeed())
497+
498+
pod.CreationTimestamp = metav1.NewTime(time.Now().Add(-6 * time.Hour))
499+
Expect(k8sClient.Delete(context.Background(), pod)).To(Succeed())
500+
501+
creationTimestamp := time.Now().Add(-24 * time.Hour)
502+
// We have to recreate the pod
503+
pod.ObjectMeta = metav1.ObjectMeta{
504+
Name: pod.Name,
505+
Namespace: pod.Namespace,
506+
Annotations: pod.Annotations,
507+
Labels: pod.Labels,
508+
// Default uptime is 60000 seconds.
509+
CreationTimestamp: metav1.NewTime(creationTimestamp),
510+
}
511+
512+
// Recreate Pod
513+
Expect(k8sClient.Create(context.Background(), pod)).To(Succeed())
514+
515+
newPod := &corev1.Pod{}
516+
Expect(k8sClient.Get(context.Background(), ctrlClient.ObjectKey{Name: picked.GetPodName(cluster), Namespace: cluster.Namespace}, newPod)).To(Succeed())
517+
Expect(newPod.CreationTimestamp.Time.Unix()).To(Equal(creationTimestamp.Unix()))
518+
519+
})
520+
521+
It("should return not error", func() {
522+
Expect(updates).To(HaveLen(4))
523+
Expect(updateErr).NotTo(HaveOccurred())
416524
})
417525
})
418526
})
@@ -429,7 +537,7 @@ var _ = Describe("update_pods", func() {
429537

430538
It("should return no updates", func() {
431539
Expect(updates).To(HaveLen(0))
432-
Expect(err).NotTo(HaveOccurred())
540+
Expect(updateErr).NotTo(HaveOccurred())
433541
})
434542
})
435543

@@ -453,7 +561,7 @@ var _ = Describe("update_pods", func() {
453561
It("should return no errors and a map with the zone and all pods to update", func() {
454562
Expect(updates).To(HaveLen(1))
455563
Expect(updates["simulation"]).To(HaveLen(4))
456-
Expect(err).NotTo(HaveOccurred())
564+
Expect(updateErr).NotTo(HaveOccurred())
457565
})
458566
})
459567

@@ -470,7 +578,7 @@ var _ = Describe("update_pods", func() {
470578
It("should return no errors and a map with the zone and two pods to update", func() {
471579
Expect(updates).To(HaveLen(1))
472580
Expect(updates["simulation"]).To(HaveLen(2))
473-
Expect(err).NotTo(HaveOccurred())
581+
Expect(updateErr).NotTo(HaveOccurred())
474582
})
475583
})
476584

@@ -486,7 +594,7 @@ var _ = Describe("update_pods", func() {
486594

487595
It("should return no errors and a an empty update map", func() {
488596
Expect(updates).To(HaveLen(0))
489-
Expect(err).NotTo(HaveOccurred())
597+
Expect(updateErr).NotTo(HaveOccurred())
490598
})
491599
})
492600
})

0 commit comments

Comments
 (0)