Skip to content

Commit 4932adf

Browse files
authored
Merge pull request kubernetes#125296 from jsturtevant/windows-numa-support
Support CPU and Topology manager on Windows
2 parents 19d5629 + ac174f5 commit 4932adf

14 files changed

+930
-38
lines changed

pkg/features/kube_features.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,12 @@ const (
715715
// Allows kube-proxy to run in Overlay mode for Windows
716716
WinOverlay featuregate.Feature = "WinOverlay"
717717

718+
// owner: @jsturtevant
719+
// kep: https://kep.k8s.io/4888
720+
//
721+
// Add CPU and Memory Affinity support to Windows nodes with CPUManager, MemoryManager and Topology manager
722+
WindowsCPUAndMemoryAffinity featuregate.Feature = "WindowsCPUAndMemoryAffinity"
723+
718724
// owner: @marosset
719725
// kep: https://kep.k8s.io/3503
720726
//

pkg/features/versioned_kube_features.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
774774
{Version: version.MustParse("1.20"), Default: true, PreRelease: featuregate.Beta},
775775
},
776776

777+
WindowsCPUAndMemoryAffinity: {
778+
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
779+
},
780+
777781
WindowsHostNetwork: {
778782
{Version: version.MustParse("1.26"), Default: true, PreRelease: featuregate.Alpha},
779783
},

pkg/kubelet/cm/container_manager.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,14 @@ type Status struct {
198198
SoftRequirements error
199199
}
200200

201+
func int64Slice(in []int) []int64 {
202+
out := make([]int64, len(in))
203+
for i := range in {
204+
out[i] = int64(in[i])
205+
}
206+
return out
207+
}
208+
201209
// parsePercentage parses the percentage string to numeric value.
202210
func parsePercentage(v string) (int64, error) {
203211
if !strings.HasSuffix(v, "%") {

pkg/kubelet/cm/container_manager_linux.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -920,14 +920,6 @@ func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.Conta
920920
return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetAllocatableDevices())
921921
}
922922

923-
func int64Slice(in []int) []int64 {
924-
out := make([]int64, len(in))
925-
for i := range in {
926-
out[i] = int64(in[i])
927-
}
928-
return out
929-
}
930-
931923
func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
932924
if cm.cpuManager != nil {
933925
return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList())

pkg/kubelet/cm/container_manager_windows.go

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ package cm
2525
import (
2626
"context"
2727
"fmt"
28+
utilfeature "k8s.io/apiserver/pkg/util/feature"
29+
kubefeatures "k8s.io/kubernetes/pkg/features"
30+
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
31+
"sync"
2832

2933
"k8s.io/klog/v2"
3034
"k8s.io/mount-utils"
@@ -38,10 +42,8 @@ import (
3842
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
3943
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
4044
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
41-
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
4245
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
4346
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
44-
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
4547
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
4648
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
4749
"k8s.io/kubernetes/pkg/kubelet/config"
@@ -63,12 +65,9 @@ type containerManagerImpl struct {
6365
deviceManager devicemanager.Manager
6466
// Interface for Topology resource co-ordination
6567
topologyManager topologymanager.Manager
66-
}
67-
68-
type noopWindowsResourceAllocator struct{}
69-
70-
func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
71-
return admission.GetPodAdmitResult(nil)
68+
cpuManager cpumanager.Manager
69+
nodeInfo *v1.Node
70+
sync.RWMutex
7271
}
7372

7473
func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
@@ -80,6 +79,8 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
8079
localStorageCapacityIsolation bool) error {
8180
klog.V(2).InfoS("Starting Windows container manager")
8281

82+
cm.nodeInfo = node
83+
8384
if localStorageCapacityIsolation {
8485
rootfs, err := cm.cadvisorInterface.RootFsInfo()
8586
if err != nil {
@@ -92,6 +93,14 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
9293

9394
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
9495

96+
// Initialize CPU manager
97+
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
98+
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
99+
if err != nil {
100+
return fmt.Errorf("start cpu manager error: %v", err)
101+
}
102+
}
103+
95104
// Starts device manager.
96105
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
97106
return err
@@ -117,7 +126,37 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
117126
cadvisorInterface: cadvisorInterface,
118127
}
119128

120-
cm.topologyManager = topologymanager.NewFakeManager()
129+
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
130+
klog.InfoS("Creating topology manager")
131+
cm.topologyManager, err = topologymanager.NewManager(machineInfo.Topology,
132+
nodeConfig.TopologyManagerPolicy,
133+
nodeConfig.TopologyManagerScope,
134+
nodeConfig.TopologyManagerPolicyOptions)
135+
if err != nil {
136+
klog.ErrorS(err, "Failed to initialize topology manager")
137+
return nil, err
138+
}
139+
140+
klog.InfoS("Creating cpu manager")
141+
cm.cpuManager, err = cpumanager.NewManager(
142+
nodeConfig.CPUManagerPolicy,
143+
nodeConfig.CPUManagerPolicyOptions,
144+
nodeConfig.CPUManagerReconcilePeriod,
145+
machineInfo,
146+
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
147+
cm.GetNodeAllocatableReservation(),
148+
nodeConfig.KubeletRootDir,
149+
cm.topologyManager,
150+
)
151+
if err != nil {
152+
klog.ErrorS(err, "Failed to initialize cpu manager")
153+
return nil, err
154+
}
155+
cm.topologyManager.AddHintProvider(cm.cpuManager)
156+
} else {
157+
cm.topologyManager = topologymanager.NewFakeManager()
158+
cm.cpuManager = cpumanager.NewFakeManager()
159+
}
121160

