diff --git a/operator/internal/controller/podclique/components/pod/rollingupdate.go b/operator/internal/controller/podclique/components/pod/rollingupdate.go index 6466fc0d8..d92bcbca5 100644 --- a/operator/internal/controller/podclique/components/pod/rollingupdate.go +++ b/operator/internal/controller/podclique/components/pod/rollingupdate.go @@ -40,15 +40,17 @@ import ( // updateWork encapsulates the information needed to perform a rolling update of pods in a PodClique. type updateWork struct { - oldTemplateHashPendingPods []*corev1.Pod - oldTemplateHashUnhealthyPods []*corev1.Pod - oldTemplateHashReadyPods []*corev1.Pod - newTemplateHashReadyPods []*corev1.Pod + oldTemplateHashPendingPods []*corev1.Pod + oldTemplateHashUnhealthyPods []*corev1.Pod + oldTemplateHashStartingPods []*corev1.Pod + oldTemplateHashUncategorizedPods []*corev1.Pod + oldTemplateHashReadyPods []*corev1.Pod + newTemplateHashReadyPods []*corev1.Pod } // getPodNamesPendingUpdate returns names of pods with old template hash that are not already being deleted func (w *updateWork) getPodNamesPendingUpdate(deletionExpectedPodUIDs []types.UID) []string { - allOldPods := lo.Union(w.oldTemplateHashPendingPods, w.oldTemplateHashUnhealthyPods, w.oldTemplateHashReadyPods) + allOldPods := lo.Union(w.oldTemplateHashPendingPods, w.oldTemplateHashUnhealthyPods, w.oldTemplateHashStartingPods, w.oldTemplateHashUncategorizedPods, w.oldTemplateHashReadyPods) return lo.FilterMap(allOldPods, func(pod *corev1.Pod, _ int) (string, bool) { if slices.Contains(deletionExpectedPodUIDs, pod.UID) { return "", false @@ -73,8 +75,8 @@ func (w *updateWork) getNextPodToUpdate() *corev1.Pod { func (r _resource) processPendingUpdates(logger logr.Logger, sc *syncContext) error { work := r.computeUpdateWork(logger, sc) pclq := sc.pclq - // always delete pods that have old pod template hash and are either Pending or Unhealthy. - if err := r.deleteOldPendingAndUnhealthyPods(logger, sc, work); err != nil { + // Always delete old-hash pods that are not Ready (pending, unhealthy, starting, or uncategorized). + if err := r.deleteOldNonReadyPods(logger, sc, work); err != nil { return err } @@ -132,24 +134,34 @@ func (r _resource) processPendingUpdates(logger logr.Logger, sc *syncContext) er return r.markRollingUpdateEnd(sc.ctx, logger, pclq) } -// computeUpdateWork analyzes existing pods and categorizes them by template hash and health status for update planning +// computeUpdateWork categorizes pods by template hash and state. +// Old-hash pods: Pending, Unhealthy, Starting, Uncategorized, or Ready. +// New-hash pods: Ready only. func (r _resource) computeUpdateWork(logger logr.Logger, sc *syncContext) *updateWork { work := &updateWork{} for _, pod := range sc.existingPCLQPods { if pod.Labels[common.LabelPodTemplateHash] != sc.expectedPodTemplateHash { - // check if the pod has already been marked for deletion + // Old-hash pod — skip if deletion already in flight. if r.hasPodDeletionBeenTriggered(sc, pod) { logger.Info("skipping old Pod since its deletion has already been triggered", "pod", client.ObjectKeyFromObject(pod)) continue } + // Pending, unhealthy, starting, and uncategorized pods are deleted immediately; + // ready pods are queued for ordered one-at-a-time replacement. if k8sutils.IsPodPending(pod) { work.oldTemplateHashPendingPods = append(work.oldTemplateHashPendingPods, pod) } else if k8sutils.HasAnyStartedButNotReadyContainer(pod) || k8sutils.HasAnyContainerExitedErroneously(logger, pod) { work.oldTemplateHashUnhealthyPods = append(work.oldTemplateHashUnhealthyPods, pod) } else if k8sutils.IsPodReady(pod) { work.oldTemplateHashReadyPods = append(work.oldTemplateHashReadyPods, pod) + } else if k8sutils.HasAnyContainerNotStarted(pod) { + work.oldTemplateHashStartingPods = append(work.oldTemplateHashStartingPods, pod) + } else { + work.oldTemplateHashUncategorizedPods = append(work.oldTemplateHashUncategorizedPods, pod) } } else { + // New-hash pod — only count as ready; non-ready pods are not tracked so + // isCurrentPodUpdateComplete won't prematurely declare success. if k8sutils.IsPodReady(pod) { work.newTemplateHashReadyPods = append(work.newTemplateHashReadyPods, pod) } @@ -163,8 +175,10 @@ func (r _resource) hasPodDeletionBeenTriggered(sc *syncContext, pod *corev1.Pod) return k8sutils.IsResourceTerminating(pod.ObjectMeta) || r.expectationsStore.HasDeleteExpectation(sc.pclqExpectationsStoreKey, pod.GetUID()) } -// deleteOldPendingAndUnhealthyPods removes pods with old template hash that are pending or unhealthy -func (r _resource) deleteOldPendingAndUnhealthyPods(logger logr.Logger, sc *syncContext, work *updateWork) error { +// deleteOldNonReadyPods removes old-hash pods that are not Ready: pending, unhealthy, starting (startup probe), +// or uncategorized (unknown state). All of these are safe to delete immediately since they are not serving traffic +// and will be replaced with pods having the correct template hash. +func (r _resource) deleteOldNonReadyPods(logger logr.Logger, sc *syncContext, work *updateWork) error { var deletionTasks []utils.Task if len(work.oldTemplateHashPendingPods) > 0 { deletionTasks = append(deletionTasks, r.createPodDeletionTasks(logger, sc.pclq, work.oldTemplateHashPendingPods, sc.pclqExpectationsStoreKey)...) @@ -172,15 +186,26 @@ func (r _resource) deleteOldPendingAndUnhealthyPods(logger logr.Logger, sc *sync if len(work.oldTemplateHashUnhealthyPods) > 0 { deletionTasks = append(deletionTasks, r.createPodDeletionTasks(logger, sc.pclq, work.oldTemplateHashUnhealthyPods, sc.pclqExpectationsStoreKey)...) } + if len(work.oldTemplateHashStartingPods) > 0 { + deletionTasks = append(deletionTasks, r.createPodDeletionTasks(logger, sc.pclq, work.oldTemplateHashStartingPods, sc.pclqExpectationsStoreKey)...) + } + if len(work.oldTemplateHashUncategorizedPods) > 0 { + logger.Info("found old-hash pods in an unrecognized state, deleting them", + "unexpected", true, + "pods", componentutils.PodsToObjectNames(work.oldTemplateHashUncategorizedPods)) + deletionTasks = append(deletionTasks, r.createPodDeletionTasks(logger, sc.pclq, work.oldTemplateHashUncategorizedPods, sc.pclqExpectationsStoreKey)...) + } if len(deletionTasks) == 0 { - logger.Info("no pending or unhealthy pods having old PodTemplateHash found") + logger.Info("no non-ready pods having old PodTemplateHash found") return nil } - logger.Info("triggering deletion of pending and unhealthy pods with old pod template hash in order to update", + logger.Info("triggering deletion of non-ready pods with old pod template hash in order to update", "oldPendingPods", componentutils.PodsToObjectNames(work.oldTemplateHashPendingPods), - "oldUnhealthyPods", componentutils.PodsToObjectNames(work.oldTemplateHashUnhealthyPods)) + "oldUnhealthyPods", componentutils.PodsToObjectNames(work.oldTemplateHashUnhealthyPods), + "oldStartingPods", componentutils.PodsToObjectNames(work.oldTemplateHashStartingPods), + "oldUncategorizedPods", componentutils.PodsToObjectNames(work.oldTemplateHashUncategorizedPods)) if runResult := utils.RunConcurrently(sc.ctx, logger, deletionTasks); runResult.HasErrors() { err := runResult.GetAggregatedError() pclqObjectKey := client.ObjectKeyFromObject(sc.pclq) @@ -191,7 +216,7 @@ func (r _resource) deleteOldPendingAndUnhealthyPods(logger logr.Logger, sc *sync fmt.Sprintf("failed to delete Pods for PodClique %v", pclqObjectKey), ) } - logger.Info("successfully deleted pods having old PodTemplateHash and in either Pending or Unhealthy state") + logger.Info("successfully deleted non-ready pods having old PodTemplateHash") return nil } diff --git a/operator/internal/controller/podclique/components/pod/rollingupdate_test.go b/operator/internal/controller/podclique/components/pod/rollingupdate_test.go new file mode 100644 index 000000000..c946b7389 --- /dev/null +++ b/operator/internal/controller/podclique/components/pod/rollingupdate_test.go @@ -0,0 +1,163 @@ +/* +Copyright 2025 The Grove Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "fmt" + "testing" + + "github.com/ai-dynamo/grove/operator/api/common" + "github.com/ai-dynamo/grove/operator/internal/expect" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" +) + +const ( + testNewHash = "new-hash-abc" + testOldHash = "old-hash-xyz" + testNS = "test-ns" +) + +// newTestPod creates a pod with the given template hash label and options applied. +func newTestPod(templateHash string, opts ...func(*corev1.Pod)) *corev1.Pod { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "p", + Namespace: testNS, + Labels: map[string]string{ + common.LabelPodTemplateHash: templateHash, + }, + }, + } + for _, opt := range opts { + opt(pod) + } + return pod +} + +func withPhase(phase corev1.PodPhase) func(*corev1.Pod) { + return func(pod *corev1.Pod) { pod.Status.Phase = phase } +} + +func withReadyCondition() func(*corev1.Pod) { + return func(pod *corev1.Pod) { + pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{ + Type: corev1.PodReady, Status: corev1.ConditionTrue, + }) + } +} + +func withContainerStatus(started *bool, ready bool) func(*corev1.Pod) { + return func(pod *corev1.Pod) { + pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{ + Name: "main", Started: started, Ready: ready, + }) + } +} + +func withErroneousExit() func(*corev1.Pod) { + return func(pod *corev1.Pod) { + pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{ + Name: "main", + LastTerminationState: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ExitCode: 1}, + }, + }) + } +} + +func withDeletionTimestamp() func(*corev1.Pod) { + return func(pod *corev1.Pod) { + now := metav1.Now() + pod.DeletionTimestamp = &now + pod.Finalizers = []string{"fake.finalizer/test"} + } +} + +// bucket identifies which updateWork bucket a pod should land in. +type bucket int + +const ( + bucketOldPending bucket = iota + bucketOldUnhealthy + bucketOldStarting + bucketOldUncategorized + bucketOldReady + bucketNewReady + bucketSkipped // terminating pods — not in any bucket +) + +func TestComputeUpdateWork(t *testing.T) { + tests := []struct { + name string + pod *corev1.Pod + expected bucket + }{ + {"old pending", newTestPod(testOldHash, withPhase(corev1.PodPending)), bucketOldPending}, + {"old unhealthy (started, not ready)", newTestPod(testOldHash, withPhase(corev1.PodRunning), withContainerStatus(ptr.To(true), false)), bucketOldUnhealthy}, + {"old unhealthy (erroneous exit)", newTestPod(testOldHash, withPhase(corev1.PodRunning), withErroneousExit()), bucketOldUnhealthy}, + {"old ready", newTestPod(testOldHash, withPhase(corev1.PodRunning), withReadyCondition(), withContainerStatus(ptr.To(true), true)), bucketOldReady}, + {"old starting (Started=false)", newTestPod(testOldHash, withPhase(corev1.PodRunning), withContainerStatus(ptr.To(false), false)), bucketOldStarting}, + {"old starting (Started=nil)", newTestPod(testOldHash, withPhase(corev1.PodRunning), withContainerStatus(nil, false)), bucketOldStarting}, + {"old uncategorized (no containers)", newTestPod(testOldHash, withPhase(corev1.PodRunning)), bucketOldUncategorized}, + {"old terminating is skipped", newTestPod(testOldHash, withDeletionTimestamp()), bucketSkipped}, + {"new ready", newTestPod(testNewHash, withPhase(corev1.PodRunning), withReadyCondition(), withContainerStatus(ptr.To(true), true)), bucketNewReady}, + {"new not-ready is not tracked", newTestPod(testNewHash, withPhase(corev1.PodRunning), withContainerStatus(ptr.To(false), false)), bucketSkipped}, + } + + r := _resource{expectationsStore: expect.NewExpectationsStore()} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sc := &syncContext{ + existingPCLQPods: []*corev1.Pod{tt.pod}, + expectedPodTemplateHash: testNewHash, + pclqExpectationsStoreKey: "test-key", + } + work := r.computeUpdateWork(logr.Discard(), sc) + + bucketPods := map[bucket][]*corev1.Pod{ + bucketOldPending: work.oldTemplateHashPendingPods, + bucketOldUnhealthy: work.oldTemplateHashUnhealthyPods, + bucketOldStarting: work.oldTemplateHashStartingPods, + bucketOldUncategorized: work.oldTemplateHashUncategorizedPods, + bucketOldReady: work.oldTemplateHashReadyPods, + bucketNewReady: work.newTemplateHashReadyPods, + } + + bucketNames := map[bucket]string{ + bucketOldPending: "oldPending", + bucketOldUnhealthy: "oldUnhealthy", + bucketOldStarting: "oldStarting", + bucketOldUncategorized: "oldUncategorized", + bucketOldReady: "oldReady", + bucketNewReady: "newReady", + } + for b, pods := range bucketPods { + name := bucketNames[b] + if b == tt.expected { + assert.Len(t, pods, 1, fmt.Sprintf("expected pod in bucket %s", name)) + } else { + assert.Empty(t, pods, fmt.Sprintf("expected no pods in bucket %s", name)) + } + } + }) + } +} diff --git a/operator/internal/utils/kubernetes/pod.go b/operator/internal/utils/kubernetes/pod.go index a86d0149c..34fb5cdfe 100644 --- a/operator/internal/utils/kubernetes/pod.go +++ b/operator/internal/utils/kubernetes/pod.go @@ -121,6 +121,19 @@ func HasAnyStartedButNotReadyContainer(pod *corev1.Pod) bool { return false } +// HasAnyContainerNotStarted checks if any container has not yet passed its startup probe. +// In Kubernetes, Started remains false (or nil) until the startup probe succeeds. +// This distinguishes pods in the startup probe phase from those that have started but are not ready. +// Returns false if there are no container statuses (the pod would be uncategorized by the caller). +func HasAnyContainerNotStarted(pod *corev1.Pod) bool { + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.Started == nil || !*containerStatus.Started { + return true + } + } + return false +} + // ComputeHash computes a hash given one or more corev1.PodTemplateSpec. func ComputeHash(podTemplateSpecs ...*corev1.PodTemplateSpec) string { podTemplateSpecHasher := fnv.New64a() diff --git a/operator/internal/utils/kubernetes/pod_test.go b/operator/internal/utils/kubernetes/pod_test.go index 85f97051b..64c116539 100644 --- a/operator/internal/utils/kubernetes/pod_test.go +++ b/operator/internal/utils/kubernetes/pod_test.go @@ -339,6 +339,122 @@ func TestHasAnyStartedButNotReadyContainer(t *testing.T) { } } +// TestHasAnyContainerNotStarted tests checking for containers that have not yet passed their startup probe. +func TestHasAnyContainerNotStarted(t *testing.T) { + testCases := []struct { + // name identifies the test case + name string + // containerStatuses is the list of container statuses to set on the pod + containerStatuses []corev1.ContainerStatus + // expectedResult is whether the pod should have a not-yet-started container + expectedResult bool + }{ + { + // Container with Started==false (startup probe not yet passed) + name: "started-false", + containerStatuses: []corev1.ContainerStatus{ + { + Name: "container-1", + Started: ptr.To(false), + Ready: false, + }, + }, + expectedResult: true, + }, + { + // Container with Started==nil (startup probe not yet evaluated) + name: "started-nil", + containerStatuses: []corev1.ContainerStatus{ + { + Name: "container-1", + Started: nil, + Ready: false, + }, + }, + expectedResult: true, + }, + { + // Container that has started (startup probe passed) + name: "started-true", + containerStatuses: []corev1.ContainerStatus{ + { + Name: "container-1", + Started: ptr.To(true), + Ready: false, + }, + }, + expectedResult: false, + }, + { + // Container that has started and is ready + name: "started-and-ready", + containerStatuses: []corev1.ContainerStatus{ + { + Name: "container-1", + Started: ptr.To(true), + Ready: true, + }, + }, + expectedResult: false, + }, + { + // No container statuses + name: "no-containers", + containerStatuses: []corev1.ContainerStatus{}, + expectedResult: false, + }, + { + // Multiple containers — one started, one not started + name: "multiple-containers-one-not-started", + containerStatuses: []corev1.ContainerStatus{ + { + Name: "container-1", + Started: ptr.To(true), + Ready: true, + }, + { + Name: "container-2", + Started: ptr.To(false), + Ready: false, + }, + }, + expectedResult: true, + }, + { + // Multiple containers — all started + name: "multiple-containers-all-started", + containerStatuses: []corev1.ContainerStatus{ + { + Name: "container-1", + Started: ptr.To(true), + Ready: true, + }, + { + Name: "container-2", + Started: ptr.To(true), + Ready: false, + }, + }, + expectedResult: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: testPodName, + Namespace: testPodNamespace, + }, + Status: corev1.PodStatus{ + ContainerStatuses: tc.containerStatuses, + }, + } + assert.Equal(t, tc.expectedResult, HasAnyContainerNotStarted(pod)) + }) + } +} + // TestGetContainerStatusIfTerminatedErroneously tests finding containers that terminated with non-zero exit code. func TestGetContainerStatusIfTerminatedErroneously(t *testing.T) { testCases := []struct {