Skip to content

Commit a85409b

Browse files
committed
Updates Kubelet Plugin Registration process
Currently, when a Kubelet Plugin is being added in the DesiredStateOfWorld, a timestamp is saved in the PluginInfo. This timestamp is then updated on subsequent plugin reregistrations. The Reconciler, when it detects different timestamps for a Plugin in its DesiredStateOfWorld and ActualStateOfWorld, it will then trigger a Plugin unregister and then a new Plugin registration. Basically, the timestamp is being used to detect whether or not a Plugin needs to be reregistered or not. However, this can be an issue on Windows, where the time measurements are not as fine-grained. time.Now() calls within the same ~1-15ms window will have the same timestamp. This can mean that Plugin Reregistration events can be missed on Windows [1]. Because of this, some of the Plugin registration unit tests fail on Windows. This commit updates the behaviour, instead of relying on different timestamps, the Reconciler will check the set PluginInfo UUID to detect a Plugin Reregistration. With this change, the unit tests mentioned above will also pass on Windows. [1] golang/go#8687
1 parent 8e5b26b commit a85409b

File tree

9 files changed

+105
-70
lines changed

9 files changed

+105
-70
lines changed

pkg/kubelet/pluginmanager/cache/actual_state_of_world.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sync"
2626
"time"
2727

28+
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/klog/v2"
2930
)
3031

@@ -53,9 +54,13 @@ type ActualStateOfWorld interface {
5354
// If a plugin does not exist with the given socket path, this is a no-op.
5455
RemovePlugin(socketPath string)
5556

56-
// PluginExists checks if the given plugin exists in the current actual
57+
// Deprecated: PluginExistsWithCorrectTimestamp checks if the given plugin exists in the current actual
5758
// state of world cache with the correct timestamp
5859
PluginExistsWithCorrectTimestamp(pluginInfo PluginInfo) bool
60+
61+
// PluginExistsWithCorrectUUID checks if the given plugin exists in the current actual
62+
// state of world cache with the correct UUID
63+
PluginExistsWithCorrectUUID(pluginInfo PluginInfo) bool
5964
}
6065

6166
// NewActualStateOfWorld returns a new instance of ActualStateOfWorld
@@ -79,6 +84,7 @@ var _ ActualStateOfWorld = &actualStateOfWorld{}
7984
type PluginInfo struct {
8085
SocketPath string
8186
Timestamp time.Time
87+
UUID types.UID
8288
Handler PluginHandler
8389
Name string
8490
}
@@ -124,3 +130,13 @@ func (asw *actualStateOfWorld) PluginExistsWithCorrectTimestamp(pluginInfo Plugi
124130
actualStatePlugin, exists := asw.socketFileToInfo[pluginInfo.SocketPath]
125131
return exists && (actualStatePlugin.Timestamp == pluginInfo.Timestamp)
126132
}
133+
134+
func (asw *actualStateOfWorld) PluginExistsWithCorrectUUID(pluginInfo PluginInfo) bool {
135+
asw.RLock()
136+
defer asw.RUnlock()
137+
138+
// We need to check both if the socket file path exists, and the UUID
139+
// matches the given plugin (from the desired state cache) UUID
140+
actualStatePlugin, exists := asw.socketFileToInfo[pluginInfo.SocketPath]
141+
return exists && (actualStatePlugin.UUID == pluginInfo.UUID)
142+
}

pkg/kubelet/pluginmanager/cache/actual_state_of_world_test.go

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,23 @@ limitations under the License.
1717
package cache
1818

1919
import (
20-
"runtime"
20+
goruntime "runtime"
2121
"testing"
2222
"time"
2323

2424
"github.com/stretchr/testify/require"
25+
"k8s.io/apimachinery/pkg/util/uuid"
2526
)
2627

2728
// Calls AddPlugin() to add a plugin
2829
// Verifies newly added plugin exists in GetRegisteredPlugins()
29-
// Verifies PluginExistsWithCorrectTimestamp returns true for the plugin
30+
// Verifies PluginExistsWithCorrectUUID returns true for the plugin
31+
// Verifies PluginExistsWithCorrectTimestamp returns true for the plugin (excluded on Windows)
3032
func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
3133
pluginInfo := PluginInfo{
3234
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
3335
Timestamp: time.Now(),
36+
UUID: uuid.NewUUID(),
3437
Handler: nil,
3538
Name: "test",
3639
}
@@ -50,20 +53,29 @@ func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
5053
t.Fatalf("Expected\n%v\nin actual state of world, but got\n%v\n", pluginInfo, aswPlugins[0])
5154
}
5255

