Skip to content

Commit d14b0b0

Browse files
authored
Merge pull request kubernetes#114136 from claudiubelu/kubelet-plugins
Updates Kubelet Plugin Registration process
2 parents 7bd36b5 + 2604d7d commit d14b0b0

File tree

9 files changed

+108
-71
lines changed

9 files changed

+108
-71
lines changed

pkg/kubelet/pluginmanager/cache/actual_state_of_world.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sync"
2626
"time"
2727

28+
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/klog/v2"
2930
)
3031

@@ -53,9 +54,15 @@ type ActualStateOfWorld interface {
5354
// If a plugin does not exist with the given socket path, this is a no-op.
5455
RemovePlugin(socketPath string)
5556

56-
// PluginExists checks if the given plugin exists in the current actual
57-
// state of world cache with the correct timestamp
57+
// PluginExistsWithCorrectTimestamp checks if the given plugin exists in the current actual
58+
// state of world cache with the correct timestamp.
59+
// Deprecated: please use `PluginExistsWithCorrectUUID` instead as it provides a better
60+
// cross-platform support
5861
PluginExistsWithCorrectTimestamp(pluginInfo PluginInfo) bool
62+
63+
// PluginExistsWithCorrectUUID checks if the given plugin exists in the current actual
64+
// state of world cache with the correct UUID
65+
PluginExistsWithCorrectUUID(pluginInfo PluginInfo) bool
5966
}
6067

6168
// NewActualStateOfWorld returns a new instance of ActualStateOfWorld
@@ -79,6 +86,7 @@ var _ ActualStateOfWorld = &actualStateOfWorld{}
7986
type PluginInfo struct {
8087
SocketPath string
8188
Timestamp time.Time
89+
UUID types.UID
8290
Handler PluginHandler
8391
Name string
8492
}
@@ -124,3 +132,13 @@ func (asw *actualStateOfWorld) PluginExistsWithCorrectTimestamp(pluginInfo Plugi
124132
actualStatePlugin, exists := asw.socketFileToInfo[pluginInfo.SocketPath]
125133
return exists && (actualStatePlugin.Timestamp == pluginInfo.Timestamp)
126134
}
135+
136+
func (asw *actualStateOfWorld) PluginExistsWithCorrectUUID(pluginInfo PluginInfo) bool {
137+
asw.RLock()
138+
defer asw.RUnlock()
139+
140+
// We need to check both if the socket file path exists, and the UUID
141+
// matches the given plugin (from the desired state cache) UUID
142+
actualStatePlugin, exists := asw.socketFileToInfo[pluginInfo.SocketPath]
143+
return exists && (actualStatePlugin.UUID == pluginInfo.UUID)
144+
}

pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,23 @@ limitations under the License.
1717
package cache
1818

1919
import (
20-
"runtime"
20+
goruntime "runtime"
2121
"testing"
2222
"time"
2323

2424
"github.com/stretchr/testify/require"
25+
"k8s.io/apimachinery/pkg/util/uuid"
2526
)
2627

2728
// Calls AddPlugin() to add a plugin
2829
// Verifies newly added plugin exists in GetRegisteredPlugins()
29-
// Verifies PluginExistsWithCorrectTimestamp returns true for the plugin
30+
// Verifies PluginExistsWithCorrectUUID returns true for the plugin
31+
// Verifies PluginExistsWithCorrectTimestamp returns true for the plugin (excluded on Windows)
3032
func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
3133
pluginInfo := PluginInfo{
3234
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
3335
Timestamp: time.Now(),
36+
UUID: uuid.NewUUID(),
3437
Handler: nil,
3538
Name: "test",
3639
}
@@ -50,20 +53,29 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
5053
t.Fatalf("Expected\n%v\nin actual state of world, but got\n%v\n", pluginInfo, aswPlugins[0])
5154
}
5255

