Skip to content
Closed
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]

### Added
- Added support for parentless queues by adopting them under a synthetic default root queue [#845](https://github.com/NVIDIA/KAI-Scheduler/pull/845) [gshaibi](https://github.com/gshaibi)
- Added the option to disable prometheus service monitor creation [#810](https://github.com/NVIDIA/KAI-Scheduler/pull/810) [itsomri](https://github.com/itsomri)
- Fixed prometheus instance deprecation - ensure single instance [#779](https://github.com/NVIDIA/KAI-Scheduler/pull/779) [itsomri](https://github.com/itsomri)
- Added clear error messages for jobs referencing missing or orphan queues, reporting via events and conditions [#820](https://github.com/NVIDIA/KAI-Scheduler/pull/820) [gshaibi](https://github.com/gshaibi)
Expand Down
62 changes: 54 additions & 8 deletions pkg/scheduler/cache/cluster_info/cluster_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestSnapshot(t *testing.T) {
},
},
expectedNodes: 1,
expectedQueues: 2,
expectedQueues: 3, // my-department, my-queue, and synthetic "default" root
expectedBindRequests: 1,
},
"SingleFromEach2": {
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestSnapshot(t *testing.T) {
},
},
expectedNodes: 1,
expectedQueues: 2,
expectedQueues: 3, // my-department, my-queue, and synthetic "default" root
},
}

Expand Down Expand Up @@ -1418,15 +1418,46 @@ func TestSnapshotQueues(t *testing.T) {
)
snapshot, err := clusterInfo.Snapshot()
assert.Nil(t, err)
assert.Equal(t, 2, len(snapshot.Queues))
// Expecting 3 queues: default (synthetic root), department0, and queue0
// department0-a is filtered out by nodepool label
assert.Equal(t, 3, len(snapshot.Queues))
assert.Equal(t, common_info.QueueID("queue0"), snapshot.Queues["queue0"].UID)
assert.Equal(t, common_info.QueueID("department0"), snapshot.Queues["department0"].UID)
assert.Equal(t, "queue0", snapshot.Queues["queue0"].Name)
assert.Equal(t, "department-zero", snapshot.Queues["department0"].Name)
assert.Equal(t, common_info.QueueID(""), snapshot.Queues["department0"].ParentQueue)
// Parentless queues (queues without a parent) are adopted by the synthetic "default" root queue
assert.Equal(t, common_info.QueueID("department0"), snapshot.Queues["queue0"].ParentQueue)
assert.Equal(t, []common_info.QueueID{"queue0"}, snapshot.Queues["department0"].ChildQueues)
assert.Equal(t, []common_info.QueueID{}, snapshot.Queues["queue0"].ChildQueues)
// Verify the default root queue exists
assert.Equal(t, common_info.QueueID(defaultQueueName), snapshot.Queues[defaultQueueName].UID)
}

func TestSnapshotQueues_TwoLevelHierarchyLimit(t *testing.T) {
// Create 3-level hierarchy: grandparent -> parent -> child
objs := []runtime.Object{
&enginev2.Queue{
ObjectMeta: metav1.ObjectMeta{Name: "grandparent"},
Spec: enginev2.QueueSpec{},
},
&enginev2.Queue{
ObjectMeta: metav1.ObjectMeta{Name: "parent"},
Spec: enginev2.QueueSpec{ParentQueue: "grandparent"},
},
&enginev2.Queue{
ObjectMeta: metav1.ObjectMeta{Name: "child"},
Spec: enginev2.QueueSpec{ParentQueue: "parent"},
},
}

clusterInfo := newClusterInfoTests(t, clusterInfoTestParams{kaiSchedulerObjects: objs})
snapshot, err := clusterInfo.Snapshot()
assert.Nil(t, err)

// Parent should be detached from grandparent to enforce 2-level limit
assert.Equal(t, common_info.QueueID(""), snapshot.Queues["parent"].ParentQueue)
assert.Equal(t, common_info.QueueID("parent"), snapshot.Queues["child"].ParentQueue)
}

