Skip to content

Commit 84ec78e

Browse files
committed
Extract pod allocation manager from status manager
1 parent 81e54a2 commit 84ec78e

File tree

10 files changed

+379
-280
lines changed

10 files changed

+379
-280
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package allocation
18+
19+
import (
20+
v1 "k8s.io/api/core/v1"
21+
apiequality "k8s.io/apimachinery/pkg/api/equality"
22+
"k8s.io/apimachinery/pkg/types"
23+
utilfeature "k8s.io/apiserver/pkg/util/feature"
24+
"k8s.io/klog/v2"
25+
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
26+
"k8s.io/kubernetes/pkg/features"
27+
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
28+
)
29+
30+
// podStatusManagerStateFile is the file name where status manager stores its state
31+
const podStatusManagerStateFile = "pod_status_manager_state"
32+
33+
// AllocationManager tracks pod resource allocations.
34+
type Manager interface {
35+
// GetContainerResourceAllocation returns the AllocatedResources value for the container
36+
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
37+
38+
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
39+
// This function does a deep copy only if updates are needed.
40+
// Returns the updated (or original) pod, and whether there was an allocation stored.
41+
UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool)
42+
43+
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
44+
SetPodAllocation(pod *v1.Pod) error
45+
46+
// DeletePodAllocation removes any stored state for the given pod UID.
47+
DeletePodAllocation(uid types.UID) error
48+
49+
// RemoveOrphanedPods removes the stored state for any pods not included in the list of remaining pod UIDs.
50+
RemoveOrphanedPods(remainingPods map[types.UID]bool)
51+
}
52+
53+
type manager struct {
54+
state state.State
55+
}
56+
57+
func NewManager(checkpointDirectory string) Manager {
58+
m := &manager{}
59+
60+
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
61+
stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, podStatusManagerStateFile)
62+
if err != nil {
63+
// This is a crictical, non-recoverable failure.
64+
klog.ErrorS(err, "Could not initialize pod allocation checkpoint manager, please drain node and remove policy state file")
65+
panic(err)
66+
}
67+
m.state = stateImpl
68+
} else {
69+
m.state = state.NewNoopStateCheckpoint()
70+
}
71+
72+
return m
73+
}
74+
75+
// NewInMemoryManager returns an allocation manager that doesn't persist state.
76+
// For testing purposes only!
77+
func NewInMemoryManager() Manager {
78+
return &manager{
79+
state: state.NewStateMemory(nil),
80+
}
81+
}
82+
83+
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
84+
// If checkpoint manager has not been initialized, it returns nil, false
85+
func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
86+
return m.state.GetContainerResourceAllocation(podUID, containerName)
87+
}
88+
89+
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
90+
// This function does a deep copy only if updates are needed.
91+
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
92+
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
93+
allocs := m.state.GetPodResourceAllocation()
94+
return updatePodFromAllocation(pod, allocs)
95+
}
96+
97+
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
98+
allocated, found := allocs[string(pod.UID)]
99+
if !found {
100+
return pod, false
101+
}
102+
103+
updated := false
104+
containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) {
105+
if cAlloc, ok := allocated[c.Name]; ok {
106+
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
107+
// Allocation differs from pod spec, retrieve the allocation
108+
if !updated {
109+
// If this is the first update to be performed, copy the pod
110+
pod = pod.DeepCopy()
111+
updated = true
112+
}
113+
return cAlloc, true
114+
}
115+
}
116+
return v1.ResourceRequirements{}, false
117+
}
118+
119+
for i, c := range pod.Spec.Containers {
120+
if cAlloc, found := containerAlloc(c); found {
121+
// Allocation differs from pod spec, update
122+
pod.Spec.Containers[i].Resources = cAlloc
123+
}
124+
}
125+
for i, c := range pod.Spec.InitContainers {
126+
if cAlloc, found := containerAlloc(c); found {
127+
// Allocation differs from pod spec, update
128+
pod.Spec.InitContainers[i].Resources = cAlloc
129+
}
130+
}
131+
return pod, updated
132+
}
133+
134+
// SetPodAllocation checkpoints the resources allocated to a pod's containers
135+
func (m *manager) SetPodAllocation(pod *v1.Pod) error {
136+
podAlloc := make(map[string]v1.ResourceRequirements)
137+
for _, container := range pod.Spec.Containers {
138+
alloc := *container.Resources.DeepCopy()
139+
podAlloc[container.Name] = alloc
140+
}
141+
142+
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
143+
for _, container := range pod.Spec.InitContainers {
144+
if podutil.IsRestartableInitContainer(&container) {
145+
alloc := *container.Resources.DeepCopy()
146+
podAlloc[container.Name] = alloc
147+
}
148+
}
149+
}
150+
151+
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
152+
}
153+
154+
func (m *manager) DeletePodAllocation(uid types.UID) error {
155+
return m.state.Delete(string(uid), "")
156+
}
157+
158+
func (m *manager) RemoveOrphanedPods(remainingPods map[types.UID]bool) {
159+
m.state.RemoveOrphanedPods(remainingPods)
160+
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package allocation
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
24+
v1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/api/resource"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
28+
)
29+
30+
func TestUpdatePodFromAllocation(t *testing.T) {
31+
containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways
32+
pod := &v1.Pod{
33+
ObjectMeta: metav1.ObjectMeta{
34+
UID: "12345",
35+
Name: "test",
36+
Namespace: "default",
37+
},
38+
Spec: v1.PodSpec{
39+
Containers: []v1.Container{
40+
{
41+
Name: "c1",
42+
Resources: v1.ResourceRequirements{
43+
Requests: v1.ResourceList{
44+
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
45+
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI),
46+
},
47+
Limits: v1.ResourceList{
48+
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
49+
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI),
50+
},
51+
},
52+
},
53+
{
54+
Name: "c2",
55+
Resources: v1.ResourceRequirements{
56+
Requests: v1.ResourceList{
57+
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
58+
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
59+
},
60+
Limits: v1.ResourceList{
61+
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
62+
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
63+
},
64+
},
65+
},
66+
},
67+
InitContainers: []v1.Container{
68+
{
69+
Name: "c1-restartable-init",
70+
Resources: v1.ResourceRequirements{
71+
Requests: v1.ResourceList{
72+
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
73+
v1.ResourceMemory: *resource.NewQuantity(300, resource.DecimalSI),
74+
},
75+
Limits: v1.ResourceList{
76+
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
77+
v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
78+
},
79+
},
80+
RestartPolicy: &containerRestartPolicyAlways,
81+
},
82+
{
83+
Name: "c1-init",
84+
Resources: v1.ResourceRequirements{
85+
Requests: v1.ResourceList{
86+
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
87+
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
88+
},
89+
Limits: v1.ResourceList{
90+
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
91+
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
92+
},
93+
},
94+
},
95+
},
96+
},
97+
}
98+
99+
resizedPod := pod.DeepCopy()
100+
resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI)
101+
resizedPod.Spec.InitContainers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(300, resource.DecimalSI)
102+
103+
tests := []struct {
104+
name string
105+
pod *v1.Pod
106+
allocs state.PodResourceAllocation
107+
expectPod *v1.Pod
108+
expectUpdate bool
109+
}{{
110+
name: "steady state",
111+
pod: pod,
112+
allocs: state.PodResourceAllocation{
113+
string(pod.UID): map[string]v1.ResourceRequirements{
114+
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
115+
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
116+
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
117+
"c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(),
118+
},
119+
},
120+
expectUpdate: false,
121+
}, {
122+
name: "no allocations",
123+
pod: pod,
124+
allocs: state.PodResourceAllocation{},
125+
expectUpdate: false,
126+
}, {
127+
name: "missing container allocation",
128+
pod: pod,
129+
allocs: state.PodResourceAllocation{
130+
string(pod.UID): map[string]v1.ResourceRequirements{
131+
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
132+
},
133+
},
134+
expectUpdate: false,
135+
}, {
136+
name: "resized container",
137+
pod: pod,
138+
allocs: state.PodResourceAllocation{
139+
string(pod.UID): map[string]v1.ResourceRequirements{
140+
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
141+
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
142+
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
143+
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
144+
},
145+
},
146+
expectUpdate: true,
147+
expectPod: resizedPod,
148+
}}
149+
150+
for _, test := range tests {
151+
t.Run(test.name, func(t *testing.T) {
152+
pod := test.pod.DeepCopy()
153+
allocatedPod, updated := updatePodFromAllocation(pod, test.allocs)
154+
155+
if test.expectUpdate {
156+
assert.True(t, updated, "updated")
157+
assert.Equal(t, test.expectPod, allocatedPod)
158+
assert.NotEqual(t, pod, allocatedPod)
159+
} else {
160+
assert.False(t, updated, "updated")
161+
assert.Same(t, pod, allocatedPod)
162+
}
163+
})
164+
}
165+
}

