@@ -10,166 +10,238 @@ import (
1010 "k8s.io/client-go/kubernetes"
1111)
1212
13+ type k8sClients []k8sClient
14+
1315type k8sClient interface {
16+ Stop (* kubernetes.Clientset ) error
1417 Restart (* kubernetes.Clientset ) error
1518}
1619
20+ type withNoStopClient struct {
21+ k8sClient
22+ }
23+
24+ type withNoRestartClient struct {
25+ k8sClient
26+ }
27+
28+ type pod struct {
29+ namespace string
30+ nodename string
31+ app string
32+ }
33+
1734// An operand is a GPU client that is controlled by a deploy label.
1835type operand struct {
19- app string
36+ pod
2037 deployLabel string
2138 lastValue string
2239}
2340
24- func stopK8sClients (clientset * kubernetes.Clientset ) ([]k8sClient , error ) {
25- // TODO: We need to add this namespace to the options.
26- gpuClientNamespace := "nvidia-gpu-operator"
27-
28- var k8sGPUClients []k8sClient
29-
30- // We first optionally stop the operands managed by the operator:
31- var operands = []* operand {
32- {
33- app : "nvidia-device-plugin-daemonset" ,
41+ func getK8sClients (opts * reconfigureMIGOptions ) k8sClients {
42+ k8sGPUClients := k8sClients {
43+ & operand {
44+ pod : pod {
45+ namespace : opts .GPUClientsNamespace ,
46+ nodename : opts .NodeName ,
47+ app : "nvidia-device-plugin-daemonset" ,
48+ },
3449 deployLabel : "nvidia.com/gpu.deploy.device-plugin" ,
3550 },
36- {
37- app : "gpu-feature-discovery" ,
51+ & operand {
52+ pod : pod {
53+ namespace : opts .GPUClientsNamespace ,
54+ nodename : opts .NodeName ,
55+ app : "gpu-feature-discovery" ,
56+ },
3857 deployLabel : "nvidia.com/gpu.deploy.gpu-feature-discovery" ,
3958 },
40- {
41- app : "nvidia-dcgm-exporter" ,
59+ & operand {
60+ pod : pod {
61+ namespace : opts .GPUClientsNamespace ,
62+ nodename : opts .NodeName ,
63+ app : "nvidia-dcgm-exporter" ,
64+ },
4265 deployLabel : "nvidia.com/gpu.deploy.dcgm-exporter" ,
4366 },
44- {
45- app : "nvidia-dcgm" ,
67+ & operand {
68+ pod : pod {
69+ namespace : opts .GPUClientsNamespace ,
70+ nodename : opts .NodeName ,
71+ app : "nvidia-dcgm" ,
72+ },
4673 deployLabel : "nvidia.com/gpu.deploy.dcgm" ,
4774 },
48- {
75+ & operand {
4976 // TODO: Why don't we wait for the following pd deletion.
50- app : "" ,
77+ pod : pod {
78+ namespace : opts .GPUClientsNamespace ,
79+ nodename : opts .NodeName ,
80+ app : "" ,
81+ },
5182 deployLabel : "nvidia.com/gpu.deploy.nvsm" ,
5283 },
84+ withNoRestart (& pod {
85+ namespace : opts .GPUClientsNamespace ,
86+ nodename : opts .NodeName ,
87+ app : "nvidia-cuda-validator" ,
88+ }),
89+ withNoRestart (& pod {
90+ namespace : opts .GPUClientsNamespace ,
91+ nodename : opts .NodeName ,
92+ app : "nvidia-device-plugin-validator" ,
93+ skipRestart : true ,
94+ }),
95+ withNoStop (& pod {
96+ namespace : opts .GPUClientsNamespace ,
97+ nodename : opts .NodeName ,
98+ app : "nvidia-operator-validator" ,
99+ }),
53100 }
54101
55- pods := []* pod {
56- {
57- app : "nvidia-cuda-validator" ,
58- },
59- {
60- app : "nvidia-device-plugin-validator" ,
61- },
62- }
102+ return k8sGPUClients
103+ }
63104
64- for _ , o := range operands {
65- value , err := getNodeLabelValue ( clientset , o . deployLabel )
66- if err ! = nil {
67- return nil , fmt . Errorf ( "unable to get the value of the %q label: %w" , o . deployLabel , err )
105+ func ( o k8sClients ) Restart ( clientset * kubernetes. Clientset ) error {
106+ for _ , c := range o {
107+ if c = = nil {
108+ continue
68109 }
69- o .lastValue = value
70- // Only set 'paused-*' if the current value is not 'false'.
71- // It should only be 'false' if some external entity has forced it to
72- // this value, at which point we want to honor it's existing value and
73- // not change it.
74- if value != "false" {
75- if err := setNodeLabelValue (clientset , o .deployLabel , "paused-for-mig-change" ); err != nil {
76- return nil , err
77- }
110+ if err := c .Restart (clientset ); err != nil {
111+ return err
78112 }
79- k8sGPUClients = append (k8sGPUClients , o )
80113 }
114+ return nil
115+ }
81116
82- for _ , o := range operands {
83- if o .app == "" {
117+ func (o k8sClients ) Stop (clientset * kubernetes.Clientset ) error {
118+ for _ , c := range o {
119+ if c == nil {
84120 continue
85121 }
86-
87- // Wait for the nvidia-device-plugin-daemonset pods to be deleted
88- timeout := 5 * time .Minute
89- ctx , cancel := context .WithTimeout (context .Background (), timeout )
90- defer cancel ()
91-
92- watcher , err := clientset .CoreV1 ().Pods (gpuClientNamespace ).Watch (ctx , metav1.ListOptions {
93- FieldSelector : fmt .Sprintf ("spec.nodeName=%s" , nodeNameFlag ),
94- LabelSelector : "app=nvidia-device-plugin-daemonset" ,
95- })
96- if err != nil {
97- return nil , fmt .Errorf ("unable to watch pods for deletion: %w" , err )
98- }
99- defer watcher .Stop ()
100-
101- // Wait for all matching pods to be deleted
102- for {
103- select {
104- case <- ctx .Done ():
105- return nil , fmt .Errorf ("timeout waiting for pod deletion: %w" , ctx .Err ())
106- case event := <- watcher .ResultChan ():
107- if event .Type == watch .Deleted {
108- // Check if there are any remaining pods matching our criteria
109- pods , err := clientset .CoreV1 ().Pods (gpuClientNamespace ).List (ctx , metav1.ListOptions {
110- FieldSelector : fmt .Sprintf ("spec.nodeName=%s" , nodeNameFlag ),
111- LabelSelector : "app=nvidia-device-plugin-daemonset" ,
112- })
113- if err != nil {
114- return nil , fmt .Errorf ("unable to list pods: %w" , err )
115- }
116- if len (pods .Items ) == 0 {
117- // All pods have been deleted
118- break
119- }
120- }
121- }
122- }
123-
124- }
125-
126- for _ , p := range pods {
127- err := clientset .CoreV1 ().Pods (gpuClientNamespace ).DeleteCollection (
128- context .TODO (),
129- metav1.DeleteOptions {},
130- metav1.ListOptions {
131- FieldSelector : fmt .Sprintf ("spec.nodeName=%s" , nodeNameFlag ),
132- LabelSelector : fmt .Sprintf ("app=%s" , p .app ),
133- },
134- )
135- if err != nil {
136- return nil , fmt .Errorf ("unable to delete pods for app %s: %w" , p .app , err )
122+ if err := c .Stop (clientset ); err != nil {
123+ return err
137124 }
138125 }
126+ return nil
127+ }
139128
140- k8sGPUClients = append (k8sGPUClients , & pod {app : "nvidia-operator-validator" })
129+ func withNoRestart (k k8sClient ) k8sClient {
130+ return & withNoRestartClient {k }
131+ }
141132
142- return k8sGPUClients , nil
133+ func withNoStop (k k8sClient ) k8sClient {
134+ return & withNoStopClient {k }
143135}
144136
145- func (c * operand ) Restart (clientset * kubernetes.Clientset ) error {
146- if c .deployLabel == "" || c .lastValue == "false" {
147- return nil
148- }
149- err := setNodeLabelValue (clientset , c .deployLabel , "true" )
150- if err != nil {
151- return fmt .Errorf ("unable to restart operand %q: %w" , c .app , err )
152- }
137+ func (o * withNoRestartClient ) Restart (_ * kubernetes.Clientset ) error {
153138 return nil
154139}
155140
156- type pod struct {
157- app string
141+ func ( o * withNoStopClient ) Stop ( _ * kubernetes. Clientset ) error {
142+ return nil
158143}
159144
160- func (p * pod ) Restart (clientset * kubernetes.Clientset ) error {
145+ func (o * pod ) Restart (clientset * kubernetes.Clientset ) error {
161146 // TODO: We need to add this namespace to the options.
162- gpuClientNamespace := "nvidia-gpu-operator"
163- err := clientset .CoreV1 ().Pods (gpuClientNamespace ).DeleteCollection (
147+ err := clientset .CoreV1 ().Pods (o .namespace ).DeleteCollection (
148+ context .TODO (),
149+ metav1.DeleteOptions {},
150+ metav1.ListOptions {
151+ FieldSelector : fmt .Sprintf ("spec.nodeName=%s" , o .nodename ),
152+ LabelSelector : fmt .Sprintf ("app=%s" , o .app ),
153+ },
154+ )
155+ if err != nil {
156+ return fmt .Errorf ("unable to delete pods for app %s: %w" , o .app , err )
157+ }
158+ return nil
159+ }
160+
161+ func (o * pod ) Stop (clientset * kubernetes.Clientset ) error {
162+ if o .app == "" {
163+ return nil
164+ }
165+ err := clientset .CoreV1 ().Pods (o .namespace ).DeleteCollection (
164166 context .TODO (),
165167 metav1.DeleteOptions {},
166168 metav1.ListOptions {
167- FieldSelector : fmt .Sprintf ("spec.nodeName=%s" , nodeNameFlag ),
168- LabelSelector : fmt .Sprintf ("app=%s" , p .app ),
169+ FieldSelector : fmt .Sprintf ("spec.nodeName=%s" , o . nodename ),
170+ LabelSelector : fmt .Sprintf ("app=%s" , o .app ),
169171 },
170172 )
171173 if err != nil {
172- return fmt .Errorf ("unable to delete pods for app %s: %w" , p .app , err )
174+ return fmt .Errorf ("unable to delete pods for app %s: %w" , o .app , err )
173175 }
174176 return nil
175177}
178+
179+ func (o * pod ) waitForDeletion (clientset * kubernetes.Clientset ) error {
180+ if o .app == "" {
181+ return nil
182+ }
183+ timeout := 5 * time .Minute
184+ ctx , cancel := context .WithTimeout (context .Background (), timeout )
185+ defer cancel ()
186+
187+ watcher , err := clientset .CoreV1 ().Pods (o .namespace ).Watch (ctx , metav1.ListOptions {
188+ FieldSelector : fmt .Sprintf ("spec.nodeName=%s" , o .nodename ),
189+ LabelSelector : fmt .Sprintf ("app=%s" , o .app ),
190+ })
191+ if err != nil {
192+ return fmt .Errorf ("unable to watch pods for deletion: %w" , err )
193+ }
194+ defer watcher .Stop ()
195+
196+ // Wait for all matching pods to be deleted
197+ for {
198+ select {
199+ case <- ctx .Done ():
200+ return fmt .Errorf ("timeout waiting for pod deletion: %w" , ctx .Err ())
201+ case event := <- watcher .ResultChan ():
202+ if event .Type == watch .Deleted {
203+ // Check if there are any remaining pods matching our criteria
204+ pods , err := clientset .CoreV1 ().Pods (o .namespace ).List (ctx , metav1.ListOptions {
205+ FieldSelector : fmt .Sprintf ("spec.nodeName=%s" , o .nodename ),
206+ LabelSelector : fmt .Sprintf ("app=%s" , o .app ),
207+ })
208+ if err != nil {
209+ return fmt .Errorf ("unable to list pods: %w" , err )
210+ }
211+ if len (pods .Items ) == 0 {
212+ // All pods have been deleted
213+ break
214+ }
215+ }
216+ }
217+ }
218+ }
219+
220+ func (o * operand ) Restart (clientset * kubernetes.Clientset ) error {
221+ if o .deployLabel == "" || o .lastValue == "false" {
222+ return nil
223+ }
224+ err := setNodeLabelValue (clientset , o .deployLabel , "true" )
225+ if err != nil {
226+ return fmt .Errorf ("unable to restart operand %q: %w" , o .app , err )
227+ }
228+ return nil
229+ }
230+
231+ func (o * operand ) Stop (clientset * kubernetes.Clientset ) error {
232+ value , err := getNodeLabelValue (clientset , o .deployLabel )
233+ if err != nil {
234+ return fmt .Errorf ("unable to get the value of the %q label: %w" , o .deployLabel , err )
235+ }
236+ o .lastValue = value
237+ // Only set 'paused-*' if the current value is not 'false'.
238+ // It should only be 'false' if some external entity has forced it to
239+ // this value, at which point we want to honor it's existing value and
240+ // not change it.
241+ if value != "false" {
242+ if err := setNodeLabelValue (clientset , o .deployLabel , "paused-for-mig-change" ); err != nil {
243+ return err
244+ }
245+ }
246+ return o .waitForDeletion (clientset )
247+ }
0 commit comments