Skip to content

Commit 3560950

Browse files
authored
Merge pull request kubernetes#130254 from tallclair/allocation-manager-2
[FG:InPlacePodVerticalScaling] Move pod resource allocation management out of the status manager
2 parents ef1c659 + 9024140 commit 3560950

18 files changed

+409
-349
lines changed
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package allocation
18+
19+
import (
20+
v1 "k8s.io/api/core/v1"
21+
apiequality "k8s.io/apimachinery/pkg/api/equality"
22+
"k8s.io/apimachinery/pkg/types"
23+
"k8s.io/apimachinery/pkg/util/sets"
24+
utilfeature "k8s.io/apiserver/pkg/util/feature"
25+
"k8s.io/klog/v2"
26+
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
27+
"k8s.io/kubernetes/pkg/features"
28+
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
29+
)
30+
31+
// podStatusManagerStateFile is the file name where status manager stores its state
32+
const podStatusManagerStateFile = "pod_status_manager_state"
33+
34+
// AllocationManager tracks pod resource allocations.
35+
type Manager interface {
36+
// GetContainerResourceAllocation returns the AllocatedResources value for the container
37+
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
38+
39+
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
40+
// This function does a deep copy only if updates are needed.
41+
// Returns the updated (or original) pod, and whether there was an allocation stored.
42+
UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool)
43+
44+
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
45+
SetPodAllocation(pod *v1.Pod) error
46+
47+
// DeletePodAllocation removes any stored state for the given pod UID.
48+
DeletePodAllocation(uid types.UID)
49+
50+
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
51+
RemoveOrphanedPods(remainingPods sets.Set[types.UID])
52+
}
53+
54+
type manager struct {
55+
state state.State
56+
}
57+
58+
func NewManager(checkpointDirectory string) Manager {
59+
m := &manager{}
60+
61+
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
62+
stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, podStatusManagerStateFile)
63+
if err != nil {
64+
// This is a crictical, non-recoverable failure.
65+
klog.ErrorS(err, "Failed to initialize allocation checkpoint manager")
66+
panic(err)
67+
}
68+
m.state = stateImpl
69+
} else {
70+
m.state = state.NewNoopStateCheckpoint()
71+
}
72+
73+
return m
74+
}
75+
76+
// NewInMemoryManager returns an allocation manager that doesn't persist state.
77+
// For testing purposes only!
78+
func NewInMemoryManager() Manager {
79+
return &manager{
80+
state: state.NewStateMemory(nil),
81+
}
82+
}
83+
84+
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
85+
// If checkpoint manager has not been initialized, it returns nil, false
86+
func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
87+
return m.state.GetContainerResourceAllocation(podUID, containerName)
88+
}
89+
90+
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
91+
// This function does a deep copy only if updates are needed.
92+
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
93+
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
94+
allocs := m.state.GetPodResourceAllocation()
95+
return updatePodFromAllocation(pod, allocs)
96+
}
97+
98+
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
99+
allocated, found := allocs[string(pod.UID)]
100+
if !found {
101+
return pod, false
102+
}
103+
104+
updated := false
105+
containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) {
106+
if cAlloc, ok := allocated[c.Name]; ok {
107+
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
108+
// Allocation differs from pod spec, retrieve the allocation
109+
if !updated {
110+
// If this is the first update to be performed, copy the pod
111+
pod = pod.DeepCopy()
112+
updated = true
113+
}
114+
return cAlloc, true
115+
}
116+
}
117+
return v1.ResourceRequirements{}, false
118+
}
119+
120+
for i, c := range pod.Spec.Containers {
121+
if cAlloc, found := containerAlloc(c); found {
122+
// Allocation differs from pod spec, update
123+
pod.Spec.Containers[i].Resources = cAlloc
124+
}
125+
}
126+
for i, c := range pod.Spec.InitContainers {
127+
if cAlloc, found := containerAlloc(c); found {
128+
// Allocation differs from pod spec, update
129+
pod.Spec.InitContainers[i].Resources = cAlloc
130+
}
131+
}
132+
return pod, updated
133+
}
134+
135+
// SetPodAllocation checkpoints the resources allocated to a pod's containers
136+
func (m *manager) SetPodAllocation(pod *v1.Pod) error {
137+
podAlloc := make(map[string]v1.ResourceRequirements)
138+
for _, container := range pod.Spec.Containers {
139+
alloc := *container.Resources.DeepCopy()
140+
podAlloc[container.Name] = alloc
141+
}
142+
143+
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
144+
for _, container := range pod.Spec.InitContainers {
145+
if podutil.IsRestartableInitContainer(&container) {
146+
alloc := *container.Resources.DeepCopy()
147+
podAlloc[container.Name] = alloc
148+
}
149+
}
150+
}
151+
152+
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
153+
}
154+
155+
func (m *manager) DeletePodAllocation(uid types.UID) {
156+
if err := m.state.Delete(string(uid), ""); err != nil {
157+
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
158+
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
159+
}
160+
}
161+
162+
func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
163+
m.state.RemoveOrphanedPods(remainingPods)
164+
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package allocation
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
24+
v1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/api/resource"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
28+
)
29+
30+
func TestUpdatePodFromAllocation(t *testing.T) {
31+
containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways
32+
pod := &v1.Pod{
33+
ObjectMeta: metav1.ObjectMeta{
34+
UID: "12345",
35+
Name: "test",
36+
Namespace: "default",
37+
},
38+
Spec: v1.PodSpec{
39+
Containers: []v1.Container{
40+
{
41+
Name: "c1",
42+
Resources: v1.ResourceRequirements{
43+
Requests: v1.ResourceList{
44+
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
45+
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI),
46+
},
47+
Limits: v1.ResourceList{
48+
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
49+
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI),
50+
},
51+
},
52+
},
53+
{
54+
Name: "c2",
55+
Resources: v1.ResourceRequirements{
56+
Requests: v1.ResourceList{
57+
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
58+
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
59+
},
60+
Limits: v1.ResourceList{
61+
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
62+
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
63+
},
64+
},
65+
},
66+
},
67+
InitContainers: []v1.Container{
68+
{
69+
Name: "c1-restartable-init",
70+
Resources: v1.ResourceRequirements{
71+
Requests: v1.ResourceList{
72+
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
73+
v1.ResourceMemory: *resource.NewQuantity(300, resource.DecimalSI),
74+
},
75+
Limits: v1.ResourceList{
76+
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
77+
v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
78+
},
79+
},
80+
RestartPolicy: &containerRestartPolicyAlways,
81+
},
82+
{
83+
Name: "c1-init",
84+
Resources: v1.ResourceRequirements{
85+
Requests: v1.ResourceList{
86+
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
87+
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
88+
},
89+
Limits: v1.ResourceList{
90+
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
91+
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
92+
},
93+
},
94+
},
95+
},
96+
},
97+
}
98+
99+
resizedPod := pod.DeepCopy()
100+
resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI)
101+
resizedPod.Spec.InitContainers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(300, resource.DecimalSI)
102+
103+
tests := []struct {
104+
name string
105+
pod *v1.Pod
106+
allocs state.PodResourceAllocation
107+
expectPod *v1.Pod
108+
expectUpdate bool
109+
}{{
110+
name: "steady state",
111+
pod: pod,
112+
allocs: state.PodResourceAllocation{
113+
string(pod.UID): map[string]v1.ResourceRequirements{
114+
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
115+
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
116+
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
117+
"c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(),
118+
},
119+
},
120+
expectUpdate: false,
121+
}, {
122+
name: "no allocations",
123+
pod: pod,
124+
allocs: state.PodResourceAllocation{},
125+
expectUpdate: false,
126+
}, {
127+
name: "missing container allocation",
128+
pod: pod,
129+
allocs: state.PodResourceAllocation{
130+
string(pod.UID): map[string]v1.ResourceRequirements{
131+
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
132+
},
133+
},
134+
expectUpdate: false,
135+
}, {
136+
name: "resized container",
137+
pod: pod,
138+
allocs: state.PodResourceAllocation{
139+
string(pod.UID): map[string]v1.ResourceRequirements{
140+
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
141+
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
142+
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
143+
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
144+
},
145+
},
146+
expectUpdate: true,
147+
expectPod: resizedPod,
148+
}}
149+
150+
for _, test := range tests {
151+
t.Run(test.name, func(t *testing.T) {
152+
pod := test.pod.DeepCopy()
153+
allocatedPod, updated := updatePodFromAllocation(pod, test.allocs)
154+
155+
if test.expectUpdate {
156+
assert.True(t, updated, "updated")
157+
assert.Equal(t, test.expectPod, allocatedPod)
158+
assert.NotEqual(t, pod, allocatedPod)
159+
} else {
160+
assert.False(t, updated, "updated")
161+
assert.Same(t, pod, allocatedPod)
162+
}
163+
})
164+
}
165+
}

