@@ -2,6 +2,7 @@ package main
22
33import (
44 "bytes"
5+ "crypto/tls"
56 "encoding/json"
67 "flag"
78 "fmt"
@@ -20,15 +21,18 @@ import (
2021 "k8s.io/kubernetes/pkg/api/v1"
2122 clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
2223 "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
23- clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
2424 "k8s.io/kubernetes/pkg/fields"
2525 "k8s.io/kubernetes/pkg/runtime"
2626)
2727
2828const (
2929 nodeNameEnv = "NODE_NAME"
3030
31- kubeletAPIPodsURL = "http://127.0.0.1:10255/pods"
31+ // We must use both the :10255/pods and :10250/runningpods/ endpoints, because /pods endpoint could have stale data.
32+ // The /pods endpoint will only show the last cached status which has successfully been written to an apiserver.
33+ // However, if there is no apiserver, we may get stale state (e.g. saying pod is running, when it really is not).
34+ kubeletAPIPodsURL = "http://127.0.0.1:10255/pods"
35+ kubeletAPIRunningPodsURL = "https://127.0.0.1:10250/runningpods/"
3236
3337 activeCheckpointPath = "/etc/kubernetes/manifests"
3438 inactiveCheckpointPath = "/srv/kubernetes/manifests"
@@ -70,6 +74,12 @@ func run(client clientset.Interface, nodeName string) {
7074 continue
7175 }
7276
77+ localRunningPods , err := getLocalRunningPods ()
78+ if err != nil {
79+ glog .Errorf ("Failed to retrieve running pods from kubelet api: %v" , err )
80+ continue
81+ }
82+
7383 createCheckpointsForValidParents (client , localParentPods )
7484
7585 // Try to get scheduled pods from the apiserver.
@@ -83,7 +93,7 @@ func run(client clientset.Interface, nodeName string) {
8393 activeCheckpoints := getFileCheckpoints (activeCheckpointPath )
8494 inactiveCheckpoints := getFileCheckpoints (inactiveCheckpointPath )
8595
86- start , stop , remove := process (localParentPods , apiParentPods , activeCheckpoints , inactiveCheckpoints )
96+ start , stop , remove := process (localRunningPods , localParentPods , apiParentPods , activeCheckpoints , inactiveCheckpoints )
8797 handleRemove (remove )
8898 handleStop (stop )
8999 handleStart (start )
@@ -93,17 +103,18 @@ func run(client clientset.Interface, nodeName string) {
93103// process() makes decisions on which checkpoints need to be started, stopped, or removed.
94104// It makes this decision based on inspecting the states from kubelet, apiserver, active/inactive checkpoints.
95105//
96- // - localParentPods: pod state from kubelet api for all "to be checkpointed" pods
106+ // - localRunningPods: running pods retrieved from kubelet api. Minimal amount of info (no podStatus) as it is extracted from container runtime.
107+ // - localParentPods: pod state from kubelet api for all "to be checkpointed" pods - podStatus may be stale (only as recent as last apiserver contact)
97108// - apiParentPods: pod state from the api server for all "to be checkpointed" pods
98109// - activeCheckpoints: checkpoint pod manifests which are currently active & in the static pod manifest
99- // - inactiveCheckpoints: checkpoint pod manifets which are stored in an inactive directory, but are ready to be activated
110+ // - inactiveCheckpoints: checkpoint pod manifest which are stored in an inactive directory, but are ready to be activated
100111//
101- // The return values are checkpoints which should be started or stopped, and checkpoints which need to be removed alltogether .
112+ // The return values are checkpoints which should be started or stopped, and checkpoints which need to be removed altogether .
102113// The removal of a checkpoint means its parent is no longer scheduled to this node, and we need to GC active / inactive
103114// checkpoints as well as any secrets / configMaps which are no longer necessary.
104- func process (localParentPods , apiParentPods , activeCheckpoints , inactiveCheckpoints map [string ]* v1.Pod ) (start , stop , remove []string ) {
115+ func process (localRunningPods , localParentPods , apiParentPods , activeCheckpoints , inactiveCheckpoints map [string ]* v1.Pod ) (start , stop , remove []string ) {
105116
106- // We can only make some GC decisions if we've successfuly contacted an apiserver.
117+ // We can only make some GC decisions if we've successfully contacted an apiserver.
107118 // When apiParentPods == nil, that means we were not able to get an updated list of pods.
108119 removeMap := make (map [string ]struct {})
109120 if apiParentPods != nil {
@@ -113,6 +124,10 @@ func process(localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoi
113124 // If the inactive checkpoint still has a parent pod, do nothing.
114125 // This means the kubelet thinks it should still be running, which has the same scheduling info that we do --
115126 // so we won't make any decisions about its checkpoint.
127+ // TODO(aaron): This is a safety check, and may not be necessary -- question is do we trust that the api state we received
128+ // is accurate -- and that we should ignore our local state (or assume it could be inaccurate). For example,
129+ // local kubelet pod state will be innacurate in the case that we can't contact apiserver (kubelet only keeps
130+ // cached responses from api) -- however, we're assuming we've been able to contact api, so this likely is moot.
116131 if _ , ok := localParentPods [id ]; ok {
117132 glog .V (4 ).Infof ("API GC: skipping inactive checkpoint %s" , id )
118133 continue
@@ -144,16 +159,16 @@ func process(localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoi
144159
145160 // If there is an inactive checkpoint, and no parent is running, start the checkpoint
146161 for id := range inactiveCheckpoints {
147- if _ , ok := localParentPods [id ]; ! ok {
162+ if _ , ok := localRunningPods [id ]; ! ok {
148163 glog .V (4 ).Infof ("Should start checkpoint %s" , id )
149164 start = append (start , id )
150165 }
151166 }
152167
153- // If there is an active checkpoint and a parent pod, stop the active checkpoint
168+ // If there is an active checkpoint and a running pod, stop the active checkpoint
154169 // The parent may not be in a running state, but the kubelet is trying to start it so we should get out of the way.
155170 for id := range activeCheckpoints {
156- if _ , ok := localParentPods [id ]; ok {
171+ if _ , ok := localRunningPods [id ]; ok {
157172 glog .V (4 ).Infof ("Should stop checkpoint %s" , id )
158173 stop = append (stop , id )
159174 }
@@ -167,13 +182,19 @@ func process(localParentPods, apiParentPods, activeCheckpoints, inactiveCheckpoi
167182 return start , stop , remove
168183}
169184
170- // createCheckpointsForValidParents will iterate through pods which are candidates for checkpoing, and if they should be :
185+ // createCheckpointsForValidParents will iterate through pods which are candidates for checkpointing, then :
171186// - checkpoint any remote assets they need (e.g. secrets)
172187// - sanitize their podSpec, removing unnecessary information
173188// - store the manifest on disk in an "inactive" checkpoint location
174189//TODO(aaron): Add support for checkpointing configMaps
175190func createCheckpointsForValidParents (client clientset.Interface , pods map [string ]* v1.Pod ) {
176191 for _ , pod := range pods {
192+ // This merely check that the last kubelet pod state thinks this pod was running. It's possible that
193+ // state is actually stale (only as recent as last successful contact with api-server). However, this
194+ // does contain the full podSpec -- so we can still attempt to checkpoint with this "last known good state".
195+ //
196+ // We do not use the `localPodRunning` state, because while the runtime may think the pod/containers are running -
197+ // they may actually be in a failing state - and we've not successfully sent that podStatus to any api-server.
177198 if ! isRunning (pod ) {
178199 continue
179200 }
@@ -187,6 +208,8 @@ func createCheckpointsForValidParents(client clientset.Interface, pods map[strin
187208
188209 cp , err = checkpointSecretVolumes (client , cp )
189210 if err != nil {
211+ //TODO(aaron): This can end up spamming logs at times when api-server is unavailable. To reduce spam
212+ // we could only log error if api-server can't be contacted and existing secret doesn't exist.
190213 glog .Errorf ("Failed to checkpoint secrets for pod %s: %v" , id , err )
191214 continue
192215 }
@@ -311,6 +334,28 @@ func getLocalParentPods() (map[string]*v1.Pod, error) {
311334 return podListToParentPods (& podList ), nil
312335}
313336
337+ // getLocalRunningPods uses the /runningpods/ kubelet api to retrieve the local container runtime pod state
338+ func getLocalRunningPods () (map [string ]* v1.Pod , error ) {
339+ // TODO(aaron): The kubelet api is currently secured by a self-signed cert. We should update this to actually verify at some point
340+ client := & http.Client {Transport : & http.Transport {TLSClientConfig : & tls.Config {InsecureSkipVerify : true }}}
341+ resp , err := client .Get (kubeletAPIRunningPodsURL )
342+ if err != nil {
343+ return nil , fmt .Errorf ("failed to contact kubelet pod api: %v" , err )
344+ }
345+
346+ defer resp .Body .Close ()
347+ b , err := ioutil .ReadAll (resp .Body )
348+ if err != nil {
349+ return nil , fmt .Errorf ("failed to read response: %v" , err )
350+ }
351+
352+ var podList v1.PodList
353+ if err := json .Unmarshal (b , & podList ); err != nil {
354+ return nil , fmt .Errorf ("failed to unmarshal podlist: %v" , err )
355+ }
356+ return podListToMap (& podList , filterNone ), nil
357+ }
358+
314359// checkpointSecretVolumes ensures that all pod secrets are checkpointed locally, then converts the secret volume to a hostpath.
315360func checkpointSecretVolumes (client clientset.Interface , pod * v1.Pod ) (* v1.Pod , error ) {
316361 for i := range pod .Spec .Volumes {
@@ -382,7 +427,7 @@ func handleStop(stop []string) {
382427 glog .Infof ("Stopping active checkpoint: %s" , id )
383428 p := PodFullNameToActiveCheckpointPath (id )
384429 if err := os .Remove (p ); err != nil {
385- if os .IsNotExist (err ) { // Sanity check (it's fine - just want to surface this if it's occuring )
430+ if os .IsNotExist (err ) { // Sanity check (it's fine - just want to surface this if it's occurring )
386431 glog .Warningf ("Attempted to remove active checkpoint, but manifest no longer exists: %s" , p )
387432 } else {
388433 glog .Errorf ("Failed to stop active checkpoint %s: %v" , p , err )
@@ -420,23 +465,31 @@ func isRunning(pod *v1.Pod) bool {
420465}
421466
422467func podListToParentPods (pl * v1.PodList ) map [string ]* v1.Pod {
468+ return podListToMap (pl , isValidParent )
469+ }
470+
471+ func filterNone (p * v1.Pod ) bool {
472+ return true
473+ }
474+
475+ type filterFn func (* v1.Pod ) bool
476+
477+ func podListToMap (pl * v1.PodList , filter filterFn ) map [string ]* v1.Pod {
423478 pods := make (map [string ]* v1.Pod )
424479 for i := range pl .Items {
425- pod , err := copyPod (& pl .Items [i ])
426- if err != nil {
427- glog .Errorf ("Failed to copy parent pod from podlist %s: %v" , PodFullName (& pl .Items [i ]), err )
480+ if ! filter (& pl .Items [i ]) {
428481 continue
429482 }
430483
484+ pod := & pl .Items [i ]
431485 id := PodFullName (pod )
432- if ! isValidParent (pod ) {
433- continue
434- }
435- if _ , ok := pods [id ]; ok { // sanity check (shouldn't ever happen)
486+
487+ if _ , ok := pods [id ]; ok { // TODO(aaron): likely not be necessary (shouldn't ever happen) - but sanity check
436488 glog .Warningf ("Found multiple local parent pods with same id: %s" , id )
437489 }
438- pods [ id ] = pod
490+
439491 // Pods from Kubelet API do not have TypeMeta populated - set it here either way.
492+ pods [id ] = pod
440493 pods [id ].TypeMeta = unversioned.TypeMeta {
441494 APIVersion : pl .APIVersion ,
442495 Kind : "Pod" ,
@@ -497,11 +550,10 @@ func PodFullNameToSecretPath(id string) string {
497550
498551func newClient () clientset.Interface {
499552 // This is run as a static pod, so we can't use InClusterConfig (no secrets/service account)
500- // Use the same kubeconfig as the kubelet for auth - but use the apiserver address populated as pod environment variables.
501- apiServerAddr := fmt .Sprintf ("https://%s:%s" , os .Getenv ("KUBERNETES_SERVICE_HOST" ), os .Getenv ("KUBERNETES_SERVICE_PORT_HTTPS" ))
553+ // Use the same kubeconfig as the kubelet for auth and api-server location.
502554 kubeConfig , err := clientcmd .NewNonInteractiveDeferredLoadingClientConfig (
503555 & clientcmd.ClientConfigLoadingRules {ExplicitPath : kubeconfigPath },
504- & clientcmd.ConfigOverrides {ClusterInfo : clientcmdapi. Cluster { Server : apiServerAddr } }).ClientConfig ()
556+ & clientcmd.ConfigOverrides {}).ClientConfig ()
505557 if err != nil {
506558 glog .Fatalf ("Failed to load kubeconfig: %v" , err )
507559 }
0 commit comments