Skip to content

Commit 8a738df

Browse files
committed
update code for recover stale volume
Signed-off-by: Ashima-Ashima1 <[email protected]>
1 parent 8b2242a commit 8a738df

File tree

3 files changed

+185
-100
lines changed

3 files changed

+185
-100
lines changed

controllers/recoverstalevolume_controller.go

Lines changed: 156 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@ import (
2222
"context"
2323
"io"
2424
"strings"
25-
"time"
2625

2726
objectdriverv1alpha1 "github.com/IBM/ibm-object-csi-driver-operator/api/v1alpha1"
28-
appsv1 "k8s.io/api/apps/v1"
27+
"github.com/IBM/ibm-object-csi-driver-operator/controllers/internal/crutils"
28+
"github.com/IBM/ibm-object-csi-driver-operator/controllers/syncer"
29+
"github.com/IBM/ibm-object-csi-driver-operator/controllers/util"
30+
"github.com/IBM/ibm-object-csi-driver-operator/pkg/config"
31+
"github.com/go-logr/logr"
32+
"golang.org/x/exp/maps"
2933
corev1 "k8s.io/api/core/v1"
3034
k8serr "k8s.io/apimachinery/pkg/api/errors"
3135
"k8s.io/apimachinery/pkg/runtime"
32-
"k8s.io/apimachinery/pkg/types"
3336
"k8s.io/client-go/kubernetes"
3437
"k8s.io/client-go/rest"
3538
ctrl "sigs.k8s.io/controller-runtime"
@@ -50,9 +53,6 @@ type KubernetesClient struct {
5053
}
5154

5255
var staleVolLog = logf.Log.WithName("recoverstalevolume_controller")
53-
var reconcileTime = 2 * time.Minute
54-
var csiNodePodPrefix = "ibm-object-csi-node"
55-
var transportEndpointError = "transport endpoint is not connected"
5656
var kubeClient = createK8sClient
5757

5858
//+kubebuilder:rbac:groups=objectdriver.csi.ibm.com,resources=recoverstalevolumes,verbs=get;list;watch;create;update;patch;delete
@@ -88,37 +88,63 @@ func (r *RecoverStaleVolumeReconciler) Reconcile(ctx context.Context, req ctrl.R
8888

8989
var logTailLines = instance.Spec.NoOfLogLines
9090
if logTailLines == 0 {
91-
logTailLines = int64(300)
91+
logTailLines = int64(config.DefaultLogTailLines)
9292
}
9393
reqLogger.Info("Tail Log Lines to fetch", "number", logTailLines)
9494

95-
for _, data := range instance.Spec.Deployment {
96-
deploymentName := data.DeploymentName
97-
deploymentNamespace := data.DeploymentNamespace
98-
reqLogger.Info("Requested Workload to watch", "deployment-name", deploymentName, "deployment-namespace", deploymentNamespace)
95+
nsData := instance.Spec.Data
96+
if len(nsData) == 0 {
97+
reqLogger.Info("No Data Found")
98+
}
9999

100-
// If workload not found, reconcile
101-
if deploymentName == "" {
102-
return ctrl.Result{RequeueAfter: reconcileTime}, nil
100+
for _, data := range nsData {
101+
namespace := data.Namespace
102+
deployments := util.Remove(data.Deployments, "")
103+
// If namespace is not set, use `default` ns
104+
if namespace == "" {
105+
namespace = config.DefaultNamespace
103106
}
107+
reqLogger.Info("Data Requested", "namespace", namespace, "deployments", deployments)
104108

105-
// If namespace is not set, use `default` ns
106-
if deploymentNamespace == "" {
107-
deploymentNamespace = "default"
109+
k8sOps := &crutils.K8sResourceOps{
110+
Client: r.Client,
111+
Ctx: ctx,
108112
}
109113

110-
// Fetch deployment
111-
deployment := &appsv1.Deployment{}
112-
err = r.Get(ctx, types.NamespacedName{Name: deploymentName, Namespace: deploymentNamespace}, deployment)
113-
if err != nil {
114-
if k8serr.IsNotFound(err) {
115-
reqLogger.Info("Deployment not found.")
116-
continue
114+
// If applications are not set, then fetch all deployments from given ns
115+
if len(deployments) == 0 {
116+
deploymentsList, err := k8sOps.ListDeployment()
117+
if err != nil {
118+
reqLogger.Error(err, "failed to get deployment list")
119+
return ctrl.Result{}, err
117120
}
118-
reqLogger.Error(err, "failed to get deployment")
121+
122+
depNames := []string{}
123+
for _, dep := range deploymentsList.Items {
124+
depNames = append(depNames, dep.Name)
125+
}
126+
deployments = depNames
127+
}
128+
129+
pvcAndPVNamesMap, err := fetchCSIPVCAndPVNames(k8sOps, reqLogger)
130+
if err != nil {
131+
return ctrl.Result{}, err
132+
}
133+
reqLogger.Info("PVCs using CSI StorageClasses", "pvc:pv-names", pvcAndPVNamesMap)
134+
135+
pvcNames := maps.Keys(pvcAndPVNamesMap)
136+
137+
csiApplications, err := fetchDeploymentsUsingCSIVolumes(k8sOps, reqLogger, deployments, pvcNames)
138+
if err != nil {
119139
return ctrl.Result{}, err
120140
}
121-
reqLogger.Info("Successfully fetched workload", "deployment", deployment)
141+
reqLogger.Info("Deployments which are using CSI Volumes in given namespace", "namespace", namespace,
142+
"applications", csiApplications)
143+
144+
if len(csiApplications) == 0 {
145+
reqLogger.Info("No Deployment found in requested namespace")
146+
return ctrl.Result{RequeueAfter: config.ReconcilationTime}, nil
147+
}
122148

123149
var csiNodeServerPods = map[string]string{} // {nodeName1: csiNodePod1, nodeName2: csiNodePod2, ...}
124150
var nodeVolumePodMapping = map[string]map[string][]string{}
@@ -135,123 +161,100 @@ func (r *RecoverStaleVolumeReconciler) Reconcile(ctx context.Context, req ctrl.R
135161
*/
136162
var deploymentPods = map[string]corev1.Pod{} // {pod1: corev1.Pod{}, pod2: corev1.Pod}
137163

138-
// Fetch Pods in given namespace
139-
var listOptions = &client.ListOptions{Namespace: deploymentNamespace}
140-
podsList := &corev1.PodList{}
141-
err = r.List(ctx, podsList, listOptions)
164+
// Fetch all Pods in given namespace
165+
podsList, err := k8sOps.ListPod()
142166
if err != nil {
143167
reqLogger.Error(err, "failed to get pods list")
144168
return ctrl.Result{}, err
145169
}
146-
reqLogger.Info("Successfully fetched pods in workload namespace", "number-of-pods", len(podsList.Items))
170+
reqLogger.Info("Successfully fetched pods in given namespace", "namespace", namespace, "number-of-pods",
171+
len(podsList.Items))
147172

148-
for ind := range podsList.Items {
149-
appPod := podsList.Items[ind]
150-
if strings.Contains(appPod.Name, deploymentName) {
151-
reqLogger.Info("Deployment Pod", "name", appPod.Name)
173+
for ind, podData := range podsList.Items {
174+
podName := podData.Name
175+
if util.MatchesPrefix(csiApplications, podName) {
176+
reqLogger.Info("Application Pod found", "name", podName)
152177

153-
volumesUsed := appPod.Spec.Volumes
154-
for _, volume := range volumesUsed {
178+
volumesUsedByPod := podData.Spec.Volumes
179+
for _, volume := range volumesUsedByPod {
155180
pvcDetails := volume.VolumeSource.PersistentVolumeClaim
156181
if pvcDetails != nil {
157182
pvcName := pvcDetails.ClaimName
158-
reqLogger.Info("PVC In Use", "pod-name", appPod.Name, "pvc-name", pvcName)
159183

160-
pvc := &corev1.PersistentVolumeClaim{}
161-
err = r.Get(ctx, types.NamespacedName{Name: pvcName, Namespace: deploymentNamespace}, pvc)
162-
if err != nil {
163-
if k8serr.IsNotFound(err) {
164-
reqLogger.Info("PVC not found.")
165-
continue
166-
}
167-
reqLogger.Error(err, "failed to get pvc")
168-
return ctrl.Result{}, err
169-
}
170-
171-
storageClass := pvc.Spec.StorageClassName
172-
scName := ""
173-
if storageClass != nil {
174-
scName = *storageClass
175-
}
176-
reqLogger.Info("PVC using Storage-class", "pvc-name", pvcName, "sc-name", scName)
177-
// Check if the volume is using csi storage-class
178-
if strings.Contains(scName, "csi") {
179-
nodeName := appPod.Spec.NodeName
184+
if util.Contains(pvcNames, pvcName) {
185+
reqLogger.Info("Pod details", "pod-name", podName, "pvc-name")
186+
nodeName := podData.Spec.NodeName
180187
volumeData, ok := nodeVolumePodMapping[nodeName]
181188
if !ok {
182189
volumeData = map[string][]string{}
183190
}
184191

185-
volumeName := pvc.Spec.VolumeName
192+
volumeName := pvcAndPVNamesMap[pvcName]
186193
podData, ok := volumeData[volumeName]
187194
if !ok {
188195
podData = []string{}
189196
}
190197

191-
if !contains(podData, appPod.Name) {
192-
podData = append(podData, appPod.Name)
198+
if !util.Contains(podData, podName) {
199+
podData = append(podData, podName)
193200
}
194201

195202
volumeData[volumeName] = podData
196203
nodeVolumePodMapping[nodeName] = volumeData
197-
deploymentPods[appPod.Name] = appPod
204+
205+
deploymentPods[podName] = podsList.Items[ind]
198206
}
199207
}
200208
}
201209
}
202210
}
203211
reqLogger.Info("node-names maped with volumes and deployment pods", "nodeVolumeMap", nodeVolumePodMapping)
204212

205-
// Get Pods in operator ns
206-
var listOptions2 = &client.ListOptions{Namespace: req.Namespace}
207-
csiPodsList := &corev1.PodList{}
208-
err = r.List(ctx, csiPodsList, listOptions2)
213+
// Get Pods in ibm-object-csi-driver-operator ns
214+
k8sOps.Namespace = req.Namespace
215+
podsInOpNs, err := k8sOps.ListPod()
209216
if err != nil {
210-
reqLogger.Error(err, "failed to fetch csi pods")
217+
reqLogger.Error(err, "failed to fetch pods in namespace: "+req.Namespace)
211218
return ctrl.Result{}, err
212219
}
213-
reqLogger.Info("Successfully fetched pods in operator ns", "number-of-pods", len(csiPodsList.Items))
220+
reqLogger.Info("Successfully fetched pods in namespace", "number-of-pods", len(podsInOpNs.Items))
214221

215-
for ind := range csiPodsList.Items {
216-
pod := csiPodsList.Items[ind]
217-
if strings.HasPrefix(pod.Name, csiNodePodPrefix) {
218-
reqLogger.Info("NodeServer Pod", "name", pod.Name)
222+
for ind := range podsInOpNs.Items {
223+
pod := podsInOpNs.Items[ind]
224+
if strings.HasPrefix(pod.Name, config.CSIDaemonSetName) {
219225
csiNodeServerPods[pod.Spec.NodeName] = pod.Name
220226
}
221227
}
222-
reqLogger.Info("node-names maped with node-server pods", "csiNodeServerPods", csiNodeServerPods)
228+
reqLogger.Info("node-names maped with csi node-server pods", "csiNodeServerPods", csiNodeServerPods)
223229

224230
// If CSI Driver Node Pods not found, reconcile
225231
if len(csiNodeServerPods) == 0 {
226-
return ctrl.Result{RequeueAfter: reconcileTime}, nil
232+
return ctrl.Result{RequeueAfter: config.ReconcilationTime}, nil
227233
}
228234

229235
for nodeName, volumesData := range nodeVolumePodMapping {
230236
// Fetch volume stats from Logs of the Node Server Pod
231-
getVolStatsFromLogs, err := fetchVolumeStatsFromNodeServerLogs(ctx, csiNodeServerPods[nodeName], req.Namespace, logTailLines, r.IsTest)
237+
getVolStatsFromLogs, err := fetchVolumeStatsFromNodeServerPodLogs(ctx, csiNodeServerPods[nodeName],
238+
req.Namespace, logTailLines, r.IsTest)
232239
if err != nil {
233240
return ctrl.Result{}, err
234241
}
235242
reqLogger.Info("Volume Stats from NodeServer Pod Logs", "volume-stas", getVolStatsFromLogs)
236243

237244
for volume, podData := range volumesData {
238-
volStats, ok := getVolStatsFromLogs[volume]
245+
getVolStatsFromLogs, ok := getVolStatsFromLogs[volume]
239246
if !ok {
240247
continue
241248
}
242249

243-
if strings.Contains(volStats, transportEndpointError) {
250+
if strings.Contains(getVolStatsFromLogs, config.TransportEndpointError) {
244251
reqLogger.Info("Stale Volume Found", "volume", volume)
245252

246-
reqLogger.Info("Restarting Pods!!")
247253
for _, podName := range podData {
248254
reqLogger.Info("Pod using stale volume", "volume-name", volume, "pod-name", podName)
249255

250-
var zero int64
251-
var deleteOptions = &client.DeleteOptions{GracePeriodSeconds: &zero}
252-
253256
pod := deploymentPods[podName]
254-
err = r.Delete(ctx, &pod, deleteOptions)
257+
err = k8sOps.DeletePod(&pod)
255258
if err != nil {
256259
if k8serr.IsNotFound(err) {
257260
reqLogger.Info("Pod not found.")
@@ -262,13 +265,12 @@ func (r *RecoverStaleVolumeReconciler) Reconcile(ctx context.Context, req ctrl.R
262265
}
263266
reqLogger.Info("Pod deleted.")
264267
}
265-
reqLogger.Info("Pods Restarted!!")
266268
}
267269
}
268270
}
269271
}
270272

271-
return ctrl.Result{RequeueAfter: reconcileTime}, nil
273+
return ctrl.Result{RequeueAfter: config.ReconcilationTime}, nil
272274
}
273275

274276
// SetupWithManager sets up the controller with the Manager.
@@ -278,13 +280,76 @@ func (r *RecoverStaleVolumeReconciler) SetupWithManager(mgr ctrl.Manager) error
278280
Complete(r)
279281
}
280282

281-
func contains(slice []string, value string) bool {
282-
for _, val := range slice {
283-
if val == value {
284-
return true
283+
func fetchCSIPVCAndPVNames(k8sOps *crutils.K8sResourceOps, log logr.Logger) (map[string]string, error) {
284+
pvcList, err := k8sOps.ListPVC()
285+
if err != nil {
286+
log.Error(err, "failed to get pvc list")
287+
return nil, err
288+
}
289+
290+
reqData := map[string]string{}
291+
292+
for _, pvcData := range pvcList.Items {
293+
pvcName := pvcData.Name
294+
pvc, err := k8sOps.GetPVC(pvcName)
295+
if err != nil {
296+
if k8serr.IsNotFound(err) {
297+
log.Info("PVC not found.")
298+
continue
299+
}
300+
log.Error(err, "failed to get pvc")
301+
return nil, err
302+
}
303+
304+
storageClassName := pvc.Spec.StorageClassName
305+
if storageClassName == nil {
306+
log.Info("PVC does not have any storageClass", "pvc-name", pvcName)
307+
continue
308+
}
309+
310+
scName := *storageClassName
311+
if scName == "" {
312+
log.Info("PVC does not have any storageClass", "pvc-name", pvcName)
313+
continue
314+
}
315+
316+
if strings.HasPrefix(scName, config.StorageClassPrefix) && strings.HasSuffix(scName, config.StorageClassSuffix) {
317+
reqData[pvcName] = pvc.Spec.VolumeName
285318
}
286319
}
287-
return false
320+
321+
return reqData, nil
322+
}
323+
324+
func fetchDeploymentsUsingCSIVolumes(k8sOps *crutils.K8sResourceOps, log logr.Logger, depNames []string,
325+
reqPVCNames []string) ([]string, error) {
326+
var reqDeploymentNames []string
327+
328+
for _, name := range depNames {
329+
deployment, err := k8sOps.GetDeployment(name)
330+
if err != nil {
331+
if k8serr.IsNotFound(err) {
332+
log.Info("Deployment not found.")
333+
continue
334+
}
335+
log.Error(err, "failed to get deployment")
336+
return nil, err
337+
}
338+
339+
volumes := deployment.Spec.Template.Spec.Volumes
340+
for _, vol := range volumes {
341+
pvcDetails := vol.VolumeSource.PersistentVolumeClaim
342+
if pvcDetails != nil {
343+
pvcName := pvcDetails.ClaimName
344+
if util.Contains(reqPVCNames, pvcName) {
345+
reqDeploymentNames = append(reqDeploymentNames, name)
346+
break
347+
}
348+
}
349+
}
350+
}
351+
352+
return reqDeploymentNames, nil
288353
}
289354

290355
func createK8sClient() (*KubernetesClient, error) {
@@ -303,11 +368,11 @@ func createK8sClient() (*KubernetesClient, error) {
303368
}, nil
304369
}
305370

306-
func fetchVolumeStatsFromNodeServerLogs(ctx context.Context, nodeServerPod, namespace string, logTailLines int64,
371+
func fetchVolumeStatsFromNodeServerPodLogs(ctx context.Context, nodeServerPod, namespace string, logTailLines int64,
307372
isTest bool) (map[string]string, error) {
308373
staleVolLog.Info("Input Parameters: ", "nodeServerPod", nodeServerPod, "namespace", namespace, "isTest", isTest)
309374
podLogOpts := &corev1.PodLogOptions{
310-
Container: csiNodePodPrefix,
375+
Container: syncer.NodeContainerName,
311376
TailLines: &logTailLines,
312377
}
313378

0 commit comments

Comments
 (0)