@@ -26,6 +26,7 @@ import (
26
26
"k8s.io/apimachinery/pkg/util/sets"
27
27
"k8s.io/klog/v2"
28
28
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
29
+ "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
29
30
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
30
31
)
31
32
@@ -36,6 +37,7 @@ type stateCheckpoint struct {
36
37
cache State
37
38
checkpointManager checkpointmanager.CheckpointManager
38
39
checkpointName string
40
+ lastChecksum checksum.Checksum
39
41
}
40
42
41
43
// NewStateCheckpoint creates new State for keeping track of pod resource allocations with checkpoint backend
@@ -45,41 +47,39 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
45
47
return nil , fmt .Errorf ("failed to initialize checkpoint manager for pod allocation tracking: %v" , err )
46
48
}
47
49
48
- praInfo , err := restoreState (checkpointManager , checkpointName )
50
+ pra , checksum , err := restoreState (checkpointManager , checkpointName )
49
51
if err != nil {
50
52
//lint:ignore ST1005 user-facing error message
51
53
return nil , fmt .Errorf ("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet" ,
52
54
err , path .Join (stateDir , checkpointName ))
53
55
}
54
56
55
57
stateCheckpoint := & stateCheckpoint {
56
- cache : NewStateMemory (praInfo . AllocationEntries ),
58
+ cache : NewStateMemory (pra ),
57
59
checkpointManager : checkpointManager ,
58
60
checkpointName : checkpointName ,
61
+ lastChecksum : checksum ,
59
62
}
60
63
return stateCheckpoint , nil
61
64
}
62
65
63
66
// restores state from a checkpoint and creates it if it doesn't exist
64
- func restoreState (checkpointManager checkpointmanager.CheckpointManager , checkpointName string ) (* PodResourceAllocationInfo , error ) {
65
- var err error
67
+ func restoreState (checkpointManager checkpointmanager.CheckpointManager , checkpointName string ) (PodResourceAllocation , checksum.Checksum , error ) {
66
68
checkpoint := & Checkpoint {}
67
-
68
- if err = checkpointManager .GetCheckpoint (checkpointName , checkpoint ); err != nil {
69
+ if err := checkpointManager .GetCheckpoint (checkpointName , checkpoint ); err != nil {
69
70
if err == errors .ErrCheckpointNotFound {
70
- return & PodResourceAllocationInfo {
71
- AllocationEntries : make (map [types.UID ]map [string ]v1.ResourceRequirements ),
72
- }, nil
71
+ return nil , 0 , nil
73
72
}
74
- return nil , err
73
+ return nil , 0 , err
75
74
}
76
- klog . V ( 2 ). InfoS ( "State checkpoint: restored pod resource allocation state from checkpoint" )
75
+
77
76
praInfo , err := checkpoint .GetPodResourceAllocationInfo ()
78
77
if err != nil {
79
- return nil , fmt .Errorf ("failed to get pod resource allocation info: %w" , err )
78
+ return nil , 0 , fmt .Errorf ("failed to get pod resource allocation info: %w" , err )
80
79
}
81
80
82
- return praInfo , nil
81
+ klog .V (2 ).InfoS ("State checkpoint: restored pod resource allocation state from checkpoint" )
82
+ return praInfo .AllocationEntries , checkpoint .Checksum , nil
83
83
}
84
84
85
85
// saves state to a checkpoint, caller is responsible for locking
@@ -92,11 +92,16 @@ func (sc *stateCheckpoint) storeState() error {
92
92
if err != nil {
93
93
return fmt .Errorf ("failed to create checkpoint: %w" , err )
94
94
}
95
+ if checkpoint .Checksum == sc .lastChecksum {
96
+ // No changes to the checkpoint => no need to re-write it.
97
+ return nil
98
+ }
95
99
err = sc .checkpointManager .CreateCheckpoint (sc .checkpointName , checkpoint )
96
100
if err != nil {
97
101
klog .ErrorS (err , "Failed to save pod allocation checkpoint" )
98
102
return err
99
103
}
104
+ sc .lastChecksum = checkpoint .Checksum
100
105
return nil
101
106
}
102
107
@@ -134,11 +139,13 @@ func (sc *stateCheckpoint) SetPodResourceAllocation(podUID types.UID, alloc map[
134
139
}
135
140
136
141
// Delete deletes allocations for specified pod
137
- func (sc * stateCheckpoint ) Delete (podUID types.UID , containerName string ) error {
142
+ func (sc * stateCheckpoint ) RemovePod (podUID types.UID ) error {
138
143
sc .mux .Lock ()
139
144
defer sc .mux .Unlock ()
140
- sc .cache .Delete (podUID , containerName )
141
- return sc .storeState ()
145
+ // Skip writing the checkpoint for pod deletion, since there is no side effect to
146
+ // keeping a deleted pod. Deleted pods will eventually be cleaned up by RemoveOrphanedPods.
147
+ // The deletion will be stored the next time a non-delete update is made.
148
+ return sc .cache .RemovePod (podUID )
142
149
}
143
150
144
151
func (sc * stateCheckpoint ) RemoveOrphanedPods (remainingPods sets.Set [types.UID ]) {
@@ -170,7 +177,7 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ types.UID, _ map[strin
170
177
return nil
171
178
}
172
179
173
- func (sc * noopStateCheckpoint ) Delete (_ types.UID , _ string ) error {
180
+ func (sc * noopStateCheckpoint ) RemovePod (_ types.UID ) error {
174
181
return nil
175
182
}
176
183
0 commit comments