122161
klog.InfoS("Creating device plugin manager")
123162
cm.deviceManager, err = devicemanager.NewManagerImpl(nil, cm.topologyManager)
@@ -134,7 +173,9 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
134173
}
135174

136175
func (cm *containerManagerImpl) GetNodeConfig() NodeConfig {
137-
return NodeConfig{}
176+
cm.RLock()
177+
defer cm.RUnlock()
178+
return cm.nodeConfig
138179
}
139180

140181
func (cm *containerManagerImpl) GetMountedSubsystems() *CgroupSubsystems {
@@ -226,7 +267,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.N
226267
}
227268

228269
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
229-
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), memorymanager.NewFakeManager(), topologymanager.NewFakeManager()}
270+
return &internalContainerLifecycleImpl{cm.cpuManager, memorymanager.NewFakeManager(), cm.topologyManager}
230271
}
231272

232273
func (cm *containerManagerImpl) GetPodCgroupRoot() string {
@@ -246,18 +287,30 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
246287
}
247288

248289
func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
249-
return &noopWindowsResourceAllocator{}
290+
return cm.topologyManager
250291
}
251292

252293
func (cm *containerManagerImpl) UpdateAllocatedDevices() {
253294
return
254295
}
255296

256-
func (cm *containerManagerImpl) GetCPUs(_, _ string) []int64 {
297+
func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
298+
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
299+
if cm.cpuManager != nil {
300+
return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList())
301+
}
302+
return []int64{}
303+
}
257304
return nil
258305
}
259306

260307
func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
308+
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WindowsCPUAndMemoryAffinity) {
309+
if cm.cpuManager != nil {
310+
return int64Slice(cm.cpuManager.GetAllocatableCPUs().UnsortedList())
311+
}
312+
return []int64{}
313+
}
261314
return nil
262315
}
263316

pkg/kubelet/cm/cpumanager/cpu_manager.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -512,21 +512,6 @@ func findContainerStatusByName(status *v1.PodStatus, name string) (*v1.Container
512512
return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name)
513513
}
514514