pkg/kubelet/allocation/doc.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package allocation handles tracking pod resource allocations.
18+
package allocation

pkg/kubelet/allocation/state/state.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package state
1818

1919
import (
2020
v1 "k8s.io/api/core/v1"
21+
"k8s.io/apimachinery/pkg/types"
2122
)
2223

2324
// PodResourceAllocation type is used in tracking resources allocated to pod's containers
@@ -48,6 +49,8 @@ type writer interface {
4849
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
4950
SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error
5051
Delete(podUID string, containerName string) error
52+
// RemoveOrphanedPods removes the stored state for any pods not included in the list of remaining pod UIDs.
53+
RemoveOrphanedPods(remainingPods map[types.UID]bool)
5154
}
5255

5356
// State interface provides methods for tracking and setting pod resource allocation

pkg/kubelet/allocation/state/state_checkpoint.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync"
2323

2424
v1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/types"
2526
"k8s.io/klog/v2"
2627
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
2728
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
@@ -139,6 +140,12 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) error {
139140
return sc.storeState()
140141
}
141142

143+
func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods map[types.UID]bool) {
144+
sc.cache.RemoveOrphanedPods(remainingPods)
145+
// Don't bother updating the stored state. If Kubelet is restarted before the cache is written,
146+
// the orphaned pods will be removed the next time this method is called.
147+
}
148+
142149
type noopStateCheckpoint struct{}
143150

144151
// NewNoopStateCheckpoint creates a dummy state checkpoint manager
@@ -165,3 +172,5 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ string, _ map[string]v
165172
func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {
166173
return nil
167174
}
175+
176+
func (sc *noopStateCheckpoint) RemoveOrphanedPods(_ map[types.UID]bool) {}

0 commit comments

Comments
 (0)