Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -396,7 +397,10 @@ func hasPodGangSchedulingGate(pod *corev1.Pod) bool {
// createPods creates the specified number of new pods for the PodClique with proper indexing and concurrency control
func (r _resource) createPods(ctx context.Context, logger logr.Logger, sc *syncContext, numPods int) (int, error) {
// Pre-calculate all needed indices to avoid race conditions
availableIndices, err := index.GetAvailableIndices(logger, sc.existingPCLQPods, numPods)
// Indices assigned to in-flight creations (not yet observed in cache) must be reserved so we do not assign them again.
reservedIndices := sets.New(r.expectationsStore.GetCreateExpectationIndices(sc.pclqExpectationsStoreKey)...)

availableIndices, err := index.GetAvailableIndices(logger, sc.existingPCLQPods, reservedIndices, numPods)
if err != nil {
return 0, groveerr.WrapError(err,
errCodeGetAvailablePodHostNameIndices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r _resource) createPodCreationTask(logger logr.Logger, pcs *grovecorev1alp
)
}
logger.Info("Created Pod for PodClique", "podName", pod.Name, "podUID", pod.GetUID())
if err := r.expectationsStore.ExpectCreations(logger, pclqExpectationsKey, pod.GetUID()); err != nil {
if err := r.expectationsStore.ExpectCreations(logger, pclqExpectationsKey, pod.GetUID(), podHostNameIndex); err != nil {
utilruntime.HandleErrorWithLogger(logger, err, "could not record create expectations for Pod", "pclqObjKey", pclqObjKey, "pod", pod.Name)
}
r.eventRecorder.Eventf(pclq, corev1.EventTypeNormal, constants.ReasonPodCreateSuccessful, "Created Pod: %s", pod.Name)
Expand Down
60 changes: 55 additions & 5 deletions operator/internal/expect/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type ControlleeExpectations struct {
uidsToDelete sets.Set[types.UID]
// uidsToAdd are the set of resource UIDs that are expected to be created.
uidsToAdd sets.Set[types.UID]
// creationIndices maps UID to the index (e.g. hostname index) assigned to that creation. Optional; only set when
// ExpectCreations is used so that callers can reserve those indices until the creation is observed.
creationIndices map[types.UID]int
}

// NewExpectationsStore creates a new expectations store.
Expand All @@ -70,14 +73,16 @@ func NewExpectationsStore() *ExpectationsStore {
}
}

// ExpectCreations records resource creation expectations for uids for a controlled resource identified by a controlleeKey.
func (s *ExpectationsStore) ExpectCreations(logger logr.Logger, controlleeKey string, uids ...types.UID) error {
return s.createOrRaiseExpectations(logger, controlleeKey, uids, nil)
// ExpectCreations records a creation expectation with the given index (e.g. hostname index) so that
// GetCreateExpectationIndices can return it. Call this when the created resource is assigned a specific index
// that must be reserved until the creation is observed (e.g. to avoid assigning the same index to another resource).
func (s *ExpectationsStore) ExpectCreations(logger logr.Logger, controlleeKey string, uid types.UID, index int) error {
return s.createOrRaiseExpectations(logger, controlleeKey, []types.UID{uid}, nil, map[types.UID]int{uid: index})
}

// ExpectDeletions records resource deletion expectations for uids for a controlled resource identified by a controlleeKey.
func (s *ExpectationsStore) ExpectDeletions(logger logr.Logger, controlleeKey string, uids ...types.UID) error {
return s.createOrRaiseExpectations(logger, controlleeKey, nil, uids)
return s.createOrRaiseExpectations(logger, controlleeKey, nil, uids, nil)
}

// ObserveDeletions lowers the delete expectations removing the uids passed.
Expand Down Expand Up @@ -123,6 +128,9 @@ func (s *ExpectationsStore) SyncExpectations(controlleeKey string, existingNonTe
// Remove the UIDs from `uidsToAdd` if the informer cache is already up-to-date and certain events have
// been missed/dropped by the watch resulting in missed calls to `CreationObserved`.
exp.uidsToAdd.Delete(existingNonTerminatingUIDs...)
for _, uid := range existingNonTerminatingUIDs {
delete(exp.creationIndices, uid)
}
// Remove stale entries in `uidsToDelete` if the `existingUIDS` no longer has those UIDs.
staleUIDs := exp.uidsToDelete.Difference(sets.New(existingNonTerminatingUIDs...))
exp.uidsToDelete.Delete(staleUIDs.UnsortedList()...)
Expand All @@ -144,6 +152,22 @@ func (s *ExpectationsStore) GetCreateExpectations(controlleeKey string) []types.
return nil
}

// GetCreateExpectationIndices returns the set of indices (e.g. hostname indices) for creations that have not yet been
// synced, when those were recorded via ExpectCreations. Returns nil if none or if no indices were recorded.
func (s *ExpectationsStore) GetCreateExpectationIndices(controlleeKey string) []int {
exp, exists, _ := s.GetExpectations(controlleeKey)
if !exists || exp.creationIndices == nil {
return nil
}
var out []int
for uid := range exp.uidsToAdd {
if idx, ok := exp.creationIndices[uid]; ok {
out = append(out, idx)
}
}
return out
}

// GetDeleteExpectations is a convenience method which gives a slice of resource UIDs for which deletion has not yet been synced
// in the informer cache.
func (s *ExpectationsStore) GetDeleteExpectations(controlleeKey string) []types.UID {
Expand All @@ -162,7 +186,9 @@ func (s *ExpectationsStore) HasDeleteExpectation(controlleeKey string, uid types
}

// createOrRaiseExpectations creates or raises create/delete expectations for the given controlleeKey.
func (s *ExpectationsStore) createOrRaiseExpectations(logger logr.Logger, controlleeKey string, uidsToAdd, uidsToDelete []types.UID) error {
// creationIndexByUID is optional: when non-nil, each uid in uidsToAdd that has an entry will have that index stored
// for GetCreateExpectationIndices (e.g. hostname index to reserve until the creation is observed).
func (s *ExpectationsStore) createOrRaiseExpectations(logger logr.Logger, controlleeKey string, uidsToAdd, uidsToDelete []types.UID, creationIndexByUID map[types.UID]int) error {
s.mu.Lock()
defer s.mu.Unlock()
exp, exists, err := s.GetExpectations(controlleeKey)
Expand All @@ -175,12 +201,33 @@ func (s *ExpectationsStore) createOrRaiseExpectations(logger logr.Logger, contro
uidsToAdd: sets.New(uidsToAdd...),
uidsToDelete: sets.New(uidsToDelete...),
}
if len(creationIndexByUID) > 0 {
exp.creationIndices = make(map[types.UID]int)
for _, uid := range uidsToAdd {
if idx, ok := creationIndexByUID[uid]; ok {
exp.creationIndices[uid] = idx
}
}
}
logger.Info("created expectations for controller resource", "controlleeKey", controlleeKey, "uidsToAdd", uidsToAdd, "uidsToDelete", uidsToDelete)
} else {
exp.uidsToAdd.Insert(uidsToAdd...)
if len(creationIndexByUID) > 0 {
if exp.creationIndices == nil {
exp.creationIndices = make(map[types.UID]int)
}
for _, uid := range uidsToAdd {
if idx, ok := creationIndexByUID[uid]; ok {
exp.creationIndices[uid] = idx
}
}
}
// If there are UIDs in uidsToDelete that also have a presence in uidsToAdd then remove these UIDs from uidsToAdd
// as those add expectations are now no longer valid.
exp.uidsToAdd.Delete(uidsToDelete...)
for _, uid := range uidsToDelete {
delete(exp.creationIndices, uid)
}
exp.uidsToDelete.Insert(uidsToDelete...)
logger.Info("raised expectations for controller resource", "controlleeKey", controlleeKey, "uidsToAdd", uidsToAdd, "uidsToDelete", uidsToDelete)
}
Expand All @@ -193,6 +240,9 @@ func (s *ExpectationsStore) lowerExpectations(logger logr.Logger, controlleeKey
defer s.mu.Unlock()
if exp, exists, _ := s.GetExpectations(controlleeKey); exists {
exp.uidsToAdd.Delete(addUIDs...)
for _, uid := range addUIDs {
delete(exp.creationIndices, uid)
}
exp.uidsToDelete.Delete(deleteUIDs...)
logger.Info("lowered expectations for controlled resource", "controlleeKey", controlleeKey, "addUIDs", addUIDs, "deleteUIDs", deleteUIDs)
}
Expand Down
28 changes: 17 additions & 11 deletions operator/internal/expect/expectations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,9 @@ func TestExpectCreations(t *testing.T) {
if tc.existingUIDs != nil {
assert.NoError(t, initializeControlleeExpectations(expStore, controlleeKey, tc.existingUIDs, nil))
}
// test the method
wg := sync.WaitGroup{}
wg.Add(len(tc.newUIDs))
for _, uid := range tc.newUIDs {
go func() {
defer wg.Done()
err := expStore.ExpectCreations(logr.Discard(), controlleeKey, uid)
assert.NoError(t, err)
}()
for i, uid := range tc.newUIDs {
assert.NoError(t, expStore.ExpectCreations(logr.Discard(), controlleeKey, uid, i))
}
wg.Wait()
// compare the expected with actual
assert.ElementsMatch(t, tc.expectedCreateExpectationUIDs, expStore.GetCreateExpectations(controlleeKey))
})
}
Expand Down Expand Up @@ -233,6 +224,21 @@ func TestSyncExpectations(t *testing.T) {
}
}

func TestExpectCreations_with_GetCreateExpectationIndices(t *testing.T) {
expStore := NewExpectationsStore()
assert.NoError(t, expStore.ExpectCreations(logr.Discard(), controlleeKey, types.UID("uid-1"), 2))
assert.NoError(t, expStore.ExpectCreations(logr.Discard(), controlleeKey, types.UID("uid-2"), 3))

assert.ElementsMatch(t, []types.UID{"uid-1", "uid-2"}, expStore.GetCreateExpectations(controlleeKey))
indices := expStore.GetCreateExpectationIndices(controlleeKey)
assert.Len(t, indices, 2)
assert.ElementsMatch(t, []int{2, 3}, indices)

// After sync (observe uid-1), only uid-2's index remains reserved
expStore.SyncExpectations(controlleeKey, []types.UID{"uid-1"}, nil)
assert.ElementsMatch(t, []int{3}, expStore.GetCreateExpectationIndices(controlleeKey))
}

func initializeControlleeExpectations(expStore *ExpectationsStore, controlleeKey string, uidsToAdd, uidsToDelete []types.UID) error {
return expStore.Add(&ControlleeExpectations{
key: controlleeKey,
Expand Down
12 changes: 8 additions & 4 deletions operator/internal/index/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,19 @@ import (
)

// GetAvailableIndices returns the `requiredIndicesCount` available indices for pods.
// Extracts indices from active pod hostnames and returns the available indices,
// Extracts indices from hostnames of active pods and returns the available indices,
// filling holes from lowest to highest (starting from 0).
// It considers as used: indices from active existing pods (by hostname) and any reservedIndices
// (e.g. indices assigned to in-flight creations not yet observed in the pod list).
// Available indices are filled from lowest to highest (starting from 0).
// Active pods are those that are not terminating, not permanently failed (RestartPolicy=Never), and not succeeded.
func GetAvailableIndices(logger logr.Logger, existingPods []*corev1.Pod, requiredIndicesCount int) ([]int, error) {
// reservedIndices can be nil; it is used to reserve indices that are already assigned but not yet visible in existingPods.
func GetAvailableIndices(logger logr.Logger, existingPods []*corev1.Pod, reservedIndices sets.Set[int], requiredIndicesCount int) ([]int, error) {
usedIndices, err := extractUsedIndices(logger, existingPods)
if err != nil {
return nil, err
}
if len(reservedIndices) > 0 {
usedIndices = usedIndices.Union(reservedIndices)
}
return findAvailableIndices(&usedIndices, requiredIndicesCount), nil
}

Expand Down
22 changes: 18 additions & 4 deletions operator/internal/index/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// ==================== GetAvailableIndices Tests ====================

func TestGetNextAvailableIndices_EmptyPods(t *testing.T) {
indices, err := GetAvailableIndices(logr.Logger{}, []*corev1.Pod{}, 3)
indices, err := GetAvailableIndices(logr.Logger{}, []*corev1.Pod{}, nil, 3)

assert.NoError(t, err)
assert.Equal(t, []int{0, 1, 2}, indices)
Expand All @@ -45,7 +45,7 @@ func TestGetNextAvailableIndices_WithExistingPods(t *testing.T) {
createTestPod("pod-c", "test-clique-4"),
}

indices, err := GetAvailableIndices(logr.Logger{}, pods, 3)
indices, err := GetAvailableIndices(logr.Logger{}, pods, nil, 3)

assert.NoError(t, err)
// Should fill holes: 1, 3, 5
Expand All @@ -59,7 +59,7 @@ func TestGetNextAvailableIndices_Sequential(t *testing.T) {
createTestPod("pod-c", "test-clique-2"),
}

indices, err := GetAvailableIndices(logr.Logger{}, pods, 2)
indices, err := GetAvailableIndices(logr.Logger{}, pods, nil, 2)

assert.NoError(t, err)
// Should continue sequence: 3, 4
Expand All @@ -74,13 +74,27 @@ func TestGetNextAvailableIndices_InvalidHostnames(t *testing.T) {
createTestPod("pod-valid2", "test-clique-2"),
}

indices, err := GetAvailableIndices(logr.Logger{}, pods, 3)
indices, err := GetAvailableIndices(logr.Logger{}, pods, nil, 3)

// Should return error for invalid hostname
assert.Error(t, err)
assert.Nil(t, indices)
}

func TestGetAvailableIndices_WithReservedIndices(t *testing.T) {
// Existing pods have indices 0, 1. Reserved (in-flight) has 2, 3. So available for 2 new pods should be 4, 5.
pods := []*corev1.Pod{
createTestPod("pod-a", "test-clique-0"),
createTestPod("pod-b", "test-clique-1"),
}
reserved := sets.New[int](2, 3)

indices, err := GetAvailableIndices(logr.Logger{}, pods, reserved, 2)

assert.NoError(t, err)
assert.Equal(t, []int{4, 5}, indices)
}

// ==================== extractUsedIndices Tests ====================
func TestExtractUsedIndices_ValidPods(t *testing.T) {
pods := []*corev1.Pod{
Expand Down
Loading