Skip to content

Commit d97cf3a

Browse files
authored
Merge pull request kubernetes#126303 from bart0sh/PR150-dra-refactor-checkpoint-upstream
DRA: refactor checkpointing
2 parents 39a8079 + c0d922e commit d97cf3a

File tree

7 files changed

+589
-464
lines changed

7 files changed

+589
-464
lines changed

pkg/kubelet/cm/dra/claiminfo.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ type ClaimInfo struct {
4040
// claimInfoCache is a cache of processed resource claims keyed by namespace/claimname.
4141
type claimInfoCache struct {
4242
sync.RWMutex
43-
state state.CheckpointState
44-
claimInfo map[string]*ClaimInfo
43+
checkpointer state.Checkpointer
44+
claimInfo map[string]*ClaimInfo
4545
}
4646

4747
// newClaimInfoFromClaim creates a new claim info from a resource claim.
@@ -77,12 +77,12 @@ func newClaimInfoFromState(state *state.ClaimInfoState) *ClaimInfo {
7777
}
7878

7979
// setCDIDevices adds a set of CDI devices to the claim info.
80-
func (info *ClaimInfo) addDevice(driverName string, device state.Device) {
80+
func (info *ClaimInfo) addDevice(driverName string, deviceState state.Device) {
8181
if info.DriverState == nil {
8282
info.DriverState = make(map[string]state.DriverState)
8383
}
8484
driverState := info.DriverState[driverName]
85-
driverState.Devices = append(driverState.Devices, device)
85+
driverState.Devices = append(driverState.Devices, deviceState)
8686
info.DriverState[driverName] = driverState
8787
}
8888

@@ -113,22 +113,27 @@ func (info *ClaimInfo) isPrepared() bool {
113113

114114
// newClaimInfoCache creates a new claim info cache object, pre-populated from a checkpoint (if present).
115115
func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) {
116-
stateImpl, err := state.NewCheckpointState(stateDir, checkpointName)
116+
checkpointer, err := state.NewCheckpointer(stateDir, checkpointName)
117117
if err != nil {
118118
return nil, fmt.Errorf("could not initialize checkpoint manager, please drain node and remove dra state file, err: %+v", err)
119119
}
120120

121-
curState, err := stateImpl.GetOrCreate()
121+
checkpoint, err := checkpointer.GetOrCreate()
122122
if err != nil {
123123
return nil, fmt.Errorf("error calling GetOrCreate() on checkpoint state: %v", err)
124124
}
125125

126126
cache := &claimInfoCache{
127-
state: stateImpl,
128-
claimInfo: make(map[string]*ClaimInfo),
127+
checkpointer: checkpointer,
128+
claimInfo: make(map[string]*ClaimInfo),
129129
}
130130

131-
for _, entry := range curState {
131+
entries, err := checkpoint.GetClaimInfoStateList()
132+
if err != nil {
133+
return nil, fmt.Errorf("error calling GetEntries() on checkpoint: %w", err)
134+
135+
}
136+
for _, entry := range entries {
132137
info := newClaimInfoFromState(&entry)
133138
cache.claimInfo[info.Namespace+"/"+info.ClaimName] = info
134139
}
@@ -192,7 +197,11 @@ func (cache *claimInfoCache) syncToCheckpoint() error {
192197
for _, infoClaim := range cache.claimInfo {
193198
claimInfoStateList = append(claimInfoStateList, infoClaim.ClaimInfoState)
194199
}
195-
return cache.state.Store(claimInfoStateList)
200+
checkpoint, err := state.NewCheckpoint(claimInfoStateList)
201+
if err != nil {
202+
return err
203+
}
204+
return cache.checkpointer.Store(checkpoint)
196205
}
197206

198207
// cdiDevicesAsList returns a list of CDIDevices from the provided claim info.

pkg/kubelet/cm/dra/state/checkpoint.go

Lines changed: 71 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,51 +18,90 @@ package state
1818

1919
import (
2020
"encoding/json"
21+
"hash/crc32"
2122

22-
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
23-
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
2425
)
2526

26-
var _ checkpointmanager.Checkpoint = &DRAManagerCheckpoint{}
27+
const (
28+
CheckpointAPIGroup = "checkpoint.dra.kubelet.k8s.io"
29+
CheckpointKind = "DRACheckpoint"
30+
CheckpointAPIVersion = CheckpointAPIGroup + "/v1"
31+
)
2732

28-
const checkpointVersion = "v1"
33+
// Checkpoint represents a structure to store DRA checkpoint data
34+
type Checkpoint struct {
35+
// Data is a JSON serialized checkpoint data
36+
Data string
37+
// Checksum is a checksum of Data
38+
Checksum uint32
39+
}
2940

30-
// DRAManagerCheckpoint struct is used to store pod dynamic resources assignments in a checkpoint
31-
type DRAManagerCheckpoint struct {
32-
Version string `json:"version"`
33-
Entries ClaimInfoStateList `json:"entries,omitempty"`
34-
Checksum checksum.Checksum `json:"checksum"`
41+
type CheckpointData struct {
42+
metav1.TypeMeta
43+
ClaimInfoStateList ClaimInfoStateList
3544
}
3645

37-
// List of claim info to store in checkpoint
38-
type ClaimInfoStateList []ClaimInfoState
46+
// NewCheckpoint creates a new checkpoint from a list of claim info states
47+
func NewCheckpoint(data ClaimInfoStateList) (*Checkpoint, error) {
48+
cpData := &CheckpointData{
49+
TypeMeta: metav1.TypeMeta{
50+
Kind: CheckpointKind,
51+
APIVersion: CheckpointAPIVersion,
52+
},
53+
ClaimInfoStateList: data,
54+
}
55+
56+
cpDataBytes, err := json.Marshal(cpData)
57+
if err != nil {
58+
return nil, err
59+
}
3960

40-
// NewDRAManagerCheckpoint returns an instance of Checkpoint
41-
func NewDRAManagerCheckpoint() *DRAManagerCheckpoint {
42-
return &DRAManagerCheckpoint{
43-
Version: checkpointVersion,
44-
Entries: ClaimInfoStateList{},
61+
cp := &Checkpoint{
62+
Data: string(cpDataBytes),
63+
Checksum: crc32.ChecksumIEEE(cpDataBytes),
4564
}
65+
66+
return cp, nil
4667
}
4768

48-
// MarshalCheckpoint returns marshalled checkpoint
49-
func (dc *DRAManagerCheckpoint) MarshalCheckpoint() ([]byte, error) {
50-
// make sure checksum wasn't set before so it doesn't affect output checksum
51-
dc.Checksum = 0
52-
dc.Checksum = checksum.New(dc)
53-
return json.Marshal(*dc)
69+
// MarshalCheckpoint marshals checkpoint to JSON
70+
func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
71+
return json.Marshal(cp)
5472
}
5573

56-
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
57-
func (dc *DRAManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error {
58-
return json.Unmarshal(blob, dc)
74+
// UnmarshalCheckpoint unmarshals checkpoint from JSON
75+
// and verifies its data checksum
76+
func (cp *Checkpoint) UnmarshalCheckpoint(blob []byte) error {
77+
if err := json.Unmarshal(blob, cp); err != nil {
78+
return err
79+
}
80+
81+
// verify checksum
82+
if err := cp.VerifyChecksum(); err != nil {
83+
return err
84+
}
85+
86+
return nil
5987
}
6088

61-
// VerifyChecksum verifies that current checksum of checkpoint is valid
62-
func (dc *DRAManagerCheckpoint) VerifyChecksum() error {
63-
ck := dc.Checksum
64-
dc.Checksum = 0
65-
err := ck.Verify(dc)
66-
dc.Checksum = ck
67-
return err
89+
// VerifyChecksum verifies that current checksum
90+
// of checkpointed Data is valid
91+
func (cp *Checkpoint) VerifyChecksum() error {
92+
expectedCS := crc32.ChecksumIEEE([]byte(cp.Data))
93+
if expectedCS != cp.Checksum {
94+
return &errors.CorruptCheckpointError{ActualCS: uint64(cp.Checksum), ExpectedCS: uint64(expectedCS)}
95+
}
96+
return nil
97+
}
98+
99+
// GetClaimInfoStateList returns list of claim info states from checkpoint
100+
func (cp *Checkpoint) GetClaimInfoStateList() (ClaimInfoStateList, error) {
101+
var data CheckpointData
102+
if err := json.Unmarshal([]byte(cp.Data), &data); err != nil {
103+
return nil, err
104+
}
105+
106+
return data.ClaimInfoStateList, nil
68107
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package state
18+
19+
import (
20+
"errors"
21+
"fmt"
22+
"sync"
23+
24+
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
25+
checkpointerrors "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
26+
)
27+
28+
type Checkpointer interface {
29+
GetOrCreate() (*Checkpoint, error)
30+
Store(*Checkpoint) error
31+
}
32+
33+
type checkpointer struct {
34+
sync.RWMutex
35+
checkpointManager checkpointmanager.CheckpointManager
36+
checkpointName string
37+
}
38+
39+
// NewCheckpointer creates new checkpointer for keeping track of claim info with checkpoint backend
40+
func NewCheckpointer(stateDir, checkpointName string) (Checkpointer, error) {
41+
if len(checkpointName) == 0 {
42+
return nil, fmt.Errorf("received empty string instead of checkpointName")
43+
}
44+
45+
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
46+
if err != nil {
47+
return nil, fmt.Errorf("failed to initialize checkpoint manager: %w", err)
48+
}
49+
50+
checkpointer := &checkpointer{
51+
checkpointManager: checkpointManager,
52+
checkpointName: checkpointName,
53+
}
54+
55+
return checkpointer, nil
56+
}
57+
58+
// GetOrCreate gets list of claim info states from a checkpoint
59+
// or creates empty list if checkpoint doesn't exist
60+
func (sc *checkpointer) GetOrCreate() (*Checkpoint, error) {
61+
sc.Lock()
62+
defer sc.Unlock()
63+
64+
checkpoint, err := NewCheckpoint(nil)
65+
if err != nil {
66+
return nil, fmt.Errorf("failed to create new checkpoint: %w", err)
67+
}
68+
69+
err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint)
70+
if errors.Is(err, checkpointerrors.ErrCheckpointNotFound) {
71+
err = sc.store(checkpoint)
72+
if err != nil {
73+
return nil, fmt.Errorf("failed to store checkpoint %v: %w", sc.checkpointName, err)
74+
}
75+
return checkpoint, nil
76+
}
77+
if err != nil {
78+
return nil, fmt.Errorf("failed to get checkpoint %v: %w", sc.checkpointName, err)
79+
}
80+
81+
return checkpoint, nil
82+
}
83+
84+
// Store stores checkpoint to the file
85+
func (sc *checkpointer) Store(checkpoint *Checkpoint) error {
86+
sc.Lock()
87+
defer sc.Unlock()
88+
89+
return sc.store(checkpoint)
90+
}
91+
92+
// store saves state to a checkpoint, caller is responsible for locking
93+
func (sc *checkpointer) store(checkpoint *Checkpoint) error {
94+
if err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint); err != nil {
95+
return fmt.Errorf("could not save checkpoint %s: %w", sc.checkpointName, err)
96+
}
97+
return nil
98+
}

0 commit comments

Comments
 (0)