Skip to content

Commit 1057407

Browse files
committed
DRA e2e: more flexible socket listening
Instead of hard-coding two instances of the hostpathplugin which listen on certain socket paths, the hostpathplugin now gets started through Pod exec as needed. The advantage is that the helper code is in charge of socket naming, just like it would be in real deployment. One nuisance is that exec.StreamWithContext always complains in copyFromStdout and copyFromStderr when the remote hostpathplugin gets killed via context cancellation: E0312 11:56:31.637669 289446 v2.go:167] "Unhandled Error" err="next reader: read tcp [::1]:59006->[::1]:6444: use of closed network connection" logger="UnhandledError" E0312 11:56:31.637749 289446 v2.go:150] "Unhandled Error" err="next reader: read tcp [::1]:59006->[::1]:6444: use of closed network connection" logger="UnhandledError" These can be ignored.
1 parent ec12727 commit 1057407

File tree

3 files changed

+170
-39
lines changed

3 files changed

+170
-39
lines changed

staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/endpoint_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
"github.com/stretchr/testify/assert"
2626
"github.com/stretchr/testify/require"
27-
"k8s.io/kubernetes/test/utils/ktesting"
27+
"k8s.io/klog/v2/ktesting"
2828
)
2929

3030
func TestEndpointLifecycle(t *testing.T) {

test/e2e/dra/deploy.go

Lines changed: 156 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ import (
2222
_ "embed"
2323
"errors"
2424
"fmt"
25+
"io"
2526
"net"
27+
"net/url"
2628
"path"
2729
"sort"
2830
"strings"
2931
"sync"
32+
"sync/atomic"
3033
"time"
3134

3235
"github.com/google/go-cmp/cmp"
@@ -49,11 +52,13 @@ import (
4952
"k8s.io/client-go/discovery/cached/memory"
5053
resourceapiinformer "k8s.io/client-go/informers/resource/v1beta1"
5154
"k8s.io/client-go/kubernetes"
55+
"k8s.io/client-go/kubernetes/scheme"
5256
"k8s.io/client-go/rest"
5357
"k8s.io/client-go/restmapper"
5458
"k8s.io/client-go/tools/cache"
5559
"k8s.io/dynamic-resource-allocation/kubeletplugin"
5660
"k8s.io/klog/v2"
61+
"k8s.io/kubectl/pkg/cmd/exec"
5762
"k8s.io/kubernetes/test/e2e/dra/test-driver/app"
5863
"k8s.io/kubernetes/test/e2e/framework"
5964
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@@ -356,11 +361,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
356361
},
357362
}
358363
item.Spec.Template.Spec.Volumes[0].HostPath.Path = pluginDataDirectoryPath
359-
item.Spec.Template.Spec.Volumes[2].HostPath.Path = registrarDirectoryPath
360-
item.Spec.Template.Spec.Containers[0].Args = append(item.Spec.Template.Spec.Containers[0].Args, "--endpoint="+path.Join(registrarDirectoryPath, registrarSocketFilename))
361-
item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = registrarDirectoryPath
362-
item.Spec.Template.Spec.Containers[1].Args = append(item.Spec.Template.Spec.Containers[1].Args, "--endpoint="+path.Join(pluginDataDirectoryPath, "dra.sock"))
363-
item.Spec.Template.Spec.Containers[1].VolumeMounts[0].MountPath = pluginDataDirectoryPath
364+
item.Spec.Template.Spec.Volumes[1].HostPath.Path = registrarDirectoryPath
365+
item.Spec.Template.Spec.Containers[0].VolumeMounts[0].MountPath = pluginDataDirectoryPath
366+
item.Spec.Template.Spec.Containers[0].VolumeMounts[1].MountPath = registrarDirectoryPath
364367
}
365368
return nil
366369
}, manifests...)
@@ -423,6 +426,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
423426
} else {
424427
fileOps.NumDevices = numDevices
425428
}
429+
// All listeners running in this pod use a new unique local port number
430+
// by atomically incrementing this variable.
431+
listenerPort := int32(9000)
426432
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps,
427433
kubeletplugin.GRPCVerbosity(0),
428434
kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
@@ -432,15 +438,12 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
432438
return d.streamInterceptor(nodename, srv, ss, info, handler)
433439
}),
434440

435-
// TODO: start socat on-demand in listen. Then we can properly test
436-
// the socket path handling in kubeletplugin and do rolling updates.
437-
438441
kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath),
439-
kubeletplugin.PluginListener(listen(d.f, pod.Name, "plugin", 9001)),
442+
kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)),
440443

