Skip to content

Commit c9f1b57

Browse files
committed
Reset extended resources only when node is recreated.
1 parent 2fd1556 commit c9f1b57

11 files changed

+150
-20
lines changed

pkg/kubelet/cm/container_manager.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
"k8s.io/apimachinery/pkg/util/sets"
2323
// TODO: Migrate kubelet to either use its own internal objects or client library.
24-
"k8s.io/api/core/v1"
24+
v1 "k8s.io/api/core/v1"
2525
internalapi "k8s.io/cri-api/pkg/apis"
2626
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
2727
"k8s.io/kubernetes/pkg/kubelet/config"
@@ -104,6 +104,10 @@ type ContainerManager interface {
104104

105105
// GetDevices returns information about the devices assigned to pods and containers
106106
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
107+
108+
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
109+
// due to node recreation.
110+
ShouldResetExtendedResourceCapacity() bool
107111
}
108112

109113
type NodeConfig struct {

pkg/kubelet/cm/container_manager_linux.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
"github.com/opencontainers/runc/libcontainer/configs"
3535
"k8s.io/klog"
3636

37-
"k8s.io/api/core/v1"
37+
v1 "k8s.io/api/core/v1"
3838
"k8s.io/apimachinery/pkg/api/resource"
3939
utilerrors "k8s.io/apimachinery/pkg/util/errors"
4040
"k8s.io/apimachinery/pkg/util/sets"
@@ -897,3 +897,7 @@ func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceLi
897897
func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
898898
return cm.deviceManager.GetDevices(podUID, containerName)
899899
}
900+
901+
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
902+
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
903+
}

pkg/kubelet/cm/container_manager_stub.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ limitations under the License.
1717
package cm
1818

1919
import (
20-
"k8s.io/api/core/v1"
20+
v1 "k8s.io/api/core/v1"
2121
"k8s.io/klog"
2222

2323
"k8s.io/apimachinery/pkg/api/resource"
@@ -32,7 +32,9 @@ import (
3232
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
3333
)
3434

35-
type containerManagerStub struct{}
35+
type containerManagerStub struct {
36+
shouldResetExtendedResourceCapacity bool
37+
}
3638

3739
var _ ContainerManager = &containerManagerStub{}
3840

@@ -110,6 +112,14 @@ func (cm *containerManagerStub) GetDevices(_, _ string) []*podresourcesapi.Conta
110112
return nil
111113
}
112114

115+
func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool {
116+
return cm.shouldResetExtendedResourceCapacity
117+
}
118+
113119
func NewStubContainerManager() ContainerManager {
114-
return &containerManagerStub{}
120+
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
121+
}
122+
123+
func NewStubContainerManagerWithExtendedResource(shouldResetExtendedResourceCapacity bool) ContainerManager {
124+
return &containerManagerStub{shouldResetExtendedResourceCapacity: shouldResetExtendedResourceCapacity}
115125
}

pkg/kubelet/cm/container_manager_windows.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ package cm
2424
import (
2525
"fmt"
2626

27-
"k8s.io/api/core/v1"
27+
v1 "k8s.io/api/core/v1"
2828
"k8s.io/apimachinery/pkg/api/resource"
2929
utilfeature "k8s.io/apiserver/pkg/util/feature"
3030
"k8s.io/client-go/tools/record"
@@ -171,3 +171,7 @@ func (cm *containerManagerImpl) GetPodCgroupRoot() string {
171171
func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
172172
return nil
173173
}
174+
175+
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
176+
return false
177+
}

pkg/kubelet/cm/devicemanager/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
visibility = ["//visibility:public"],
1515
deps = [
1616
"//pkg/apis/core/v1/helper:go_default_library",
17+
"//pkg/features:go_default_library",
1718
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
1819
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
1920
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
@@ -30,6 +31,7 @@ go_library(
3031
"//staging/src/k8s.io/api/core/v1:go_default_library",
3132
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
3233
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
34+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
3335
"//vendor/google.golang.org/grpc:go_default_library",
3436
"//vendor/k8s.io/klog:go_default_library",
3537
],

pkg/kubelet/cm/devicemanager/manager.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ import (
2828
"google.golang.org/grpc"
2929
"k8s.io/klog"
3030

31-
"k8s.io/api/core/v1"
31+
v1 "k8s.io/api/core/v1"
3232
"k8s.io/apimachinery/pkg/api/resource"
3333
"k8s.io/apimachinery/pkg/util/sets"
34+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3435
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
36+
"k8s.io/kubernetes/pkg/features"
3537
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
3638
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
3739
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
@@ -838,3 +840,17 @@ func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesap
838840
defer m.mutex.Unlock()
839841
return m.podDevices.getContainerDevices(podUID, containerName)
840842
}
843+
844+
// ShouldResetExtendedResourceCapacity returns whether the extended resources should be zeroed or not,
845+
// depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node
846+
// has been recreated.
847+
func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
848+
if utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) {
849+
checkpoints, err := m.checkpointManager.ListCheckpoints()
850+
if err != nil {
851+
return false
852+
}
853+
return len(checkpoints) == 0
854+
}
855+
return false
856+
}

