Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ import (

// updateWork encapsulates the information needed to perform a rolling update of pods in a PodClique.
type updateWork struct {
oldTemplateHashPendingPods []*corev1.Pod
oldTemplateHashUnhealthyPods []*corev1.Pod
oldTemplateHashReadyPods []*corev1.Pod
newTemplateHashReadyPods []*corev1.Pod
oldTemplateHashPendingPods []*corev1.Pod
oldTemplateHashUnhealthyPods []*corev1.Pod
oldTemplateHashStartingPods []*corev1.Pod
oldTemplateHashUncategorizedPods []*corev1.Pod
oldTemplateHashReadyPods []*corev1.Pod
newTemplateHashReadyPods []*corev1.Pod
}

// getPodNamesPendingUpdate returns names of pods with old template hash that are not already being deleted
func (w *updateWork) getPodNamesPendingUpdate(deletionExpectedPodUIDs []types.UID) []string {
allOldPods := lo.Union(w.oldTemplateHashPendingPods, w.oldTemplateHashUnhealthyPods, w.oldTemplateHashReadyPods)
allOldPods := lo.Union(w.oldTemplateHashPendingPods, w.oldTemplateHashUnhealthyPods, w.oldTemplateHashStartingPods, w.oldTemplateHashUncategorizedPods, w.oldTemplateHashReadyPods)
return lo.FilterMap(allOldPods, func(pod *corev1.Pod, _ int) (string, bool) {
if slices.Contains(deletionExpectedPodUIDs, pod.UID) {
return "", false
Expand All @@ -73,8 +75,8 @@ func (w *updateWork) getNextPodToUpdate() *corev1.Pod {
func (r _resource) processPendingUpdates(logger logr.Logger, sc *syncContext) error {
work := r.computeUpdateWork(logger, sc)
pclq := sc.pclq
// always delete pods that have old pod template hash and are either Pending or Unhealthy.
if err := r.deleteOldPendingAndUnhealthyPods(logger, sc, work); err != nil {
// Always delete old-hash pods that are not Ready (pending, unhealthy, starting, or uncategorized).
if err := r.deleteOldNonReadyPods(logger, sc, work); err != nil {
return err
}

Expand Down Expand Up @@ -132,24 +134,34 @@ func (r _resource) processPendingUpdates(logger logr.Logger, sc *syncContext) er
return r.markRollingUpdateEnd(sc.ctx, logger, pclq)
}

// computeUpdateWork analyzes existing pods and categorizes them by template hash and health status for update planning
// computeUpdateWork categorizes pods by template hash and state.
// Old-hash pods: Pending, Unhealthy, Starting, Uncategorized, or Ready.
// New-hash pods: Ready only.
func (r _resource) computeUpdateWork(logger logr.Logger, sc *syncContext) *updateWork {
work := &updateWork{}
for _, pod := range sc.existingPCLQPods {
if pod.Labels[common.LabelPodTemplateHash] != sc.expectedPodTemplateHash {
// check if the pod has already been marked for deletion
// Old-hash pod — skip if deletion already in flight.
if r.hasPodDeletionBeenTriggered(sc, pod) {
logger.Info("skipping old Pod since its deletion has already been triggered", "pod", client.ObjectKeyFromObject(pod))
continue
}
// Pending, unhealthy, starting, and uncategorized pods are deleted immediately;
// ready pods are queued for ordered one-at-a-time replacement.
if k8sutils.IsPodPending(pod) {
work.oldTemplateHashPendingPods = append(work.oldTemplateHashPendingPods, pod)
} else if k8sutils.HasAnyStartedButNotReadyContainer(pod) || k8sutils.HasAnyContainerExitedErroneously(logger, pod) {
work.oldTemplateHashUnhealthyPods = append(work.oldTemplateHashUnhealthyPods, pod)
} else if k8sutils.IsPodReady(pod) {
work.oldTemplateHashReadyPods = append(work.oldTemplateHashReadyPods, pod)
} else if k8sutils.HasAnyContainerNotStarted(pod) {
work.oldTemplateHashStartingPods = append(work.oldTemplateHashStartingPods, pod)
} else {
work.oldTemplateHashUncategorizedPods = append(work.oldTemplateHashUncategorizedPods, pod)
}
} else {
// New-hash pod — only count as ready; non-ready pods are not tracked so
// isCurrentPodUpdateComplete won't prematurely declare success.
if k8sutils.IsPodReady(pod) {
work.newTemplateHashReadyPods = append(work.newTemplateHashReadyPods, pod)
}
Expand All @@ -163,24 +175,37 @@ func (r _resource) hasPodDeletionBeenTriggered(sc *syncContext, pod *corev1.Pod)
return k8sutils.IsResourceTerminating(pod.ObjectMeta) || r.expectationsStore.HasDeleteExpectation(sc.pclqExpectationsStoreKey, pod.GetUID())
}

// deleteOldPendingAndUnhealthyPods removes pods with old template hash that are pending or unhealthy
func (r _resource) deleteOldPendingAndUnhealthyPods(logger logr.Logger, sc *syncContext, work *updateWork) error {
// deleteOldNonReadyPods removes old-hash pods that are not Ready: pending, unhealthy, starting (startup probe),
// or uncategorized (unknown state). All of these are safe to delete immediately since they are not serving traffic
// and will be replaced with pods having the correct template hash.
func (r _resource) deleteOldNonReadyPods(logger logr.Logger, sc *syncContext, work *updateWork) error {
var deletionTasks []utils.Task
if len(work.oldTemplateHashPendingPods) > 0 {
deletionTasks = append(deletionTasks, r.createPodDeletionTasks(logger, sc.pclq, work.oldTemplateHashPendingPods, sc.pclqExpectationsStoreKey)...)
}
if len(work.oldTemplateHashUnhealthyPods) > 0 {
deletionTasks = append(deletionTasks, r.createPodDeletionTasks(logger, sc.pclq, work.oldTemplateHashUnhealthyPods, sc.pclqExpectationsStoreKey)...)
}
if len(work.oldTemplateHashStartingPods) > 0 {
deletionTasks = append(deletionTasks, r.createPodDeletionTasks(logger, sc.pclq, work.oldTemplateHashStartingPods, sc.pclqExpectationsStoreKey)...)
}
if len(work.oldTemplateHashUncategorizedPods) > 0 {
logger.Info("found old-hash pods in an unrecognized state, deleting them",
"unexpected", true,
"pods", componentutils.PodsToObjectNames(work.oldTemplateHashUncategorizedPods))
deletionTasks = append(deletionTasks, r.createPodDeletionTasks(logger, sc.pclq, work.oldTemplateHashUncategorizedPods, sc.pclqExpectationsStoreKey)...)
}

if len(deletionTasks) == 0 {
logger.Info("no pending or unhealthy pods having old PodTemplateHash found")
logger.Info("no non-ready pods having old PodTemplateHash found")
return nil
}

logger.Info("triggering deletion of pending and unhealthy pods with old pod template hash in order to update",
logger.Info("triggering deletion of non-ready pods with old pod template hash in order to update",
"oldPendingPods", componentutils.PodsToObjectNames(work.oldTemplateHashPendingPods),
"oldUnhealthyPods", componentutils.PodsToObjectNames(work.oldTemplateHashUnhealthyPods))
"oldUnhealthyPods", componentutils.PodsToObjectNames(work.oldTemplateHashUnhealthyPods),
"oldStartingPods", componentutils.PodsToObjectNames(work.oldTemplateHashStartingPods),
"oldUncategorizedPods", componentutils.PodsToObjectNames(work.oldTemplateHashUncategorizedPods))
if runResult := utils.RunConcurrently(sc.ctx, logger, deletionTasks); runResult.HasErrors() {
err := runResult.GetAggregatedError()
pclqObjectKey := client.ObjectKeyFromObject(sc.pclq)
Expand All @@ -191,7 +216,7 @@ func (r _resource) deleteOldPendingAndUnhealthyPods(logger logr.Logger, sc *sync
fmt.Sprintf("failed to delete Pods for PodClique %v", pclqObjectKey),
)
}
logger.Info("successfully deleted pods having old PodTemplateHash and in either Pending or Unhealthy state")
logger.Info("successfully deleted non-ready pods having old PodTemplateHash")
return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
Copyright 2025 The Grove Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"fmt"
"testing"

"github.com/ai-dynamo/grove/operator/api/common"
"github.com/ai-dynamo/grove/operator/internal/expect"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
)

const (
testNewHash = "new-hash-abc"
testOldHash = "old-hash-xyz"
testNS = "test-ns"
)

// newTestPod creates a pod with the given template hash label and options applied.
func newTestPod(templateHash string, opts ...func(*corev1.Pod)) *corev1.Pod {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "p",
Namespace: testNS,
Labels: map[string]string{
common.LabelPodTemplateHash: templateHash,
},
},
}
for _, opt := range opts {
opt(pod)
}
return pod
}

func withPhase(phase corev1.PodPhase) func(*corev1.Pod) {
return func(pod *corev1.Pod) { pod.Status.Phase = phase }
}

func withReadyCondition() func(*corev1.Pod) {
return func(pod *corev1.Pod) {
pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{
Type: corev1.PodReady, Status: corev1.ConditionTrue,
})
}
}

func withContainerStatus(started *bool, ready bool) func(*corev1.Pod) {
return func(pod *corev1.Pod) {
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{
Name: "main", Started: started, Ready: ready,
})
}
}

func withErroneousExit() func(*corev1.Pod) {
return func(pod *corev1.Pod) {
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{
Name: "main",
LastTerminationState: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{ExitCode: 1},
},
})
}
}

func withDeletionTimestamp() func(*corev1.Pod) {
return func(pod *corev1.Pod) {
now := metav1.Now()
pod.DeletionTimestamp = &now
pod.Finalizers = []string{"fake.finalizer/test"}
}
}

// bucket identifies which updateWork bucket a pod should land in.
type bucket int

const (
bucketOldPending bucket = iota
bucketOldUnhealthy
bucketOldStarting
bucketOldUncategorized
bucketOldReady
bucketNewReady
bucketSkipped // terminating pods — not in any bucket
)

func TestComputeUpdateWork(t *testing.T) {
tests := []struct {
name string
pod *corev1.Pod
expected bucket
}{
{"old pending", newTestPod(testOldHash, withPhase(corev1.PodPending)), bucketOldPending},
{"old unhealthy (started, not ready)", newTestPod(testOldHash, withPhase(corev1.PodRunning), withContainerStatus(ptr.To(true), false)), bucketOldUnhealthy},
{"old unhealthy (erroneous exit)", newTestPod(testOldHash, withPhase(corev1.PodRunning), withErroneousExit()), bucketOldUnhealthy},
{"old ready", newTestPod(testOldHash, withPhase(corev1.PodRunning), withReadyCondition(), withContainerStatus(ptr.To(true), true)), bucketOldReady},
{"old starting (Started=false)", newTestPod(testOldHash, withPhase(corev1.PodRunning), withContainerStatus(ptr.To(false), false)), bucketOldStarting},
{"old starting (Started=nil)", newTestPod(testOldHash, withPhase(corev1.PodRunning), withContainerStatus(nil, false)), bucketOldStarting},
{"old uncategorized (no containers)", newTestPod(testOldHash, withPhase(corev1.PodRunning)), bucketOldUncategorized},
{"old terminating is skipped", newTestPod(testOldHash, withDeletionTimestamp()), bucketSkipped},
{"new ready", newTestPod(testNewHash, withPhase(corev1.PodRunning), withReadyCondition(), withContainerStatus(ptr.To(true), true)), bucketNewReady},
{"new not-ready is not tracked", newTestPod(testNewHash, withPhase(corev1.PodRunning), withContainerStatus(ptr.To(false), false)), bucketSkipped},
}

r := _resource{expectationsStore: expect.NewExpectationsStore()}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sc := &syncContext{
existingPCLQPods: []*corev1.Pod{tt.pod},
expectedPodTemplateHash: testNewHash,
pclqExpectationsStoreKey: "test-key",
}
work := r.computeUpdateWork(logr.Discard(), sc)

bucketPods := map[bucket][]*corev1.Pod{
bucketOldPending: work.oldTemplateHashPendingPods,
bucketOldUnhealthy: work.oldTemplateHashUnhealthyPods,
bucketOldStarting: work.oldTemplateHashStartingPods,
bucketOldUncategorized: work.oldTemplateHashUncategorizedPods,
bucketOldReady: work.oldTemplateHashReadyPods,
bucketNewReady: work.newTemplateHashReadyPods,
}

bucketNames := map[bucket]string{
bucketOldPending: "oldPending",
bucketOldUnhealthy: "oldUnhealthy",
bucketOldStarting: "oldStarting",
bucketOldUncategorized: "oldUncategorized",
bucketOldReady: "oldReady",
bucketNewReady: "newReady",
}
for b, pods := range bucketPods {
name := bucketNames[b]
if b == tt.expected {
assert.Len(t, pods, 1, fmt.Sprintf("expected pod in bucket %s", name))
} else {
assert.Empty(t, pods, fmt.Sprintf("expected no pods in bucket %s", name))
}
}
})
}
}
13 changes: 13 additions & 0 deletions operator/internal/utils/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ func HasAnyStartedButNotReadyContainer(pod *corev1.Pod) bool {
return false
}

// HasAnyContainerNotStarted checks if any container has not yet passed its startup probe.
// In Kubernetes, Started remains false (or nil) until the startup probe succeeds.
// This distinguishes pods in the startup probe phase from those that have started but are not ready.
// Returns false if there are no container statuses (the pod would be uncategorized by the caller).
func HasAnyContainerNotStarted(pod *corev1.Pod) bool {
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.Started == nil || !*containerStatus.Started {
return true
}
}
return false
}

// ComputeHash computes a hash given one or more corev1.PodTemplateSpec.
func ComputeHash(podTemplateSpecs ...*corev1.PodTemplateSpec) string {
podTemplateSpecHasher := fnv.New64a()
Expand Down
Loading
Loading