441444
kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath),
442445
kubeletplugin.RegistrarSocketFilename(registrarSocketFilename),
443-
kubeletplugin.RegistrarListener(listen(d.f, pod.Name, "registrar", 9000)),
446+
kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)),
444447
)
445448
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
446449
d.cleanup = append(d.cleanup, func() {
@@ -557,27 +560,163 @@ func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO {
557560
F: d.f,
558561
Namespace: pod.Namespace,
559562
PodName: pod.Name,
560-
ContainerName: "plugin",
563+
ContainerName: pod.Spec.Containers[0].Name,
561564
Logger: &logger,
562565
}
563566
}
564567

565-
func listen(f *framework.Framework, podName, containerName string, port int) func(ctx context.Context, path string) (net.Listener, error) {
566-
return func(ctx context.Context, path string) (net.Listener, error) {
568+
// errListenerDone is the special error that we use to shut down.
569+
// It doesn't need to be logged.
570+
var errListenerDone = errors.New("listener is shutting down")
571+
572+
// listen returns the function which the kubeletplugin helper needs to open a listening socket.
573+
// For that it spins up hostpathplugin in the pod for the desired node
574+
// and connects to hostpathplugin via port forwarding.
575+
func listen(f *framework.Framework, pod *v1.Pod, port *int32) func(ctx context.Context, path string) (net.Listener, error) {
576+
return func(ctx context.Context, path string) (l net.Listener, e error) {
577+
// "Allocate" a new port by by bumping the per-pod counter by one.
578+
port := atomic.AddInt32(port, 1)
579+
580+
logger := klog.FromContext(ctx)
581+
logger = klog.LoggerWithName(logger, "socket-listener")
582+
logger = klog.LoggerWithValues(logger, "endpoint", path, "port", port)
583+
ctx = klog.NewContext(ctx, logger)
584+
585+
// Start hostpathplugin in proxy mode and keep it running until the listener gets closed.
586+
req := f.ClientSet.CoreV1().RESTClient().Post().
587+
Resource("pods").
588+
Namespace(f.Namespace.Name).
589+
Name(pod.Name).
590+
SubResource("exec").
591+
VersionedParams(&v1.PodExecOptions{
592+
Container: pod.Spec.Containers[0].Name,
593+
Command: []string{
594+
"/hostpathplugin",
595+
"--v=5",
596+
"--endpoint=" + path,
597+
fmt.Sprintf("--proxy-endpoint=tcp://:%d", port),
598+
},
599+
Stdout: true,
600+
Stderr: true,
601+
}, scheme.ParameterCodec)
602+
var wg sync.WaitGroup
603+
wg.Add(1)
604+
cmdCtx, cmdCancel := context.WithCancelCause(ctx)
605+
go func() {
606+
defer wg.Done()
607+
cmdLogger := klog.LoggerWithName(logger, "hostpathplugin")
608+
cmdCtx := klog.NewContext(cmdCtx, cmdLogger)
609+
logger.V(1).Info("Starting...")
610+
defer logger.V(1).Info("Stopped")
611+
if err := execute(cmdCtx, req.URL(), f.ClientConfig(), 5); err != nil {
612+
// errors.Is(err, listenerDoneErr) would be nicer, but we don't get
613+
// that error from remotecommand. Instead forgo logging when we already shut down.
614+
if cmdCtx.Err() == nil {
615+
logger.Error(err, "execution failed")
616+
}
617+
}
618+
619+
// Killing hostpathplugin does not remove the socket. Need to do that manually.
620+
req := f.ClientSet.CoreV1().RESTClient().Post().
621+
Resource("pods").
622+
Namespace(f.Namespace.Name).
623+
Name(pod.Name).
624+
SubResource("exec").
625+
VersionedParams(&v1.PodExecOptions{
626+
Container: pod.Spec.Containers[0].Name,
627+
Command: []string{
628+
"rm",
629+
"-f",
630+
path,
631+
},
632+
Stdout: true,
633+
Stderr: true,
634+
}, scheme.ParameterCodec)
635+
cleanupLogger := klog.LoggerWithName(logger, "cleanup")
636+
cleanupCtx := klog.NewContext(ctx, cleanupLogger)
637+
if err := execute(cleanupCtx, req.URL(), f.ClientConfig(), 0); err != nil {
638+
cleanupLogger.Error(err, "Socket removal failed")
639+
}
640+
}()
641+
defer func() {
642+
// If we don't return a functional listener, then clean up.
643+
if e != nil {
644+
cmdCancel(e)
645+
}
646+
}()
647+
stopHostpathplugin := func() {
648+
cmdCancel(errListenerDone)
649+
wg.Wait()
650+
}
651+
567652
addr := proxy.Addr{
568653
Namespace: f.Namespace.Name,
569-
PodName: podName,
570-
ContainerName: containerName,
571-
Port: port,
654+
PodName: pod.Name,
655+
ContainerName: pod.Spec.Containers[0].Name,
656+
Port: int(port),
572657
}
573658
listener, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(), addr)
574659
if err != nil {
575660
return nil, fmt.Errorf("listen for connections from %+v: %w", addr, err)
576661
}
577-
return listener, nil
662+
return &listenerWithClose{Listener: listener, close: stopHostpathplugin}, nil
578663
}
579664
}
580665

666+
// listenerWithClose wraps Close so that it also shuts down hostpathplugin.
667+
type listenerWithClose struct {
668+
net.Listener
669+
close func()
670+
}
671+
672+
func (l *listenerWithClose) Close() error {
673+
// First close connections, then shut down the remote command.
674+
// Otherwise the connection code is unhappy and logs errors.
675+
err := l.Listener.Close()
676+
l.close()
677+
return err
678+
}
679+
680+
// execute runs a remote command with stdout/stderr redirected to log messages at the chosen verbosity level.
681+
func execute(ctx context.Context, url *url.URL, config *rest.Config, verbosity int) error {
682+
// Stream output as long as we run, i.e. ignore cancellation.
683+
stdout := pipe(context.WithoutCancel(ctx), "STDOUT", verbosity)
684+
stderr := pipe(context.WithoutCancel(ctx), "STDERR", verbosity)
685+
defer func() { _ = stdout.Close() }()
686+
defer func() { _ = stderr.Close() }()
687+
688+
executor := exec.DefaultRemoteExecutor{}
689+
return executor.ExecuteWithContext(ctx, url, config, nil, stdout, stderr, false, nil)
690+
}
691+
692+
// pipe creates an in-memory pipe and starts logging whatever is sent through that pipe in the background.
693+
func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter {
694+
logger := klog.FromContext(ctx)
695+
696+
reader, writer := io.Pipe()
697+
go func() {
698+
buffer := make([]byte, 10*1024)
699+
for {
700+
n, err := reader.Read(buffer)
701+
if n > 0 {
702+
logger.V(verbosity).Info(msg, "msg", string(buffer[0:n]))
703+
}
704+
if err != nil {
705+
if !errors.Is(err, io.EOF) {
706+
logger.Error(err, msg)
707+
}
708+
reader.CloseWithError(err)
709+
return
710+
}
711+
if ctx.Err() != nil {
712+
reader.CloseWithError(context.Cause(ctx))
713+
return
714+
}
715+
}
716+
}()
717+
return writer
718+
}
719+
581720
func (d *Driver) TearDown() {
582721
for _, c := range d.cleanup {
583722
c()

test/e2e/testing-manifests/dra/dra-test-driver-proxy.yaml

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,29 +46,21 @@ spec:
4646
topologyKey: kubernetes.io/hostname
4747

4848
containers:
49-
- name: registrar
49+
- name: pause
5050
image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3
51-
args:
52-
- "--v=5"
53-
- "--endpoint=/var/lib/kubelet/plugins_registry/test-driver.dra.k8s.io-reg.sock"
54-
- "--proxy-endpoint=tcp://:9000"
55-
volumeMounts:
56-
- mountPath: /var/lib/kubelet/plugins_registry
57-
name: registration-dir
58-
59-
- name: plugin
60-
image: registry.k8s.io/sig-storage/hostpathplugin:v1.7.3
61-
args:
62-
- "--v=5"
63-
- "--endpoint=/var/lib/kubelet/plugins/test-driver.dra.k8s.io/dra.sock"
64-
- "--proxy-endpoint=tcp://:9001"
65-
securityContext:
66-
privileged: true
51+
command:
52+
- /bin/sh
53+
- -c
54+
- while true; do sleep 10000; done
6755
volumeMounts:
6856
- mountPath: /var/lib/kubelet/plugins/test-driver.dra.k8s.io
6957
name: socket-dir
58+
- mountPath: /var/lib/kubelet/plugins_registry
59+
name: registration-dir
7060
- mountPath: /cdi
7161
name: cdi-dir
62+
securityContext:
63+
privileged: true
7264

7365
volumes:
7466
- hostPath:
@@ -78,11 +70,11 @@ spec:
7870
path: /var/lib/kubelet/plugins/test-driver.dra.k8s.io
7971
type: DirectoryOrCreate
8072
name: socket-dir
81-
- hostPath:
82-
path: /var/run/cdi
83-
type: DirectoryOrCreate
84-
name: cdi-dir
8573
- hostPath:
8674
path: /var/lib/kubelet/plugins_registry
8775
type: DirectoryOrCreate
8876
name: registration-dir
77+
- hostPath:
78+
path: /var/run/cdi
79+
type: DirectoryOrCreate
80+
name: cdi-dir

0 commit comments

Comments
 (0)