pkg/kubelet/cm/devicemanager/manager_stub.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ limitations under the License.
1717
package devicemanager
1818

1919
import (
20-
"k8s.io/api/core/v1"
20+
v1 "k8s.io/api/core/v1"
2121
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
2222
"k8s.io/kubernetes/pkg/kubelet/config"
2323
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@@ -67,3 +67,8 @@ func (h *ManagerStub) GetWatcherHandler() pluginwatcher.PluginHandler {
6767
func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
6868
return nil
6969
}
70+
71+
// ShouldResetExtendedResourceCapacity returns false
72+
func (h *ManagerStub) ShouldResetExtendedResourceCapacity() bool {
73+
return false
74+
}

pkg/kubelet/cm/devicemanager/manager_test.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
"github.com/stretchr/testify/assert"
2929
"github.com/stretchr/testify/require"
30-
"k8s.io/api/core/v1"
30+
v1 "k8s.io/api/core/v1"
3131
"k8s.io/apimachinery/pkg/api/resource"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
"k8s.io/apimachinery/pkg/util/sets"
@@ -946,6 +946,45 @@ func TestDevicePreStartContainer(t *testing.T) {
946946
as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs))
947947
}
948948

949+
func TestResetExtendedResource(t *testing.T) {
950+
as := assert.New(t)
951+
tmpDir, err := ioutil.TempDir("", "checkpoint")
952+
as.Nil(err)
953+
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
954+
as.Nil(err)
955+
testManager := &ManagerImpl{
956+
endpoints: make(map[string]endpointInfo),
957+
healthyDevices: make(map[string]sets.String),
958+
unhealthyDevices: make(map[string]sets.String),
959+
allocatedDevices: make(map[string]sets.String),
960+
podDevices: make(podDevices),
961+
checkpointManager: ckm,
962+
}
963+
964+
extendedResourceName := "domain.com/resource"
965+
testManager.podDevices.insert("pod", "con", extendedResourceName,
966+
constructDevices([]string{"dev1"}),
967+
constructAllocResp(map[string]string{"/dev/dev1": "/dev/dev1"},
968+
map[string]string{"/home/lib1": "/usr/lib1"}, map[string]string{}))
969+
970+
testManager.healthyDevices[extendedResourceName] = sets.NewString()
971+
testManager.healthyDevices[extendedResourceName].Insert("dev1")
972+
// checkpoint is present, indicating node hasn't been recreated
973+
err = testManager.writeCheckpoint()
974+
as.Nil(err)
975+
976+
as.False(testManager.ShouldResetExtendedResourceCapacity())
977+
978+
// checkpoint is absent, representing node recreation
979+
ckpts, err := ckm.ListCheckpoints()
980+
as.Nil(err)
981+
for _, ckpt := range ckpts {
982+
err = ckm.RemoveCheckpoint(ckpt)
983+
as.Nil(err)
984+
}
985+
as.True(testManager.ShouldResetExtendedResourceCapacity())
986+
}
987+
949988
func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) {
950989
return func(devs []string) (*pluginapi.AllocateResponse, error) {
951990
resp := new(pluginapi.ContainerAllocateResponse)

pkg/kubelet/cm/devicemanager/types.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package devicemanager
1919
import (
2020
"time"
2121

22-
"k8s.io/api/core/v1"
22+
v1 "k8s.io/api/core/v1"
2323
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
2424
"k8s.io/kubernetes/pkg/kubelet/config"
2525
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@@ -58,6 +58,11 @@ type Manager interface {
5858

5959
// GetDevices returns information about the devices assigned to pods and containers
6060
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
61+
62+
// ShouldResetExtendedResourceCapacity returns whether the extended resources should be reset or not,
63+
// depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates
64+
// the node has been recreated.
65+
ShouldResetExtendedResourceCapacity() bool
6166
}
6267

6368
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.

pkg/kubelet/kubelet_node_status.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626

2727
"k8s.io/klog"
2828

29-
"k8s.io/api/core/v1"
29+
v1 "k8s.io/api/core/v1"
3030
apiequality "k8s.io/apimachinery/pkg/api/equality"
3131
apierrors "k8s.io/apimachinery/pkg/api/errors"
3232
"k8s.io/apimachinery/pkg/api/resource"
@@ -132,12 +132,15 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
132132
// Zeros out extended resource capacity during reconciliation.
133133
func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
134134
requiresUpdate := false
135-
for k := range node.Status.Capacity {
136-
if v1helper.IsExtendedResourceName(k) {
137-
klog.Infof("Zero out resource %s capacity in existing node.", k)
138-
node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
139-
node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
140-
requiresUpdate = true
135+
// Check with the device manager to see if node has been recreated, in which case extended resources should be zeroed until they are available
136+
if kl.containerManager.ShouldResetExtendedResourceCapacity() {
137+
for k := range node.Status.Capacity {
138+
if v1helper.IsExtendedResourceName(k) {
139+
klog.Infof("Zero out resource %s capacity in existing node.", k)
140+
node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
141+
node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
142+
requiresUpdate = true
143+
}
141144
}
142145
}
143146
return requiresUpdate

0 commit comments

Comments
 (0)