Skip to content

Commit 2665037

Browse files
committed
DRA resourceslice controller: support publishing multiple slices
The driver determines what each slice is meant to look like. The controller then ensures that only those slices exist. It reuses existing slices where the set of devices, as identified by their names, is the same as in some desired slice. Such slices get updated to match the desired state. In other words, attributes and the order of devices can be changed by updating an existing slice, but adding or removing a device is done by deleting and re-creating slices. Co-authored-by: googs1025 <[email protected]> The test update is partly based on kubernetes#127645.
1 parent 8b063a6 commit 2665037

File tree

5 files changed

+1175
-222
lines changed

5 files changed

+1175
-222
lines changed
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package workqueue
18+
19+
import (
20+
"maps"
21+
"slices"
22+
"sync"
23+
"time"
24+
25+
"k8s.io/client-go/util/workqueue"
26+
)
27+
28+
// TODO (pohly): move this to k8s.io/client-go/util/workqueue/mockqueue.go
29+
// if it turns out to be generally useful. Doc comments are already written
30+
// as if the code was there.
31+
32+
// MockQueue is an implementation of [TypedRateLimitingInterface] which
33+
// can be used to test a function which pulls work items out of a queue
34+
// and processes them. It is thread-safe.
35+
//
36+
// A null instance is directly usable. The usual usage is:
37+
//
38+
// var m workqueue.Mock[string]
39+
// m.SyncOne("some-item", func(queue workqueue.TypedRateLimitingInterface[string]) { ... } )
40+
// if diff := cmp.Diff(workqueue.MockState[string]{}, m.State()); diff != "" {
41+
// t.Errorf("unexpected state of mock work queue after sync (-want, +got):\n%s", diff)
42+
// }
43+
//
44+
// All slices get reset to nil when they become empty, so there are no spurious
45+
// differences because of nil vs. empty slice.
46+
type Mock[T comparable] struct {
47+
mutex sync.Mutex
48+
state MockState[T]
49+
}
50+
51+
type MockState[T comparable] struct {
52+
// Ready contains the items which are ready for processing.
53+
Ready []T
54+
55+
// InFlight contains the items which are currently being processed (= Get
56+
// was called, Done not yet).
57+
InFlight []T
58+
59+
// MismatchedDone contains the items for which Done was called without
60+
// a matching Get.
61+
MismatchedDone []T
62+
63+
// Later contains the items which are meant to be added to the queue after
64+
// a certain delay (= AddAfter was called for them). They appear in the
65+
// order in which AddAfter got called.
66+
Later []MockDelayedItem[T]
67+
68+
// Failures contains the items and their retry count which failed to be
69+
// processed (AddRateLimited called at least once, Forget not yet).
70+
// The retry count is always larger than zero.
71+
Failures map[T]int
72+
73+
// ShutDownCalled tracks how often ShutDown got called.
74+
ShutDownCalled int
75+
76+
// ShutDownWithDrainCalled tracks how often ShutDownWithDrain got called.
77+
ShutDownWithDrainCalled int
78+
}
79+
80+
// DeepCopy takes a snapshot of all slices. It cannot do a deep copy of the items in those slices,
81+
// but typically those keys are immutable.
82+
func (m MockState[T]) DeepCopy() *MockState[T] {
83+
m.Ready = slices.Clone(m.Ready)
84+
m.InFlight = slices.Clone(m.InFlight)
85+
m.MismatchedDone = slices.Clone(m.MismatchedDone)
86+
m.Later = slices.Clone(m.Later)
87+
m.Failures = maps.Clone(m.Failures)
88+
return &m
89+
}
90+
91+
// MockDelayedItem is an item which was queue for later processing.
92+
type MockDelayedItem[T comparable] struct {
93+
Item T
94+
Duration time.Duration
95+
}
96+
97+
// SyncOne adds the item to the work queue and calls sync.
98+
// That sync function can pull one or more items from the work
99+
// queue until the queue is empty. Then it is told that the queue
100+
// is shutting down, which must cause it to return.
101+
//
102+
// The test can then retrieve the state of the queue to check the result.
103+
func (m *Mock[T]) SyncOne(item T, sync func(workqueue.TypedRateLimitingInterface[T])) {
104+
// sync must run with the mutex not locked.
105+
defer sync(m)
106+
m.mutex.Lock()
107+
defer m.mutex.Unlock()
108+
109+
m.state.Ready = append(m.state.Ready, item)
110+
}
111+
112+
// State returns the current state of the queue.
113+
func (m *Mock[T]) State() MockState[T] {
114+
m.mutex.Lock()
115+
defer m.mutex.Unlock()
116+
117+
return *m.state.DeepCopy()
118+
}
119+
120+
// Add implements [TypedInterface].
121+
func (m *Mock[T]) Add(item T) {
122+
m.mutex.Lock()
123+
defer m.mutex.Unlock()
124+
125+
if !slices.Contains(m.state.Ready, item) {
126+
m.state.Ready = append(m.state.Ready, item)
127+
}
128+
}
129+
130+
// Len implements [TypedInterface].
131+
func (m *Mock[T]) Len() int {
132+
m.mutex.Lock()
133+
defer m.mutex.Unlock()
134+
135+
return len(m.state.Ready)
136+
}
137+
138+
// Get implements [TypedInterface].
139+
func (m *Mock[T]) Get() (item T, shutdown bool) {
140+
m.mutex.Lock()
141+
defer m.mutex.Unlock()
142+
143+
if len(m.state.Ready) == 0 {
144+
shutdown = true
145+
return
146+
}
147+
item = m.state.Ready[0]
148+
m.state.Ready = m.state.Ready[1:]
149+
if len(m.state.Ready) == 0 {
150+
m.state.Ready = nil
151+
}
152+
m.state.InFlight = append(m.state.InFlight, item)
153+
return item, false
154+
}
155+
156+
// Done implements [TypedInterface].
157+
func (m *Mock[T]) Done(item T) {
158+
m.mutex.Lock()
159+
defer m.mutex.Unlock()
160+
161+
index := slices.Index(m.state.InFlight, item)
162+
if index < 0 {
163+
m.state.MismatchedDone = append(m.state.MismatchedDone, item)
164+
}
165+
m.state.InFlight = slices.Delete(m.state.InFlight, index, index+1)
166+
if len(m.state.InFlight) == 0 {
167+
m.state.InFlight = nil
168+
}
169+
}
170+
171+
// ShutDown implements [TypedInterface].
172+
func (m *Mock[T]) ShutDown() {
173+
m.mutex.Lock()
174+
defer m.mutex.Unlock()
175+
176+
m.state.ShutDownCalled++
177+
}
178+
179+
// ShutDownWithDrain implements [TypedInterface].
180+
func (m *Mock[T]) ShutDownWithDrain() {
181+
m.mutex.Lock()
182+
defer m.mutex.Unlock()
183+
184+
m.state.ShutDownWithDrainCalled++
185+
}
186+
187+
// ShuttingDown implements [TypedInterface].
188+
func (m *Mock[T]) ShuttingDown() bool {
189+
m.mutex.Lock()
190+
defer m.mutex.Unlock()
191+
192+
return m.state.ShutDownCalled > 0 || m.state.ShutDownWithDrainCalled > 0
193+
}
194+
195+
// AddAfter implements [TypedDelayingInterface.AddAfter]
196+
func (m *Mock[T]) AddAfter(item T, duration time.Duration) {
197+
if duration == 0 {
198+
m.Add(item)
199+
return
200+
}
201+
202+
m.mutex.Lock()
203+
defer m.mutex.Unlock()
204+
205+
for i := range m.state.Later {
206+
if m.state.Later[i].Item == item {
207+
// https://github.com/kubernetes/client-go/blob/270e5ab1714527c455865953da8ceba2810dbb50/util/workqueue/delaying_queue.go#L340-L349
208+
// only shortens the delay for an existing item. It does not make it longer.
209+
if m.state.Later[i].Duration > duration {
210+
m.state.Later[i].Duration = duration
211+
}
212+
return
213+
}
214+
}
215+
216+
m.state.Later = append(m.state.Later, MockDelayedItem[T]{Item: item, Duration: duration})
217+
}
218+
219+
// AddRateLimited implements [TypedRateLimitingInterface.AddRateLimited].
220+
func (m *Mock[T]) AddRateLimited(item T) {
221+
m.mutex.Lock()
222+
defer m.mutex.Unlock()
223+
224+
if m.state.Failures == nil {
225+
m.state.Failures = make(map[T]int)
226+
}
227+
m.state.Failures[item]++
228+
}
229+
230+
// Forget implements [TypedRateLimitingInterface.Forget].
231+
func (m *Mock[T]) Forget(item T) {
232+
m.mutex.Lock()
233+
defer m.mutex.Unlock()
234+
235+
if m.state.Failures == nil {
236+
return
237+
}
238+
delete(m.state.Failures, item)
239+
}
240+
241+
// NumRequeues implements [TypedRateLimitingInterface.NumRequeues].
242+
func (m *Mock[T]) NumRequeues(item T) int {
243+
m.mutex.Lock()
244+
defer m.mutex.Unlock()
245+
246+
return m.state.Failures[item]
247+
}

staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,9 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e
393393
driverResources := &resourceslice.DriverResources{
394394
Pools: map[string]resourceslice.Pool{
395395
d.nodeName: {
396-
Devices: resources.Devices,
396+
Slices: []resourceslice.Slice{{
397+
Devices: resources.Devices,
398+
}},
397399
},
398400
},
399401
}
@@ -407,7 +409,13 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e
407409
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")
408410
controllerCtx = klog.NewContext(controllerCtx, controllerLogger)
409411
var err error
410-
if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources); err != nil {
412+
if d.resourceSliceController, err = resourceslice.StartController(controllerCtx,
413+
resourceslice.Options{
414+
DriverName: d.driverName,
415+
KubeClient: d.kubeClient,
416+
Owner: &owner,
417+
Resources: driverResources,
418+
}); err != nil {
411419
return fmt.Errorf("start ResourceSlice controller: %w", err)
412420
}
413421
return nil

0 commit comments

Comments
 (0)