Skip to content

Commit 6435489

Browse files
authored
Merge pull request kubernetes#128275 from pohly/dra-resourceslice-controller-multiple-slices
DRA resourceslice controller: support publishing multiple slices
2 parents d001d56 + 1088f4f commit 6435489

File tree

9 files changed

+1485
-239
lines changed

9 files changed

+1485
-239
lines changed
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package workqueue
18+
19+
import (
20+
"maps"
21+
"slices"
22+
"sync"
23+
"time"
24+
25+
"k8s.io/client-go/util/workqueue"
26+
)
27+
28+
// TODO (pohly): move this to k8s.io/client-go/util/workqueue/mockqueue.go
29+
// if it turns out to be generally useful. Doc comments are already written
30+
// as if the code was there.
31+
32+
// MockQueue is an implementation of [TypedRateLimitingInterface] which
33+
// can be used to test a function which pulls work items out of a queue
34+
// and processes them. It is thread-safe.
35+
//
36+
// A null instance is directly usable. The usual usage is:
37+
//
38+
// var m workqueue.Mock[string]
39+
// m.SyncOne("some-item", func(queue workqueue.TypedRateLimitingInterface[string]) { ... } )
40+
// if diff := cmp.Diff(workqueue.MockState[string]{}, m.State()); diff != "" {
41+
// t.Errorf("unexpected state of mock work queue after sync (-want, +got):\n%s", diff)
42+
// }
43+
//
44+
// All slices get reset to nil when they become empty, so there are no spurious
45+
// differences because of nil vs. empty slice.
46+
type Mock[T comparable] struct {
47+
mutex sync.Mutex
48+
state MockState[T]
49+
}
50+
51+
type MockState[T comparable] struct {
52+
// Ready contains the items which are ready for processing.
53+
Ready []T
54+
55+
// InFlight contains the items which are currently being processed (= Get
56+
// was called, Done not yet).
57+
InFlight []T
58+
59+
// MismatchedDone contains the items for which Done was called without
60+
// a matching Get.
61+
MismatchedDone []T
62+
63+
// Later contains the items which are meant to be added to the queue after
64+
// a certain delay (= AddAfter was called for them). They appear in the
65+
// order in which AddAfter got called.
66+
Later []MockDelayedItem[T]
67+
68+
// Failures contains the items and their retry count which failed to be
69+
// processed (AddRateLimited called at least once, Forget not yet).
70+
// The retry count is always larger than zero.
71+
Failures map[T]int
72+
73+
// ShutDownCalled tracks how often ShutDown got called.
74+
ShutDownCalled int
75+
76+
// ShutDownWithDrainCalled tracks how often ShutDownWithDrain got called.
77+
ShutDownWithDrainCalled int
78+
}
79+
80+
// DeepCopy takes a snapshot of all slices. It cannot do a deep copy of the items in those slices,
81+
// but typically those keys are immutable.
82+
func (m MockState[T]) DeepCopy() *MockState[T] {
83+
m.Ready = slices.Clone(m.Ready)
84+
m.InFlight = slices.Clone(m.InFlight)
85+
m.MismatchedDone = slices.Clone(m.MismatchedDone)
86+
m.Later = slices.Clone(m.Later)
87+
m.Failures = maps.Clone(m.Failures)
88+
return &m
89+
}
90+
91+
// MockDelayedItem is an item which was queue for later processing.
92+
type MockDelayedItem[T comparable] struct {
93+
Item T
94+
Duration time.Duration
95+
}
96+
97+
// SyncOne adds the item to the work queue and calls sync.
98+
// That sync function can pull one or more items from the work
99+
// queue until the queue is empty. Then it is told that the queue
100+
// is shutting down, which must cause it to return.
101+
//
102+
// The test can then retrieve the state of the queue to check the result.
103+
func (m *Mock[T]) SyncOne(item T, sync func(workqueue.TypedRateLimitingInterface[T])) {
104+
// sync must run with the mutex not locked.
105+
defer sync(m)
106+
m.mutex.Lock()
107+
defer m.mutex.Unlock()
108+
109+
m.state.Ready = append(m.state.Ready, item)
110+
}
111+
112+
// State returns the current state of the queue.
113+
func (m *Mock[T]) State() MockState[T] {
114+
m.mutex.Lock()
115+
defer m.mutex.Unlock()
116+
117+
return *m.state.DeepCopy()
118+
}
119+
120+
// Add implements [TypedInterface].
121+
func (m *Mock[T]) Add(item T) {
122+
m.mutex.Lock()
123+
defer m.mutex.Unlock()
124+
125+
if !slices.Contains(m.state.Ready, item) {
126+
m.state.Ready = append(m.state.Ready, item)
127+
}
128+
}
129+
130+
// Len implements [TypedInterface].
131+
func (m *Mock[T]) Len() int {
132+
m.mutex.Lock()
133+
defer m.mutex.Unlock()
134+
135+
return len(m.state.Ready)
136+
}
137+
138+
// Get implements [TypedInterface].
139+
func (m *Mock[T]) Get() (item T, shutdown bool) {
140+
m.mutex.Lock()
141+
defer m.mutex.Unlock()
142+
143+
if len(m.state.Ready) == 0 {
144+
shutdown = true
145+
return
146+
}
147+
item = m.state.Ready[0]
148+
m.state.Ready = m.state.Ready[1:]
149+
if len(m.state.Ready) == 0 {
150+
m.state.Ready = nil
151+
}
152+
m.state.InFlight = append(m.state.InFlight, item)
153+
return item, false
154+
}
155+
156+
// Done implements [TypedInterface].
157+
func (m *Mock[T]) Done(item T) {
158+
m.mutex.Lock()
159+
defer m.mutex.Unlock()
160+
161+
index := slices.Index(m.state.InFlight, item)
162+
if index < 0 {
163+
m.state.MismatchedDone = append(m.state.MismatchedDone, item)
164+
}
165+
m.state.InFlight = slices.Delete(m.state.InFlight, index, index+1)
166+
if len(m.state.InFlight) == 0 {
167+
m.state.InFlight = nil
168+
}
169+
}
170+
171+
// ShutDown implements [TypedInterface].
172+
func (m *Mock[T]) ShutDown() {
173+
m.mutex.Lock()
174+
defer m.mutex.Unlock()
175+
176+
m.state.ShutDownCalled++
177+
}
178+
179+
// ShutDownWithDrain implements [TypedInterface].
180+
func (m *Mock[T]) ShutDownWithDrain() {
181+
m.mutex.Lock()
182+
defer m.mutex.Unlock()
183+
184+
m.state.ShutDownWithDrainCalled++
185+
}
186+
187+
// ShuttingDown implements [TypedInterface].
188+
func (m *Mock[T]) ShuttingDown() bool {
189+
m.mutex.Lock()
190+
defer m.mutex.Unlock()
191+
192+
return m.state.ShutDownCalled > 0 || m.state.ShutDownWithDrainCalled > 0
193+
}
194+
195+
// AddAfter implements [TypedDelayingInterface.AddAfter]
196+
func (m *Mock[T]) AddAfter(item T, duration time.Duration) {
197+
if duration == 0 {
198+
m.Add(item)
199+
return
200+
}
201+
202+
m.mutex.Lock()
203+
defer m.mutex.Unlock()
204+
205+
for i := range m.state.Later {
206+
if m.state.Later[i].Item == item {
207+
// https://github.com/kubernetes/client-go/blob/270e5ab1714527c455865953da8ceba2810dbb50/util/workqueue/delaying_queue.go#L340-L349
208+
// only shortens the delay for an existing item. It does not make it longer.
209+
if m.state.Later[i].Duration > duration {
210+
m.state.Later[i].Duration = duration
211+
}
212+
return
213+
}
214+
}
215+
216+
m.state.Later = append(m.state.Later, MockDelayedItem[T]{Item: item, Duration: duration})
217+
}
218+
219+
// AddRateLimited implements [TypedRateLimitingInterface.AddRateLimited].
220+
func (m *Mock[T]) AddRateLimited(item T) {
221+
m.mutex.Lock()
222+
defer m.mutex.Unlock()
223+
224+
if m.state.Failures == nil {
225+
m.state.Failures = make(map[T]int)
226+
}
227+
m.state.Failures[item]++
228+
}
229+
230+
// Forget implements [TypedRateLimitingInterface.Forget].
231+
func (m *Mock[T]) Forget(item T) {
232+
m.mutex.Lock()
233+
defer m.mutex.Unlock()
234+
235+
if m.state.Failures == nil {
236+
return
237+
}
238+
delete(m.state.Failures, item)
239+
}
240+
241+
// NumRequeues implements [TypedRateLimitingInterface.NumRequeues].
242+
func (m *Mock[T]) NumRequeues(item T) int {
243+
m.mutex.Lock()
244+
defer m.mutex.Unlock()
245+
246+
return m.state.Failures[item]
247+
}

staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,9 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e
393393
driverResources := &resourceslice.DriverResources{
394394
Pools: map[string]resourceslice.Pool{
395395
d.nodeName: {
396-
Devices: resources.Devices,
396+
Slices: []resourceslice.Slice{{
397+
Devices: resources.Devices,
398+
}},
397399
},
398400
},
399401
}
@@ -407,7 +409,13 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e
407409
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")
408410
controllerCtx = klog.NewContext(controllerCtx, controllerLogger)
409411
var err error
410-
if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources); err != nil {
412+
if d.resourceSliceController, err = resourceslice.StartController(controllerCtx,
413+
resourceslice.Options{
414+
DriverName: d.driverName,
415+
KubeClient: d.kubeClient,
416+
Owner: &owner,
417+
Resources: driverResources,
418+
}); err != nil {
411419
return fmt.Errorf("start ResourceSlice controller: %w", err)
412420
}
413421
return nil

0 commit comments

Comments
 (0)