56+
// Check PluginExistsWithCorrectUUID returns true
57+
if !asw.PluginExistsWithCorrectUUID(pluginInfo) {
58+
t.Fatalf("PluginExistsWithCorrectUUID returns false for plugin that should be registered")
59+
}
60+
5361
// Check PluginExistsWithCorrectTimestamp returns true
54-
if !asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
62+
// Skipped on Windows. Time measurements are not as fine-grained on Windows and can lead to
63+
// 2 consecutive time.Now() calls to be return identical timestamps.
64+
if goruntime.GOOS != "windows" && !asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
5565
t.Fatalf("PluginExistsWithCorrectTimestamp returns false for plugin that should be registered")
5666
}
5767
}
5868

5969
// Calls AddPlugin() to add an empty string for socket path
6070
// Verifies the plugin does not exist in GetRegisteredPlugins()
61-
// Verifies PluginExistsWithCorrectTimestamp returns false
71+
// Verifies PluginExistsWithCorrectUUID returns false
72+
// Verifies PluginExistsWithCorrectTimestamp returns false (excluded on Windows)
6273
func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
6374
asw := NewActualStateOfWorld()
6475
pluginInfo := PluginInfo{
6576
SocketPath: "",
6677
Timestamp: time.Now(),
78+
UUID: uuid.NewUUID(),
6779
Handler: nil,
6880
Name: "test",
6981
}
@@ -76,21 +88,30 @@ func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
7688
t.Fatalf("Actual state of world length should be zero but it's %d", len(aswPlugins))
7789
}
7890

91+
// Check PluginExistsWithCorrectUUID returns false
92+
if asw.PluginExistsWithCorrectUUID(pluginInfo) {
93+
t.Fatalf("PluginExistsWithCorrectUUID returns true for plugin that's not registered")
94+
}
95+
7996
// Check PluginExistsWithCorrectTimestamp returns false
80-
if asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
97+
// Skipped on Windows. Time measurements are not as fine-grained on Windows and can lead to
98+
// 2 consecutive time.Now() calls to be return identical timestamps.
99+
if goruntime.GOOS != "windows" && asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
81100
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for plugin that's not registered")
82101
}
83102
}
84103

85104
// Calls RemovePlugin() to remove a plugin
86105
// Verifies newly removed plugin no longer exists in GetRegisteredPlugins()
87-
// Verifies PluginExistsWithCorrectTimestamp returns false
106+
// Verifies PluginExistsWithCorrectUUID returns false
107+
// Verifies PluginExistsWithCorrectTimestamp returns false (excluded on Windows)
88108
func Test_ASW_RemovePlugin_Positive(t *testing.T) {
89109
// First, add a plugin
90110
asw := NewActualStateOfWorld()
91111
pluginInfo := PluginInfo{
92112
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
93113
Timestamp: time.Now(),
114+
UUID: uuid.NewUUID(),
94115
Handler: nil,
95116
Name: "test",
96117
}
@@ -109,25 +130,28 @@ func Test_ASW_RemovePlugin_Positive(t *testing.T) {
109130
t.Fatalf("Actual state of world length should be zero but it's %d", len(aswPlugins))
110131
}
111132

133+
// Check PluginExistsWithCorrectUUID returns false
134+
if asw.PluginExistsWithCorrectUUID(pluginInfo) {
135+
t.Fatalf("PluginExistsWithCorrectUUID returns true for the removed plugin")
136+
}
137+
112138
// Check PluginExistsWithCorrectTimestamp returns false
113-
if asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
139+
// Skipped on Windows. Time measurements are not as fine-grained on Windows and can lead to
140+
// 2 consecutive time.Now() calls to be return identical timestamps.
141+
if goruntime.GOOS != "windows" && asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
114142
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for the removed plugin")
115143
}
116144
}
117145

