@@ -19,6 +19,7 @@ package dra
19
19
import (
20
20
"bytes"
21
21
"context"
22
+ _ "embed"
22
23
"errors"
23
24
"fmt"
24
25
"net"
@@ -38,10 +39,19 @@ import (
38
39
v1 "k8s.io/api/core/v1"
39
40
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
40
41
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
42
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
43
+ "k8s.io/apimachinery/pkg/api/meta"
41
44
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
45
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
42
46
"k8s.io/apimachinery/pkg/labels"
47
+ "k8s.io/apimachinery/pkg/runtime/schema"
43
48
"k8s.io/apimachinery/pkg/selection"
49
+ "k8s.io/apiserver/pkg/authentication/serviceaccount"
50
+ "k8s.io/client-go/discovery/cached/memory"
44
51
resourceapiinformer "k8s.io/client-go/informers/resource/v1alpha2"
52
+ "k8s.io/client-go/kubernetes"
53
+ "k8s.io/client-go/rest"
54
+ "k8s.io/client-go/restmapper"
45
55
"k8s.io/client-go/tools/cache"
46
56
"k8s.io/dynamic-resource-allocation/kubeletplugin"
47
57
"k8s.io/klog/v2"
@@ -52,6 +62,7 @@ import (
52
62
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
53
63
"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
54
64
"k8s.io/kubernetes/test/e2e/storage/utils"
65
+ "sigs.k8s.io/yaml"
55
66
)
56
67
57
68
const (
@@ -63,10 +74,14 @@ type Nodes struct {
63
74
NodeNames []string
64
75
}
65
76
77
+ //go:embed test-driver/deploy/example/plugin-permissions.yaml
78
+ var pluginPermissions string
79
+
66
80
// NewNodes selects nodes to run the test on.
67
81
func NewNodes (f * framework.Framework , minNodes , maxNodes int ) * Nodes {
68
82
nodes := & Nodes {}
69
83
ginkgo .BeforeEach (func (ctx context.Context ) {
84
+
70
85
ginkgo .By ("selecting nodes" )
71
86
// The kubelet plugin is harder. We deploy the builtin manifest
72
87
// after patching in the driver name and all nodes on which we
@@ -166,15 +181,19 @@ type MethodInstance struct {
166
181
}
167
182
168
183
type Driver struct {
169
- f * framework.Framework
170
- ctx context.Context
171
- cleanup []func () // executed first-in-first-out
172
- wg sync.WaitGroup
184
+ f * framework.Framework
185
+ ctx context.Context
186
+ cleanup []func () // executed first-in-first-out
187
+ wg sync.WaitGroup
188
+ serviceAccountName string
173
189
174
190
NameSuffix string
175
191
Controller * app.ExampleController
176
192
Name string
177
- Nodes map [string ]* app.ExamplePlugin
193
+
194
+ // Nodes contains entries for each node selected for a test when the test runs.
195
+ // In addition, there is one entry for a fictional node.
196
+ Nodes map [string ]KubeletPlugin
178
197
179
198
parameterMode parameterMode
180
199
parameterAPIGroup string
@@ -189,6 +208,11 @@ type Driver struct {
189
208
callCounts map [MethodInstance ]int64
190
209
}
191
210
211
+ type KubeletPlugin struct {
212
+ * app.ExamplePlugin
213
+ ClientSet kubernetes.Interface
214
+ }
215
+
192
216
type parameterMode string
193
217
194
218
const (
@@ -199,7 +223,7 @@ const (
199
223
200
224
func (d * Driver ) SetUp (nodes * Nodes , resources app.Resources ) {
201
225
ginkgo .By (fmt .Sprintf ("deploying driver on nodes %v" , nodes .NodeNames ))
202
- d .Nodes = map [string ]* app. ExamplePlugin {}
226
+ d .Nodes = make ( map [string ]KubeletPlugin )
203
227
d .Name = d .f .UniqueName + d .NameSuffix + ".k8s.io"
204
228
resources .DriverName = d .Name
205
229
@@ -250,6 +274,13 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
250
274
framework .Failf ("unknown test driver parameter mode: %s" , d .parameterMode )
251
275
}
252
276
277
+ // Create service account and corresponding RBAC rules.
278
+ d .serviceAccountName = "dra-kubelet-plugin-" + d .Name + "-service-account"
279
+ content := pluginPermissions
280
+ content = strings .ReplaceAll (content , "dra-kubelet-plugin-namespace" , d .f .Namespace .Name )
281
+ content = strings .ReplaceAll (content , "dra-kubelet-plugin" , "dra-kubelet-plugin-" + d .Name )
282
+ d .createFromYAML (ctx , []byte (content ), d .f .Namespace .Name )
283
+
253
284
instanceKey := "app.kubernetes.io/instance"
254
285
rsName := ""
255
286
draAddr := path .Join (framework .TestContext .KubeletRootDir , "plugins" , d .Name + ".sock" )
@@ -262,6 +293,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
262
293
item .Spec .Replicas = & numNodes
263
294
item .Spec .Selector .MatchLabels [instanceKey ] = d .Name
264
295
item .Spec .Template .Labels [instanceKey ] = d .Name
296
+ item .Spec .Template .Spec .ServiceAccountName = d .serviceAccountName
265
297
item .Spec .Template .Spec .Affinity .PodAntiAffinity .RequiredDuringSchedulingIgnoredDuringExecution [0 ].LabelSelector .MatchLabels [instanceKey ] = d .Name
266
298
item .Spec .Template .Spec .Affinity .NodeAffinity = & v1.NodeAffinity {
267
299
RequiredDuringSchedulingIgnoredDuringExecution : & v1.NodeSelector {
@@ -305,15 +337,31 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
305
337
framework .ExpectNoError (err , "list proxy pods" )
306
338
gomega .Expect (numNodes ).To (gomega .Equal (int32 (len (pods .Items ))), "number of proxy pods" )
307
339
308
- // Run registar and plugin for each of the pods.
340
+ // Run registrar and plugin for each of the pods.
309
341
for _ , pod := range pods .Items {
310
342
// Need a local variable, not the loop variable, for the anonymous
311
343
// callback functions below.
312
344
pod := pod
313
345
nodename := pod .Spec .NodeName
346
+
347
+ // Authenticate the plugin so that it has the exact same
348
+ // permissions as the daemonset pod. This includes RBAC and a
349
+ // validating admission policy which limits writes to per-node
350
+ // ResourceSlices.
351
+ //
352
+ // We could retrieve
353
+ // /var/run/secrets/kubernetes.io/serviceaccount/token from
354
+ // each pod and use it. That would check that
355
+ // ServiceAccountTokenNodeBindingValidation works. But that's
356
+ // better covered by a test owned by SIG Auth (like the one in
357
+ // https://github.com/kubernetes/kubernetes/pull/124711).
358
+ //
359
+ // Here we merely use impersonation, which is faster.
360
+ driverClient := d .impersonateKubeletPlugin (& pod )
361
+
314
362
logger := klog .LoggerWithValues (klog .LoggerWithName (klog .Background (), "kubelet plugin" ), "node" , pod .Spec .NodeName , "pod" , klog .KObj (& pod ))
315
363
loggerCtx := klog .NewContext (ctx , logger )
316
- plugin , err := app .StartPlugin (loggerCtx , "/cdi" , d .Name , d . f . ClientSet , nodename ,
364
+ plugin , err := app .StartPlugin (loggerCtx , "/cdi" , d .Name , driverClient , nodename ,
317
365
app.FileOperations {
318
366
Create : func (name string , content []byte ) error {
319
367
klog .Background ().Info ("creating CDI file" , "node" , nodename , "filename" , name , "content" , string (content ))
@@ -342,7 +390,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
342
390
// Depends on cancel being called first.
343
391
plugin .Stop ()
344
392
})
345
- d .Nodes [nodename ] = plugin
393
+ d .Nodes [nodename ] = KubeletPlugin { ExamplePlugin : plugin , ClientSet : driverClient }
346
394
}
347
395
348
396
// Wait for registration.
@@ -359,6 +407,26 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) {
359
407
}).WithTimeout (time .Minute ).Should (gomega .BeEmpty (), "hosts where the plugin has not been registered yet" )
360
408
}
361
409
410
+ func (d * Driver ) impersonateKubeletPlugin (pod * v1.Pod ) kubernetes.Interface {
411
+ ginkgo .GinkgoHelper ()
412
+ driverUserInfo := (& serviceaccount.ServiceAccountInfo {
413
+ Name : d .serviceAccountName ,
414
+ Namespace : pod .Namespace ,
415
+ NodeName : pod .Spec .NodeName ,
416
+ PodName : pod .Name ,
417
+ PodUID : string (pod .UID ),
418
+ }).UserInfo ()
419
+ driverClientConfig := d .f .ClientConfig ()
420
+ driverClientConfig .Impersonate = rest.ImpersonationConfig {
421
+ UserName : driverUserInfo .GetName (),
422
+ Groups : driverUserInfo .GetGroups (),
423
+ Extra : driverUserInfo .GetExtra (),
424
+ }
425
+ driverClient , err := kubernetes .NewForConfig (driverClientConfig )
426
+ framework .ExpectNoError (err , "create client for driver" )
427
+ return driverClient
428
+ }
429
+
362
430
func (d * Driver ) createFile (pod * v1.Pod , name string , content []byte ) error {
363
431
buffer := bytes .NewBuffer (content )
364
432
// Writing the content can be slow. Better create a temporary file and
@@ -375,6 +443,57 @@ func (d *Driver) removeFile(pod *v1.Pod, name string) error {
375
443
return d .podIO (pod ).RemoveAll (name )
376
444
}
377
445
446
+ func (d * Driver ) createFromYAML (ctx context.Context , content []byte , namespace string ) {
447
+ // Not caching the discovery result isn't very efficient, but good enough.
448
+ discoveryCache := memory .NewMemCacheClient (d .f .ClientSet .Discovery ())
449
+ restMapper := restmapper .NewDeferredDiscoveryRESTMapper (discoveryCache )
450
+
451
+ for _ , content := range bytes .Split (content , []byte ("---\n " )) {
452
+ if len (content ) == 0 {
453
+ continue
454
+ }
455
+
456
+ var obj * unstructured.Unstructured
457
+ framework .ExpectNoError (yaml .UnmarshalStrict (content , & obj ), fmt .Sprintf ("Full YAML:\n %s\n " , string (content )))
458
+
459
+ gv , err := schema .ParseGroupVersion (obj .GetAPIVersion ())
460
+ framework .ExpectNoError (err , fmt .Sprintf ("extract group+version from object %q" , klog .KObj (obj )))
461
+ gk := schema.GroupKind {Group : gv .Group , Kind : obj .GetKind ()}
462
+
463
+ mapping , err := restMapper .RESTMapping (gk , gv .Version )
464
+ framework .ExpectNoError (err , fmt .Sprintf ("map %q to resource" , gk ))
465
+
466
+ resourceClient := d .f .DynamicClient .Resource (mapping .Resource )
467
+ options := metav1.CreateOptions {
468
+ // If the YAML input is invalid, then we want the
469
+ // apiserver to tell us via an error. This can
470
+ // happen because decoding into an unstructured object
471
+ // doesn't validate.
472
+ FieldValidation : "Strict" ,
473
+ }
474
+ switch mapping .Scope .Name () {
475
+ case meta .RESTScopeNameRoot :
476
+ _ , err = resourceClient .Create (ctx , obj , options )
477
+ case meta .RESTScopeNameNamespace :
478
+ if namespace == "" {
479
+ framework .Failf ("need namespace for object type %s" , gk )
480
+ }
481
+ _ , err = resourceClient .Namespace (namespace ).Create (ctx , obj , options )
482
+ }
483
+ framework .ExpectNoError (err , "create object" )
484
+ ginkgo .DeferCleanup (func (ctx context.Context ) {
485
+ del := resourceClient .Delete
486
+ if mapping .Scope .Name () == meta .RESTScopeNameNamespace {
487
+ del = resourceClient .Namespace (namespace ).Delete
488
+ }
489
+ err := del (ctx , obj .GetName (), metav1.DeleteOptions {})
490
+ if ! apierrors .IsNotFound (err ) {
491
+ framework .ExpectNoError (err , fmt .Sprintf ("deleting %s.%s %s" , obj .GetKind (), obj .GetAPIVersion (), klog .KObj (obj )))
492
+ }
493
+ })
494
+ }
495
+ }
496
+
378
497
func (d * Driver ) podIO (pod * v1.Pod ) proxy.PodDirIO {
379
498
logger := klog .Background ()
380
499
return proxy.PodDirIO {
0 commit comments