Skip to content
This repository was archived by the owner on Jul 30, 2021. It is now read-only.

Commit 74d2cff

Browse files
committed
checkpointer: implement state machine.
This implements an explicit state machine to capture the various states a checkpoint can be in. It recovers those states from disk upon startup (if applicable) and then reconciles them with the states that are fetched from the various apiservers. It also adds 2 new states to attempt to work around issues with kubernetes 1.8 in which daemonset pods are deleted before being recreated. For single-master clusters this can lead to a total outage during apiserver upgrades since the checkpointer will aggressively retire the checkpoint for the deleted apiserver pod before it has a chance to schedule the new one.
1 parent d3f812d commit 74d2cff

File tree

7 files changed

+699
-142
lines changed

7 files changed

+699
-142
lines changed

pkg/checkpoint/apiserver.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@ import (
99
)
1010

1111
// getAPIParentPods will retrieve all pods from apiserver that are parents & should be checkpointed
12-
func (c *checkpointer) getAPIParentPods(nodeName string) map[string]*v1.Pod {
12+
// Returns false if we could not contact the apiserver.
13+
func (c *checkpointer) getAPIParentPods(nodeName string) (bool, map[string]*v1.Pod) {
1314
opts := metav1.ListOptions{
1415
FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String(),
1516
}
1617

1718
podList, err := c.apiserver.CoreV1().Pods(api.NamespaceAll).List(opts)
1819
if err != nil {
1920
glog.Warningf("Unable to contact APIServer, skipping garbage collection: %v", err)
20-
return nil
21+
return false, nil
2122
}
22-
return podListToParentPods(podList)
23+
return true, podListToParentPods(podList)
2324
}

pkg/checkpoint/checkpoint.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ package checkpoint
44

55
import (
66
"fmt"
7+
"os"
78
"time"
89

10+
"github.com/golang/glog"
911
"k8s.io/client-go/kubernetes"
1012
restclient "k8s.io/client-go/rest"
1113
)
@@ -23,12 +25,14 @@ const (
2325
shouldCheckpoint = "true"
2426
podSourceFile = "file"
2527

26-
defaultPollingFrequency = 3 * time.Second
27-
defaultCheckpointTimeout = 1 * time.Minute
28+
defaultPollingFrequency = 5 * time.Second
29+
defaultCheckpointTimeout = 1 * time.Minute
30+
defaultCheckpointGracePeriod = 1 * time.Minute
2831
)
2932

3033
var (
31-
lastCheckpoint time.Time
34+
lastCheckpoint time.Time
35+
checkpointGracePeriod = defaultCheckpointGracePeriod
3236
)
3337

3438
// Options defines the parameters that are required to start the checkpointer.
@@ -59,6 +63,7 @@ type checkpointer struct {
5963
kubelet *kubeletClient
6064
cri *remoteRuntimeService
6165
checkpointerPod CheckpointerPod
66+
checkpoints checkpoints
6267
}
6368

6469
// Run instantiates and starts a new checkpointer. Returns error if there was a problem creating
@@ -90,6 +95,11 @@ func Run(opts Options) error {
9095

9196
// run is the main checkpointing loop.
9297
func (c *checkpointer) run() {
98+
// Make sure the inactive checkpoint path exists.
99+
if err := os.MkdirAll(inactiveCheckpointPath, 0700); err != nil {
100+
glog.Fatalf("Could not create inactive checkpoint path: %v", err)
101+
}
102+
93103
for {
94104
time.Sleep(defaultPollingFrequency)
95105

@@ -101,20 +111,24 @@ func (c *checkpointer) run() {
101111
localParentPods := c.kubelet.localParentPods()
102112
localRunningPods := c.cri.localRunningPods()
103113

104-
c.createCheckpointsForValidParents(localParentPods)
105-
106114
// Try to get scheduled pods from the apiserver.
107115
// These will be used to GC checkpoints for parents no longer scheduled to this node.
108-
// A return value of nil is assumed to be "could not contact apiserver"
109116
// TODO(aaron): only check this every 30 seconds or so
110-
apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)
117+
apiAvailable, apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)
111118

112119
// Get on disk copies of (in)active checkpoints
113120
//TODO(aaron): Could be racy to load from disk each time, but much easier than trying to keep in-memory state in sync.
114121
activeCheckpoints := getFileCheckpoints(activeCheckpointPath)
115122
inactiveCheckpoints := getFileCheckpoints(inactiveCheckpointPath)
116123

117-
start, stop, remove := process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)
124+
// Update checkpoints using the latest information from the APIs.
125+
c.checkpoints.update(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)
126+
127+
// Update on-disk manifests based on updated checkpoint state.
128+
c.createCheckpointsForValidParents()
129+
130+
// Update checkpoint states and determine which checkpoints to start, stop, or remove.
131+
start, stop, remove := c.checkpoints.process(time.Now(), apiAvailable, localRunningPods, localParentPods, apiParentPods)
118132

119133
// Handle remove at last because we may still have some work to do
120134
// before removing the checkpointer itself.

pkg/checkpoint/manifest.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ func writeCheckpointManifest(pod *v1.Pod) (bool, error) {
5252
return false, err
5353
}
5454
path := filepath.Join(inactiveCheckpointPath, pod.Namespace+"-"+pod.Name+".json")
55-
// Make sure the inactive checkpoint path exists.
56-
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
57-
return false, err
58-
}
5955
return writeManifestIfDifferent(path, podFullName(pod), buff.Bytes())
6056
}
6157

0 commit comments

Comments
 (0)