Skip to content

Commit 925ee98

Browse files
authored
Merge pull request kubernetes-sigs#428 from kuba-wolf/master
Add prestop hook for graceful termination
2 parents 43954a0 + e6a71c9 commit 925ee98

17 files changed

+1428
-77
lines changed

Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,9 @@ generate-kustomize: bin/helm
138138
cd charts/aws-fsx-csi-driver && ../../bin/helm template kustomize . -s templates/clusterrole-csi-node.yaml > ../../deploy/kubernetes/base/clusterrole-csi-node.yaml
139139
cd charts/aws-fsx-csi-driver && ../../bin/helm template kustomize . -s templates/clusterrolebinding-csi-node.yaml > ../../deploy/kubernetes/base/clusterrolebinding-csi-node.yaml
140140

141+
$(MAKE) remove-namespace-kustomize-files
142+
143+
.PHONY: remove-namespace-kustomize-files
144+
remove-namespace-kustomize-files:
145+
ls deploy/kubernetes/base/* | grep -v 'kustomization\.yaml' | xargs sed -i '/namespace:/d'
141146

charts/aws-fsx-csi-driver/templates/clusterrole-csi-node.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,12 @@ rules:
99
- apiGroups: [""]
1010
resources: ["nodes"]
1111
verbs: ["get", "patch"]
12+
- apiGroups: [ "" ]
13+
resources: [ "pods" ]
14+
verbs: [ "get", "list", "watch" ]
15+
- apiGroups: [ "" ]
16+
resources: [ "persistentvolumes" ]
17+
verbs: [ "get" ]
18+
- apiGroups: [ "" ]
19+
resources: [ "persistentvolumeclaims" ]
20+
verbs: [ "get"]

charts/aws-fsx-csi-driver/templates/clusterrolebinding-csi-node.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ roleRef:
1313
kind: ClusterRole
1414
name: fsx-csi-node-role
1515
apiGroup: rbac.authorization.k8s.io
16+

charts/aws-fsx-csi-driver/templates/node-daemonset.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ spec:
3535
dnsConfig: {{- toYaml . | nindent 8 }}
3636
{{- end }}
3737
serviceAccountName: {{ .Values.node.serviceAccount.name }}
38+
terminationGracePeriodSeconds: {{ .Values.node.terminationGracePeriodSeconds }}
3839
priorityClassName: system-node-critical
3940
tolerations:
4041
{{- if .Values.node.tolerateAllTaints }}
@@ -85,6 +86,10 @@ spec:
8586
timeoutSeconds: 3
8687
periodSeconds: 2
8788
failureThreshold: 5
89+
lifecycle:
90+
preStop:
91+
exec:
92+
command: [ "/bin/aws-fsx-csi-driver", "pre-stop-hook" ]
8893
{{- with .Values.node.resources }}
8994
resources:
9095
{{- toYaml . | nindent 12 }}

charts/aws-fsx-csi-driver/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ node:
8888
name: fsx-csi-node-sa
8989
annotations: {}
9090
podAnnotations: {}
91+
terminationGracePeriodSeconds: 30
9192
tolerateAllTaints: true
9293
tolerations:
9394
- operator: Exists

cmd/hooks/prestop.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// Copyright 2025 The Kubernetes Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the 'License');
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an 'AS IS' BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package hooks
16+
17+
import (
18+
"context"
19+
"fmt"
20+
v1 "k8s.io/api/core/v1"
21+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22+
"k8s.io/client-go/informers"
23+
"k8s.io/client-go/kubernetes"
24+
"k8s.io/client-go/tools/cache"
25+
"k8s.io/klog/v2"
26+
"os"
27+
"sigs.k8s.io/aws-fsx-csi-driver/pkg/driver"
28+
)
29+
30+
/*
31+
When a node is terminated without first unmounting clients, this leaves stranded clients server side, which in turn need to be
32+
evicted by the Lustre filesystem.
33+
34+
This PreStop lifecycle hook aims to ensure that before the node (and the CSI driver node pod running on it) is shut down,
35+
there are no more pods with PVCs on the node, thereby indicating that all volumes have been successfully unmounted and detached.
36+
37+
No unnecessary delay is added to the termination workflow, as the PreStop hook logic is only executed when the node is being drained
38+
(thus preventing delays in termination where the node pod is killed due to a rolling restart, or during driver upgrades, but the workload pods are expected to be running).
39+
If the PreStop hook hangs during its execution, the driver node pod will be forcefully terminated after terminationGracePeriodSeconds.
40+
*/
41+
42+
const clusterAutoscalerTaint = "ToBeDeletedByClusterAutoscaler"
43+
const v1KarpenterTaint = "karpenter.sh/disrupted"
44+
const v1beta1KarpenterTaint = "karpenter.sh/disruption"
45+
46+
// drainTaints includes taints used by K8s or autoscalers that signify node draining or pod eviction
47+
var drainTaints = map[string]struct{}{
48+
v1.TaintNodeUnschedulable: {}, // Kubernetes common eviction taint (kubectl drain)
49+
clusterAutoscalerTaint: {},
50+
v1KarpenterTaint: {},
51+
v1beta1KarpenterTaint: {},
52+
}
53+
54+
func PreStop(clientset kubernetes.Interface) error {
55+
klog.InfoS("PreStop: executing PreStop lifecycle hook")
56+
57+
nodeName := os.Getenv("CSI_NODE_NAME")
58+
if nodeName == "" {
59+
return fmt.Errorf("PreStop: CSI_NODE_NAME missing")
60+
}
61+
62+
node, err := fetchNode(clientset, nodeName)
63+
if err != nil {
64+
return err
65+
}
66+
67+
if isNodeBeingDrained(node) {
68+
klog.InfoS("PreStop: node is being drained, checking for remaining pods with PVCs", "node", nodeName)
69+
return waitForPodShutdowns(clientset, nodeName)
70+
}
71+
72+
klog.InfoS("PreStop: node is not being drained, skipping pods check", "node", nodeName)
73+
return nil
74+
}
75+
76+
func fetchNode(clientset kubernetes.Interface, nodeName string) (*v1.Node, error) {
77+
node, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
78+
if err != nil {
79+
return nil, fmt.Errorf("fetchNode: failed to retrieve node information: %w", err)
80+
}
81+
return node, nil
82+
}
83+
84+
// isNodeBeingDrained returns true if node resource has a known drain/eviction taint.
85+
func isNodeBeingDrained(node *v1.Node) bool {
86+
for _, taint := range node.Spec.Taints {
87+
if _, isDrainTaint := drainTaints[taint.Key]; isDrainTaint {
88+
return true
89+
}
90+
}
91+
return false
92+
}
93+
94+
func waitForPodShutdowns(clientset kubernetes.Interface, nodeName string) error {
95+
allVolumesUnmounted := make(chan struct{})
96+
97+
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0,
98+
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
99+
options.FieldSelector = fmt.Sprintf("spec.nodeName=%s", nodeName)
100+
}))
101+
informer := factory.Core().V1().Pods().Informer()
102+
103+
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
104+
DeleteFunc: func(obj interface{}) {
105+
klog.V(5).InfoS("DeleteFunc: Pod deleted", "node", nodeName)
106+
if err := checkActivePods(clientset, nodeName, allVolumesUnmounted); err != nil {
107+
klog.ErrorS(err, "DeleteFunc: error checking active pods on the node")
108+
}
109+
110+
},
111+
UpdateFunc: func(oldObj, newObj interface{}) {
112+
klog.V(5).InfoS("UpdateFunc: Pod updated", "node", nodeName)
113+
if err := checkActivePods(clientset, nodeName, allVolumesUnmounted); err != nil {
114+
klog.ErrorS(err, "UpdateFunc: error checking active pods on the node")
115+
}
116+
},
117+
})
118+
if err != nil {
119+
return fmt.Errorf("failed to add event handler to Node informer: %w", err)
120+
}
121+
122+
go informer.Run(allVolumesUnmounted)
123+
124+
if err := checkActivePods(clientset, nodeName, allVolumesUnmounted); err != nil {
125+
klog.ErrorS(err, "waitForPodShutdowns: error checking active pods on the node")
126+
}
127+
128+
<-allVolumesUnmounted
129+
klog.InfoS("waitForPodShutdowns: finished waiting for active pods on the node. preStopHook completed")
130+
return nil
131+
}
132+
133+
func checkActivePods(clientset kubernetes.Interface, nodeName string, allVolumesUnmounted chan struct{}) error {
134+
podList, err := clientset.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
135+
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),
136+
})
137+
if err != nil {
138+
return fmt.Errorf("checkActivePods: failed to get podList: %w", err)
139+
}
140+
for _, pod := range podList.Items {
141+
// Temporary workaround until FieldSelector filters properly: https://github.com/kubernetes/client-go/issues/1350
142+
if pod.Spec.NodeName == nodeName {
143+
for _, vol := range pod.Spec.Volumes {
144+
145+
if vol.PersistentVolumeClaim == nil {
146+
continue
147+
}
148+
149+
pvcName := vol.PersistentVolumeClaim.ClaimName
150+
pvc, err := clientset.CoreV1().PersistentVolumeClaims(pod.Namespace).Get(context.Background(), pvcName, metav1.GetOptions{})
151+
if err != nil {
152+
return fmt.Errorf("checkActivePods: failed to get pv %s: %w", pvcName, err)
153+
}
154+
pvName := pvc.Spec.VolumeName
155+
pv, err := clientset.CoreV1().PersistentVolumes().Get(context.Background(), pvName, metav1.GetOptions{})
156+
157+
if err != nil {
158+
return fmt.Errorf("checkActivePods: failed to get pv %s: %w", pvName, err)
159+
}
160+
161+
if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == driver.DriverName {
162+
klog.InfoS("checkActivePods: not ready to exit, found PV associated with pod", "PV", pvName, "node", nodeName)
163+
return nil
164+
}
165+
}
166+
}
167+
}
168+
169+
close(allVolumesUnmounted)
170+
klog.V(5).Info("checkActivePods: no pods associated with PVCs identified", "node", nodeName)
171+
return nil
172+
}

0 commit comments

Comments
 (0)