515-
func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, cpus cpuset.CPUSet) error {
516-
// TODO: Consider adding a `ResourceConfigForContainer` helper in
517-
// helpers_linux.go similar to what exists for pods.
518-
// It would be better to pass the full container resources here instead of
519-
// this patch-like partial resources.
520-
return m.containerRuntime.UpdateContainerResources(
521-
ctx,
522-
containerID,
523-
&runtimeapi.ContainerResources{
524-
Linux: &runtimeapi.LinuxContainerResources{
525-
CpusetCpus: cpus.String(),
526-
},
527-
})
528-
}
529-
530515
func (m *manager) GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet {
531516
if result, ok := m.state.GetCPUSet(podUID, containerName); ok {
532517
return result
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//go:build !windows
2+
// +build !windows
3+
4+
/*
5+
Copyright 2017 The Kubernetes Authors.
6+
7+
Licensed under the Apache License, Version 2.0 (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
*/
19+
20+
package cpumanager
21+
22+
import (
23+
"context"
24+
25+
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
26+
"k8s.io/utils/cpuset"
27+
)
28+
29+
func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, cpus cpuset.CPUSet) error {
30+
// TODO: Consider adding a `ResourceConfigForContainer` helper in
31+
// helpers_linux.go similar to what exists for pods.
32+
// It would be better to pass the full container resources here instead of
33+
// this patch-like partial resources.
34+
35+
return m.containerRuntime.UpdateContainerResources(
36+
ctx,
37+
containerID,
38+
&runtimeapi.ContainerResources{
39+
Linux: &runtimeapi.LinuxContainerResources{
40+
CpusetCpus: cpus.String(),
41+
},
42+
})
43+
}

pkg/kubelet/cm/cpumanager/cpu_manager_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,16 @@ import (
2121
"fmt"
2222
"os"
2323
"reflect"
24+
"runtime"
2425
"strconv"
2526
"strings"
2627
"testing"
2728
"time"
2829

30+
utilfeature "k8s.io/apiserver/pkg/util/feature"
31+
featuregatetesting "k8s.io/component-base/featuregate/testing"
32+
"k8s.io/kubernetes/pkg/features"
33+
2934
cadvisorapi "github.com/google/cadvisor/info/v1"
3035
v1 "k8s.io/api/core/v1"
3136
"k8s.io/apimachinery/pkg/api/resource"
@@ -263,6 +268,10 @@ func makeMultiContainerPodWithOptions(initCPUs, appCPUs []*containerOptions) *v1
263268
}
264269

265270
func TestCPUManagerAdd(t *testing.T) {
271+
if runtime.GOOS == "windows" {
272+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
273+
}
274+
266275
testPolicy, _ := NewStaticPolicy(
267276
&topology.CPUTopology{
268277
NumCPUs: 4,
@@ -347,6 +356,10 @@ func TestCPUManagerAdd(t *testing.T) {
347356
}
348357

349358
func TestCPUManagerAddWithInitContainers(t *testing.T) {
359+
if runtime.GOOS == "windows" {
360+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
361+
}
362+
350363
testCases := []struct {
351364
description string
352365
topo *topology.CPUTopology
@@ -598,6 +611,10 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
598611
}
599612

600613
func TestCPUManagerGenerate(t *testing.T) {
614+
if runtime.GOOS == "windows" {
615+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
616+
}
617+
601618
testCases := []struct {
602619
description string
603620
cpuPolicyName string
@@ -703,6 +720,10 @@ func TestCPUManagerGenerate(t *testing.T) {
703720
}
704721

705722
func TestCPUManagerRemove(t *testing.T) {
723+
if runtime.GOOS == "windows" {
724+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
725+
}
726+
706727
containerID := "fakeID"
707728
containerMap := containermap.NewContainerMap()
708729

@@ -746,6 +767,10 @@ func TestCPUManagerRemove(t *testing.T) {
746767
}
747768

748769
func TestReconcileState(t *testing.T) {
770+
if runtime.GOOS == "windows" {
771+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
772+
}
773+
749774
testPolicy, _ := NewStaticPolicy(
750775
&topology.CPUTopology{
751776
NumCPUs: 8,
@@ -1269,6 +1294,10 @@ func TestReconcileState(t *testing.T) {
12691294
// above test cases are without kubelet --reserved-cpus cmd option
12701295
// the following tests are with --reserved-cpus configured
12711296
func TestCPUManagerAddWithResvList(t *testing.T) {
1297+
if runtime.GOOS == "windows" {
1298+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
1299+
}
1300+
12721301
testPolicy, _ := NewStaticPolicy(
12731302
&topology.CPUTopology{
12741303
NumCPUs: 4,
@@ -1343,6 +1372,10 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
13431372
}
13441373

13451374
func TestCPUManagerHandlePolicyOptions(t *testing.T) {
1375+
if runtime.GOOS == "windows" {
1376+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
1377+
}
1378+
13461379
testCases := []struct {
13471380
description string
13481381
cpuPolicyName string
@@ -1409,6 +1442,10 @@ func TestCPUManagerHandlePolicyOptions(t *testing.T) {
14091442
}
14101443

14111444
func TestCPUManagerGetAllocatableCPUs(t *testing.T) {
1445+
if runtime.GOOS == "windows" {
1446+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WindowsCPUAndMemoryAffinity, true)
1447+
}
1448+
14121449
nonePolicy, _ := NewNonePolicy(nil)
14131450
staticPolicy, _ := NewStaticPolicy(
14141451
&topology.CPUTopology{

0 commit comments

Comments
 (0)