118-
// Verifies PluginExistsWithCorrectTimestamp returns false for an existing
119-
// plugin with the wrong timestamp
120-
func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testing.T) {
121-
// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
122-
if runtime.GOOS == "windows" {
123-
t.Skip("Skipping test that fails on Windows")
124-
}
125-
146+
// Verifies PluginExistsWithCorrectUUID returns false for an existing
147+
// plugin with the wrong UUID
148+
func Test_ASW_PluginExistsWithCorrectUUID_Negative_WrongUUID(t *testing.T) {
126149
// First, add a plugin
127150
asw := NewActualStateOfWorld()
128151
pluginInfo := PluginInfo{
129152
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
130153
Timestamp: time.Now(),
154+
UUID: uuid.NewUUID(),
131155
Handler: nil,
132156
Name: "test",
133157
}
@@ -140,9 +164,10 @@ func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testin
140164
newerPlugin := PluginInfo{
141165
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
142166
Timestamp: time.Now(),
167+
UUID: uuid.NewUUID(),
143168
}
144-
// Check PluginExistsWithCorrectTimestamp returns false
145-
if asw.PluginExistsWithCorrectTimestamp(newerPlugin) {
146-
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for a plugin with newer timestamp")
169+
// Check PluginExistsWithCorrectUUID returns false
170+
if asw.PluginExistsWithCorrectUUID(newerPlugin) {
171+
t.Fatalf("PluginExistsWithCorrectUUID returns true for a plugin with a different UUID")
147172
}
148173
}

pkg/kubelet/pluginmanager/cache/desired_state_of_world.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sync"
2626
"time"
2727

28+
"k8s.io/apimachinery/pkg/util/uuid"
2829
"k8s.io/klog/v2"
2930
)
3031

@@ -132,12 +133,13 @@ func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string) error {
132133
}
133134

134135
// Update the PluginInfo object.
135-
// Note that we only update the timestamp in the desired state of world, not the actual state of world
136+
// Note that we only update the timestamp and UUID in the desired state of world, not the actual state of world
136137
// because in the reconciler, we need to check if the plugin in the actual state of world is the same
137138
// version as the plugin in the desired state of world
138139
dsw.socketFileToInfo[socketPath] = PluginInfo{
139140
SocketPath: socketPath,
140141
Timestamp: time.Now(),
142+
UUID: uuid.NewUUID(),
141143
}
142144
return nil
143145
}

pkg/kubelet/pluginmanager/cache/desired_state_of_world_test.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package cache
1818

