Skip to content

Commit 64621d1

Browse files
authored
Merge pull request kubernetes#129832 from pohly/dra-seamless-upgrade
DRA: seamless driver upgrades
2 parents fe27448 + 582b421 commit 64621d1

File tree

21 files changed

+639
-111
lines changed

21 files changed

+639
-111
lines changed

pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s
4343
return s.connectClient(pluginName, endpoint)
4444
}
4545

46-
func (s *server) DeRegisterPlugin(pluginName string) {
47-
klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName)
46+
func (s *server) DeRegisterPlugin(pluginName, endpoint string) {
47+
klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint)
4848
client := s.getClient(pluginName)
4949
if client != nil {
5050
s.disconnectClient(pluginName, client)
@@ -88,7 +88,6 @@ func (s *server) disconnectClient(name string, c Client) error {
8888
s.deregisterClient(name)
8989
return c.Disconnect()
9090
}
91-
9291
func (s *server) registerClient(name string, c Client) {
9392
s.mutex.Lock()
9493
defer s.mutex.Unlock()

pkg/kubelet/cm/dra/manager.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,20 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
9898
}
9999

100100
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
101-
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode))
101+
// The time that DRA drivers have to come back after being unregistered
102+
// before the kubelet removes their ResourceSlices.
103+
//
104+
// This must be long enough to actually allow stopping a pod and
105+
// starting the replacement (otherwise ResourceSlices get deleted
106+
// unnecessarily) and not too long (otherwise the time window were
107+
// pods might still get scheduled to the node after removal of a
108+
// driver is too long).
109+
//
110+
// 30 seconds might be long enough for a simple container restart.
111+
// If a DRA driver wants to be sure that slices don't get wiped,
112+
// it should use rolling updates.
113+
wipingDelay := 30 * time.Second
114+
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode, wipingDelay))
102115
}
103116

104117
// Start starts the reconcile loop of the manager.

pkg/kubelet/cm/dra/manager_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -580,11 +580,11 @@ func TestPrepareResources(t *testing.T) {
580580
}
581581
defer draServerInfo.teardownFn()
582582

583-
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
583+
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
584584
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
585585
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
586586
}
587-
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
587+
defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests
588588

589589
if test.claimInfo != nil {
590590
manager.cache.add(test.claimInfo)
@@ -717,11 +717,11 @@ func TestUnprepareResources(t *testing.T) {
717717
}
718718
defer draServerInfo.teardownFn()
719719

720-
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
720+
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
721721
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
722722
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
723723
}
724-
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
724+
defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests
725725

