Skip to content

Commit a6d180c

Browse files
committed
DRA: validate set of devices in a pool before using the pool
The ResourceSlice controller (theoretically) might end up creating too many slices if it syncs again before its informer cache was updated. This could cause the scheduler to allocate a device from a duplicated slice. They should be identical, but its still better to fail and wait until the controller removes the redundant slice.
1 parent 2665037 commit a6d180c

File tree

4 files changed

+69
-5
lines changed

4 files changed

+69
-5
lines changed

staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,20 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
578578
Devices: pool.Slices[i].Devices,
579579
},
580580
}
581+
582+
// It can happen that we create a missing slice, some
583+
// other change than the create causes another sync of
584+
// the pool, and then a second slice for the same set
585+
// of devices gets created because the controller has
586+
// no copy of the first slice instance in its informer
587+
// cache yet.
588+
//
589+
// This is not a problem: a client will either see one
590+
// of the two slices and use it or see both and do
591+
// nothing because of the duplicated device IDs.
592+
//
593+
// To avoid creating a second slice, we would have to use a
594+
// https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache.
581595
logger.V(5).Info("Creating new resource slice")
582596
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
583597
return fmt.Errorf("create resource slice: %w", err)

staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
187187
if pool.IsIncomplete {
188188
return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently being updated", klog.KObj(claim), request.Name, pool.PoolID)
189189
}
190+
if pool.IsInvalid {
191+
return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently invalid", klog.KObj(claim), request.Name, pool.PoolID)
192+
}
190193

191194
for _, slice := range pool.Slices {
192195
for deviceIndex := range slice.Spec.Devices {
@@ -574,6 +577,13 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) {
574577
continue
575578
}
576579

580+
// If the pool is not valid, then fail now. It's okay when pools of one driver
581+
// are invalid if we allocate from some other pool, but it's not safe to
582+
// allocated from an invalid pool.
583+
if pool.IsInvalid {
584+
return false, fmt.Errorf("pool %s is invalid: %s", pool.Pool, pool.InvalidReason)
585+
}
586+
577587
// Finally treat as allocated and move on to the next device.
578588
allocated, deallocate, err := alloc.allocateDevice(r, slice.Spec.Devices[deviceIndex].Basic, deviceID, false)
579589
if err != nil {

staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,28 @@ func TestAllocator(t *testing.T) {
529529
deviceAllocationResult(req0, driverA, pool1, device1),
530530
)},
531531
},
532+
"duplicate-slice": {
533+
claimsToAllocate: objects(claim(claim0, req0, classA)),
534+
classes: objects(class(classA, driverA)),
535+
slices: func() []*resourceapi.ResourceSlice {
536+
// This simulates the problem that can
537+
// (theoretically) occur when the resource
538+
// slice controller wants to publish a pool
539+
// with two slices but ends up creating some
540+
// identical slices under different names
541+
// because its informer cache was out-dated on
542+
// another sync (see
543+
// resourceslicecontroller.go).
544+
sliceA := sliceWithOneDevice(slice1, node1, pool1, driverA)
545+
sliceA.Spec.Pool.ResourceSliceCount = 2
546+
sliceB := sliceA.DeepCopy()
547+
sliceB.Name += "-2"
548+
return []*resourceapi.ResourceSlice{sliceA, sliceB}
549+
}(),
550+
node: node(node1, region1),
551+
552+
expectError: gomega.MatchError(gomega.ContainSubstring(fmt.Sprintf("pool %s is invalid: duplicate device name %s", pool1, device1))),
553+
},
532554
"no-slices": {
533555
claimsToAllocate: objects(claim(claim0, req0, classA)),
534556
classes: objects(class(classA, driverA)),

staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,17 @@ import (
2323
v1 "k8s.io/api/core/v1"
2424
resourceapi "k8s.io/api/resource/v1alpha3"
2525
"k8s.io/apimachinery/pkg/labels"
26+
"k8s.io/apimachinery/pkg/util/sets"
2627
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
2728
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
2829
)
2930

3031
// GatherPools collects information about all resource pools which provide
3132
// devices that are accessible from the given node.
3233
//
33-
// Out-dated slices are silently ignored. Pools may be incomplete, which is
34-
// recorded in the result.
34+
// Out-dated slices are silently ignored. Pools may be incomplete (not all
35+
// required slices available) or invalid (for example, device names not unique).
36+
// Both is recorded in the result.
3537
func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceLister, node *v1.Node) ([]*Pool, error) {
3638
pools := make(map[PoolID]*Pool)
3739

@@ -75,6 +77,7 @@ func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceL
7577
result := make([]*Pool, 0, len(pools))
7678
for _, pool := range pools {
7779
pool.IsIncomplete = int64(len(pool.Slices)) != pool.Slices[0].Spec.Pool.ResourceSliceCount
80+
pool.IsInvalid, pool.InvalidReason = poolIsInvalid(pool)
7881
result = append(result, pool)
7982
}
8083

@@ -101,17 +104,32 @@ func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) {
101104

102105
if slice.Spec.Pool.Generation > pool.Slices[0].Spec.Pool.Generation {
103106
// Newer, replaces all old slices.
104-
pool.Slices = []*resourceapi.ResourceSlice{slice}
107+
pool.Slices = nil
105108
}
106109

107110
// Add to pool.
108111
pool.Slices = append(pool.Slices, slice)
109112
}
110113

114+
func poolIsInvalid(pool *Pool) (bool, string) {
115+
devices := sets.New[string]()
116+
for _, slice := range pool.Slices {
117+
for _, device := range slice.Spec.Devices {
118+
if devices.Has(device.Name) {
119+
return true, fmt.Sprintf("duplicate device name %s", device.Name)
120+
}
121+
devices.Insert(device.Name)
122+
}
123+
}
124+
return false, ""
125+
}
126+
111127
type Pool struct {
112128
PoolID
113-
IsIncomplete bool
114-
Slices []*resourceapi.ResourceSlice
129+
IsIncomplete bool
130+
IsInvalid bool
131+
InvalidReason string
132+
Slices []*resourceapi.ResourceSlice
115133
}
116134

117135
type PoolID struct {

0 commit comments

Comments
 (0)