@@ -12,6 +12,7 @@ import (
1212 "path"
1313 "path/filepath"
1414 "strings"
15+ "syscall"
1516 "time"
1617
1718 "github.com/golang/glog"
@@ -26,7 +27,9 @@ import (
2627)
2728
2829const (
29- nodeNameEnv = "NODE_NAME"
30+ nodeNameEnv = "NODE_NAME"
31+ podNameEnv = "POD_NAME"
32+ podNamespaceEnv = "POD_NAMESPACE"
3033
3134 // We must use both the :10255/pods and :10250/runningpods/ endpoints, because /pods endpoint could have stale data.
3235 // The /pods endpoint will only show the last cached status which has successfully been written to an apiserver.
@@ -48,17 +51,63 @@ const (
4851 podSourceFile = "file"
4952)
5053
51- //TODO(aaron): The checkpointer should know how to GC itself because it runs as a static pod.
54+ var (
55+ lockfilePath string
5256
53- func main () {
57+ // TODO(yifan): Put these into a struct when necessary.
58+ nodeName string
59+ podName string
60+ podNamespace string
61+ )
62+
63+ func init () {
64+ flag .StringVar (& lockfilePath , "lock-file" , "/var/run/lock/pod-checkpointer.lock" , "The path to lock file for checkpointer to use" )
5465 flag .Set ("logtostderr" , "true" )
55- flag .Parse ()
56- defer glog .Flush ()
66+ }
67+
68+ // flock tries to grab a flock on the given path.
69+ // If the lock is already acquired by other process, the function will block.
70+ // TODO(yifan): Maybe replace this with kubernetes/pkg/util/flock.Acquire() once
71+ // https://github.com/kubernetes/kubernetes/issues/42929 is solved, or maybe not.
72+ func flock (path string ) error {
73+ fd , err := syscall .Open (path , syscall .O_CREAT | syscall .O_RDWR , 0600 )
74+ if err != nil {
75+ return err
76+ }
77+
78+ // We don't need to close the fd since we should hold
79+ // it until the process exits.
80+
81+ return syscall .Flock (fd , syscall .LOCK_EX )
82+ }
5783
58- nodeName := os .Getenv (nodeNameEnv )
84+ // readDownwardAPI fills the node name, pod name, and pod namespace.
85+ func readDownwardAPI () {
86+ nodeName = os .Getenv (nodeNameEnv )
5987 if nodeName == "" {
6088 glog .Fatalf ("Missing required environment variable: %s" , nodeNameEnv )
6189 }
90+ podName = os .Getenv (podNameEnv )
91+ if podName == "" {
92+ glog .Fatalf ("Missing required environment variable: %s" , podNameEnv )
93+ }
94+ podNamespace = os .Getenv (podNamespaceEnv )
95+ if podNamespace == "" {
96+ glog .Fatalf ("Missing required environment variable: %s" , podNamespaceEnv )
97+ }
98+ }
99+
100+ func main () {
101+ flag .Parse ()
102+ defer glog .Flush ()
103+
104+ readDownwardAPI ()
105+
106+ glog .Infof ("Trying to acquire the flock at %q" , lockfilePath )
107+
108+ if err := flock (lockfilePath ); err != nil {
109+ glog .Fatalf ("Error when acquiring the flock: %v" , err )
110+ }
62111
63112 glog .Infof ("Starting checkpointer for node: %s" , nodeName )
64113 // This is run as a static pod, so we can't use InClusterConfig (no secrets/service account)
@@ -104,10 +153,10 @@ func main() {
104153 },
105154 }
106155
107- run (client , kubelet , nodeName )
156+ run (client , kubelet )
108157}
109158
110- func run (client clientset.Interface , kubelet * kubeletClient , nodeName string ) {
159+ func run (client clientset.Interface , kubelet * kubeletClient ) {
111160 for {
112161 time .Sleep (3 * time .Second )
113162
@@ -138,9 +187,12 @@ func run(client clientset.Interface, kubelet *kubeletClient, nodeName string) {
138187 inactiveCheckpoints := getFileCheckpoints (inactiveCheckpointPath )
139188
140189 start , stop , remove := process (localRunningPods , localParentPods , apiParentPods , activeCheckpoints , inactiveCheckpoints )
141- handleRemove (remove )
190+
191+ // Handle remove at last because we may still have some work to do
192+ // before removing the checkpointer itself.
142193 handleStop (stop )
143194 handleStart (start )
195+ handleRemove (remove )
144196 }
145197}
146198
@@ -157,6 +209,10 @@ func run(client clientset.Interface, kubelet *kubeletClient, nodeName string) {
157209// The removal of a checkpoint means its parent is no longer scheduled to this node, and we need to GC active / inactive
158210// checkpoints as well as any secrets / configMaps which are no longer necessary.
159211func process (localRunningPods , localParentPods , apiParentPods , activeCheckpoints , inactiveCheckpoints map [string ]* v1.Pod ) (start , stop , remove []string ) {
212+ // If this variable is filled, then it means we need to remove the pod-checkpointer's checkpoint.
213+ // We treat the pod-checkpointer's checkpoint specially because we want to put it at the end of
214+ // the remove queue.
215+ var podCheckpointerID string
160216
161217 // We can only make some GC decisions if we've successfully contacted an apiserver.
162218 // When apiParentPods == nil, that means we were not able to get an updated list of pods.
@@ -183,7 +239,13 @@ func process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints
183239 // However, we don't know this, and as far as the checkpointer is concerned - that pod is no longer scheduled to this node.
184240 if _ , ok := apiParentPods [id ]; ! ok {
185241 glog .V (4 ).Infof ("API GC: should remove inactive checkpoint %s" , id )
186- removeMap [id ] = struct {}{}
242+
243+ if isPodCheckpointer (inactiveCheckpoints [id ]) {
244+ podCheckpointerID = id
245+ } else {
246+ removeMap [id ] = struct {}{}
247+ }
248+
187249 delete (inactiveCheckpoints , id )
188250 }
189251 }
@@ -193,26 +255,37 @@ func process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints
193255 // If the active checkpoint does not have a parent in the api-server, we must assume it should no longer be running on this node.
194256 if _ , ok := apiParentPods [id ]; ! ok {
195257 glog .V (4 ).Infof ("API GC: should remove active checkpoint %s" , id )
196- removeMap [id ] = struct {}{}
258+
259+ if isPodCheckpointer (activeCheckpoints [id ]) {
260+ podCheckpointerID = id
261+ } else {
262+ removeMap [id ] = struct {}{}
263+ }
264+
197265 delete (activeCheckpoints , id )
198266 }
199267 }
200268 }
201269
202270 // Can make decisions about starting/stopping checkpoints just with local state.
203-
204- // If there is an inactive checkpoint, and no parent is running, start the checkpoint
271+ //
272+ // If there is an inactive checkpoint, and no parent pod is running, or the checkpoint
273+ // is the pod-checkpointer, start the checkpoint.
205274 for id := range inactiveCheckpoints {
206- if _ , ok := localRunningPods [id ]; ! ok {
275+ _ , ok := localRunningPods [id ]
276+ if ! ok || isPodCheckpointer (inactiveCheckpoints [id ]) {
207277 glog .V (4 ).Infof ("Should start checkpoint %s" , id )
208278 start = append (start , id )
209279 }
210280 }
211281
212- // If there is an active checkpoint and a running pod, stop the active checkpoint
213- // The parent may not be in a running state, but the kubelet is trying to start it so we should get out of the way.
282+ // If there is an active checkpoint and a running parent pod, stop the active checkpoint
283+ // unless this is the pod-checkpointer.
284+ // The parent may not be in a running state, but the kubelet is trying to start it
285+ // so we should get out of the way.
214286 for id := range activeCheckpoints {
215- if _ , ok := localRunningPods [id ]; ok {
287+ _ , ok := localRunningPods [id ]
288+ if ok && ! isPodCheckpointer (activeCheckpoints [id ]) {
216289 glog .V (4 ).Infof ("Should stop checkpoint %s" , id )
217290 stop = append (stop , id )
218291 }
@@ -222,6 +295,9 @@ func process(localRunningPods, localParentPods, apiParentPods, activeCheckpoints
222295 for k := range removeMap {
223296 remove = append (remove , k )
224297 }
298+ if podCheckpointerID != "" {
299+ remove = append (remove , podCheckpointerID )
300+ }
225301
226302 return start , stop , remove
227303}
@@ -295,6 +371,17 @@ func writeCheckpointManifest(pod *v1.Pod) error {
295371 return writeAndAtomicRename (path , b , 0644 )
296372}
297373
374+ // isPodCheckpointer returns true if the manifest is the pod checkpointer (has the same name as the parent).
375+ // For example, the pod.Name would be "pod-checkpointer".
376+ // The podName would be "pod-checkpointer" or "pod-checkpointer-172.17.4.201" where
377+ // "172.17.4.201" is the nodeName.
378+ func isPodCheckpointer (pod * v1.Pod ) bool {
379+ if pod .Namespace != podNamespace {
380+ return false
381+ }
382+ return pod .Name == strings .TrimSuffix (podName , "-" + nodeName )
383+ }
384+
298385func sanitizeCheckpointPod (cp * v1.Pod ) (* v1.Pod , error ) {
299386 // Clear ObjectMeta except for name/namespace
300387 // NOTE(aaron): If we want to keep labels, we need to add a new label so the static pod
@@ -497,29 +584,35 @@ func checkpointConfigMap(client clientset.Interface, namespace, podName, configM
497584
498585func handleRemove (remove []string ) {
499586 for _ , id := range remove {
500- // Remove inactive checkpoints
501587 glog .Infof ("Removing checkpoint of: %s" , id )
502- p := PodFullNameToInactiveCheckpointPath (id )
503- if err := os .Remove (p ); err != nil && ! os .IsNotExist (err ) {
504- glog .Errorf ("Failed to remove inactive checkpoint %s: %v" , p , err )
505- continue
506- }
507- // Remove active checkpoints
508- p = PodFullNameToActiveCheckpointPath (id )
509- if err := os .Remove (p ); err != nil && ! os .IsNotExist (err ) {
510- glog .Errorf ("Failed to remove active checkpoint %s: %v" , p , err )
511- continue
512- }
588+
513589 // Remove Secrets
514- p = PodFullNameToSecretPath (id )
590+ p : = PodFullNameToSecretPath (id )
515591 if err := os .RemoveAll (p ); err != nil {
516592 glog .Errorf ("Failed to remove pod secrets from %s: %s" , p , err )
517593 }
594+
518595 // Remove ConfipMaps
519596 p = PodFullNameToConfigMapPath (id )
520597 if err := os .RemoveAll (p ); err != nil {
521598 glog .Errorf ("Failed to remove pod configMaps from %s: %s" , p , err )
522599 }
600+
601+ // Remove inactive checkpoints
602+ p = PodFullNameToInactiveCheckpointPath (id )
603+ if err := os .Remove (p ); err != nil && ! os .IsNotExist (err ) {
604+ glog .Errorf ("Failed to remove inactive checkpoint %s: %v" , p , err )
605+ continue
606+ }
607+
608+ // Remove active checkpoints.
609+ // We do this as the last step because we want to clean up
610+ // resources before the checkpointer itself exits.
611+ p = PodFullNameToActiveCheckpointPath (id )
612+ if err := os .Remove (p ); err != nil && ! os .IsNotExist (err ) {
613+ glog .Errorf ("Failed to remove active checkpoint %s: %v" , p , err )
614+ continue
615+ }
523616 }
524617}
525618
0 commit comments