1919
import (
20-
"runtime"
2120
"testing"
2221

2322
"github.com/stretchr/testify/require"
@@ -54,11 +53,6 @@ func Test_DSW_AddOrUpdatePlugin_Positive_NewPlugin(t *testing.T) {
5453
// Verifies the timestamp the existing plugin is updated
5554
// Verifies newly added plugin returns true for PluginExists()
5655
func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
57-
// Skip tests that fail on Windows, as discussed during the SIG Testing meeting from January 10, 2023
58-
if runtime.GOOS == "windows" {
59-
t.Skip("Skipping test that fails on Windows")
60-
}
61-
6256
dsw := NewDesiredStateOfWorld()
6357
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
6458
// Adding the plugin for the first time
@@ -75,7 +69,7 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
7569
if dswPlugins[0].SocketPath != socketPath {
7670
t.Fatalf("Expected\n%s\nin desired state of world, but got\n%v\n", socketPath, dswPlugins[0])
7771
}
78-
oldTimestamp := dswPlugins[0].Timestamp
72+
oldUUID := dswPlugins[0].UUID
7973

8074
// Adding the plugin again so that the timestamp will be updated
8175
err = dsw.AddOrUpdatePlugin(socketPath)
@@ -90,9 +84,9 @@ func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
9084
t.Fatalf("Expected\n%s\nin desired state of world, but got\n%v\n", socketPath, newDswPlugins[0])
9185
}
9286

93-
// Verify that the new timestamp is newer than the old timestamp
94-
if !newDswPlugins[0].Timestamp.After(oldTimestamp) {
95-
t.Fatal("New timestamp is not newer than the old timestamp", newDswPlugins[0].Timestamp, oldTimestamp)
87+
// Verify that the new UUID is different from the old UUID
88+
if newDswPlugins[0].UUID == oldUUID {
89+
t.Fatal("New UUID is not different from the old UUID", newDswPlugins[0].UUID, oldUUID)
9690
}
9791

9892
}

pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ limitations under the License.
2121
package operationexecutor
2222

2323
import (
24-
"time"
24+
"k8s.io/apimachinery/pkg/types"
2525

2626
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
2727
"k8s.io/kubernetes/pkg/util/goroutinemap"
@@ -45,7 +45,7 @@ import (
4545
type OperationExecutor interface {
4646
// RegisterPlugin registers the given plugin using a handler in the plugin handler map.
4747
// It then updates the actual state of the world to reflect that.
48-
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
48+
RegisterPlugin(socketPath string, UUID types.UID, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
4949

5050
// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
5151
// It then updates the actual state of the world to reflect that.
@@ -94,11 +94,11 @@ func (oe *operationExecutor) IsOperationPending(socketPath string) bool {
9494

9595
func (oe *operationExecutor) RegisterPlugin(
9696
socketPath string,
97-
timestamp time.Time,
97+
pluginUUID types.UID,
9898
pluginHandlers map[string]cache.PluginHandler,
9999
actualStateOfWorld ActualStateOfWorldUpdater) error {
100100
generatedOperation :=
101-
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
101+
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, pluginUUID, pluginHandlers, actualStateOfWorld)
102102

103103
return oe.pendingOperations.Run(
104104
socketPath, generatedOperation)

pkg/kubelet/pluginmanager/operationexecutor/operation_executor_test.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ import (
2323
"testing"
2424
"time"
2525

26+
"github.com/stretchr/testify/assert"
27+
"k8s.io/apimachinery/pkg/types"
28+
"k8s.io/apimachinery/pkg/util/uuid"
2629
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
2730
)
2831

@@ -46,7 +49,8 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
4649
ch, quit, oe := setup()
4750
for i := 0; i < numPluginsToRegister; i++ {
4851
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
49-
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
52+
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
53+
assert.NoError(t, err)
5054
}
5155
if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
5256
t.Fatalf("Unable to start register operations in Concurrent for plugins")
@@ -56,8 +60,16 @@ func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T)
5660
func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
5761
ch, quit, oe := setup()
5862
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
59-
for i := 0; i < numPluginsToRegister; i++ {
60-
oe.RegisterPlugin(socketPath, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
63+
64+
// First registration should not fail.
65+
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
66+
assert.NoError(t, err)
67+
68+
for i := 1; i < numPluginsToRegister; i++ {
69+
err := oe.RegisterPlugin(socketPath, uuid.NewUUID(), nil /* plugin handlers */, nil /* actual state of the world updator */)
70+
if err == nil {
71+
t.Fatalf("RegisterPlugin did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name is already executing.> Actual: <no error>", socketPath)
72+
}
6173

6274
}
6375
if !isOperationRunSerially(ch, quit) {
@@ -105,7 +117,7 @@ func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) Opera
105117

106118
func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
107119
socketPath string,
108-
timestamp time.Time,
120+
pluginUUID types.UID,
109121
pluginHandlers map[string]cache.PluginHandler,
110122
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
111123

pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"net"
2828
"time"
2929

30+
"k8s.io/apimachinery/pkg/types"
3031
"k8s.io/klog/v2"
3132

3233
"google.golang.org/grpc"
@@ -62,7 +63,7 @@ type OperationGenerator interface {
6263
// Generates the RegisterPlugin function needed to perform the registration of a plugin
6364
GenerateRegisterPluginFunc(
6465
socketPath string,
65-
timestamp time.Time,
66+
UUID types.UID,
6667
pluginHandlers map[string]cache.PluginHandler,
6768
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
6869

@@ -74,7 +75,7 @@ type OperationGenerator interface {
7475

7576
func (og *operationGenerator) GenerateRegisterPluginFunc(
7677
socketPath string,
77-
timestamp time.Time,
78+
pluginUUID types.UID,
7879
pluginHandlers map[string]cache.PluginHandler,
7980
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
8081

@@ -114,7 +115,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
114115
// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
115116
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
116117
SocketPath: socketPath,
117-
Timestamp: timestamp,
118+
UUID: pluginUUID,
118119
Handler: handler,
119120
Name: infoResp.Name,
120121
})

pkg/kubelet/pluginmanager/reconciler/reconciler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (rc *reconciler) reconcile() {
122122
// Iterate through desired state of world plugins and see if there's any plugin
123123
// with the same socket path but different timestamp.
124124
for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
125-
if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp {
125+
if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.UUID != registeredPlugin.UUID {
126126
klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin)
127127
unregisterPlugin = true
128128
break
@@ -148,9 +148,9 @@ func (rc *reconciler) reconcile() {
148148

149149
// Ensure plugins that should be registered are registered
150150
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
151-
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
151+
if !rc.actualStateOfWorld.PluginExistsWithCorrectUUID(pluginToRegister) {
152152
klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister)
153-
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
153+
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld)
154154
if err != nil &&
155155
!goroutinemap.IsAlreadyExists(err) &&
156156
!exponentialbackoff.IsExponentialBackoff(err) {

0 commit comments

Comments
 (0)