Skip to content

Commit efdd6be

Browse files
committed
kubelet checkpoint: refactor state memory
refactor state mem constructor to accept the state as parameter and SetPodAllocation to update a single pod.
1 parent 8e87297 commit efdd6be

File tree

6 files changed

+35
-23
lines changed

6 files changed

+35
-23
lines changed

pkg/kubelet/status/fake_status_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,6 @@ func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodRe
9393
// NewFakeManager creates empty/fake memory manager
9494
func NewFakeManager() Manager {
9595
return &fakeManager{
96-
state: state.NewStateMemory(),
96+
state: state.NewStateMemory(state.PodResourceAllocation{}, state.PodResizeStatus{}),
9797
}
9898
}

pkg/kubelet/status/state/state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type Reader interface {
4747

4848
type writer interface {
4949
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
50-
SetPodResourceAllocation(PodResourceAllocation) error
50+
SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error
5151
SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus)
5252
Delete(podUID string, containerName string) error
5353
ClearState() error

pkg/kubelet/status/state/state_checkpoint.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
4343
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err)
4444
}
4545
stateCheckpoint := &stateCheckpoint{
46-
cache: NewStateMemory(),
46+
cache: NewStateMemory(PodResourceAllocation{}, PodResizeStatus{}),
4747
checkpointManager: checkpointManager,
4848
checkpointName: checkpointName,
4949
}
@@ -76,10 +76,14 @@ func (sc *stateCheckpoint) restoreState() error {
7676
if err != nil {
7777
return fmt.Errorf("failed to get pod resource allocation info: %w", err)
7878
}
79-
err = sc.cache.SetPodResourceAllocation(praInfo.AllocationEntries)
80-
if err != nil {
81-
return fmt.Errorf("failed to set pod resource allocation: %w", err)
79+
80+
for podUID, alloc := range praInfo.AllocationEntries {
81+
err = sc.cache.SetPodResourceAllocation(podUID, alloc)
82+
if err != nil {
83+
klog.ErrorS(err, "failed to set pod resource allocation")
84+
}
8285
}
86+
8387
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
8488
return nil
8589
}
@@ -132,10 +136,15 @@ func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, contain
132136
}
133137

134138
// SetPodResourceAllocation sets pod resource allocation
135-
func (sc *stateCheckpoint) SetPodResourceAllocation(a PodResourceAllocation) error {
139+
func (sc *stateCheckpoint) SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error {
136140
sc.mux.Lock()
137141
defer sc.mux.Unlock()
138-
sc.cache.SetPodResourceAllocation(a)
142+
143+
err := sc.cache.SetPodResourceAllocation(podUID, alloc)
144+
if err != nil {
145+
return err
146+
}
147+
139148
return sc.storeState()
140149
}
141150

@@ -185,7 +194,7 @@ func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string
185194
return nil
186195
}
187196

188-
func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ PodResourceAllocation) error {
197+
func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ string, _ map[string]v1.ResourceRequirements) error {
189198
return nil
190199
}
191200

pkg/kubelet/status/state/state_checkpoint_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const testCheckpoint = "pod_status_manager_state"
3232

3333
func newTestStateCheckpoint(t *testing.T) *stateCheckpoint {
3434
testingDir := getTestDir(t)
35-
cache := NewStateMemory()
35+
cache := NewStateMemory(PodResourceAllocation{}, PodResizeStatus{})
3636
checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir)
3737
require.NoError(t, err, "failed to create checkpoint manager")
3838
checkpointName := "pod_state_checkpoint"
@@ -110,8 +110,10 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
110110
originalSC, err := NewStateCheckpoint(testDir, testCheckpoint)
111111
require.NoError(t, err)
112112

113-
err = originalSC.SetPodResourceAllocation(tt.args.podResourceAllocation)
114-
require.NoError(t, err)
113+
for podUID, alloc := range tt.args.podResourceAllocation {
114+
err = originalSC.SetPodResourceAllocation(podUID, alloc)
115+
require.NoError(t, err)
116+
}
115117

116118
actual := originalSC.GetPodResourceAllocation()
117119
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "stored pod resource allocation is not equal to original pod resource allocation")

pkg/kubelet/status/state/state_mem.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ type stateMemory struct {
3232
var _ State = &stateMemory{}
3333

3434
// NewStateMemory creates new State to track resources allocated to pods
35-
func NewStateMemory() State {
35+
func NewStateMemory(alloc PodResourceAllocation, stats PodResizeStatus) State {
3636
klog.V(2).InfoS("Initialized new in-memory state store for pod resource allocation tracking")
3737
return &stateMemory{
38-
podAllocation: PodResourceAllocation{},
39-
podResizeStatus: PodResizeStatus{},
38+
podAllocation: alloc,
39+
podResizeStatus: stats,
4040
}
4141
}
4242

@@ -74,12 +74,15 @@ func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerNam
7474
return nil
7575
}
7676

77-
func (s *stateMemory) SetPodResourceAllocation(a PodResourceAllocation) error {
77+
func (s *stateMemory) SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error {
7878
s.Lock()
7979
defer s.Unlock()
8080

81-
s.podAllocation = a.Clone()
82-
klog.V(3).InfoS("Updated pod resource allocation", "allocation", a)
81+
for containerName, containerAlloc := range alloc {
82+
s.podAllocation[podUID][containerName] = containerAlloc
83+
}
84+
85+
klog.V(3).InfoS("Updated pod resource allocation", "podUID", podUID, "allocation", alloc)
8386
return nil
8487
}
8588

pkg/kubelet/status/status_manager.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,16 +297,14 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
297297
m.podStatusesLock.RLock()
298298
defer m.podStatusesLock.RUnlock()
299299

300-
podUID := string(pod.UID)
301-
podAlloc := state.PodResourceAllocation{}
302-
podAlloc[podUID] = make(map[string]v1.ResourceRequirements)
300+
podAlloc := make(map[string]v1.ResourceRequirements)
303301

304302
for _, container := range pod.Spec.Containers {
305303
alloc := *container.Resources.DeepCopy()
306-
podAlloc[podUID][container.Name] = alloc
304+
podAlloc[container.Name] = alloc
307305
}
308306

309-
return m.state.SetPodResourceAllocation(podUID, podAlloc)
307+
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
310308
}
311309

312310
// SetPodResizeStatus checkpoints the last resizing decision for the pod.

0 commit comments

Comments
 (0)