726726
manager := &ManagerImpl{
727727
kubeClient: fakeKubeClient,
@@ -887,11 +887,11 @@ func TestParallelPrepareUnprepareResources(t *testing.T) {
887887
}
888888
defer draServerInfo.teardownFn()
889889

890-
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
890+
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
891891
if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil {
892892
t.Fatalf("failed to register plugin %s, err: %v", driverName, err)
893893
}
894-
defer plg.DeRegisterPlugin(driverName)
894+
defer plg.DeRegisterPlugin(driverName, draServerInfo.socketName)
895895

896896
// Create ClaimInfo cache
897897
cache, err := newClaimInfoCache(t.TempDir(), draManagerStateFileName)

pkg/kubelet/cm/dra/plugin/plugin_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func TestGRPCConnIsReused(t *testing.T) {
136136

137137
// ensure the plugin we are using is registered
138138
draPlugins.add(p)
139-
defer draPlugins.delete(pluginName)
139+
defer draPlugins.remove(pluginName, addr)
140140

141141
// we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused
142142
for i := 0; i < 2; i++ {
@@ -210,7 +210,7 @@ func TestNewDRAPluginClient(t *testing.T) {
210210
setup: func(name string) tearDown {
211211
draPlugins.add(&Plugin{name: name})
212212
return func() {
213-
draPlugins.delete(name)
213+
draPlugins.remove(name, "")
214214
}
215215
},
216216
pluginName: "dummy-plugin",
@@ -298,7 +298,7 @@ func TestGRPCMethods(t *testing.T) {
298298
}
299299

300300
draPlugins.add(p)
301-
defer draPlugins.delete(pluginName)
301+
defer draPlugins.remove(pluginName, addr)
302302

303303
client, err := NewDRAPluginClient(pluginName)
304304
if err != nil {

pkg/kubelet/cm/dra/plugin/plugins_store.go

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ package plugin
1818

1919
import (
2020
"errors"
21+
"fmt"
22+
"slices"
2123
"sync"
2224
)
2325

2426
// PluginsStore holds a list of DRA Plugins.
2527
type pluginsStore struct {
2628
sync.RWMutex
27-
store map[string]*Plugin
29+
// plugin name -> Plugin in the order in which they got added
30+
store map[string][]*Plugin
2831
}
2932

3033
// draPlugins map keeps track of all registered DRA plugins on the node
@@ -37,43 +40,57 @@ func (s *pluginsStore) get(pluginName string) *Plugin {
3740
s.RLock()
3841
defer s.RUnlock()
3942

40-
return s.store[pluginName]
43+
instances := s.store[pluginName]
44+
if len(instances) == 0 {
45+
return nil
46+
}
47+
// Heuristic: pick the most recent one. It's most likely
48+
// the newest, except when kubelet got restarted and registered
49+
// all running plugins in random order.
50+
return instances[len(instances)-1]
4151
}
4252

4353
// Set lets you save a DRA Plugin to the list and give it a specific name.
4454
// This method is protected by a mutex.
45-
func (s *pluginsStore) add(p *Plugin) (replacedPlugin *Plugin, replaced bool) {
55+
func (s *pluginsStore) add(p *Plugin) error {
4656
s.Lock()
4757
defer s.Unlock()
4858

4959
if s.store == nil {
50-
s.store = make(map[string]*Plugin)
60+
s.store = make(map[string][]*Plugin)
5161
}
52-
53-
replacedPlugin, exists := s.store[p.name]
54-
s.store[p.name] = p
55-
56-
if replacedPlugin != nil && replacedPlugin.cancel != nil {
57-
replacedPlugin.cancel(errors.New("plugin got replaced"))
62+
for _, oldP := range s.store[p.name] {
63+
if oldP.endpoint == p.endpoint {
64+
// One plugin instance cannot hijack the endpoint of another instance.
65+
return fmt.Errorf("endpoint %s already registered for plugin %s", p.endpoint, p.name)
66+
}
5867
}
59-
60-
return replacedPlugin, exists
68+
s.store[p.name] = append(s.store[p.name], p)
69+
return nil
6170
}
6271

63-
// Delete lets you delete a DRA Plugin by name.
64-
// This method is protected by a mutex.
65-
func (s *pluginsStore) delete(pluginName string) *Plugin {
72+
// remove lets you remove one endpoint for a DRA Plugin.
73+
// This method is protected by a mutex. It returns the
74+
// plugin if found and true if that was the last instance
75+
func (s *pluginsStore) remove(pluginName, endpoint string) (*Plugin, bool) {
6676
s.Lock()
6777
defer s.Unlock()
6878

69-
p, exists := s.store[pluginName]
70-
if !exists {
71-
return nil
79+
instances := s.store[pluginName]
80+
i := slices.IndexFunc(instances, func(p *Plugin) bool { return p.endpoint == endpoint })
81+
if i == -1 {
82+
return nil, false
83+
}
84+
p := instances[i]
85+
last := len(instances) == 1
86+
if last {
87+
delete(s.store, pluginName)
88+
} else {
89+
s.store[pluginName] = slices.Delete(instances, i, i+1)
7290
}
91+
7392
if p.cancel != nil {
7493
p.cancel(errors.New("plugin got removed"))
7594
}
76-
delete(s.store, pluginName)
77-
78-
return p
95+
return p, last
7996
}

pkg/kubelet/cm/dra/plugin/plugins_store_test.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"testing"
2323

2424
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
2526
)
2627

2728
func TestAddSameName(t *testing.T) {
@@ -30,26 +31,35 @@ func TestAddSameName(t *testing.T) {
3031

3132
firstWasCancelled := false
3233
p := &Plugin{
33-
name: pluginName,
34-
cancel: func(err error) { firstWasCancelled = true },
34+
name: pluginName,
35+
endpoint: "old",
36+
cancel: func(err error) { firstWasCancelled = true },
3537
}
3638

3739
// ensure the plugin we are using is registered
38-
draPlugins.add(p)
39-
defer draPlugins.delete(p.name)
40+
require.NoError(t, draPlugins.add(p))
41+
defer draPlugins.remove(p.name, p.endpoint)
4042

4143
assert.False(t, firstWasCancelled, "should not cancel context after the first call")
4244

45+
// Same name, same endpoint -> error.
46+
require.Error(t, draPlugins.add(p))
47+
4348
secondWasCancelled := false
4449
p2 := &Plugin{
45-
name: pluginName,
46-
cancel: func(err error) { secondWasCancelled = true },
50+
name: pluginName,
51+
endpoint: "new",
52+
cancel: func(err error) { secondWasCancelled = true },
4753
}
54+
require.NoError(t, draPlugins.add(p2))
55+
defer draPlugins.remove(p2.name, p2.endpoint)
4856

49-
draPlugins.add(p2)
50-
defer draPlugins.delete(p2.name)
57+
assert.False(t, firstWasCancelled, "should not cancel context after registering the second instance")
58+
assert.False(t, secondWasCancelled, "should not cancel context of a new plugin")
5159

52-
assert.True(t, firstWasCancelled, "should cancel context after the second call")
60+
// Remove old plugin.
61+
draPlugins.remove(p.name, p.endpoint)
62+
assert.True(t, firstWasCancelled, "should have canceled context after the explicit removal")
5363
assert.False(t, secondWasCancelled, "should not cancel context of a new plugin")
5464
}
5565

@@ -65,7 +75,7 @@ func TestDelete(t *testing.T) {
6575
// ensure the plugin we are using is registered
6676
draPlugins.add(p)
6777

68-
draPlugins.delete(p.name)
78+
draPlugins.remove(p.name, "")
6979

7080
assert.True(t, wasCancelled, "should cancel context after the second call")
7181
}

0 commit comments

Comments
 (0)