Skip to content
This repository was archived by the owner on Jul 30, 2021. It is now read-only.

Commit 7cb123a

Browse files
committed
checkpointer: factor out checkpoint lib.
The monolithic main.go file was becoming too unwieldy to keep working on. This is a straightforward refactor: I moved functions into logical files and made the main() as lightweight as possible.
1 parent 7279a38 commit 7cb123a

File tree

13 files changed

+1686
-1532
lines changed

13 files changed

+1686
-1532
lines changed

cmd/checkpoint/main.go

Lines changed: 41 additions & 871 deletions
Large diffs are not rendered by default.

cmd/checkpoint/main_test.go

Lines changed: 0 additions & 661 deletions
This file was deleted.

pkg/checkpoint/apiserver.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package checkpoint
2+
3+
import (
4+
"github.com/golang/glog"
5+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
6+
"k8s.io/apimachinery/pkg/fields"
7+
"k8s.io/client-go/pkg/api"
8+
"k8s.io/client-go/pkg/api/v1"
9+
)
10+
11+
// getAPIParentPods will retrieve all pods from apiserver that are parents & should be checkpointed
12+
func (c *checkpointer) getAPIParentPods(nodeName string) map[string]*v1.Pod {
13+
opts := metav1.ListOptions{
14+
FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String(),
15+
}
16+
17+
podList, err := c.apiserver.CoreV1().Pods(api.NamespaceAll).List(opts)
18+
if err != nil {
19+
glog.Warningf("Unable to contact APIServer, skipping garbage collection: %v", err)
20+
return nil
21+
}
22+
return podListToParentPods(podList)
23+
}

pkg/checkpoint/checkpoint.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Package checkpoint provides libraries that are used by the pod-checkpointer utility to checkpoint
2+
// pods on a node. See cmd/checkpoint/README.md in this repository for more information.
3+
package checkpoint
4+
5+
import (
6+
"fmt"
7+
"time"
8+
9+
"k8s.io/client-go/kubernetes"
10+
restclient "k8s.io/client-go/rest"
11+
)
12+
13+
const (
14+
activeCheckpointPath = "/etc/kubernetes/manifests"
15+
inactiveCheckpointPath = "/etc/kubernetes/inactive-manifests"
16+
checkpointSecretPath = "/etc/kubernetes/checkpoint-secrets"
17+
checkpointConfigMapPath = "/etc/kubernetes/checkpoint-configmaps"
18+
19+
shouldCheckpointAnnotation = "checkpointer.alpha.coreos.com/checkpoint" // = "true"
20+
checkpointParentAnnotation = "checkpointer.alpha.coreos.com/checkpoint-of" // = "podName"
21+
podSourceAnnotation = "kubernetes.io/config.source"
22+
23+
shouldCheckpoint = "true"
24+
podSourceFile = "file"
25+
26+
defaultPollingFrequency = 3 * time.Second
27+
defaultCheckpointTimeout = 1 * time.Minute
28+
)
29+
30+
var (
31+
lastCheckpoint time.Time
32+
)
33+
34+
// Options defines the parameters that are required to start the checkpointer.
35+
type Options struct {
36+
// CheckpointerPod holds information about this checkpointer pod.
37+
CheckpointerPod CheckpointerPod
38+
// KubeConfig is a valid kubeconfig for communicating with the APIServer.
39+
KubeConfig *restclient.Config
40+
// RemoteRuntimeEndpoint is the location of the CRI GRPC endpoint.
41+
RemoteRuntimeEndpoint string
42+
// RuntimeRequestTimeout is the timeout that is used for requests to the RemoteRuntimeEndpoint.
43+
RuntimeRequestTimeout time.Duration
44+
}
45+
46+
// CheckpointerPod holds information about this checkpointer pod.
47+
type CheckpointerPod struct {
48+
// The name of the node this checkpointer is running on.
49+
NodeName string
50+
// The name of the pod that is running this checkpointer.
51+
PodName string
52+
// The namespace of the pod that is running this checkpointer.
53+
PodNamespace string
54+
}
55+
56+
// checkpointer holds state used by the checkpointer to perform its duties.
57+
type checkpointer struct {
58+
apiserver kubernetes.Interface
59+
kubelet *kubeletClient
60+
cri *remoteRuntimeService
61+
checkpointerPod CheckpointerPod
62+
}
63+
64+
// Run instantiates and starts a new checkpointer. Returns error if there was a problem creating
65+
// the checkpointer, otherwise never returns.
66+
func Run(opts Options) error {
67+
apiserver := kubernetes.NewForConfigOrDie(opts.KubeConfig)
68+
69+
kubelet, err := newKubeletClient(opts.KubeConfig)
70+
if err != nil {
71+
return fmt.Errorf("failed to load kubelet client: %v", err)
72+
}
73+
74+
// Open a GRPC connection to the CRI shim
75+
cri, err := newRemoteRuntimeService(opts.RemoteRuntimeEndpoint, opts.RuntimeRequestTimeout)
76+
if err != nil {
77+
return fmt.Errorf("failed to connect to CRI server: %v", err)
78+
}
79+
80+
cp := &checkpointer{
81+
apiserver: apiserver,
82+
kubelet: kubelet,
83+
cri: cri,
84+
checkpointerPod: opts.CheckpointerPod,
85+
}
86+
cp.run()
87+
88+
return nil
89+
}
90+
91+
// run is the main checkpointing loop.
92+
func (c *checkpointer) run() {
93+
for {
94+
time.Sleep(defaultPollingFrequency)
95+
96+
// We must use both the :10255/pods endpoint and CRI shim, because /pods
97+
// endpoint could have stale data. The /pods endpoint will only show the last cached
98+
// status which has successfully been written to an apiserver. However, if there is
99+
// no apiserver, we may get stale state (e.g. saying pod is running, when it really is
100+
// not).
101+
localParentPods := c.kubelet.localParentPods()
102+
localRunningPods := c.cri.localRunningPods()
103+
104+
c.createCheckpointsForValidParents(localParentPods)
105+
106+
// Try to get scheduled pods from the apiserver.
107+
// These will be used to GC checkpoints for parents no longer scheduled to this node.
108+
// A return value of nil is assumed to be "could not contact apiserver"
109+
// TODO(aaron): only check this every 30 seconds or so
110+
apiParentPods := c.getAPIParentPods(c.checkpointerPod.NodeName)
111+
112+
// Get on disk copies of (in)active checkpoints
113+
//TODO(aaron): Could be racy to load from disk each time, but much easier than trying to keep in-memory state in sync.
114+
activeCheckpoints := getFileCheckpoints(activeCheckpointPath)
115+
inactiveCheckpoints := getFileCheckpoints(inactiveCheckpointPath)
116+
117+
start, stop, remove := process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoints, c.checkpointerPod)
118+
119+
// Handle remove at last because we may still have some work to do
120+
// before removing the checkpointer itself.
121+
handleStop(stop)
122+
handleStart(start)
123+
handleRemove(remove)
124+
}
125+
}

