Skip to content

Commit b471c2c

Browse files
committed
DRA kubelet: support rolling upgrades
The key difference is that the kubelet must remember all plugin instances because it could always happen that the new instance dies and leaves only the old one running. The endpoints of each instance must be different. Registering a plugin with the same endpoint as some other instance is not supported and triggers an error, which should get reported as "not registered" to the plugin. This should only happen when the kubelet missed some unregistration event and re-registers the same instance again. The recovery in this case is for the plugin to shut down, remove its socket, which should get observed by kubelet, and then try again after a restart.
1 parent 760903c commit b471c2c

File tree

14 files changed

+128
-68
lines changed

14 files changed

+128
-68
lines changed

pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s
4343
return s.connectClient(pluginName, endpoint)
4444
}
4545

46-
func (s *server) DeRegisterPlugin(pluginName string) {
47-
klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName)
46+
func (s *server) DeRegisterPlugin(pluginName, endpoint string) {
47+
klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint)
4848
client := s.getClient(pluginName)
4949
if client != nil {
5050
s.disconnectClient(pluginName, client)
@@ -88,7 +88,6 @@ func (s *server) disconnectClient(name string, c Client) error {
8888
s.deregisterClient(name)
8989
return c.Disconnect()
9090
}
91-
9291
func (s *server) registerClient(name string, c Client) {
9392
s.mutex.Lock()
9493
defer s.mutex.Unlock()

pkg/kubelet/cm/dra/manager_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ func TestPrepareResources(t *testing.T) {
584584
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
585585
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
586586
}
587-
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
587+
defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests
588588

589589
if test.claimInfo != nil {
590590
manager.cache.add(test.claimInfo)
@@ -721,7 +721,7 @@ func TestUnprepareResources(t *testing.T) {
721721
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
722722
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
723723
}
724-
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
724+
defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests
725725

726726
manager := &ManagerImpl{
727727
kubeClient: fakeKubeClient,
@@ -891,7 +891,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) {
891891
if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil {
892892
t.Fatalf("failed to register plugin %s, err: %v", driverName, err)
893893
}
894-
defer plg.DeRegisterPlugin(driverName)
894+
defer plg.DeRegisterPlugin(driverName, draServerInfo.socketName)
895895

896896
// Create ClaimInfo cache
897897
cache, err := newClaimInfoCache(t.TempDir(), draManagerStateFileName)

pkg/kubelet/cm/dra/plugin/plugin_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func TestGRPCConnIsReused(t *testing.T) {
136136

137137
// ensure the plugin we are using is registered
138138
draPlugins.add(p)
139-
defer draPlugins.delete(pluginName)
139+
defer draPlugins.remove(pluginName, addr)
140140

141141
// we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused
142142
for i := 0; i < 2; i++ {
@@ -210,7 +210,7 @@ func TestNewDRAPluginClient(t *testing.T) {
210210
setup: func(name string) tearDown {
211211
draPlugins.add(&Plugin{name: name})
212212
return func() {
213-
draPlugins.delete(name)
213+
draPlugins.remove(name, "")
214214
}
215215
},
216216
pluginName: "dummy-plugin",
@@ -298,7 +298,7 @@ func TestGRPCMethods(t *testing.T) {
298298
}
299299

300300
draPlugins.add(p)
301-
defer draPlugins.delete(pluginName)
301+
defer draPlugins.remove(pluginName, addr)
302302

303303
client, err := NewDRAPluginClient(pluginName)
304304
if err != nil {

pkg/kubelet/cm/dra/plugin/plugins_store.go

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ package plugin
1818

1919
import (
2020
"errors"
21+
"fmt"
22+
"slices"
2123
"sync"
2224
)
2325

2426
// PluginsStore holds a list of DRA Plugins.
2527
type pluginsStore struct {
2628
sync.RWMutex
27-
store map[string]*Plugin
29+
// plugin name -> Plugin in the order in which they got added
30+
store map[string][]*Plugin
2831
}
2932

3033
// draPlugins map keeps track of all registered DRA plugins on the node
@@ -37,43 +40,57 @@ func (s *pluginsStore) get(pluginName string) *Plugin {
3740
s.RLock()
3841
defer s.RUnlock()
3942

40-
return s.store[pluginName]
43+
instances := s.store[pluginName]
44+
if len(instances) == 0 {
45+
return nil
46+
}
47+
// Heuristic: pick the most recent one. It's most likely
48+
// the newest, except when kubelet got restarted and registered
49+
// all running plugins in random order.
50+
return instances[len(instances)-1]
4151
}
4252

4353
// Set lets you save a DRA Plugin to the list and give it a specific name.
4454
// This method is protected by a mutex.
45-
func (s *pluginsStore) add(p *Plugin) (replacedPlugin *Plugin, replaced bool) {
55+
func (s *pluginsStore) add(p *Plugin) error {
4656
s.Lock()
4757
defer s.Unlock()
4858

4959
if s.store == nil {
50-
s.store = make(map[string]*Plugin)
60+
s.store = make(map[string][]*Plugin)
5161
}
52-
53-
replacedPlugin, exists := s.store[p.name]
54-
s.store[p.name] = p
55-
56-
if replacedPlugin != nil && replacedPlugin.cancel != nil {
57-
replacedPlugin.cancel(errors.New("plugin got replaced"))
62+
for _, oldP := range s.store[p.name] {
63+
if oldP.endpoint == p.endpoint {
64+
// One plugin instance cannot hijack the endpoint of another instance.
65+
return fmt.Errorf("endpoint %s already registered for plugin %s", p.endpoint, p.name)
66+
}
5867
}
59-
60-
return replacedPlugin, exists
68+
s.store[p.name] = append(s.store[p.name], p)
69+
return nil
6170
}
6271

63-
// Delete lets you delete a DRA Plugin by name.
64-
// This method is protected by a mutex.
65-
func (s *pluginsStore) delete(pluginName string) *Plugin {
72+
// remove lets you remove one endpoint for a DRA Plugin.
73+
// This method is protected by a mutex. It returns the
74+
// plugin if found and true if that was the last instance
75+
func (s *pluginsStore) remove(pluginName, endpoint string) (*Plugin, bool) {
6676
s.Lock()
6777
defer s.Unlock()
6878

69-
p, exists := s.store[pluginName]
70-
if !exists {
71-
return nil
79+
instances := s.store[pluginName]
80+
i := slices.IndexFunc(instances, func(p *Plugin) bool { return p.endpoint == endpoint })
81+
if i == -1 {
82+
return nil, false
83+
}
84+
p := instances[i]
85+
last := len(instances) == 1
86+
if last {
87+
delete(s.store, pluginName)
88+
} else {
89+
s.store[pluginName] = slices.Delete(instances, i, i+1)
7290
}
91+
7392
if p.cancel != nil {
7493
p.cancel(errors.New("plugin got removed"))
7594
}
76-
delete(s.store, pluginName)
77-
78-
return p
95+
return p, last
7996
}

pkg/kubelet/cm/dra/plugin/plugins_store_test.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"testing"
2323

2424
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
2526
)
2627

2728
func TestAddSameName(t *testing.T) {
@@ -30,26 +31,35 @@ func TestAddSameName(t *testing.T) {
3031

3132
firstWasCancelled := false
3233
p := &Plugin{
33-
name: pluginName,
34-
cancel: func(err error) { firstWasCancelled = true },
34+
name: pluginName,
35+
endpoint: "old",
36+
cancel: func(err error) { firstWasCancelled = true },
3537
}
3638

3739
// ensure the plugin we are using is registered
38-
draPlugins.add(p)
39-
defer draPlugins.delete(p.name)
40+
require.NoError(t, draPlugins.add(p))
41+
defer draPlugins.remove(p.name, p.endpoint)
4042

4143
assert.False(t, firstWasCancelled, "should not cancel context after the first call")
4244

45+
// Same name, same endpoint -> error.
46+
require.Error(t, draPlugins.add(p))
47+
4348
secondWasCancelled := false
4449
p2 := &Plugin{
45-
name: pluginName,
46-
cancel: func(err error) { secondWasCancelled = true },
50+
name: pluginName,
51+
endpoint: "new",
52+
cancel: func(err error) { secondWasCancelled = true },
4753
}
54+
require.NoError(t, draPlugins.add(p2))
55+
defer draPlugins.remove(p2.name, p2.endpoint)
4856

49-
draPlugins.add(p2)
50-
defer draPlugins.delete(p2.name)
57+
assert.False(t, firstWasCancelled, "should not cancel context after registering the second instance")
58+
assert.False(t, secondWasCancelled, "should not cancel context of a new plugin")
5159

52-
assert.True(t, firstWasCancelled, "should cancel context after the second call")
60+
// Remove old plugin.
61+
draPlugins.remove(p.name, p.endpoint)
62+
assert.True(t, firstWasCancelled, "should have canceled context after the explicit removal")
5363
assert.False(t, secondWasCancelled, "should not cancel context of a new plugin")
5464
}
5565

@@ -65,7 +75,7 @@ func TestDelete(t *testing.T) {
6575
// ensure the plugin we are using is registered
6676
draPlugins.add(p)
6777

68-
draPlugins.delete(p.name)
78+
draPlugins.remove(p.name, "")
6979

7080
assert.True(t, wasCancelled, "should cancel context after the second call")
7181
}

pkg/kubelet/cm/dra/plugin/registration.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
178178
// into all log output related to the plugin.
179179
ctx := h.backgroundCtx
180180
logger := klog.FromContext(ctx)
181-
logger = klog.LoggerWithValues(logger, "pluginName", pluginName)
181+
logger = klog.LoggerWithValues(logger, "pluginName", pluginName, "endpoint", endpoint)
182182
ctx = klog.NewContext(ctx, logger)
183183

184-
logger.V(3).Info("Register new DRA plugin", "endpoint", endpoint)
184+
logger.V(3).Info("Register new DRA plugin")
185185

186186
chosenService, err := h.validateSupportedServices(pluginName, supportedServices)
187187
if err != nil {
@@ -209,9 +209,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
209209

210210
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
211211
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
212-
213-
if oldPlugin, replaced := draPlugins.add(pluginInstance); replaced {
214-
logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint)
212+
if err := draPlugins.add(pluginInstance); err != nil {
213+
cancel(err)
214+
// No wrapping, the error already contains details.
215+
return err
215216
}
216217

217218
// Now cancel any pending ResourceSlice wiping for this plugin.
@@ -259,10 +260,14 @@ func (h *RegistrationHandler) validateSupportedServices(pluginName string, suppo
259260

260261
// DeRegisterPlugin is called when a plugin has removed its socket,
261262
// signaling it is no longer available.
262-
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
263-
if p := draPlugins.delete(pluginName); p != nil {
263+
func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) {
264+
if p, last := draPlugins.remove(pluginName, endpoint); p != nil {
265+
// This logger includes endpoint and pluginName.
264266
logger := klog.FromContext(p.backgroundCtx)
265-
logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint)
267+
logger.V(3).Info("Deregister DRA plugin", "lastInstance", last)
268+
if !last {
269+
return
270+
}
266271

267272
// Prepare for canceling the background wiping. This needs to run
268273
// in the context of the registration handler, the one from

pkg/kubelet/cm/dra/plugin/registration_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ func TestRegistrationHandler(t *testing.T) {
176176
// Simulate one existing plugin A.
177177
err := handler.RegisterPlugin(pluginA, endpointA, []string{drapb.DRAPluginService}, nil)
178178
require.NoError(t, err)
179+
t.Cleanup(func() {
180+
tCtx.Logf("Removing plugin %s", pluginA)
181+
handler.DeRegisterPlugin(pluginA, endpointA)
182+
})
179183

180184
err = handler.ValidatePlugin(test.pluginName, test.endpoint, test.supportedServices)
181185
if test.shouldError {
@@ -206,9 +210,10 @@ func TestRegistrationHandler(t *testing.T) {
206210
assert.NoError(t, err, "recreate slice")
207211
}
208212

209-
handler.DeRegisterPlugin(test.pluginName)
213+
tCtx.Logf("Removing plugin %s", test.pluginName)
214+
handler.DeRegisterPlugin(test.pluginName, test.endpoint)
210215
// Nop.
211-
handler.DeRegisterPlugin(test.pluginName)
216+
handler.DeRegisterPlugin(test.pluginName, test.endpoint)
212217

213218
requireNoSlices()
214219
})

pkg/kubelet/pluginmanager/cache/actual_state_of_world.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ type PluginInfo struct {
8989
UUID types.UID
9090
Handler PluginHandler
9191
Name string
92+
Endpoint string
9293
}
9394

9495
func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {

pkg/kubelet/pluginmanager/cache/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,5 @@ type PluginHandler interface {
5656
RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error
5757
// DeRegisterPlugin is called once the pluginwatcher observes that the socket has
5858
// been deleted.
59-
DeRegisterPlugin(pluginName string)
59+
DeRegisterPlugin(pluginName, endpoint string)
6060
}

pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
118118
UUID: pluginUUID,
119119
Handler: handler,
120120
Name: infoResp.Name,
121+
Endpoint: infoResp.Endpoint,
121122
})
122123
if err != nil {
123124
klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
@@ -147,7 +148,7 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc(
147148
// so that if we receive a register event during Register Plugin, we can process it as a Register call.
148149
actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath)
149150

150-
pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)
151+
pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name, pluginInfo.Endpoint)
151152

152153
klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler)
153154
return nil

0 commit comments

Comments
 (0)