Skip to content
This repository was archived by the owner on Jul 30, 2021. It is now read-only.

Commit a936ee1

Browse files
author
Yifan Gu
authored
Merge pull request #366 from yifan-gu/gc_checkpointer
Checkpoint and activate itself.
2 parents c4b581c + c4ec0bb commit a936ee1

File tree

8 files changed

+357
-50
lines changed

8 files changed

+357
-50
lines changed

cmd/checkpoint/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,8 @@ ConfigMaps are stored using a path of:
7979
```
8080
/etc/kubernets/checkpoint-configmaps/<namespace>/<pod-name>/<configmap-name>
8181
```
82+
### Self Checkpointing
83+
84+
The pod checkpoint will also checkpoint itself to the disk to handle the absence of the API server.
85+
After a node reboot, the on-disk pod-checkpointer will take over the responsibility.
86+
Once it reaches the API server and finds out that it's no longer being scheduled, it will clean up itself.

cmd/checkpoint/main.go

Lines changed: 149 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"path"
1313
"path/filepath"
1414
"strings"
15+
"syscall"
1516
"time"
1617

1718
"github.com/golang/glog"
@@ -26,7 +27,9 @@ import (
2627
)
2728

2829
const (
29-
nodeNameEnv = "NODE_NAME"
30+
nodeNameEnv = "NODE_NAME"
31+
podNameEnv = "POD_NAME"
32+
podNamespaceEnv = "POD_NAMESPACE"
3033

3134
// We must use both the :10255/pods and :10250/runningpods/ endpoints, because /pods endpoint could have stale data.
3235
// The /pods endpoint will only show the last cached status which has successfully been written to an apiserver.
@@ -48,17 +51,63 @@ const (
4851
podSourceFile = "file"
4952
)
5053

51-
//TODO(aaron): The checkpointer should know how to GC itself because it runs as a static pod.
54+
var (
55+
lockfilePath string
5256

53-
func main() {
57+
// TODO(yifan): Put these into a struct when necessary.
58+
nodeName string
59+
podName string
60+
podNamespace string
61+
)
62+
63+
func init() {
64+
flag.StringVar(&lockfilePath, "lock-file", "/var/run/lock/pod-checkpointer.lock", "The path to lock file for checkpointer to use")
5465
flag.Set("logtostderr", "true")
55-
flag.Parse()
56-
defer glog.Flush()
66+
}
67+
68+
// flock tries to grab a flock on the given path.
69+
// If the lock is already acquired by other process, the function will block.
70+
// TODO(yifan): Maybe replace this with kubernetes/pkg/util/flock.Acquire() once
71+
// https://github.com/kubernetes/kubernetes/issues/42929 is solved, or maybe not.
72+
func flock(path string) error {
73+
fd, err := syscall.Open(path, syscall.O_CREAT|syscall.O_RDWR, 0600)
74+
if err != nil {
75+
return err
76+
}
5777

58-
nodeName := os.Getenv(nodeNameEnv)
78+
// We don't need to close the fd since we should hold
79+
// it until the process exits.
80+
81+
return syscall.Flock(fd, syscall.LOCK_EX)
82+
}
83+
84+
// readDownwardAPI fills the node name, pod name, and pod namespace.
85+
func readDownwardAPI() {
86+
nodeName = os.Getenv(nodeNameEnv)
5987
if nodeName == "" {
6088
glog.Fatalf("Missing required environment variable: %s", nodeNameEnv)
6189
}
90+
podName = os.Getenv(podNameEnv)
91+
if podName == "" {
92+
glog.Fatalf("Missing required environment variable: %s", podNameEnv)
93+
}
94+
podNamespace = os.Getenv(podNamespaceEnv)
95+
if podNamespace == "" {
96+
glog.Fatalf("Missing required environment variable: %s", podNamespaceEnv)
97+
}
98+
}
99+
100+
func main() {
101+
flag.Parse()
102+
defer glog.Flush()
103+
104+
readDownwardAPI()
105+
106+
glog.Infof("Trying to acquire the flock at %q", lockfilePath)
107+
108+
if err := flock(lockfilePath); err != nil {
109+
glog.Fatalf("Error when acquiring the flock: %v", err)
110+
}
62111

63112
glog.Infof("Starting checkpointer for node: %s", nodeName)
64113
// This is run as a static pod, so we can't use InClusterConfig (no secrets/service account)
@@ -104,10 +153,10 @@ func main() {
104153
},
105154
}
106155

107-
run(client, kubelet, nodeName)
156+
run(client, kubelet)
108157
}
109158

110-
func run(client clientset.Interface, kubelet *kubeletClient, nodeName string) {
159+
func run(client clientset.Interface, kubelet *kubeletClient) {
111160
for {
112161
time.Sleep(3 * time.Second)
113162

@@ -138,9 +187,12 @@ func run(client clientset.Interface, kubelet *kubeletClient, nodeName string) {
138187
inactiveCheckpoints := getFileCheckpoints(inactiveCheckpointPath)
139188

140189
start, stop, remove := process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints)
141-
handleRemove(remove)
190+
191+
// Handle remove at last because we may still have some work to do
192+
// before removing the checkpointer itself.
142193
handleStop(stop)
143194
handleStart(start)
195+
handleRemove(remove)
144196
}
145197
}
146198

@@ -157,6 +209,10 @@ func run(client clientset.Interface, kubelet *kubeletClient, nodeName string) {
157209
// The removal of a checkpoint means its parent is no longer scheduled to this node, and we need to GC active / inactive
158210
// checkpoints as well as any secrets / configMaps which are no longer necessary.
159211
func process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints map[string]*v1.Pod) (start, stop, remove []string) {
212+
// If this variable is filled, then it means we need to remove the pod-checkpointer's checkpoint.
213+
// We treat the pod-checkpointer's checkpoint specially because we want to put it at the end of
214+
// the remove queue.
215+
var podCheckpointerID string
160216

161217
// We can only make some GC decisions if we've successfully contacted an apiserver.
162218
// When apiParentPods == nil, that means we were not able to get an updated list of pods.
@@ -183,7 +239,13 @@ func process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints
183239
// However, we don't know this, and as far as the checkpointer is concerned - that pod is no longer scheduled to this node.
184240
if _, ok := apiParentPods[id]; !ok {
185241
glog.V(4).Infof("API GC: should remove inactive checkpoint %s", id)
242+
186243
removeMap[id] = struct{}{}
244+
if isPodCheckpointer(inactiveCheckpoints[id]) {
245+
podCheckpointerID = id
246+
break
247+
}
248+
187249
delete(inactiveCheckpoints, id)
188250
}
189251
}
@@ -193,35 +255,65 @@ func process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints
193255
// If the active checkpoint does not have a parent in the api-server, we must assume it should no longer be running on this node.
194256
if _, ok := apiParentPods[id]; !ok {
195257
glog.V(4).Infof("API GC: should remove active checkpoint %s", id)
258+
196259
removeMap[id] = struct{}{}
260+
if isPodCheckpointer(activeCheckpoints[id]) {
261+
podCheckpointerID = id
262+
break
263+
}
264+
197265
delete(activeCheckpoints, id)
198266
}
199267
}
200268
}
201269

