Skip to content
This repository was archived by the owner on Jul 30, 2021. It is now read-only.

Commit 69f99ef

Browse files
authored
Merge pull request #755 from diegs/cp-cooldown
checkpointer: implement state machine.
2 parents b2295ab + 02785c9 commit 69f99ef

File tree

9 files changed

+780
-204
lines changed

9 files changed

+780
-204
lines changed

pkg/checkpoint/apiserver.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@ import (
99
)
1010

1111
// getAPIParentPods will retrieve all pods from apiserver that are parents & should be checkpointed
12-
func (c *checkpointer) getAPIParentPods(nodeName string) map[string]*v1.Pod {
12+
// Returns false if we could not contact the apiserver.
13+
func (c *checkpointer) getAPIParentPods(nodeName string) (bool, map[string]*v1.Pod) {
1314
opts := metav1.ListOptions{
1415
FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String(),
1516
}
1617

1718
podList, err := c.apiserver.CoreV1().Pods(api.NamespaceAll).List(opts)
1819
if err != nil {
1920
glog.Warningf("Unable to contact APIServer, skipping garbage collection: %v", err)
20-
return nil
21+
return false, nil
2122
}
22-
return podListToParentPods(podList)
23+
return true, podListToParentPods(podList)
2324
}

pkg/checkpoint/checkpoint.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ package checkpoint
44

55
import (
66
"fmt"
7+
"os"
78
"time"
89

10+
"github.com/golang/glog"
911
"k8s.io/client-go/kubernetes"
1012
restclient "k8s.io/client-go/rest"
1113
)
@@ -23,12 +25,14 @@ const (
2325
shouldCheckpoint = "true"
2426
podSourceFile = "file"
2527

26-
defaultPollingFrequency = 3 * time.Second
27-
defaultCheckpointTimeout = 1 * time.Minute
28+
defaultPollingFrequency = 5 * time.Second
29+
defaultCheckpointTimeout = 1 * time.Minute
30+
defaultCheckpointGracePeriod = 1 * time.Minute
2831
)
2932

3033
var (
31-
lastCheckpoint time.Time
34+
lastCheckpoint time.Time
35+
checkpointGracePeriod = defaultCheckpointGracePeriod
3236
)
3337

3438
// Options defines the parameters that are required to start the checkpointer.
@@ -59,6 +63,7 @@ type checkpointer struct {
5963
kubelet *kubeletClient
6064
cri *remoteRuntimeService
6165
checkpointerPod CheckpointerPod
66+
checkpoints checkpoints
6267
}
6368

6469
// Run instantiates and starts a new checkpointer. Returns error if there was a problem creating
@@ -90,6 +95,11 @@ func Run(opts Options) error {
9095

9196
// run is the main checkpointing loop.
9297
func (c *checkpointer) run() {
98+
// Make sure the inactive checkpoint path exists.
99+
if err := os.MkdirAll(inactiveCheckpointPath, 0700); err != nil {
100+
glog.Fatalf("Could not create inactive checkpoint path: %v", err)
101+
}
102+
93103
for {
94104
time.Sleep(defaultPollingFrequency)
95105

@@ -101,20 +111,24 @@ func (c *checkpointer) run() {
101111
localParentPods := c.kubelet.localParentPods()
102112
localRunningPods := c.cri.localRunningPods()
103113

104-
c.createCheckpointsForValidParents(localParentPods)
105-
106114
// Try to get scheduled pods from the apiserver.
107115
// These will be used to GC checkpoints for parents no longer scheduled to this node.
108-
// A return value of nil is assumed to be "could not contact apiserver"
109116
// TODO(aaron): only check this every 30 seconds or so
110-
apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)
117+
apiAvailable, apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)
111118

112119
// Get on disk copies of (in)active checkpoints
113120
//TODO(aaron): Could be racy to load from disk each time, but much easier than trying to keep in-memory state in sync.
114121
activeCheckpoints := getFileCheckpoints(activeCheckpointPath)
115122
inactiveCheckpoints := getFileCheckpoints(inactiveCheckpointPath)
116123

117-
start, stop, remove := process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)
124+
// Update checkpoints using the latest information from the APIs.
125+
c.checkpoints.update(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)
126+
127+
// Update on-disk manifests based on updated checkpoint state.
128+
c.createCheckpointsForValidParents()
129+
130+
// Update checkpoint states and determine which checkpoints to start, stop, or remove.
131+
start, stop, remove := c.checkpoints.process(time.Now(), apiAvailable, localRunningPods, localParentPods, apiParentPods)
118132

119133
// Handle remove at last because we may still have some work to do
120134
// before removing the checkpointer itself.

pkg/checkpoint/manifest.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io/ioutil"
66
"os"
77
"path/filepath"
8+
"strings"
89

910
"github.com/golang/glog"
1011
"k8s.io/apimachinery/pkg/runtime"
@@ -23,6 +24,16 @@ func getFileCheckpoints(path string) map[string]*v1.Pod {
2324

2425
for _, f := range fi {
2526
manifest := filepath.Join(path, f.Name())
27+
28+
// Check for leftover temporary checkpoints.
29+
if strings.HasPrefix(filepath.Base(manifest), ".") {
30+
glog.V(4).Infof("Found temporary checkpoint %s, removing.", manifest)
31+
if err := os.Remove(manifest); err != nil {
32+
glog.V(4).Infof("Error removing temporary checkpoint %s: %v.", manifest, err)
33+
}
34+
continue
35+
}
36+
2637
b, err := ioutil.ReadFile(manifest)
2738
if err != nil {
2839
glog.Errorf("Error reading manifest: %v", err)
@@ -52,10 +63,6 @@ func writeCheckpointManifest(pod *v1.Pod) (bool, error) {
5263
return false, err
5364
}
5465
path := filepath.Join(inactiveCheckpointPath, pod.Namespace+"-"+pod.Name+".json")
55-
// Make sure the inactive checkpoint path exists.
56-
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
57-
return false, err
58-
}
5966
return writeManifestIfDifferent(path, podFullName(pod), buff.Bytes())
6067
}
6168

@@ -73,3 +80,21 @@ func writeManifestIfDifferent(path, name string, data []byte) (bool, error) {
7380
glog.Infof("Writing manifest for %q to %q", name, path)
7481
return true, writeAndAtomicRename(path, data, 0644)
7582
}
83+
84+
func writeAndAtomicRename(path string, data []byte, perm os.FileMode) error {
85+
// Ensure that the temporary file is on the same filesystem so that os.Rename() does not error.
86+
tmpfile, err := ioutil.TempFile(filepath.Dir(path), ".")
87+
if err != nil {
88+
return err
89+
}
90+
if _, err := tmpfile.Write(data); err != nil {
91+
return err
92+
}
93+
if err := tmpfile.Close(); err != nil {
94+
return err
95+
}
96+
if err := tmpfile.Chmod(perm); err != nil {
97+
return err
98+
}
99+
return os.Rename(tmpfile.Name(), path)
100+
}

pkg/checkpoint/pod.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package checkpoint
22

33
import (
4-
"io/ioutil"
5-
"os"
64
"path/filepath"
75
"strings"
86

@@ -163,11 +161,3 @@ func podFullNameToInactiveCheckpointPath(id string) string {
163161
func podFullNameToActiveCheckpointPath(id string) string {
164162
return filepath.Join(activeCheckpointPath, strings.Replace(id, "/", "-", -1)+".json")
165163
}
166-
167-
func writeAndAtomicRename(path string, data []byte, perm os.FileMode) error {
168-
tmpfile := filepath.Join(filepath.Dir(path), "."+filepath.Base(path))
169-
if err := ioutil.WriteFile(tmpfile, data, perm); err != nil {
170-
return err
171-
}
172-
return os.Rename(tmpfile, path)
173-
}

0 commit comments

Comments
 (0)