Skip to content

Commit b658aa1

Browse files
authored
Merge pull request kubernetes#130796 from ndixita/pod-level-resources-ippr
Replace PodResourceAllocation with PodResourceInfoMap type and cleanup
2 parents 381ccf0 + 091b450 commit b658aa1

File tree

8 files changed

+228
-173
lines changed

8 files changed

+228
-173
lines changed

pkg/kubelet/allocation/allocation_manager.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -103,26 +103,26 @@ func NewInMemoryManager() Manager {
103103
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
104104
// If checkpoint manager has not been initialized, it returns nil, false
105105
func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
106-
return m.allocated.GetContainerResourceAllocation(podUID, containerName)
106+
return m.allocated.GetContainerResources(podUID, containerName)
107107
}
108108

109109
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
110110
// This function does a deep copy only if updates are needed.
111111
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
112112
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
113-
allocs := m.allocated.GetPodResourceAllocation()
113+
allocs := m.allocated.GetPodResourceInfoMap()
114114
return updatePodFromAllocation(pod, allocs)
115115
}
116116

117-
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
117+
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceInfoMap) (*v1.Pod, bool) {
118118
allocated, found := allocs[pod.UID]
119119
if !found {
120120
return pod, false
121121
}
122122

123123
updated := false
124124
containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) {
125-
if cAlloc, ok := allocated[c.Name]; ok {
125+
if cAlloc, ok := allocated.ContainerResources[c.Name]; ok {
126126
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
127127
// Allocation differs from pod spec, retrieve the allocation
128128
if !updated {
@@ -153,21 +153,22 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*
153153

154154
// SetAllocatedResources checkpoints the resources allocated to a pod's containers
155155
func (m *manager) SetAllocatedResources(pod *v1.Pod) error {
156-
return m.allocated.SetPodResourceAllocation(pod.UID, allocationFromPod(pod))
156+
return m.allocated.SetPodResourceInfo(pod.UID, allocationFromPod(pod))
157157
}
158158

159-
func allocationFromPod(pod *v1.Pod) map[string]v1.ResourceRequirements {
160-
podAlloc := make(map[string]v1.ResourceRequirements)
159+
func allocationFromPod(pod *v1.Pod) state.PodResourceInfo {
160+
var podAlloc state.PodResourceInfo
161+
podAlloc.ContainerResources = make(map[string]v1.ResourceRequirements)
161162
for _, container := range pod.Spec.Containers {
162163
alloc := *container.Resources.DeepCopy()
163-
podAlloc[container.Name] = alloc
164+
podAlloc.ContainerResources[container.Name] = alloc
164165
}
165166

166167
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
167168
for _, container := range pod.Spec.InitContainers {
168169
if podutil.IsRestartableInitContainer(&container) {
169170
alloc := *container.Resources.DeepCopy()
170-
podAlloc[container.Name] = alloc
171+
podAlloc.ContainerResources[container.Name] = alloc
171172
}
172173
}
173174
}
@@ -195,12 +196,12 @@ func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
195196
func (m *manager) SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error {
196197
if actuatedContainer == nil {
197198
alloc := allocationFromPod(allocatedPod)
198-
return m.actuated.SetPodResourceAllocation(allocatedPod.UID, alloc)
199+
return m.actuated.SetPodResourceInfo(allocatedPod.UID, alloc)
199200
}
200201

201-
return m.actuated.SetContainerResourceAllocation(allocatedPod.UID, actuatedContainer.Name, actuatedContainer.Resources)
202+
return m.actuated.SetContainerResources(allocatedPod.UID, actuatedContainer.Name, actuatedContainer.Resources)
202203
}
203204

204205
func (m *manager) GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
205-
return m.actuated.GetContainerResourceAllocation(podUID, containerName)
206+
return m.actuated.GetContainerResources(podUID, containerName)
206207
}

pkg/kubelet/allocation/allocation_manager_test.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,44 +103,50 @@ func TestUpdatePodFromAllocation(t *testing.T) {
103103
tests := []struct {
104104
name string
105105
pod *v1.Pod
106-
allocs state.PodResourceAllocation
106+
allocs state.PodResourceInfoMap
107107
expectPod *v1.Pod
108108
expectUpdate bool
109109
}{{
110110
name: "steady state",
111111
pod: pod,
112-
allocs: state.PodResourceAllocation{
113-
pod.UID: map[string]v1.ResourceRequirements{
114-
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
115-
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
116-
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
117-
"c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(),
112+
allocs: state.PodResourceInfoMap{
113+
pod.UID: state.PodResourceInfo{
114+
ContainerResources: map[string]v1.ResourceRequirements{
115+
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
116+
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
117+
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
118+
"c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(),
119+
},
118120
},
119121
},
120122
expectUpdate: false,
121123
}, {
122124
name: "no allocations",
123125
pod: pod,
124-
allocs: state.PodResourceAllocation{},
126+
allocs: state.PodResourceInfoMap{},
125127
expectUpdate: false,
126128
}, {
127129
name: "missing container allocation",
128130
pod: pod,
129-
allocs: state.PodResourceAllocation{
130-
pod.UID: map[string]v1.ResourceRequirements{
131-
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
131+
allocs: state.PodResourceInfoMap{
132+
pod.UID: state.PodResourceInfo{
133+
ContainerResources: map[string]v1.ResourceRequirements{
134+
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
135+
},
132136
},
133137
},
134138
expectUpdate: false,
135139
}, {
136140
name: "resized container",
137141
pod: pod,
138-
allocs: state.PodResourceAllocation{
139-
pod.UID: map[string]v1.ResourceRequirements{
140-
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
141-
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
142-
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
143-
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
142+
allocs: state.PodResourceInfoMap{
143+
pod.UID: state.PodResourceInfo{
144+
ContainerResources: map[string]v1.ResourceRequirements{
145+
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
146+
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
147+
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
148+
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
149+
},
144150
},
145151
},
146152
expectUpdate: true,

pkg/kubelet/allocation/state/checkpoint.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,14 @@ import (
2020
"encoding/json"
2121
"fmt"
2222

23-
v1 "k8s.io/api/core/v1"
24-
"k8s.io/apimachinery/pkg/types"
2523
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
2624
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
2725
)
2826

2927
var _ checkpointmanager.Checkpoint = &Checkpoint{}
3028

31-
type PodResourceAllocationInfo struct {
32-
AllocationEntries map[types.UID]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"`
29+
type PodResourceCheckpointInfo struct {
30+
Entries PodResourceInfoMap `json:"entries,omitempty"`
3331
}
3432

3533
// Checkpoint represents a structure to store pod resource allocation checkpoint data
@@ -41,7 +39,7 @@ type Checkpoint struct {
4139
}
4240

4341
// NewCheckpoint creates a new checkpoint from a list of claim info states
44-
func NewCheckpoint(allocations *PodResourceAllocationInfo) (*Checkpoint, error) {
42+
func NewCheckpoint(allocations *PodResourceCheckpointInfo) (*Checkpoint, error) {
4543

4644
serializedAllocations, err := json.Marshal(allocations)
4745
if err != nil {
@@ -70,9 +68,9 @@ func (cp *Checkpoint) VerifyChecksum() error {
7068
return cp.Checksum.Verify(cp.Data)
7169
}
7270

73-
// GetPodResourceAllocationInfo returns Pod Resource Allocation info states from checkpoint
74-
func (cp *Checkpoint) GetPodResourceAllocationInfo() (*PodResourceAllocationInfo, error) {
75-
var data PodResourceAllocationInfo
71+
// GetPodResourceCheckpointInfo returns Pod Resource Allocation info states from checkpoint
72+
func (cp *Checkpoint) GetPodResourceCheckpointInfo() (*PodResourceCheckpointInfo, error) {
73+
var data PodResourceCheckpointInfo
7674
if err := json.Unmarshal([]byte(cp.Data), &data); err != nil {
7775
return nil, err
7876
}

pkg/kubelet/allocation/state/state.go

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,36 +22,45 @@ import (
2222
"k8s.io/apimachinery/pkg/util/sets"
2323
)
2424

25-
// PodResourceAllocation type is used in tracking resources allocated to pod's containers
26-
type PodResourceAllocation map[types.UID]map[string]v1.ResourceRequirements
27-
28-
// Clone returns a copy of PodResourceAllocation
29-
func (pr PodResourceAllocation) Clone() PodResourceAllocation {
30-
prCopy := make(PodResourceAllocation)
31-
for pod := range pr {
32-
prCopy[pod] = make(map[string]v1.ResourceRequirements)
33-
for container, alloc := range pr[pod] {
34-
prCopy[pod][container] = *alloc.DeepCopy()
25+
// PodResourceInfo stores resource requirements for containers within a pod.
26+
type PodResourceInfo struct {
27+
// ContainerResources maps container names to their respective ResourceRequirements.
28+
ContainerResources map[string]v1.ResourceRequirements
29+
}
30+
31+
// PodResourceInfoMap maps pod UIDs to their corresponding PodResourceInfo,
32+
// tracking resource requirements for all containers within each pod.
33+
type PodResourceInfoMap map[types.UID]PodResourceInfo
34+
35+
// Clone returns a copy of PodResourceInfoMap
36+
func (pr PodResourceInfoMap) Clone() PodResourceInfoMap {
37+
prCopy := make(PodResourceInfoMap)
38+
for podUID, podInfo := range pr {
39+
prCopy[podUID] = PodResourceInfo{
40+
ContainerResources: make(map[string]v1.ResourceRequirements),
41+
}
42+
for containerName, containerInfo := range podInfo.ContainerResources {
43+
prCopy[podUID].ContainerResources[containerName] = *containerInfo.DeepCopy()
3544
}
3645
}
3746
return prCopy
3847
}
3948

40-
// Reader interface used to read current pod resource allocation state
49+
// Reader interface used to read current pod resource state
4150
type Reader interface {
42-
GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool)
43-
GetPodResourceAllocation() PodResourceAllocation
51+
GetContainerResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool)
52+
GetPodResourceInfoMap() PodResourceInfoMap
4453
}
4554

4655
type writer interface {
47-
SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error
48-
SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error
56+
SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error
57+
SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error
4958
RemovePod(podUID types.UID) error
5059
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
5160
RemoveOrphanedPods(remainingPods sets.Set[types.UID])
5261
}
5362

54-
// State interface provides methods for tracking and setting pod resource allocation
63+
// State interface provides methods for tracking and setting pod resources
5564
type State interface {
5665
Reader
5766
writer

pkg/kubelet/allocation/state/state_checkpoint.go

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,17 @@ type stateCheckpoint struct {
4040
lastChecksum checksum.Checksum
4141
}
4242

43-
// NewStateCheckpoint creates new State for keeping track of pod resource allocations with checkpoint backend
43+
// NewStateCheckpoint creates new State for keeping track of pod resource information with checkpoint backend
4444
func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
4545
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
4646
if err != nil {
47-
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err)
47+
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod resource information tracking: %w", err)
4848
}
4949

5050
pra, checksum, err := restoreState(checkpointManager, checkpointName)
5151
if err != nil {
5252
//lint:ignore ST1005 user-facing error message
53-
return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet",
53+
return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod resource information checkpoint file %q before restarting Kubelet",
5454
err, path.Join(stateDir, checkpointName))
5555
}
5656

@@ -64,7 +64,7 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
6464
}
6565

6666
// restores state from a checkpoint and creates it if it doesn't exist
67-
func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceAllocation, checksum.Checksum, error) {
67+
func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (PodResourceInfoMap, checksum.Checksum, error) {
6868
checkpoint := &Checkpoint{}
6969
if err := checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil {
7070
if err == errors.ErrCheckpointNotFound {
@@ -73,21 +73,21 @@ func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpo
7373
return nil, 0, err
7474
}
7575

76-
praInfo, err := checkpoint.GetPodResourceAllocationInfo()
76+
praInfo, err := checkpoint.GetPodResourceCheckpointInfo()
7777
if err != nil {
78-
return nil, 0, fmt.Errorf("failed to get pod resource allocation info: %w", err)
78+
return nil, 0, fmt.Errorf("failed to get pod resource information: %w", err)
7979
}
8080

81-
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
82-
return praInfo.AllocationEntries, checkpoint.Checksum, nil
81+
klog.V(2).InfoS("State checkpoint: restored pod resource state from checkpoint")
82+
return praInfo.Entries, checkpoint.Checksum, nil
8383
}
8484

8585
// saves state to a checkpoint, caller is responsible for locking
8686
func (sc *stateCheckpoint) storeState() error {
87-
podAllocation := sc.cache.GetPodResourceAllocation()
87+
resourceInfo := sc.cache.GetPodResourceInfoMap()
8888

89-
checkpoint, err := NewCheckpoint(&PodResourceAllocationInfo{
90-
AllocationEntries: podAllocation,
89+
checkpoint, err := NewCheckpoint(&PodResourceCheckpointInfo{
90+
Entries: resourceInfo,
9191
})
9292
if err != nil {
9393
return fmt.Errorf("failed to create checkpoint: %w", err)
@@ -98,47 +98,50 @@ func (sc *stateCheckpoint) storeState() error {
9898
}
9999
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
100100
if err != nil {
101-
klog.ErrorS(err, "Failed to save pod allocation checkpoint")
101+
klog.ErrorS(err, "Failed to save pod resource information checkpoint")
102102
return err
103103
}
104104
sc.lastChecksum = checkpoint.Checksum
105105
return nil
106106
}
107107

108-
// GetContainerResourceAllocation returns current resources allocated to a pod's container
109-
func (sc *stateCheckpoint) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
108+
// GetContainerResources returns current resources information to a pod's container
109+
func (sc *stateCheckpoint) GetContainerResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
110110
sc.mux.RLock()
111111
defer sc.mux.RUnlock()
112-
return sc.cache.GetContainerResourceAllocation(podUID, containerName)
112+
return sc.cache.GetContainerResources(podUID, containerName)
113113
}
114114

115-
// GetPodResourceAllocation returns current pod resource allocation
116-
func (sc *stateCheckpoint) GetPodResourceAllocation() PodResourceAllocation {
115+
// GetPodResourceInfoMap returns current pod resource information
116+
func (sc *stateCheckpoint) GetPodResourceInfoMap() PodResourceInfoMap {
117117
sc.mux.RLock()
118118
defer sc.mux.RUnlock()
119-
return sc.cache.GetPodResourceAllocation()
119+
return sc.cache.GetPodResourceInfoMap()
120120
}
121121

122-
// SetContainerResourceAllocation sets resources allocated to a pod's container
123-
func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID types.UID, containerName string, alloc v1.ResourceRequirements) error {
122+
// SetContainerResoruces sets resources information for a pod's container
123+
func (sc *stateCheckpoint) SetContainerResources(podUID types.UID, containerName string, resources v1.ResourceRequirements) error {
124124
sc.mux.Lock()
125125
defer sc.mux.Unlock()
126-
sc.cache.SetContainerResourceAllocation(podUID, containerName, alloc)
126+
err := sc.cache.SetContainerResources(podUID, containerName, resources)
127+
if err != nil {
128+
return err
129+
}
127130
return sc.storeState()
128131
}
129132

130-
// SetPodResourceAllocation sets pod resource allocation
131-
func (sc *stateCheckpoint) SetPodResourceAllocation(podUID types.UID, alloc map[string]v1.ResourceRequirements) error {
133+
// SetPodResourceInfo sets pod resource information
134+
func (sc *stateCheckpoint) SetPodResourceInfo(podUID types.UID, resourceInfo PodResourceInfo) error {
132135
sc.mux.Lock()
133136
defer sc.mux.Unlock()
134-
err := sc.cache.SetPodResourceAllocation(podUID, alloc)
137+
err := sc.cache.SetPodResourceInfo(podUID, resourceInfo)
135138
if err != nil {
136139
return err
137140
}
138141
return sc.storeState()
139142
}
140143

141-
// Delete deletes allocations for specified pod
144+
// Delete deletes resource information for specified pod
142145
func (sc *stateCheckpoint) RemovePod(podUID types.UID) error {
143146
sc.mux.Lock()
144147
defer sc.mux.Unlock()
@@ -161,19 +164,19 @@ func NewNoopStateCheckpoint() State {
161164
return &noopStateCheckpoint{}
162165
}
163166

164-
func (sc *noopStateCheckpoint) GetContainerResourceAllocation(_ types.UID, _ string) (v1.ResourceRequirements, bool) {
167+
func (sc *noopStateCheckpoint) GetContainerResources(_ types.UID, _ string) (v1.ResourceRequirements, bool) {
165168
return v1.ResourceRequirements{}, false
166169
}
167170

168-
func (sc *noopStateCheckpoint) GetPodResourceAllocation() PodResourceAllocation {
171+
func (sc *noopStateCheckpoint) GetPodResourceInfoMap() PodResourceInfoMap {
169172
return nil
170173
}
171174

172-
func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ types.UID, _ string, _ v1.ResourceRequirements) error {
175+
func (sc *noopStateCheckpoint) SetContainerResources(_ types.UID, _ string, _ v1.ResourceRequirements) error {
173176
return nil
174177
}
175178

176-
func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ types.UID, _ map[string]v1.ResourceRequirements) error {
179+
func (sc *noopStateCheckpoint) SetPodResourceInfo(_ types.UID, _ PodResourceInfo) error {
177180
return nil
178181
}
179182

0 commit comments

Comments
 (0)