Skip to content

Commit 342d328

Browse files
authored
Merge pull request kubernetes#89934 from tedyu/unregister-plugin
Simplify the unregistration of csiplugin
2 parents a1dc52e + 1001be8 commit 342d328

File tree

6 files changed

+34
-78
lines changed

6 files changed

+34
-78
lines changed

pkg/kubelet/pluginmanager/cache/actual_state_of_world.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,22 +75,12 @@ type actualStateOfWorld struct {
7575

7676
var _ ActualStateOfWorld = &actualStateOfWorld{}
7777

78-
// NamedPluginHandler holds information for handler and the name of the plugin
79-
type NamedPluginHandler struct {
80-
Handler PluginHandler
81-
Name string
82-
}
83-
84-
// SocketPluginHandlers contains the map from socket path to NamedPluginHandler
85-
type SocketPluginHandlers struct {
86-
Handlers map[string]NamedPluginHandler
87-
sync.Mutex
88-
}
89-
9078
// PluginInfo holds information of a plugin
9179
type PluginInfo struct {
9280
SocketPath string
9381
Timestamp time.Time
82+
Handler PluginHandler
83+
Name string
9484
}
9585

9686
func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {

pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
3030
pluginInfo := PluginInfo{
3131
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
3232
Timestamp: time.Now(),
33+
Handler: nil,
34+
Name: "test",
3335
}
3436
asw := NewActualStateOfWorld()
3537
err := asw.AddPlugin(pluginInfo)
@@ -61,6 +63,8 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
6163
pluginInfo := PluginInfo{
6264
SocketPath: "",
6365
Timestamp: time.Now(),
66+
Handler: nil,
67+
Name: "test",
6468
}
6569
err := asw.AddPlugin(pluginInfo)
6670
require.EqualError(t, err, "socket path is empty")
@@ -86,6 +90,8 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) {
8690
pluginInfo := PluginInfo{
8791
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
8892
Timestamp: time.Now(),
93+
Handler: nil,
94+
Name: "test",
8995
}
9096
err := asw.AddPlugin(pluginInfo)
9197
// Assert
@@ -116,6 +122,8 @@ func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testin
116122
pluginInfo := PluginInfo{
117123
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
118124
Timestamp: time.Now(),
125+
Handler: nil,
126+
Name: "test",
119127
}
120128
err := asw.AddPlugin(pluginInfo)
121129
// Assert

pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ import (
4545
type OperationExecutor interface {
4646
// RegisterPlugin registers the given plugin using the a handler in the plugin handler map.
4747
// It then updates the actual state of the world to reflect that.
48-
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error
48+
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
4949

5050
// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
5151
// It then updates the actual state of the world to reflect that.
52-
UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, pathToHandlers *cache.SocketPluginHandlers, actualStateOfWorld ActualStateOfWorldUpdater) error
52+
UnregisterPlugin(pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error
5353
}
5454

5555
// NewOperationExecutor returns a new instance of OperationExecutor.
@@ -96,23 +96,20 @@ func (oe *operationExecutor) RegisterPlugin(
9696
socketPath string,
9797
timestamp time.Time,
9898
pluginHandlers map[string]cache.PluginHandler,
99-
pathToHandlers *cache.SocketPluginHandlers,
10099
actualStateOfWorld ActualStateOfWorldUpdater) error {
101100
generatedOperation :=
102-
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, pathToHandlers, actualStateOfWorld)
101+
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
103102

104103
return oe.pendingOperations.Run(
105104
socketPath, generatedOperation)
106105
}
107106

108107
func (oe *operationExecutor) UnregisterPlugin(
109-
socketPath string,
110-
pluginHandlers map[string]cache.PluginHandler,
111-
pathToHandlers *cache.SocketPluginHandlers,
108+
pluginInfo cache.PluginInfo,
112109
actualStateOfWorld ActualStateOfWorldUpdater) error {
113110
generatedOperation :=
114-
oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, pathToHandlers, actualStateOfWorld)
111+
oe.operationGenerator.GenerateUnregisterPluginFunc(pluginInfo, actualStateOfWorld)
115112

116113
return oe.pendingOperations.Run(
117-
socketPath, generatedOperation)
114+
pluginInfo.SocketPath, generatedOperation)
118115
}

pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ func init() {
4444

4545
func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) {
4646
ch, quit, oe := setup()
47-
hdlr := cache.SocketPluginHandlers{}
4847
for i := 0; i < numPluginsToRegister; i++ {
4948
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
50-
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
49+
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
5150
}
5251
if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
5352
t.Fatalf("Unable to start register operations in Concurrent for plugins")
@@ -57,9 +56,8 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
5756
func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
5857
ch, quit, oe := setup()
5958
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
60-
hdlr := cache.SocketPluginHandlers{}
6159
for i := 0; i < numPluginsToRegister; i++ {
62-
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
60+
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
6361

6462
}
6563
if !isOperationRunSerially(ch, quit) {
@@ -69,10 +67,10 @@ func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
6967

7068
func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) {
7169
ch, quit, oe := setup()
72-
hdlr := cache.SocketPluginHandlers{}
7370
for i := 0; i < numPluginsToUnregister; i++ {
7471
socketPath := "socket-path" + strconv.Itoa(i)
75-
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
72+
pluginInfo := cache.PluginInfo{SocketPath: socketPath}
73+
oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)
7674

7775
}
7876
if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) {
@@ -83,9 +81,9 @@ func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testin
8381
func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) {
8482
ch, quit, oe := setup()
8583
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
86-
hdlr := cache.SocketPluginHandlers{}
8784
for i := 0; i < numPluginsToUnregister; i++ {
88-
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, &hdlr, nil /* actual state of the world updator */)
85+
pluginInfo := cache.PluginInfo{SocketPath: socketPath}
86+
oe.UnregisterPlugin(pluginInfo, nil /* actual state of the world updator */)
8987

9088
}
9189
if !isOperationRunSerially(ch, quit) {
@@ -109,7 +107,6 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
109107
socketPath string,
110108
timestamp time.Time,
111109
pluginHandlers map[string]cache.PluginHandler,
112-
pathToHandlers *cache.SocketPluginHandlers,
113110
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
114111

115112
opFunc := func() error {
@@ -120,9 +117,7 @@ func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
120117
}
121118

122119
func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc(
123-
socketPath string,
124-
pluginHandlers map[string]cache.PluginHandler,
125-
pathToHandlers *cache.SocketPluginHandlers,
120+
pluginInfo cache.PluginInfo,
126121
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
127122
opFunc := func() error {
128123
startOperationAndBlock(fopg.ch, fopg.quit)

pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go

Lines changed: 9 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -63,22 +63,18 @@ type OperationGenerator interface {
6363
socketPath string,
6464
timestamp time.Time,
6565
pluginHandlers map[string]cache.PluginHandler,
66-
pathToHandlers *cache.SocketPluginHandlers,
6766
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
6867

6968
// Generates the UnregisterPlugin function needed to perform the unregistration of a plugin
7069
GenerateUnregisterPluginFunc(
71-
socketPath string,
72-
pluginHandlers map[string]cache.PluginHandler,
73-
pathToHandlers *cache.SocketPluginHandlers,
70+
pluginInfo cache.PluginInfo,
7471
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
7572
}
7673

7774
func (og *operationGenerator) GenerateRegisterPluginFunc(
7875
socketPath string,
7976
timestamp time.Time,
8077
pluginHandlers map[string]cache.PluginHandler,
81-
pathToHandlers *cache.SocketPluginHandlers,
8278
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
8379

8480
registerPluginFunc := func() error {
@@ -118,19 +114,15 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
118114
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
119115
SocketPath: socketPath,
120116
Timestamp: timestamp,
117+
Handler: handler,
118+
Name: infoResp.Name,
121119
})
122120
if err != nil {
123121
klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err)
124122
}
125123
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
126124
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
127125
}
128-
pathToHandlers.Lock()
129-
if pathToHandlers.Handlers == nil {
130-
pathToHandlers.Handlers = make(map[string]cache.NamedPluginHandler)
131-
}
132-
pathToHandlers.Handlers[socketPath] = cache.NamedPluginHandler{Handler: handler, Name: infoResp.Name}
133-
pathToHandlers.Unlock()
134126

135127
// Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate
136128
if err := og.notifyPlugin(client, true, ""); err != nil {
@@ -142,37 +134,20 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
142134
}
143135

144136
func (og *operationGenerator) GenerateUnregisterPluginFunc(
145-
socketPath string,
146-
pluginHandlers map[string]cache.PluginHandler,
147-
pathToHandlers *cache.SocketPluginHandlers,
137+
pluginInfo cache.PluginInfo,
148138
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
149139

150140
unregisterPluginFunc := func() error {
151-
_, conn, err := dial(socketPath, dialTimeoutDuration)
152-
if err != nil {
153-
klog.V(4).Infof("unable to dial: %v", err)
154-
} else {
155-
conn.Close()
156-
}
157-
158-
var handlerWithName cache.NamedPluginHandler
159-
pathToHandlers.Lock()
160-
handlerWithName, handlerFound := pathToHandlers.Handlers[socketPath]
161-
pathToHandlers.Unlock()
162-
163-
if !handlerFound {
164-
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", socketPath)
141+
if pluginInfo.Handler == nil {
142+
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", pluginInfo.SocketPath)
165143
}
166144
// We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle
167145
// so that if we receive a register event during Register Plugin, we can process it as a Register call.
168-
actualStateOfWorldUpdater.RemovePlugin(socketPath)
146+
actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath)
169147

170-
handlerWithName.Handler.DeRegisterPlugin(handlerWithName.Name)
148+
pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)
171149

172-
pathToHandlers.Lock()
173-
delete(pathToHandlers.Handlers, socketPath)
174-
pathToHandlers.Unlock()
175-
klog.V(4).Infof("DeRegisterPlugin called for %s on %v", handlerWithName.Name, handlerWithName.Handler)
150+
klog.V(4).Infof("DeRegisterPlugin called for %s on %v", pluginInfo.Name, pluginInfo.Handler)
176151
return nil
177152
}
178153
return unregisterPluginFunc

pkg/kubelet/pluginmanager/reconciler/reconciler.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ func NewReconciler(
6767
desiredStateOfWorld: desiredStateOfWorld,
6868
actualStateOfWorld: actualStateOfWorld,
6969
handlers: make(map[string]cache.PluginHandler),
70-
pathToHandlers: cache.SocketPluginHandlers{Handlers: make(map[string]cache.NamedPluginHandler)},
7170
}
7271
}
7372

@@ -77,7 +76,6 @@ type reconciler struct {
7776
desiredStateOfWorld cache.DesiredStateOfWorld
7877
actualStateOfWorld cache.ActualStateOfWorld
7978
handlers map[string]cache.PluginHandler
80-
pathToHandlers cache.SocketPluginHandlers
8179
sync.RWMutex
8280
}
8381

@@ -105,13 +103,6 @@ func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
105103
return rc.handlers
106104
}
107105

108-
func (rc *reconciler) getPathToHandlers() *cache.SocketPluginHandlers {
109-
rc.RLock()
110-
defer rc.RUnlock()
111-
112-
return &rc.pathToHandlers
113-
}
114-
115106
func (rc *reconciler) reconcile() {
116107
// Unregisterations are triggered before registrations
117108

@@ -136,7 +127,7 @@ func (rc *reconciler) reconcile() {
136127

137128
if unregisterPlugin {
138129
klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", ""))
139-
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld)
130+
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
140131
if err != nil &&
141132
!goroutinemap.IsAlreadyExists(err) &&
142133
!exponentialbackoff.IsExponentialBackoff(err) {
@@ -154,7 +145,7 @@ func (rc *reconciler) reconcile() {
154145
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
155146
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
156147
klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", ""))
157-
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.getPathToHandlers(), rc.actualStateOfWorld)
148+
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
158149
if err != nil &&
159150
!goroutinemap.IsAlreadyExists(err) &&
160151
!exponentialbackoff.IsExponentialBackoff(err) {

0 commit comments

Comments
 (0)