pkg/kubelet/allocation/doc.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package allocation handles tracking pod resource allocations.
18+
package allocation
File renamed without changes.

pkg/kubelet/status/state/state.go renamed to pkg/kubelet/allocation/state/state.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package state
1818

1919
import (
2020
v1 "k8s.io/api/core/v1"
21+
"k8s.io/apimachinery/pkg/types"
22+
"k8s.io/apimachinery/pkg/util/sets"
2123
)
2224

2325
// PodResourceAllocation type is used in tracking resources allocated to pod's containers
@@ -48,6 +50,8 @@ type writer interface {
4850
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
4951
SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error
5052
Delete(podUID string, containerName string) error
53+
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
54+
RemoveOrphanedPods(remainingPods sets.Set[types.UID])
5155
}
5256

5357
// State interface provides methods for tracking and setting pod resource allocation

pkg/kubelet/status/state/state_checkpoint.go renamed to pkg/kubelet/allocation/state/state_checkpoint.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"sync"
2323

2424
v1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/types"
26+
"k8s.io/apimachinery/pkg/util/sets"
2527
"k8s.io/klog/v2"
2628
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
2729
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
@@ -139,6 +141,12 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) error {
139141
return sc.storeState()
140142
}
141143

144+
func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
145+
sc.cache.RemoveOrphanedPods(remainingPods)
146+
// Don't bother updating the stored state. If Kubelet is restarted before the cache is written,
147+
// the orphaned pods will be removed the next time this method is called.
148+
}
149+
142150
type noopStateCheckpoint struct{}
143151

144152
// NewNoopStateCheckpoint creates a dummy state checkpoint manager
@@ -165,3 +173,5 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ string, _ map[string]v
165173
func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {
166174
return nil
167175
}
176+
177+
func (sc *noopStateCheckpoint) RemoveOrphanedPods(_ sets.Set[types.UID]) {}
File renamed without changes.

0 commit comments

Comments
 (0)