@@ -19,14 +19,23 @@ package benchmark
19
19
import (
20
20
"context"
21
21
"fmt"
22
+ "math/rand/v2"
22
23
"path/filepath"
24
+ "reflect"
23
25
"sync"
24
26
27
+ "github.com/stretchr/testify/require"
28
+
29
+ v1 "k8s.io/api/core/v1"
25
30
resourceapi "k8s.io/api/resource/v1alpha3"
26
31
"k8s.io/apimachinery/pkg/api/resource"
27
32
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
+ "k8s.io/apimachinery/pkg/labels"
34
+ "k8s.io/client-go/informers"
28
35
"k8s.io/client-go/util/workqueue"
36
+ "k8s.io/dynamic-resource-allocation/structured"
29
37
"k8s.io/klog/v2"
38
+ "k8s.io/kubernetes/pkg/scheduler/util/assumecache"
30
39
draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
31
40
"k8s.io/kubernetes/test/utils/ktesting"
32
41
"k8s.io/utils/ptr"
@@ -261,3 +270,109 @@ func resourceSlice(driverName, nodeName string, capacity int) *resourceapi.Resou
261
270
262
271
return slice
263
272
}
273
+
274
+ // allocResourceClaimsOp defines an op where resource claims with structured
275
+ // parameters get allocated without being associated with a pod.
276
+ type allocResourceClaimsOp struct {
277
+ // Must be allocResourceClaimsOpcode.
278
+ Opcode operationCode
279
+ // Namespace where claims are to be allocated, all namespaces if empty.
280
+ Namespace string
281
+ }
282
+
283
+ var _ realOp = & allocResourceClaimsOp {}
284
+ var _ runnableOp = & allocResourceClaimsOp {}
285
+
286
+ func (op * allocResourceClaimsOp ) isValid (allowParameterization bool ) error {
287
+ return nil
288
+ }
289
+
290
+ func (op * allocResourceClaimsOp ) collectsMetrics () bool {
291
+ return false
292
+ }
293
+ func (op * allocResourceClaimsOp ) patchParams (w * workload ) (realOp , error ) {
294
+ return op , op .isValid (false )
295
+ }
296
+
297
+ func (op * allocResourceClaimsOp ) requiredNamespaces () []string { return nil }
298
+
299
+ func (op * allocResourceClaimsOp ) run (tCtx ktesting.TContext ) {
300
+ claims , err := tCtx .Client ().ResourceV1alpha3 ().ResourceClaims (op .Namespace ).List (tCtx , metav1.ListOptions {})
301
+ tCtx .ExpectNoError (err , "list claims" )
302
+ tCtx .Logf ("allocating %d ResourceClaims" , len (claims .Items ))
303
+ tCtx = ktesting .WithCancel (tCtx )
304
+ defer tCtx .Cancel ("allocResourceClaimsOp.run is done" )
305
+
306
+ // Track cluster state.
307
+ informerFactory := informers .NewSharedInformerFactory (tCtx .Client (), 0 )
308
+ claimInformer := informerFactory .Resource ().V1alpha3 ().ResourceClaims ().Informer ()
309
+ classLister := informerFactory .Resource ().V1alpha3 ().DeviceClasses ().Lister ()
310
+ sliceLister := informerFactory .Resource ().V1alpha3 ().ResourceSlices ().Lister ()
311
+ nodeLister := informerFactory .Core ().V1 ().Nodes ().Lister ()
312
+ claimCache := assumecache .NewAssumeCache (tCtx .Logger (), claimInformer , "ResourceClaim" , "" , nil )
313
+ claimLister := claimLister {cache : claimCache }
314
+ informerFactory .Start (tCtx .Done ())
315
+ defer func () {
316
+ tCtx .Cancel ("allocResourceClaimsOp.run is shutting down" )
317
+ informerFactory .Shutdown ()
318
+ }()
319
+ syncedInformers := informerFactory .WaitForCacheSync (tCtx .Done ())
320
+ expectSyncedInformers := map [reflect.Type ]bool {
321
+ reflect .TypeOf (& resourceapi.DeviceClass {}): true ,
322
+ reflect .TypeOf (& resourceapi.ResourceClaim {}): true ,
323
+ reflect .TypeOf (& resourceapi.ResourceSlice {}): true ,
324
+ reflect .TypeOf (& v1.Node {}): true ,
325
+ }
326
+ require .Equal (tCtx , expectSyncedInformers , syncedInformers , "synced informers" )
327
+
328
+ // The set of nodes is assumed to be fixed at this point.
329
+ nodes , err := nodeLister .List (labels .Everything ())
330
+ tCtx .ExpectNoError (err , "list nodes" )
331
+
332
+ // Allocate one claim at a time, picking nodes randomly. Each
333
+ // allocation is stored immediately, using the claim cache to avoid
334
+ // having to wait for the actual informer update.
335
+ claims:
336
+ for i := range claims .Items {
337
+ claim := & claims .Items [i ]
338
+ if claim .Status .Allocation != nil {
339
+ continue
340
+ }
341
+
342
+ allocator , err := structured .NewAllocator (tCtx , []* resourceapi.ResourceClaim {claim }, claimLister , classLister , sliceLister )
343
+ tCtx .ExpectNoError (err , "create allocator" )
344
+
345
+ rand .Shuffle (len (nodes ), func (i , j int ) {
346
+ nodes [i ], nodes [j ] = nodes [j ], nodes [i ]
347
+ })
348
+ for _ , node := range nodes {
349
+ result , err := allocator .Allocate (tCtx , node )
350
+ tCtx .ExpectNoError (err , "allocate claim" )
351
+ if result != nil {
352
+ claim = claim .DeepCopy ()
353
+ claim .Status .Allocation = result [0 ]
354
+ claim , err := tCtx .Client ().ResourceV1alpha3 ().ResourceClaims (claim .Namespace ).UpdateStatus (tCtx , claim , metav1.UpdateOptions {})
355
+ tCtx .ExpectNoError (err , "update claim status with allocation" )
356
+ tCtx .ExpectNoError (claimCache .Assume (claim ), "assume claim" )
357
+ continue claims
358
+ }
359
+ }
360
+ tCtx .Fatalf ("Could not allocate claim %d out of %d" , i , len (claims .Items ))
361
+ }
362
+ }
363
+
364
+ type claimLister struct {
365
+ cache * assumecache.AssumeCache
366
+ }
367
+
368
+ func (c claimLister ) ListAllAllocated () ([]* resourceapi.ResourceClaim , error ) {
369
+ objs := c .cache .List (nil )
370
+ allocatedClaims := make ([]* resourceapi.ResourceClaim , 0 , len (objs ))
371
+ for _ , obj := range objs {
372
+ claim := obj .(* resourceapi.ResourceClaim )
373
+ if claim .Status .Allocation != nil {
374
+ allocatedClaims = append (allocatedClaims , claim )
375
+ }
376
+ }
377
+ return allocatedClaims , nil
378
+ }
0 commit comments