56+
// Check PluginExistsWithCorrectUUID returns true
57+
if !asw.PluginExistsWithCorrectUUID(pluginInfo) {
58+
t.Fatalf("PluginExistsWithCorrectUUID returns false for plugin that should be registered")
59+
}
60+
5361
// Check PluginExistsWithCorrectTimestamp returns true
54-
if !asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
62+
// Skipped on Windows. Time measurements are not as fine-grained on Windows and can lead to
63+
// 2 consecutive time.Now() calls to be return identical timestamps.
64+
if goruntime.GOOS != "windows" && !asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
5565
t.Fatalf("PluginExistsWithCorrectTimestamp returns false for plugin that should be registered")
5666
}
5767
}
5868

5969
// Calls AddPlugin() to add an empty string for socket path
6070
// Verifies the plugin does not exist in GetRegisteredPlugins()
61-
// Verifies PluginExistsWithCorrectTimestamp returns false
71+
// Verifies PluginExistsWithCorrectUUID returns false
72+
// Verifies PluginExistsWithCorrectTimestamp returns false (excluded on Windows)
6273
func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
6374
asw := NewActualStateOfWorld()
6475
pluginInfo := PluginInfo{
6576
SocketPath: "",
6677
Timestamp: time.Now(),
78+
UUID: uuid.NewUUID(),
6779
Handler: nil,
6880
Name: "test",
6981
}
@@ -76,21 +88,30 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
7688
t.Fatalf("Actual state of world length should be zero but it's %d", len(aswPlugins))
7789
}
7890

91+
// Check PluginExistsWithCorrectUUID returns false
92+
if asw.PluginExistsWithCorrectUUID(pluginInfo) {
93+
t.Fatalf("PluginExistsWithCorrectUUID returns true for plugin that's not registered")
94+
}
95+
7996
// Check PluginExistsWithCorrectTimestamp returns false
80-
if asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
97+
// Skipped on Windows. Time measurements are not as fine-grained on Windows and can lead to
98+
// 2 consecutive time.Now() calls to be return identical timestamps.
99+
if goruntime.GOOS != "windows" && asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
81100
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for plugin that's not registered")
82101
}
83102
}
84103

85104
// Calls RemovePlugin() to remove a plugin
86105
// Verifies newly removed plugin no longer exists in GetRegisteredPlugins()
87-
// Verifies PluginExistsWithCorrectTimestamp returns false
106+
// Verifies PluginExistsWithCorrectUUID returns false
107+
// Verifies PluginExistsWithCorrectTimestamp returns false (excluded on Windows)
88108
func Test_ASW_RemovePlugin_Positive(t *testing.T) {
89109
// First, add a plugin
90110
asw := NewActualStateOfWorld()
91111
pluginInfo := PluginInfo{
92112
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
93113
Timestamp: time.Now(),
114+
UUID: uuid.NewUUID(),
94115
Handler: nil,
95116
Name: "test",
96117
}
@@ -109,25 +130,28 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) {
109130
t.Fatalf("Actual state of world length should be zero but it's %d", len(aswPlugins))
110131
}
111132

133+
// Check PluginExistsWithCorrectUUID returns false
134+
if asw.PluginExistsWithCorrectUUID(pluginInfo) {
135+
t.Fatalf("PluginExistsWithCorrectUUID returns true for the removed plugin")
136+
}
137+
112138
// Check PluginExistsWithCorrectTimestamp returns false
113-
if asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
139+
// Skipped on Windows. Time measurements are not as fine-grained on Windows and can lead to
140+
// 2 consecutive time.Now() calls to be return identical timestamps.
141+
if goruntime.GOOS != "windows" && asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
114142
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for the removed plugin")
115143
}
116144
}
117145