pkg/checkpoint/config_map.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package checkpoint
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path"
7+
"path/filepath"
8+
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/client-go/pkg/api/v1"
11+
)
12+
13+
// checkpointConfigMapVolumes ensures that all pod configMaps are checkpointed locally, then converts the configMap volume to a hostpath.
14+
func (c *checkpointer) checkpointConfigMapVolumes(pod *v1.Pod) (*v1.Pod, error) {
15+
for i := range pod.Spec.Volumes {
16+
v := &pod.Spec.Volumes[i]
17+
if v.ConfigMap == nil {
18+
continue
19+
}
20+
21+
_, err := c.checkpointConfigMap(pod.Namespace, pod.Name, v.ConfigMap.Name)
22+
if err != nil {
23+
return nil, fmt.Errorf("failed to checkpoint configMap for pod %s/%s: %v", pod.Namespace, pod.Name, err)
24+
}
25+
}
26+
return pod, nil
27+
}
28+
29+
// checkpointConfigMap will locally store configMap data.
30+
// The path to the configMap data becomes: checkpointConfigMapPath/namespace/podname/configMapName/configMap.file
31+
// Where each "configMap.file" is a key from the configMap.Data field.
32+
func (c *checkpointer) checkpointConfigMap(namespace, podName, configMapName string) (string, error) {
33+
configMap, err := c.apiserver.Core().ConfigMaps(namespace).Get(configMapName, metav1.GetOptions{})
34+
if err != nil {
35+
return "", fmt.Errorf("failed to retrieve configMap %s/%s: %v", namespace, configMapName, err)
36+
}
37+
38+
basePath := configMapPath(namespace, podName, configMapName)
39+
if err := os.MkdirAll(basePath, 0700); err != nil {
40+
return "", fmt.Errorf("failed to create configMap checkpoint path %s: %v", basePath, err)
41+
}
42+
// TODO(aaron): No need to store if already exists
43+
for f, d := range configMap.Data {
44+
if err := writeAndAtomicRename(filepath.Join(basePath, f), []byte(d), 0600); err != nil {
45+
return "", fmt.Errorf("failed to write configMap %s: %v", configMap.Name, err)
46+
}
47+
}
48+
return basePath, nil
49+
}
50+
51+
func configMapPath(namespace, podName, configMapName string) string {
52+
return filepath.Join(checkpointConfigMapPath, namespace, podName, configMapName)
53+
}
54+
55+
func podFullNameToConfigMapPath(id string) string {
56+
namespace, podname := path.Split(id)
57+
return filepath.Join(checkpointConfigMapPath, namespace, podname)
58+
}

