Skip to content

Commit c6ea102

Browse files
authored
Merge pull request kubernetes#128298 from SergeyKanzhelev/convergePluginRegistrationForDRAAndDP
converge DRA and Device Plugin plugins registration
2 parents b845137 + 1297d0c commit c6ea102

10 files changed

+56
-23
lines changed

pkg/kubelet/cm/container_manager.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,14 @@ const (
5757

5858
type ActivePodsFunc func() []*v1.Pod
5959

60+
type GetNodeFunc func() (*v1.Node, error)
61+
6062
// Manages the containers running on a machine.
6163
type ContainerManager interface {
6264
// Runs the container manager's housekeeping.
6365
// - Ensures that the Docker daemon is in a container.
6466
// - Creates the system container where all non-containerized processes run.
65-
Start(context.Context, *v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService, bool) error
67+
Start(context.Context, *v1.Node, ActivePodsFunc, GetNodeFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService, bool) error
6668

6769
// SystemCgroupsLimit returns resources allocated to system cgroups in the machine.
6870
// These cgroups include the system and Kubernetes services.
@@ -115,10 +117,10 @@ type ContainerManager interface {
115117
// GetPodCgroupRoot returns the cgroup which contains all pods.
116118
GetPodCgroupRoot() string
117119

118-
// GetPluginRegistrationHandler returns a plugin registration handler
120+
// GetPluginRegistrationHandlers returns a set of plugin registration handlers
119121
// The pluginwatcher's Handlers allow to have a single module for handling
120122
// registration.
121-
GetPluginRegistrationHandler() cache.PluginHandler
123+
GetPluginRegistrationHandlers() map[string]cache.PluginHandler
122124

123125
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
124126
// due to node recreation.

pkg/kubelet/cm/container_manager_linux.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"k8s.io/client-go/tools/record"
4747
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
4848
internalapi "k8s.io/cri-api/pkg/apis"
49+
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
4950
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
5051
kubefeatures "k8s.io/kubernetes/pkg/features"
5152
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
@@ -132,6 +133,8 @@ type containerManagerImpl struct {
132133
topologyManager topologymanager.Manager
133134
// Interface for Dynamic Resource Allocation management.
134135
draManager dra.Manager
136+
// kubeClient is the interface to the Kubernetes API server. May be nil if the kubelet is running in standalone mode.
137+
kubeClient clientset.Interface
135138
}
136139

137140
type features struct {
@@ -312,6 +315,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
312315
return nil, err
313316
}
314317
}
318+
cm.kubeClient = kubeClient
315319

316320
// Initialize CPU manager
317321
cm.cpuManager, err = cpumanager.NewManager(
@@ -555,6 +559,7 @@ func (cm *containerManagerImpl) Status() Status {
555559

556560
func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
557561
activePods ActivePodsFunc,
562+
getNode GetNodeFunc,
558563
sourcesReady config.SourcesReady,
559564
podStatusProvider status.PodStatusProvider,
560565
runtimeService internalapi.RuntimeService,
@@ -564,7 +569,7 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
564569

565570
// Initialize DRA manager
566571
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
567-
err := cm.draManager.Start(ctx, dra.ActivePodsFunc(activePods), sourcesReady)
572+
err := cm.draManager.Start(ctx, dra.ActivePodsFunc(activePods), dra.GetNodeFunc(getNode), sourcesReady)
568573
if err != nil {
569574
return fmt.Errorf("start dra manager error: %w", err)
570575
}
@@ -649,8 +654,16 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
649654
return nil
650655
}
651656

652-
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
653-
return cm.deviceManager.GetWatcherHandler()
657+
func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
658+
res := map[string]cache.PluginHandler{
659+
pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler(),
660+
}
661+
662+
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
663+
res[pluginwatcherapi.DRAPlugin] = cm.draManager.GetWatcherHandler()
664+
}
665+
666+
return res
654667
}
655668

656669
// TODO: move the GetResources logic to PodContainerManager.

pkg/kubelet/cm/container_manager_stub.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type containerManagerStub struct {
4646

4747
var _ ContainerManager = &containerManagerStub{}
4848

49-
func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
49+
func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
5050
klog.V(2).InfoS("Starting stub container manager")
5151
return nil
5252
}
@@ -91,7 +91,7 @@ func (cm *containerManagerStub) GetCapacity(localStorageCapacityIsolation bool)
9191
return c
9292
}
9393

94-
func (cm *containerManagerStub) GetPluginRegistrationHandler() cache.PluginHandler {
94+
func (cm *containerManagerStub) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
9595
return nil
9696
}
9797

pkg/kubelet/cm/container_manager_unsupported.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type unsupportedContainerManager struct {
4040

4141
var _ ContainerManager = &unsupportedContainerManager{}
4242

43-
func (unsupportedContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
43+
func (unsupportedContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
4444
return fmt.Errorf("Container Manager is unsupported in this build")
4545
}
4646

pkg/kubelet/cm/container_manager_windows.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
clientset "k8s.io/client-go/kubernetes"
3636
"k8s.io/client-go/tools/record"
3737
internalapi "k8s.io/cri-api/pkg/apis"
38+
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
3839
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
3940
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
4041
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
@@ -72,6 +73,7 @@ func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttribute
7273

7374
func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
7475
activePods ActivePodsFunc,
76+
getNode GetNodeFunc,
7577
sourcesReady config.SourcesReady,
7678
podStatusProvider status.PodStatusProvider,
7779
runtimeService internalapi.RuntimeService,
@@ -176,8 +178,9 @@ func (cm *containerManagerImpl) GetCapacity(localStorageCapacityIsolation bool)
176178
return cm.capacity
177179
}
178180

179-
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
180-
return cm.deviceManager.GetWatcherHandler()
181+
func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
182+
// DRA is not supported on Windows, only device plugin is supported
183+
return map[string]cache.PluginHandler{pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler()}
181184
}
182185

