Skip to content

Commit 2c4a863

Browse files
authored
Merge pull request kubernetes#126620 from yunwang0911/master
[InPlacePodVerticalScaling] fix restore checkpoint bug: failed to verify pod status checkpoint checksum because of different behaviors of func Quantity.Marshal and Quantity.Unmarshal
2 parents 6bc0768 + f428881 commit 2c4a863

File tree

3 files changed

+231
-43
lines changed

3 files changed

+231
-43
lines changed

pkg/kubelet/status/state/checkpoint.go

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,48 +18,64 @@ package state
1818

1919
import (
2020
"encoding/json"
21+
"fmt"
2122

2223
v1 "k8s.io/api/core/v1"
2324
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
2425
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
2526
)
2627

27-
var _ checkpointmanager.Checkpoint = &PodResourceAllocationCheckpoint{}
28+
var _ checkpointmanager.Checkpoint = &Checkpoint{}
2829

29-
// PodResourceAllocationCheckpoint is used to store resources allocated to a pod in checkpoint
30-
type PodResourceAllocationCheckpoint struct {
30+
type PodResourceAllocationInfo struct {
3131
AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"`
3232
ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"`
33-
Checksum checksum.Checksum `json:"checksum"`
3433
}
3534

36-
// NewPodResourceAllocationCheckpoint returns an instance of Checkpoint
37-
func NewPodResourceAllocationCheckpoint() *PodResourceAllocationCheckpoint {
38-
//lint:ignore unexported-type-in-api user-facing error message
39-
return &PodResourceAllocationCheckpoint{
40-
AllocationEntries: make(map[string]map[string]v1.ResourceRequirements),
41-
ResizeStatusEntries: make(map[string]v1.PodResizeStatus),
35+
// Checkpoint represents a structure to store pod resource allocation checkpoint data
36+
type Checkpoint struct {
37+
// Data is a serialized PodResourceAllocationInfo
38+
Data string `json:"data"`
39+
// Checksum is a checksum of Data
40+
Checksum checksum.Checksum `json:"checksum"`
41+
}
42+
43+
// NewCheckpoint creates a new checkpoint from a list of claim info states
44+
func NewCheckpoint(allocations *PodResourceAllocationInfo) (*Checkpoint, error) {
45+
46+
serializedAllocations, err := json.Marshal(allocations)
47+
if err != nil {
48+
return nil, fmt.Errorf("failed to serialize allocations for checkpointing: %w", err)
49+
}
50+
51+
cp := &Checkpoint{
52+
Data: string(serializedAllocations),
4253
}
54+
cp.Checksum = checksum.New(cp.Data)
55+
return cp, nil
4356
}
4457

45-
// MarshalCheckpoint returns marshalled checkpoint
46-
func (prc *PodResourceAllocationCheckpoint) MarshalCheckpoint() ([]byte, error) {
47-
// make sure checksum wasn't set before so it doesn't affect output checksum
48-
prc.Checksum = 0
49-
prc.Checksum = checksum.New(prc)
50-
return json.Marshal(*prc)
58+
func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
59+
return json.Marshal(cp)
5160
}
5261

53-
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
54-
func (prc *PodResourceAllocationCheckpoint) UnmarshalCheckpoint(blob []byte) error {
55-
return json.Unmarshal(blob, prc)
62+
// UnmarshalCheckpoint unmarshals checkpoint from JSON
63+
func (cp *Checkpoint) UnmarshalCheckpoint(blob []byte) error {
64+
return json.Unmarshal(blob, cp)
5665
}
5766

58-
// VerifyChecksum verifies that current checksum of checkpoint is valid
59-
func (prc *PodResourceAllocationCheckpoint) VerifyChecksum() error {
60-
ck := prc.Checksum
61-
prc.Checksum = 0
62-
err := ck.Verify(prc)
63-
prc.Checksum = ck
64-
return err
67+
// VerifyChecksum verifies that current checksum
68+
// of checkpointed Data is valid
69+
func (cp *Checkpoint) VerifyChecksum() error {
70+
return cp.Checksum.Verify(cp.Data)
71+
}
72+
73+
// GetPodResourceAllocationInfo returns Pod Resource Allocation info states from checkpoint
74+
func (cp *Checkpoint) GetPodResourceAllocationInfo() (*PodResourceAllocationInfo, error) {
75+
var data PodResourceAllocationInfo
76+
if err := json.Unmarshal([]byte(cp.Data), &data); err != nil {
77+
return nil, err
78+
}
79+
80+
return &data, nil
6581
}

pkg/kubelet/status/state/state_checkpoint.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,40 +61,46 @@ func (sc *stateCheckpoint) restoreState() error {
6161
defer sc.mux.Unlock()
6262
var err error
6363

64-
checkpoint := NewPodResourceAllocationCheckpoint()
64+
checkpoint, err := NewCheckpoint(nil)
65+
if err != nil {
66+
return fmt.Errorf("failed to create new checkpoint: %w", err)
67+
}
6568

6669
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
6770
if err == errors.ErrCheckpointNotFound {
6871
return sc.storeState()
6972
}
7073
return err
7174
}
72-
73-
sc.cache.SetPodResourceAllocation(checkpoint.AllocationEntries)
74-
sc.cache.SetResizeStatus(checkpoint.ResizeStatusEntries)
75+
praInfo, err := checkpoint.GetPodResourceAllocationInfo()
76+
if err != nil {
77+
return fmt.Errorf("failed to get pod resource allocation info: %w", err)
78+
}
79+
err = sc.cache.SetPodResourceAllocation(praInfo.AllocationEntries)
80+
if err != nil {
81+
return fmt.Errorf("failed to set pod resource allocation: %w", err)
82+
}
83+
err = sc.cache.SetResizeStatus(praInfo.ResizeStatusEntries)
84+
if err != nil {
85+
return fmt.Errorf("failed to set resize status: %w", err)
86+
}
7587
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
7688
return nil
7789
}
7890

7991
// saves state to a checkpoint, caller is responsible for locking
8092
func (sc *stateCheckpoint) storeState() error {
81-
checkpoint := NewPodResourceAllocationCheckpoint()
82-
8393
podAllocation := sc.cache.GetPodResourceAllocation()
84-
for pod := range podAllocation {
85-
checkpoint.AllocationEntries[pod] = make(map[string]v1.ResourceRequirements)
86-
for container, alloc := range podAllocation[pod] {
87-
checkpoint.AllocationEntries[pod][container] = alloc
88-
}
89-
}
9094

9195
podResizeStatus := sc.cache.GetResizeStatus()
92-
checkpoint.ResizeStatusEntries = make(map[string]v1.PodResizeStatus)
93-
for pUID, rStatus := range podResizeStatus {
94-
checkpoint.ResizeStatusEntries[pUID] = rStatus
96+
checkpoint, err := NewCheckpoint(&PodResourceAllocationInfo{
97+
AllocationEntries: podAllocation,
98+
ResizeStatusEntries: podResizeStatus,
99+
})
100+
if err != nil {
101+
return fmt.Errorf("failed to create checkpoint: %w", err)
95102
}
96-
97-
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
103+
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
98104
if err != nil {
99105
klog.ErrorS(err, "Failed to save pod allocation checkpoint")
100106
return err
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package state
18+
19+
import (
20+
"fmt"
21+
"os"
22+
"testing"
23+
24+
"github.com/stretchr/testify/require"
25+
26+
v1 "k8s.io/api/core/v1"
27+
"k8s.io/apimachinery/pkg/api/resource"
28+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
29+
)
30+
31+
const testCheckpoint = "pod_status_manager_state"
32+
33+
func newTestStateCheckpoint(t *testing.T) *stateCheckpoint {
34+
testingDir := getTestDir(t)
35+
cache := NewStateMemory()
36+
checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir)
37+
require.NoError(t, err, "failed to create checkpoint manager")
38+
checkpointName := "pod_state_checkpoint"
39+
sc := &stateCheckpoint{
40+
cache: cache,
41+
checkpointManager: checkpointManager,
42+
checkpointName: checkpointName,
43+
}
44+
return sc
45+
}
46+
47+
func getTestDir(t *testing.T) string {
48+
testingDir, err := os.MkdirTemp("", "pod_resource_allocation_state_test")
49+
require.NoError(t, err, "failed to create temp dir")
50+
t.Cleanup(func() {
51+
if err := os.RemoveAll(testingDir); err != nil {
52+
t.Fatal(err)
53+
}
54+
})
55+
return testingDir
56+
}
57+
58+
func verifyPodResourceAllocation(t *testing.T, expected, actual *PodResourceAllocation, msgAndArgs string) {
59+
for podUID, containerResourceList := range *expected {
60+
require.Equal(t, len(containerResourceList), len((*actual)[podUID]), msgAndArgs)
61+
for containerName, resourceList := range containerResourceList {
62+
for name, quantity := range resourceList.Requests {
63+
require.True(t, quantity.Equal((*actual)[podUID][containerName].Requests[name]), msgAndArgs)
64+
}
65+
}
66+
}
67+
}
68+
69+
func Test_stateCheckpoint_storeState(t *testing.T) {
70+
type args struct {
71+
podResourceAllocation PodResourceAllocation
72+
}
73+
74+
tests := []struct {
75+
name string
76+
args args
77+
}{}
78+
suffix := []string{"Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "n", "u", "m", "k", "M", "G", "T", "P", "E", ""}
79+
factor := []string{"1", "0.1", "0.03", "10", "100", "512", "1000", "1024", "700", "10000"}
80+
for _, fact := range factor {
81+
for _, suf := range suffix {
82+
if (suf == "E" || suf == "Ei") && (fact == "1000" || fact == "10000") {
83+
// when fact is 1000 or 10000, suffix "E" or "Ei", the quantity value is overflow
84+
// see detail https://github.com/kubernetes/apimachinery/blob/95b78024e3feada7739b40426690b4f287933fd8/pkg/api/resource/quantity.go#L301
85+
continue
86+
}
87+
tests = append(tests, struct {
88+
name string
89+
args args
90+
}{
91+
name: fmt.Sprintf("resource - %s%s", fact, suf),
92+
args: args{
93+
podResourceAllocation: PodResourceAllocation{
94+
"pod1": {
95+
"container1": {
96+
Requests: v1.ResourceList{
97+
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
98+
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
99+
},
100+
},
101+
},
102+
},
103+
},
104+
})
105+
}
106+
}
107+
for _, tt := range tests {
108+
t.Run(tt.name, func(t *testing.T) {
109+
testDir := getTestDir(t)
110+
originalSC, err := NewStateCheckpoint(testDir, testCheckpoint)
111+
require.NoError(t, err)
112+
113+
err = originalSC.SetPodResourceAllocation(tt.args.podResourceAllocation)
114+
require.NoError(t, err)
115+
116+
actual := originalSC.GetPodResourceAllocation()
117+
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "stored pod resource allocation is not equal to original pod resource allocation")
118+
119+
newSC, err := NewStateCheckpoint(testDir, testCheckpoint)
120+
require.NoError(t, err)
121+
122+
actual = newSC.GetPodResourceAllocation()
123+
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "restored pod resource allocation is not equal to original pod resource allocation")
124+
})
125+
}
126+
}
127+
128+
func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
129+
// Based on the PodResourceAllocationInfo struct, it's mostly possible that new field will be added
130+
// in struct PodResourceAllocationInfo, rather than in struct PodResourceAllocationInfo.AllocationEntries.
131+
// Emulate upgrade scenario by pretending that `ResizeStatusEntries` is a new field.
132+
// The checkpoint content doesn't have it and that shouldn't prevent the checkpoint from being loaded.
133+
sc := newTestStateCheckpoint(t)
134+
135+
// prepare old checkpoint, ResizeStatusEntries is unset,
136+
// pretend that the old checkpoint is unaware for the field ResizeStatusEntries
137+
const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}`
138+
expectedPodResourceAllocationInfo := &PodResourceAllocationInfo{
139+
AllocationEntries: map[string]map[string]v1.ResourceRequirements{
140+
"pod1": {
141+
"container1": {
142+
Requests: v1.ResourceList{
143+
v1.ResourceCPU: resource.MustParse("1Ki"),
144+
v1.ResourceMemory: resource.MustParse("1Ki"),
145+
},
146+
},
147+
},
148+
},
149+
ResizeStatusEntries: map[string]v1.PodResizeStatus{},
150+
}
151+
checkpoint := &Checkpoint{}
152+
err := checkpoint.UnmarshalCheckpoint([]byte(checkpointContent))
153+
require.NoError(t, err, "failed to unmarshal checkpoint")
154+
155+
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
156+
require.NoError(t, err, "failed to create old checkpoint")
157+
158+
err = sc.restoreState()
159+
require.NoError(t, err, "failed to restore state")
160+
161+
actualPodResourceAllocationInfo := &PodResourceAllocationInfo{}
162+
actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation()
163+
actualPodResourceAllocationInfo.ResizeStatusEntries = sc.cache.GetResizeStatus()
164+
require.NoError(t, err, "failed to get pod resource allocation info")
165+
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal")
166+
}

0 commit comments

Comments
 (0)