Skip to content

Commit 37b47f4

Browse files
committed
DRA helper: support dropped fields and TimeAdded defaults
Both the new DeviceTaint.TimeAdded and dropped fields when the DRADeviceTaints feature is disabled confused the ResourceSlice controller because what is stored and sent back can be different from what the controller wants to store. It's now more lenient regarding TimeAdded (doesn't need to be exact because of rounding during serialization, only having a value on the server is okay) and dropped fields (doesn't try to store them again). It also preserves a server-side TimeAdded when updating slices.
1 parent 2499663 commit 37b47f4

File tree

3 files changed

+281
-2
lines changed

3 files changed

+281
-2
lines changed

staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"slices"
2324
"sync"
2425
"sync/atomic"
2526
"time"
@@ -31,6 +32,7 @@ import (
3132
apiequality "k8s.io/apimachinery/pkg/api/equality"
3233
apierrors "k8s.io/apimachinery/pkg/api/errors"
3334
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35+
"k8s.io/apimachinery/pkg/conversion"
3436
"k8s.io/apimachinery/pkg/fields"
3537
"k8s.io/apimachinery/pkg/types"
3638
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -546,7 +548,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
546548
if !apiequality.Semantic.DeepEqual(&currentSlice.Spec.Pool, &desiredPool) ||
547549
!apiequality.Semantic.DeepEqual(currentSlice.Spec.NodeSelector, pool.NodeSelector) ||
548550
currentSlice.Spec.AllNodes != desiredAllNodes ||
549-
!apiequality.Semantic.DeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) {
551+
!DevicesDeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) {
550552
changedDesiredSlices.Insert(i)
551553
logger.V(5).Info("Need to update slice", "slice", klog.KObj(currentSlice), "matchIndex", i)
552554
}
@@ -586,7 +588,8 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
586588
// have listed the existing slice.
587589
slice.Spec.NodeSelector = pool.NodeSelector
588590
slice.Spec.AllNodes = desiredAllNodes
589-
slice.Spec.Devices = pool.Slices[i].Devices
591+
// Preserve TimeAdded from existing device, if there is a matching device and taint.
592+
slice.Spec.Devices = copyTaintTimeAdded(slice.Spec.Devices, pool.Slices[i].Devices)
590593

591594
logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice))
592595
slice, err := c.kubeClient.ResourceV1beta1().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{})
@@ -595,6 +598,16 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
595598
}
596599
atomic.AddInt64(&c.numUpdates, 1)
597600
c.sliceStore.Mutation(slice)
601+
602+
// Some fields may have been dropped. When we receive
603+
// the updated slice through the informer, the
604+
// DeepEqual fails and the controller would try to
605+
// update again, etc. To break that cycle, update our
606+
// desired state of the world so that it matches what
607+
// we can store.
608+
//
609+
// TODO (https://github.com/kubernetes/kubernetes/issues/130856): check for dropped fields and report them to the DRA driver.
610+
pool.Slices[i].Devices = slice.Spec.Devices
598611
}
599612