183186
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {

pkg/kubelet/cm/devicemanager/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type Manager interface {
5959
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
6060
// and inactive device plugin resources previously registered on the node.
6161
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
62+
63+
// GetWatcherHandler returns the plugin handler for the device manager.
6264
GetWatcherHandler() cache.PluginHandler
6365

6466
// GetDevices returns information about the devices assigned to pods and containers

pkg/kubelet/cm/dra/manager.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"k8s.io/kubernetes/pkg/kubelet/config"
3838
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
3939
"k8s.io/kubernetes/pkg/kubelet/metrics"
40+
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
4041
)
4142

4243
// draManagerStateFileName is the file name where dra manager stores its state
@@ -48,6 +49,9 @@ const defaultReconcilePeriod = 60 * time.Second
4849
// ActivePodsFunc is a function that returns a list of pods to reconcile.
4950
type ActivePodsFunc func() []*v1.Pod
5051

52+
// GetNodeFunc is a function that returns the node object using the kubelet's node lister.
53+
type GetNodeFunc func() (*v1.Node, error)
54+
5155
// ManagerImpl is the structure in charge of managing DRA drivers.
5256
type ManagerImpl struct {
5357
// cache contains cached claim info
@@ -66,6 +70,9 @@ type ManagerImpl struct {
6670

6771
// KubeClient reference
6872
kubeClient clientset.Interface
73+
74+
// getNode is a function that returns the node object using the kubelet's node lister.
75+
getNode GetNodeFunc
6976
}
7077

7178
// NewManagerImpl creates a new manager.
@@ -90,9 +97,14 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
9097
return manager, nil
9198
}
9299

100+
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
101+
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode))
102+
}
103+
93104
// Start starts the reconcile loop of the manager.
94-
func (m *ManagerImpl) Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
105+
func (m *ManagerImpl) Start(ctx context.Context, activePods ActivePodsFunc, getNode GetNodeFunc, sourcesReady config.SourcesReady) error {
95106
m.activePods = activePods
107+
m.getNode = getNode
96108
m.sourcesReady = sourcesReady
97109
go wait.UntilWithContext(ctx, func(ctx context.Context) { m.reconcileLoop(ctx) }, m.reconcilePeriod)
98110
return nil

pkg/kubelet/cm/dra/types.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@ import (
2323
"k8s.io/apimachinery/pkg/types"
2424
"k8s.io/kubernetes/pkg/kubelet/config"
2525
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
26+
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
2627
)
2728

2829
// Manager manages all the DRA resource plugins running on a node.
2930
type Manager interface {
31+
// GetWatcherHandler returns the plugin handler for the DRA.
32+
GetWatcherHandler() cache.PluginHandler
33+
3034
// Start starts the reconcile loop of the manager.
3135
// This will ensure that all claims are unprepared even if pods get deleted unexpectedly.
32-
Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
36+
Start(ctx context.Context, activePods ActivePodsFunc, getNode GetNodeFunc, sourcesReady config.SourcesReady) error
3337

3438
// PrepareResources prepares resources for a pod.
3539
// It communicates with the DRA resource plugin to prepare resources.

pkg/kubelet/cm/fake_container_manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func NewFakeContainerManager() *FakeContainerManager {
5353
}
5454
}
5555

56-
func (cm *FakeContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
56+
func (cm *FakeContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
5757
cm.Lock()
5858
defer cm.Unlock()
5959
cm.CalledFunctions = append(cm.CalledFunctions, "Start")
@@ -124,10 +124,10 @@ func (cm *FakeContainerManager) GetCapacity(localStorageCapacityIsolation bool)
124124
return c
125125
}
126126

127-
func (cm *FakeContainerManager) GetPluginRegistrationHandler() cache.PluginHandler {
127+
func (cm *FakeContainerManager) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
128128
cm.Lock()
129129
defer cm.Unlock()
130-
cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationHandler")
130+
cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationHandlers")
131131
return nil
132132
}
133133

pkg/kubelet/kubelet.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ import (
8181
"k8s.io/kubernetes/pkg/kubelet/cloudresource"
8282
"k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
8383
"k8s.io/kubernetes/pkg/kubelet/cm"
84-
draplugin "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
8584
"k8s.io/kubernetes/pkg/kubelet/config"
8685
"k8s.io/kubernetes/pkg/kubelet/configmap"
8786
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@@ -1575,7 +1574,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
15751574
os.Exit(1)
15761575
}
15771576
// containerManager must start after cAdvisor because it needs filesystem capacity information
1578-
if err := kl.containerManager.Start(context.TODO(), node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil {
1577+
if err := kl.containerManager.Start(context.TODO(), node, kl.GetActivePods, kl.getNodeAnyWay, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil {
15791578
// Fail kubelet and rely on the babysitter to retry starting kubelet.
15801579
klog.ErrorS(err, "Failed to start ContainerManager")
15811580
os.Exit(1)
@@ -1589,12 +1588,10 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
15891588
kl.containerLogManager.Start()
15901589
// Adding Registration Callback function for CSI Driver
15911590
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
1592-
// Adding Registration Callback function for DRA Plugin
1593-
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
1594-
kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.getNodeAnyWay)))
1591+
// Adding Registration Callback function for DRA Plugin and Device Plugin
1592+
for name, handler := range kl.containerManager.GetPluginRegistrationHandlers() {
1593+
kl.pluginManager.AddHandler(name, handler)
15951594
}
1596-
// Adding Registration Callback function for Device Manager
1597-
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
15981595

15991596
// Start the plugin manager
16001597
klog.V(4).InfoS("Starting plugin manager")

0 commit comments

Comments
 (0)