func TestSnapshotFlatHierarchy(t *testing.T) {
Expand Down Expand Up @@ -1486,14 +1517,16 @@ func TestSnapshotFlatHierarchy(t *testing.T) {

snapshot, err := clusterInfo.Snapshot()
assert.Nil(t, err)
assert.Equal(t, 3, len(snapshot.Queues))
// In ProjectLevelFairness mode, all queues are flattened under the synthetic "default" root:
// default, department0, department1, queue0, queue1
assert.Equal(t, 5, len(snapshot.Queues))

defaultParentQueueId := common_info.QueueID(defaultQueueName)
parentQueue, found := snapshot.Queues[defaultParentQueueId]
defaultQueue, found := snapshot.Queues[defaultParentQueueId]
assert.True(t, found)
assert.Equal(t, parentQueue.Name, defaultQueueName)
assert.Equal(t, parentQueue.UID, defaultParentQueueId)
assert.Equal(t, parentQueue.Resources, queue_info.QueueQuota{
assert.Equal(t, defaultQueue.Name, defaultQueueName)
assert.Equal(t, defaultQueue.UID, defaultParentQueueId)
assert.Equal(t, defaultQueue.Resources, queue_info.QueueQuota{
GPU: queue_info.ResourceQuota{
Quota: -1,
OverQuotaWeight: 1,
Expand All @@ -1510,13 +1543,26 @@ func TestSnapshotFlatHierarchy(t *testing.T) {
Limit: -1,
},
})

// In flat hierarchy mode, all queues should have "default" as their parent
snapshotDept0, found := snapshot.Queues[common_info.QueueID(parentQueue0.Name)]
assert.True(t, found)
assert.Equal(t, snapshotDept0.ParentQueue, defaultParentQueueId)

snapshotDept1, found := snapshot.Queues[common_info.QueueID(parentQueue1.Name)]
assert.True(t, found)
assert.Equal(t, snapshotDept1.ParentQueue, defaultParentQueueId)

snapshotQueue0, found := snapshot.Queues[common_info.QueueID(queue0.Name)]
assert.True(t, found)
assert.Equal(t, snapshotQueue0.ParentQueue, defaultParentQueueId)

snapshotQueue1, found := snapshot.Queues[common_info.QueueID(queue1.Name)]
assert.True(t, found)
assert.Equal(t, snapshotQueue1.ParentQueue, defaultParentQueueId)

// Default queue should have all 4 user queues as children
assert.ElementsMatch(t, []common_info.QueueID{"department0", "department1", "queue0", "queue1"}, defaultQueue.ChildQueues)
}

func TestGetPodGroupPriority(t *testing.T) {
Expand Down
44 changes: 28 additions & 16 deletions pkg/scheduler/cache/cluster_info/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/common_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/api/queue_info"
"github.com/NVIDIA/KAI-scheduler/pkg/scheduler/log"

"github.com/samber/lo"
)

const (
defaultQueueName = "default"
defaultQueueName = "__default__"
)

func (c *ClusterInfo) getDefaultParentQueue() *queue_info.QueueInfo {
Expand Down Expand Up @@ -55,25 +57,33 @@ func (c *ClusterInfo) snapshotQueues() (map[common_info.QueueID]*queue_info.Queu
return nil, err
}

result := map[common_info.QueueID]*queue_info.QueueInfo{}
if c.fairnessLevelType == FullFairness {
for _, queue := range queues {
queueInfo := queue_info.NewQueueInfo(queue)
result[queueInfo.UID] = queueInfo
for _, queue := range queues {
// Adopt parentless queues (no parent specified) under the default root queue.
// In ProjectLevelFairness mode, flatten all queues under the default root.
if len(queue.Spec.ParentQueue) == 0 || c.fairnessLevelType == ProjectLevelFairness {
queue.Spec.ParentQueue = defaultQueueName
}
} else if c.fairnessLevelType == ProjectLevelFairness {
defaultParentQueue := c.getDefaultParentQueue()
result[defaultParentQueue.UID] = defaultParentQueue

for _, queue := range queues {
if len(queue.Spec.ParentQueue) > 0 {
queue.Spec.ParentQueue = defaultQueueName
queueInfo := queue_info.NewQueueInfo(queue)
result[queueInfo.UID] = queueInfo
}
}

queuesByName := lo.SliceToMap(queues, func(queue *enginev2.Queue) (string, *enginev2.Queue) {
return queue.Name, queue
})
// Detach queues from their grandparents to enforce 2-level hierarchy limit
// TODO: Remove this restriction when n-level hierarchy support is added
for _, queue := range queues {
if queue.Spec.ParentQueue != "" && queuesByName[queue.Spec.ParentQueue] != nil {
queuesByName[queue.Spec.ParentQueue].Spec.ParentQueue = ""
}
}

result := map[common_info.QueueID]*queue_info.QueueInfo{}
defaultParentQueue := c.getDefaultParentQueue()
result[defaultParentQueue.UID] = defaultParentQueue
for _, queue := range queues {
queueInfo := queue_info.NewQueueInfo(queue)
result[queueInfo.UID] = queueInfo
}

return result, nil
}

Expand Down Expand Up @@ -102,6 +112,8 @@ func updateQueueChildren(queues map[common_info.QueueID]*queue_info.QueueInfo) {
}
}

// cleanQueueOrphans removes queues that reference a non-existent parent queue
// (parentless queues are not considered orphans)
func cleanQueueOrphans(queues map[common_info.QueueID]*queue_info.QueueInfo) {
for queueId, queue := range queues {
if queue.ParentQueue != "" {
Expand Down
Loading