202-
// Can make decisions about starting/stopping checkpoints just with local state.
270+
// Remove all checkpoints if we need to remove the pod checkpointer itself.
271+
if podCheckpointerID != "" {
272+
for id := range inactiveCheckpoints {
273+
removeMap[id] = struct{}{}
274+
delete(inactiveCheckpoints, id)
275+
}
276+
for id := range activeCheckpoints {
277+
removeMap[id] = struct{}{}
278+
delete(activeCheckpoints, id)
279+
}
280+
}
203281

204-
// If there is an inactive checkpoint, and no parent is running, start the checkpoint
282+
// Can make decisions about starting/stopping checkpoints just with local state.
283+
//
284+
// If there is an inactive checkpoint, and no parent pod is running, or the checkpoint
285+
// is the pod-checkpointer, start the checkpoint.
205286
for id := range inactiveCheckpoints {
206-
if _, ok := localRunningPods[id]; !ok {
287+
_, ok := localRunningPods[id]
288+
if !ok || isPodCheckpointer(inactiveCheckpoints[id]) {
207289
glog.V(4).Infof("Should start checkpoint %s", id)
208290
start = append(start, id)
209291
}
210292
}
211293

212-
// If there is an active checkpoint and a running pod, stop the active checkpoint
213-
// The parent may not be in a running state, but the kubelet is trying to start it so we should get out of the way.
294+
// If there is an active checkpoint and a running parent pod, stop the active checkpoint
295+
// unless this is the pod-checkpointer.
296+
// The parent may not be in a running state, but the kubelet is trying to start it
297+
// so we should get out of the way.
214298
for id := range activeCheckpoints {
215-
if _, ok := localRunningPods[id]; ok {
299+
_, ok := localRunningPods[id]
300+
if ok && !isPodCheckpointer(activeCheckpoints[id]) {
216301
glog.V(4).Infof("Should stop checkpoint %s", id)
217302
stop = append(stop, id)
218303
}
219304
}
220305

221306
// De-duped checkpoints to remove. If we decide to GC a checkpoint, we will clean up both inactive/active.
222307
for k := range removeMap {
308+
if k == podCheckpointerID {
309+
continue
310+
}
223311
remove = append(remove, k)
224312
}
313+
// Put pod checkpoint at the last of the queue.
314+
if podCheckpointerID != "" {
315+
remove = append(remove, podCheckpointerID)
316+
}
225317

226318
return start, stop, remove
227319
}
@@ -283,6 +375,11 @@ func writeCheckpointManifest(pod *v1.Pod) error {
283375
return err
284376
}
285377
path := filepath.Join(inactiveCheckpointPath, pod.Namespace+"-"+pod.Name+".json")
378+
// Make sure the inactive checkpoint path exists.
379+
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
380+
return err
381+
}
382+
286383
oldb, err := ioutil.ReadFile(path)
287384
if err != nil && !os.IsNotExist(err) {
288385
return err
@@ -295,6 +392,17 @@ func writeCheckpointManifest(pod *v1.Pod) error {
295392
return writeAndAtomicRename(path, b, 0644)
296393
}
297394

395+
// isPodCheckpointer returns true if the manifest is the pod checkpointer (has the same name as the parent).
396+
// For example, the pod.Name would be "pod-checkpointer".
397+
// The podName would be "pod-checkpointer" or "pod-checkpointer-172.17.4.201" where
398+
// "172.17.4.201" is the nodeName.
399+
func isPodCheckpointer(pod *v1.Pod) bool {
400+
if pod.Namespace != podNamespace {
401+
return false
402+
}
403+
return pod.Name == strings.TrimSuffix(podName, "-"+nodeName)
404+
}
405+
298406
func sanitizeCheckpointPod(cp *v1.Pod) (*v1.Pod, error) {
299407
// Clear ObjectMeta except for name/namespace
300408
// NOTE(aaron): If we want to keep labels, we need to add a new label so the static pod
@@ -497,29 +605,42 @@ func checkpointConfigMap(client clientset.Interface, namespace, podName, configM
497605

498606
func handleRemove(remove []string) {
499607
for _, id := range remove {
500-
// Remove inactive checkpoints
501608
glog.Infof("Removing checkpoint of: %s", id)
502-
p := PodFullNameToInactiveCheckpointPath(id)
503-
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
504-
glog.Errorf("Failed to remove inactive checkpoint %s: %v", p, err)
505-
continue
506-
}
507-
// Remove active checkpoints
508-
p = PodFullNameToActiveCheckpointPath(id)
509-
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
510-
glog.Errorf("Failed to remove active checkpoint %s: %v", p, err)
511-
continue
512-
}
609+
513610
// Remove Secrets
514-
p = PodFullNameToSecretPath(id)
611+
p := PodFullNameToSecretPath(id)
515612
if err := os.RemoveAll(p); err != nil {
516613
glog.Errorf("Failed to remove pod secrets from %s: %s", p, err)
517614
}
615+
518616
// Remove ConfipMaps
519617
p = PodFullNameToConfigMapPath(id)
520618
if err := os.RemoveAll(p); err != nil {
521619
glog.Errorf("Failed to remove pod configMaps from %s: %s", p, err)
522620
}
621+
622+
// Remove inactive checkpoints
623+
p = PodFullNameToInactiveCheckpointPath(id)
624+
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
625+
glog.Errorf("Failed to remove inactive checkpoint %s: %v", p, err)
626+
continue
627+
}
628+
629+
// Remove active checkpoints.
630+
// We do this as the last step because we want to clean up
631+
// resources before the checkpointer itself exits.
632+
//
633+
// TODO(yifan): Removing the pods after removing the secrets/configmaps
634+
// might disturb other pods since they might want to use the configmap
635+
// or secrets during termination.
636+
//
637+
// However, since we are not waiting for them to terminate anyway, so it's
638+
// ok to just leave as is for now. We can handle this more gracefully later.
639+
p = PodFullNameToActiveCheckpointPath(id)
640+
if err := os.Remove(p); err != nil && !os.IsNotExist(err) {
641+
glog.Errorf("Failed to remove active checkpoint %s: %v", p, err)
642+
continue
643+
}
523644
}
524645
}
525646

0 commit comments

Comments
 (0)