600613
// Create new slices.
@@ -652,6 +665,16 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
652665
atomic.AddInt64(&c.numCreates, 1)
653666
c.sliceStore.Mutation(slice)
654667
added = true
668+
669+
// Some fields may have been dropped. When we receive
670+
// the created slice through the informer, the
671+
// DeepEqual fails and the controller would try to
672+
// update, which again suffers from dropped fields,
673+
// etc. To break that cycle, update our desired state
674+
// of the world so that it matches what we can store.
675+
//
676+
// TODO (https://github.com/kubernetes/kubernetes/issues/130856): check for dropped fields and report them to the DRA driver.
677+
pool.Slices[i].Devices = slice.Spec.Devices
655678
}
656679
if added {
657680
// Check that the recently added slice(s) really exist even
@@ -713,3 +736,74 @@ func sameSlice(existingSlice *resourceapi.ResourceSlice, desiredSlice *Slice) bo
713736
// Same number of devices, names all present -> equal.
714737
return true
715738
}
739+
740+
// copyTaintTimeAdded copies existing TimeAdded values from one slice into
741+
// the other if the other one doesn't have it for a taint. Both input
742+
// slices are read-only.
743+
func copyTaintTimeAdded(from, to []resourceapi.Device) []resourceapi.Device {
744+
to = slices.Clone(to)
745+
for i, toDevice := range to {
746+
index := slices.IndexFunc(from, func(fromDevice resourceapi.Device) bool {
747+
return fromDevice.Name == toDevice.Name
748+
})
749+
if index < 0 {
750+
// No matching device.
751+
continue
752+
}
753+
fromDevice := from[index]
754+
if fromDevice.Basic == nil || toDevice.Basic == nil {
755+
continue
756+
}
757+
for j, toTaint := range toDevice.Basic.Taints {
758+
if toTaint.TimeAdded != nil {
759+
// Already set.
760+
continue
761+
}
762+
// Preserve the old TimeAdded if all other fields are the same.
763+
index := slices.IndexFunc(fromDevice.Basic.Taints, func(fromTaint resourceapi.DeviceTaint) bool {
764+
return toTaint.Key == fromTaint.Key &&
765+
toTaint.Value == fromTaint.Value &&
766+
toTaint.Effect == fromTaint.Effect
767+
})
768+
if index < 0 {
769+
// No matching old taint.
770+
continue
771+
}
772+
// In practice, devices are unlikely to have many
773+
// taints. Just clone the entire device before we
774+
// motify it, it's unlikely that we do this more than once.
775+
to[i] = *toDevice.DeepCopy()
776+
to[i].Basic.Taints[j].TimeAdded = fromDevice.Basic.Taints[index].TimeAdded
777+
}
778+
}
779+
return to
780+
}
781+
782+
// DevicesDeepEqual compares two slices of Devices. It behaves like
783+
// apiequality.Semantic.DeepEqual, with one small difference:
784+
// a nil DeviceTaint.TimeAdded is equal to a non-nil time.
785+
// Also, rounding to full seconds (caused by round-tripping) is
786+
// tolerated.
787+
func DevicesDeepEqual(a, b []resourceapi.Device) bool {
788+
return devicesSemantic.DeepEqual(a, b)
789+
}
790+
791+
var devicesSemantic = func() conversion.Equalities {
792+
semantic := apiequality.Semantic.Copy()
793+
if err := semantic.AddFunc(deviceTaintEqual); err != nil {
794+
panic(err)
795+
}
796+
return semantic
797+
}()
798+
799+
func deviceTaintEqual(a, b resourceapi.DeviceTaint) bool {
800+
if a.TimeAdded != nil && b.TimeAdded != nil {
801+
delta := b.TimeAdded.Time.Sub(a.TimeAdded.Time)
802+
if delta < -time.Second || delta > time.Second {
803+
return false
804+
}
805+
}
806+
return a.Key == b.Key &&
807+
a.Value == b.Value &&
808+
a.Effect == b.Effect
809+
}

staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ func TestControllerSyncPool(t *testing.T) {
8585
}},
8686
}},
8787
}
88+
timeAdded = metav1.Now()
89+
timeAddedLater = metav1.Time{Time: timeAdded.Add(time.Minute)}
8890
)
8991