118-
// Verifies PluginExistsWithCorrectTimestamp returns false for an existing
119-
// plugin with the wrong timestamp
120-
func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testing.T) {
121-
// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
122-
if runtime.GOOS == "windows" {
123-
t.Skip("Skipping test that fails on Windows")
124-
}
125-
146+
// Verifies PluginExistsWithCorrectUUID returns false for an existing
147+
// plugin with the wrong UUID
148+
func Test_ASW_PluginExistsWithCorrectUUID_Negative_WrongUUID(t *testing.T) {
126149
// First, add a plugin
127150
asw := NewActualStateOfWorld()
128151
pluginInfo := PluginInfo{
129152
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
130153
Timestamp: time.Now(),
154+
UUID: uuid.NewUUID(),
131155
Handler: nil,
132156
Name: "test",
133157
}
@@ -140,9 +164,10 @@ func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testin
140164
newerPlugin := PluginInfo{
141165
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
142166
Timestamp: time.Now(),
167+
UUID: uuid.NewUUID(),
143168
}
144-
// Check PluginExistsWithCorrectTimestamp returns false
145-
if asw.PluginExistsWithCorrectTimestamp(newerPlugin) {
146-
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for a plugin with newer timestamp")
169+
// Check PluginExistsWithCorrectUUID returns false
170+
if asw.PluginExistsWithCorrectUUID(newerPlugin) {
171+
t.Fatalf("PluginExistsWithCorrectUUID returns true for a plugin with a different UUID")
147172
}
148173
}

pkg/kubelet/pluginmanager/cache/desired_state_of_world.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sync"
2626
"time"
2727

28+
"k8s.io/apimachinery/pkg/util/uuid"
2829
"k8s.io/klog/v2"
2930
)
3031

@@ -132,12 +133,13 @@ func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string) error {
132133
}
133134

134135
// Update the PluginInfo object.
135-
// Note that we only update the timestamp in the desired state of world, not the actual state of world
136+
// Note that we only update the timestamp and UUID in the desired state of world, not the actual state of world
136137
// because in the reconciler, we need to check if the plugin in the actual state of world is the same
137138
// version as the plugin in the desired state of world
138139
dsw.socketFileToInfo[socketPath] = PluginInfo{
139140
SocketPath: socketPath,
140141
Timestamp: time.Now(),
142+
UUID: uuid.NewUUID(),
141143
}
142144
return nil
143145
}

pkg/kubelet/pluginmanager/cache/desired_state_of_world_test.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package cache
1818