pkg/checkpoint/kubelet.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package checkpoint
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/golang/glog"
7+
"k8s.io/apimachinery/pkg/runtime/serializer"
8+
"k8s.io/client-go/kubernetes/scheme"
9+
"k8s.io/client-go/pkg/api/v1"
10+
"k8s.io/client-go/rest"
11+
)
12+
13+
// A minimal kubelet client. It assumes the kubelet can be reached the kubelet's insecure API at
14+
// 127.0.0.1:10255 and the secure API at 127.0.0.1:10250.
15+
type kubeletClient struct {
16+
insecureClient *rest.RESTClient
17+
secureClient *rest.RESTClient
18+
}
19+
20+
func newKubeletClient(config *rest.Config) (*kubeletClient, error) {
21+
// Use the core API group serializer. Same logic as client-go.
22+
// https://github.com/kubernetes/client-go/blob/v3.0.0/kubernetes/typed/core/v1/core_client.go#L147
23+
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}
24+
25+
// Kubelet is using a self-signed cert.
26+
config.TLSClientConfig.Insecure = true
27+
config.TLSClientConfig.CAFile = ""
28+
config.TLSClientConfig.CAData = nil
29+
30+
// Shallow copy.
31+
insecureConfig := *config
32+
secureConfig := *config
33+
34+
insecureConfig.Host = "http://127.0.0.1:10255"
35+
secureConfig.Host = "https://127.0.0.1:10250"
36+
37+
client := new(kubeletClient)
38+
var err error
39+
if client.insecureClient, err = rest.UnversionedRESTClientFor(&insecureConfig); err != nil {
40+
return nil, fmt.Errorf("failed creating kubelet client for debug endpoints: %v", err)
41+
}
42+
if client.secureClient, err = rest.UnversionedRESTClientFor(&secureConfig); err != nil {
43+
return nil, fmt.Errorf("failed creating kubelet client: %v", err)
44+
}
45+
46+
return client, nil
47+
}
48+
49+
// localParentPods will retrieve all pods from kubelet api that are parents & should be checkpointed
50+
func (k *kubeletClient) localParentPods() map[string]*v1.Pod {
51+
podList := new(v1.PodList)
52+
if err := k.insecureClient.Get().AbsPath("/pods/").Do().Into(podList); err != nil {
53+
// Assume there are no local parent pods.
54+
glog.Errorf("failed to list local parent pods, assuming none are running: %v", err)
55+
}
56+
return podListToParentPods(podList)
57+
}

pkg/checkpoint/manifest.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package checkpoint
2+
3+
import (
4+
"bytes"
5+
"io/ioutil"
6+
"os"
7+
"path/filepath"
8+
9+
"github.com/golang/glog"
10+
"k8s.io/apimachinery/pkg/runtime"
11+
"k8s.io/client-go/pkg/api"
12+
"k8s.io/client-go/pkg/api/v1"
13+
)
14+
15+
// getFileCheckpoints will retrieve all checkpoint manifests from a given filepath.
16+
func getFileCheckpoints(path string) map[string]*v1.Pod {
17+
checkpoints := make(map[string]*v1.Pod)
18+
19+
fi, err := ioutil.ReadDir(path)
20+
if err != nil {
21+
glog.Fatalf("Failed to read checkpoint manifest path: %v", err)
22+
}
23+
24+
for _, f := range fi {
25+
manifest := filepath.Join(path, f.Name())
26+
b, err := ioutil.ReadFile(manifest)
27+
if err != nil {
28+
glog.Errorf("Error reading manifest: %v", err)
29+
continue
30+
}
31+
32+
cp := &v1.Pod{}
33+
if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), b, cp); err != nil {
34+
glog.Errorf("Error unmarshalling manifest from %s: %v", filepath.Join(path, f.Name()), err)
35+
continue
36+
}
37+
38+
if isCheckpoint(cp) {
39+
if _, ok := checkpoints[podFullName(cp)]; ok { // sanity check
40+
glog.Warningf("Found multiple checkpoint pods in %s with same id: %s", path, podFullName(cp))
41+
}
42+
checkpoints[podFullName(cp)] = cp
43+
}
44+
}
45+
return checkpoints
46+
}
47+
48+
// writeCheckpointManifest will save the pod to the inactive checkpoint location if it doesn't already exist.
49+
func writeCheckpointManifest(pod *v1.Pod) (bool, error) {
50+
buff := &bytes.Buffer{}
51+
if err := podSerializer.Encode(pod, buff); err != nil {
52+
return false, err
53+
}
54+
path := filepath.Join(inactiveCheckpointPath, pod.Namespace+"-"+pod.Name+".json")
55+
// Make sure the inactive checkpoint path exists.
56+
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
57+
return false, err
58+
}
59+
return writeManifestIfDifferent(path, podFullName(pod), buff.Bytes())
60+
}
61+
62+
// writeManifestIfDifferent writes `data` to `path` if data is different from the existing content.
63+
// The `name` parameter is used for debug output.
64+
func writeManifestIfDifferent(path, name string, data []byte) (bool, error) {
65+
existing, err := ioutil.ReadFile(path)
66+
if err != nil && !os.IsNotExist(err) {
67+
return false, err
68+
}
69+
if bytes.Equal(existing, data) {
70+
glog.V(4).Infof("Checkpoint manifest for %q already exists. Skipping", name)
71+
return false, nil
72+
}
73+
glog.Infof("Writing manifest for %q to %q", name, path)
74+
return true, writeAndAtomicRename(path, data, 0644)
75+
}

0 commit comments

Comments
 (0)