9092
testCases := map[string]struct {
@@ -143,6 +145,118 @@ func TestControllerSyncPool(t *testing.T) {
143145
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(),
144146
},
145147
},
148+
"keep-taint-unchanged": {
149+
nodeUID: nodeUID,
150+
initialObjects: []runtime.Object{
151+
MakeResourceSlice().Name(generatedName1).GenerateName(generateName).
152+
NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).
153+
Driver(driverName).Devices([]resourceapi.Device{{
154+
Name: deviceName,
155+
Basic: &resourceapi.BasicDevice{
156+
Taints: []resourceapi.DeviceTaint{{
157+
Effect: resourceapi.DeviceTaintEffectNoExecute,
158+
TimeAdded: &timeAdded,
159+
}},
160+
}}}).
161+
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).
162+
Obj(),
163+
},
164+
inputDriverResources: &DriverResources{
165+
Pools: map[string]Pool{
166+
poolName: {
167+
Generation: 1,
168+
Slices: []Slice{{Devices: []resourceapi.Device{{
169+
Name: deviceName,
170+
Basic: &resourceapi.BasicDevice{
171+
Taints: []resourceapi.DeviceTaint{{
172+
Effect: resourceapi.DeviceTaintEffectNoExecute,
173+
// No time added here! No need to update the slice.
174+
}},
175+
}}},
176+
}},
177+
},
178+
},
179+
},
180+
expectedResourceSlices: []resourceapi.ResourceSlice{
181+
*MakeResourceSlice().Name(generatedName1).GenerateName(generateName).
182+
NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).
183+
Driver(driverName).Devices([]resourceapi.Device{{
184+
Name: deviceName,
185+
Basic: &resourceapi.BasicDevice{
186+
Taints: []resourceapi.DeviceTaint{{
187+
Effect: resourceapi.DeviceTaintEffectNoExecute,
188+
TimeAdded: &timeAdded,
189+
}},
190+
}}}).
191+
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).
192+
Obj(),
193+
},
194+
},
195+
"add-taint": {
196+
nodeUID: nodeUID,
197+
initialObjects: []runtime.Object{
198+
MakeResourceSlice().Name(generatedName1).GenerateName(generateName).
199+
NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).
200+
Driver(driverName).Devices([]resourceapi.Device{{
201+
Name: deviceName,
202+
Basic: &resourceapi.BasicDevice{
203+
Taints: []resourceapi.DeviceTaint{{
204+
Effect: resourceapi.DeviceTaintEffectNoExecute,
205+
TimeAdded: &timeAdded,
206+
}},
207+
}}}).
208+
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).
209+
Obj(),
210+
},
211+
inputDriverResources: &DriverResources{
212+
Pools: map[string]Pool{
213+
poolName: {
214+
Generation: 1,
215+
Slices: []Slice{{Devices: []resourceapi.Device{{
216+
Name: deviceName,
217+
Basic: &resourceapi.BasicDevice{
218+
Taints: []resourceapi.DeviceTaint{
219+
{
220+
Effect: resourceapi.DeviceTaintEffectNoExecute,
221+
// No time added here! Time from existing slice must get copied during update.
222+
},
223+
{
224+
Key: "example.com/tainted",
225+
Effect: resourceapi.DeviceTaintEffectNoSchedule,
226+
TimeAdded: &timeAddedLater,
227+
},
228+
},
229+
}}},
230+
}},
231+
},
232+
},
233+
},
234+
expectedStats: Stats{
235+
NumUpdates: 1,
236+
},
237+
expectedResourceSlices: []resourceapi.ResourceSlice{
238+
*MakeResourceSlice().Name(generatedName1).GenerateName(generateName).
239+
ResourceVersion("1").
240+
NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).
241+
Driver(driverName).Devices([]resourceapi.Device{{
242+
Name: deviceName,
243+
Basic: &resourceapi.BasicDevice{
244+
Taints: []resourceapi.DeviceTaint{
245+
{
246+
Effect: resourceapi.DeviceTaintEffectNoExecute,
247+
TimeAdded: &timeAdded,
248+
},
249+
{
250+
Key: "example.com/tainted",
251+
Effect: resourceapi.DeviceTaintEffectNoSchedule,
252+
TimeAdded: &timeAddedLater,
253+
},
254+
},
255+
}}}).
256+
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).
257+
Obj(),
258+
},
259+
},
146260
"remove-pool": {
147261
nodeUID: nodeUID,
148262
syncDelay: ptr.To(time.Duration(0)), // Ensure that the initial object causes an immediate sync of the pool.

test/integration/dra/dra_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
utilfeature "k8s.io/apiserver/pkg/util/feature"
4141
"k8s.io/component-base/featuregate"
4242
featuregatetesting "k8s.io/component-base/featuregate/testing"
43+
"k8s.io/dynamic-resource-allocation/resourceslice"
4344
"k8s.io/klog/v2"
4445
kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
4546
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
@@ -129,6 +130,7 @@ func TestDRA(t *testing.T) {
129130
tCtx.Run("AdminAccess", func(tCtx ktesting.TContext) { testAdminAccess(tCtx, false) })
130131
tCtx.Run("PrioritizedList", func(tCtx ktesting.TContext) { testPrioritizedList(tCtx, false) })
131132
tCtx.Run("Pod", func(tCtx ktesting.TContext) { testPod(tCtx, true) })
133+
tCtx.Run("PublishResourceSlices", func(tCtx ktesting.TContext) { testPublishResourceSlices(tCtx) })
132134
},
133135
},
134136
"all": {
@@ -148,6 +150,7 @@ func TestDRA(t *testing.T) {
148150
tCtx.Run("AdminAccess", func(tCtx ktesting.TContext) { testAdminAccess(tCtx, true) })
149151
tCtx.Run("Convert", testConvert)
150152
tCtx.Run("PrioritizedList", func(tCtx ktesting.TContext) { testPrioritizedList(tCtx, true) })
153+
tCtx.Run("PublishResourceSlices", func(tCtx ktesting.TContext) { testPublishResourceSlices(tCtx) })
151154
},
152155
},
153156
} {
@@ -308,3 +311,71 @@ func testPrioritizedList(tCtx ktesting.TContext, enabled bool) {
308311
}).WithTimeout(time.Minute).WithPolling(time.Second).Should(schedulingAttempted)
309312
})
310313
}
314+
315+
func testPublishResourceSlices(tCtx ktesting.TContext) {
316+
tCtx.Parallel()
317+
318+
driverName := "dra.example.com"
319+
poolName := "global"
320+
resources := &resourceslice.DriverResources{
321+
Pools: map[string]resourceslice.Pool{
322+
poolName: {
323+
Slices: []resourceslice.Slice{
324+
{
325+
Devices: []resourceapi.Device{
326+
{
327+
Name: "device-simple",
328+
Basic: &resourceapi.BasicDevice{},
329+
},
330+
// TODO: once https://github.com/kubernetes/kubernetes/pull/130764 is merged,
331+
// add tests which detect dropped fields related to it.
332+
},
333+
},
334+
{
335+
Devices: []resourceapi.Device{
336+
{
337+
Name: "device-tainted-default",
338+
Basic: &resourceapi.BasicDevice{
339+
Taints: []resourceapi.DeviceTaint{{
340+
Effect: resourceapi.DeviceTaintEffectNoExecute,
341+
// TimeAdded is added by apiserver.
342+
}},
343+
},
344+
},
345+
{
346+
Name: "device-tainted-time-added",
347+
Basic: &resourceapi.BasicDevice{
348+
Taints: []resourceapi.DeviceTaint{{
349+
Effect: resourceapi.DeviceTaintEffectNoExecute,
350+
TimeAdded: ptr.To(metav1.Now()),
351+
}},
352+
},
353+
},
354+
},
355+
},
356+
},
357+
},
358+
},
359+
}
360+
opts := resourceslice.Options{
361+
DriverName: driverName,
362+
KubeClient: tCtx.Client(),
363+
SyncDelay: ptr.To(0 * time.Second),
364+
Resources: resources,
365+
}
366+
controller, err := resourceslice.StartController(tCtx, opts)
367+
tCtx.ExpectNoError(err, "start controller")
368+
defer controller.Stop()
369+
370+
// Two create calls should be all that are needed.
371+
expectedStats := resourceslice.Stats{
372+
NumCreates: 2,
373+
}
374+
getStats := func(tCtx ktesting.TContext) resourceslice.Stats {
375+
return controller.GetStats()
376+
}
377+
ktesting.Eventually(tCtx, getStats).WithTimeout(10 * time.Second).Should(gomega.Equal(expectedStats))
378+
379+
// No further changes necessary.
380+
ktesting.Consistently(tCtx, getStats).WithTimeout(10 * time.Second).Should(gomega.Equal(expectedStats))
381+
}

0 commit comments

Comments
 (0)