1919
import (
20-
"runtime"
2120
"testing"
2221

2322
"github.com/stretchr/testify/require"
@@ -54,11 +53,6 @@ func Test_DSW_AddOrUpdatePlugin_Positive_NewPlugin(t *testing.T) {
5453
// Verifies the timestamp the existing plugin is updated
5554
// Verifies newly added plugin returns true for PluginExists()
5655
func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
57-
// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
58-
if runtime.GOOS == "windows" {
59-
t.Skip("Skipping test that fails on Windows")
60-
}
61-
6256
dsw := NewDesiredStateOfWorld()
6357
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
6458
// Adding the plugin for the first time
@@ -75,7 +69,7 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
7569
if dswPlugins[0].SocketPath != socketPath {
7670
t.Fatalf("Expected\n%s\nin desired state of world, but got\n%v\n", socketPath, dswPlugins[0])
7771
}
78-
oldTimestamp := dswPlugins[0].Timestamp
72+
oldUUID := dswPlugins[0].UUID
7973

8074
// Adding the plugin again so that the timestamp will be updated
8175
err = dsw.AddOrUpdatePlugin(socketPath)
@@ -90,9 +84,9 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
9084
t.Fatalf("Expected\n%s\nin desired state of world, but got\n%v\n", socketPath, newDswPlugins[0])
9185
}
9286

93-
// Verify that the new timestamp is newer than the old timestamp
94-
if !newDswPlugins[0].Timestamp.After(oldTimestamp) {
95-
t.Fatal("New timestamp is not newer than the old timestamp", newDswPlugins[0].Timestamp, oldTimestamp)
87+
// Verify that the new UUID is different from the old UUID
88+
if newDswPlugins[0].UUID == oldUUID {
89+
t.Fatal("New UUID is not different from the old UUID", newDswPlugins[0].UUID, oldUUID)
9690
}
9791

9892
}

pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ limitations under the License.
2121
package operationexecutor
2222

2323
import (
24-
"time"
24+
"k8s.io/apimachinery/pkg/types"
2525

2626
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
2727
"k8s.io/kubernetes/pkg/util/goroutinemap"
@@ -45,7 +45,7 @@ import (
4545
type OperationExecutor interface {
4646
// RegisterPlugin registers the given plugin using a handler in the plugin handler map.
4747
// It then updates the actual state of the world to reflect that.
48-
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
48+
RegisterPlugin(socketPath string, UUID types.UID, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
4949

5050
// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
5151
// It then updates the actual state of the world to reflect that.
@@ -94,11 +94,11 @@ func (oe *operationExecutor) IsOperationPending(socketPath string) bool {
9494

9595
func (oe *operationExecutor) RegisterPlugin(
9696
socketPath string,
97-
timestamp time.Time,
97+
pluginUUID types.UID,
9898
pluginHandlers map[string]cache.PluginHandler,
9999
actualStateOfWorld ActualStateOfWorldUpdater) error {
100100
generatedOperation :=
101-
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
101+
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, pluginUUID, pluginHandlers, actualStateOfWorld)
102102

103103
return oe.pendingOperations.Run(
104104
socketPath, generatedOperation)

pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ import (
2323
"testing"
2424
"time"
2525

26+
"github.com/stretchr/testify/assert"
27+
"k8s.io/apimachinery/pkg/types"
28+
"k8s.io/apimachinery/pkg/util/uuid"
2629
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
2730
)
2831

@@ -46,7 +49,8 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
4649
ch, quit, oe := setup()
4750
for i := 0; i < numPluginsToRegister; i++ {
4851
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
49-
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
52+
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
53+
assert.NoError(t, err)
5054
}
5155
if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
5256
t.Fatalf("Unable to start register operations in Concurrent for plugins")
@@ -56,8 +60,16 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
5660
func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
5761
ch, quit, oe := setup()
5862
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
59-
for i := 0; i < numPluginsToRegister; i++ {
60-
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
63+
64+
// First registration should not fail.
65+
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
66+
assert.NoError(t, err)
67+
68+
for i := 1; i < numPluginsToRegister; i++ {
69+
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
70+
if err == nil {
71+
t.Fatalf("RegisterPlugin did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name is already executing.> Actual: <no error>", socketPath)
72+
}
6173

6274
}
6375
if !isOperationRunSerially(ch, quit) {
@@ -105,7 +117,7 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera
105117

106118
func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
107119
socketPath string,
108-
timestamp time.Time,
120+
pluginUUID types.UID,
109121
pluginHandlers map[string]cache.PluginHandler,
110122
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
111123

pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"net"
2828
"time"
2929

30+
"k8s.io/apimachinery/pkg/types"
3031
"k8s.io/klog/v2"
3132

3233
"google.golang.org/grpc"
@@ -62,7 +63,7 @@ type OperationGenerator interface {
6263
// Generates the RegisterPlugin function needed to perform the registration of a plugin
6364
GenerateRegisterPluginFunc(
6465
socketPath string,
65-
timestamp time.Time,
66+
UUID types.UID,
6667
pluginHandlers map[string]cache.PluginHandler,
6768
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
6869

@@ -74,7 +75,7 @@ type OperationGenerator interface {
7475

7576
func (og *operationGenerator) GenerateRegisterPluginFunc(
7677
socketPath string,
77-
timestamp time.Time,
78+
pluginUUID types.UID,
7879
pluginHandlers map[string]cache.PluginHandler,
7980
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
8081

@@ -114,7 +115,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
114115
// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
115116
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
116117
SocketPath: socketPath,
117-
Timestamp: timestamp,
118+
UUID: pluginUUID,
118119
Handler: handler,
119120
Name: infoResp.Name,
120121
})

pkg/kubelet/pluginmanager/reconciler/reconciler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (rc *reconciler) reconcile() {
122122
// Iterate through desired state of world plugins and see if there's any plugin
123123
// with the same socket path but different timestamp.
124124
for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
125-
if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp {
125+
if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.UUID != registeredPlugin.UUID {
126126
klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin)
127127
unregisterPlugin = true
128128
break
@@ -148,9 +148,9 @@ func (rc *reconciler) reconcile() {
148148

149149
// Ensure plugins that should be registered are registered
150150
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
151-
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
151+
if !rc.actualStateOfWorld.PluginExistsWithCorrectUUID(pluginToRegister) {
152152
klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister)
153-
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
153+
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld)
154154
if err != nil &&
155155
!goroutinemap.IsAlreadyExists(err) &&
156156
!exponentialbackoff.IsExponentialBackoff(err) {

0 commit comments

Comments
 (0)