Skip to content

Commit 9d4ed85

Browse files
authored
Enhancement - Optimize Reconciler Triggers Using Predicates (#3633)
Updated reconcilers to ignore updates that do not modify the spec Refactored the logic to get the worker threads Added unit tests wherever applicable
1 parent 5982436 commit 9d4ed85

File tree

14 files changed

+230
-404
lines changed

14 files changed

+230
-404
lines changed

pkg/syncer/cnsoperator/controller/cnsfileaccessconfig/cnsfileaccessconfig_controller.go

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
vmoperatortypes "github.com/vmware-tanzu/vm-operator/api/v1alpha5"
2929
cnstypes "github.com/vmware/govmomi/cns/types"
30+
vsanfstypes "github.com/vmware/govmomi/vsan/vsanfs/types"
3031
v1 "k8s.io/api/core/v1"
3132
apierrors "k8s.io/apimachinery/pkg/api/errors"
3233
"k8s.io/apimachinery/pkg/runtime"
@@ -39,13 +40,10 @@ import (
3940
"sigs.k8s.io/controller-runtime/pkg/client/config"
4041
"sigs.k8s.io/controller-runtime/pkg/controller"
4142
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
42-
4343
"sigs.k8s.io/controller-runtime/pkg/handler"
4444
"sigs.k8s.io/controller-runtime/pkg/manager"
4545
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4646
"sigs.k8s.io/controller-runtime/pkg/source"
47-
48-
vsanfstypes "github.com/vmware/govmomi/vsan/vsanfs/types"
4947
cnsoperatorapis "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator"
5048
cnsfileaccessconfigv1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/cnsfileaccessconfig/v1alpha1"
5149
volumes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/volume"
@@ -57,13 +55,14 @@ import (
5755
k8s "sigs.k8s.io/vsphere-csi-driver/v3/pkg/kubernetes"
5856
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer"
5957
cnsoperatortypes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/types"
60-
cnsoperatorutil "sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/util"
58+
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/util"
6159
)
6260

6361
const (
64-
defaultMaxWorkerThreadsForFileAccessConfig = 10
65-
capvVmLabelKey = "capv.vmware.com"
66-
devopsUserLabelKey = "cns.vmware.com/user-created"
62+
workerThreadsEnvVar = "WORKER_THREADS_CNS_FILE_ACCESS_CONFIG"
63+
defaultMaxWorkerThreads = 10
64+
capvVmLabelKey = "capv.vmware.com"
65+
devopsUserLabelKey = "cns.vmware.com/user-created"
6766
)
6867

6968
// backOffDuration is a map of cnsfileaccessconfig name's to the time after
@@ -87,18 +86,16 @@ func Add(mgr manager.Manager, clusterFlavor cnstypes.CnsClusterFlavor,
8786
return nil
8887
}
8988

90-
if clusterFlavor == cnstypes.CnsClusterFlavorWorkload {
91-
if commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.TKGsHA) {
92-
clusterComputeResourceMoIds, _, err := common.GetClusterComputeResourceMoIds(ctx)
93-
if err != nil {
94-
log.Errorf("failed to get clusterComputeResourceMoIds. err: %v", err)
95-
return err
96-
}
97-
if len(clusterComputeResourceMoIds) > 1 &&
98-
!commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.WorkloadDomainIsolation) {
99-
log.Infof("Not initializing the CnsFileAccessConfig Controller as stretched supervisor is detected.")
100-
return nil
101-
}
89+
if commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.TKGsHA) {
90+
clusterComputeResourceMoIds, _, err := common.GetClusterComputeResourceMoIds(ctx)
91+
if err != nil {
92+
log.Errorf("failed to get clusterComputeResourceMoIds. err: %v", err)
93+
return err
94+
}
95+
if len(clusterComputeResourceMoIds) > 1 &&
96+
!commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.WorkloadDomainIsolation) {
97+
log.Infof("Not initializing the CnsFileAccessConfig Controller as stretched supervisor is detected.")
98+
return nil
10299
}
103100
}
104101
volumePermissionLockMap = &sync.Map{}
@@ -164,17 +161,23 @@ func Add(mgr manager.Manager, clusterFlavor cnstypes.CnsClusterFlavor,
164161
func newReconciler(mgr manager.Manager, configInfo *commonconfig.ConfigurationInfo,
165162
volumeManager volumes.Manager, vmOperatorClient client.Client, dynamicClient dynamic.Interface,
166163
recorder record.EventRecorder) reconcile.Reconciler {
167-
return &ReconcileCnsFileAccessConfig{client: mgr.GetClient(), scheme: mgr.GetScheme(),
168-
configInfo: configInfo, volumeManager: volumeManager, vmOperatorClient: vmOperatorClient,
169-
dynamicClient: dynamicClient, recorder: recorder}
164+
return &ReconcileCnsFileAccessConfig{
165+
client: mgr.GetClient(),
166+
scheme: mgr.GetScheme(),
167+
configInfo: configInfo,
168+
volumeManager: volumeManager,
169+
vmOperatorClient: vmOperatorClient,
170+
dynamicClient: dynamicClient,
171+
recorder: recorder,
172+
}
170173
}
171174

172175
// add adds a new Controller to mgr with r as the reconcile.Reconciler.
173176
func add(mgr manager.Manager, r reconcile.Reconciler) error {
174177
ctx, log := logger.GetNewContextWithLogger()
175178

176-
maxWorkerThreads := getMaxWorkerThreadsToReconcileCnsFileAccessConfig(ctx)
177-
179+
maxWorkerThreads := util.GetMaxWorkerThreads(ctx,
180+
workerThreadsEnvVar, defaultMaxWorkerThreads)
178181
// Create a new controller.
179182
c, err := controller.New("cnsfileaccessconfig-controller", mgr,
180183
controller.Options{Reconciler: r, MaxConcurrentReconciles: maxWorkerThreads})
@@ -261,7 +264,7 @@ func (r *ReconcileCnsFileAccessConfig) Reconcile(ctx context.Context,
261264
instance.Name, instance.Spec.VMName)
262265
// Fetch the PVC and PV instance and get volume ID
263266
skipConfigureVolumeACL := false
264-
volumeID, err := cnsoperatorutil.GetVolumeID(ctx, r.client, instance.Spec.PvcName, instance.Namespace)
267+
volumeID, err := util.GetVolumeID(ctx, r.client, instance.Spec.PvcName, instance.Namespace)
265268
if err != nil {
266269
if apierrors.IsNotFound(err) {
267270
// If PVC instance is NotFound (deleted), then there is no need to configure ACL on file volume.
@@ -330,7 +333,7 @@ func (r *ReconcileCnsFileAccessConfig) Reconcile(ctx context.Context,
330333
if instance.DeletionTimestamp != nil {
331334
log.Infof("CnsFileAccessConfig instance %q has deletion timestamp set", instance.Name)
332335
volumeExists := true
333-
volumeID, err := cnsoperatorutil.GetVolumeID(ctx, r.client, instance.Spec.PvcName, instance.Namespace)
336+
volumeID, err := util.GetVolumeID(ctx, r.client, instance.Spec.PvcName, instance.Namespace)
334337
if err != nil {
335338
if ifFileVolumesWithVmserviceVmsSupported &&
336339
apierrors.IsNotFound(err) {
@@ -447,7 +450,7 @@ func (r *ReconcileCnsFileAccessConfig) Reconcile(ctx context.Context,
447450
log.Infof("Reconciling CnsFileAccessConfig with instance: %q from namespace: %q. timeout %q seconds",
448451
instance.Name, instance.Namespace, timeout)
449452
if !instance.Status.Done {
450-
volumeID, err := cnsoperatorutil.GetVolumeID(ctx, r.client, instance.Spec.PvcName, instance.Namespace)
453+
volumeID, err := util.GetVolumeID(ctx, r.client, instance.Spec.PvcName, instance.Namespace)
451454
if err != nil {
452455
msg := fmt.Sprintf("Failed to get volumeID from pvcName: %q. Error: %+v", instance.Spec.PvcName, err)
453456
log.Error(msg)
@@ -780,7 +783,7 @@ func (r *ReconcileCnsFileAccessConfig) configureVolumeACLs(ctx context.Context,
780783
func (r *ReconcileCnsFileAccessConfig) getVMExternalIP(ctx context.Context,
781784
vm *vmoperatortypes.VirtualMachine) (string, error) {
782785
log := logger.GetLogger(ctx)
783-
networkProvider, err := cnsoperatorutil.GetNetworkProvider(ctx)
786+
networkProvider, err := util.GetNetworkProvider(ctx)
784787
if err != nil {
785788
return "", logger.LogNewErrorf(log, "Failed to identify the network provider. Error: %+v", err)
786789
}
@@ -789,11 +792,11 @@ func (r *ReconcileCnsFileAccessConfig) getVMExternalIP(ctx context.Context,
789792
return "", logger.LogNewError(log, "unable to find network provider information")
790793
}
791794

792-
networkTypes := []string{cnsoperatorutil.NSXTNetworkProvider, cnsoperatorutil.
795+
networkTypes := []string{util.NSXTNetworkProvider, util.
793796
VDSNetworkProvider}
794797

795798
if commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.VPCCapabilitySupervisor) {
796-
networkTypes = append(networkTypes, cnsoperatorutil.VPCNetworkProvider)
799+
networkTypes = append(networkTypes, util.VPCNetworkProvider)
797800
}
798801

799802
supported_found := false
@@ -808,7 +811,7 @@ func (r *ReconcileCnsFileAccessConfig) getVMExternalIP(ctx context.Context,
808811
return "", logger.LogNewErrorf(log, "Unknown network provider. Error: %+v", err)
809812
}
810813

811-
tkgVMIP, err := cnsoperatorutil.GetTKGVMIP(ctx, r.vmOperatorClient,
814+
tkgVMIP, err := util.GetTKGVMIP(ctx, r.vmOperatorClient,
812815
r.dynamicClient, vm.Namespace, vm.Name, networkProvider)
813816
if err != nil {
814817
return "", logger.LogNewErrorf(log, "Failed to get external facing IP address for VM %q/%q. Err: %+v",

pkg/syncer/cnsoperator/controller/cnsfileaccessconfig/util.go

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ package cnsfileaccessconfig
1919
import (
2020
"context"
2121
"fmt"
22-
"os"
2322
"reflect"
24-
"strconv"
2523

2624
vmoperatortypes "github.com/vmware-tanzu/vm-operator/api/v1alpha5"
2725
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -70,38 +68,3 @@ func setInstanceOwnerRef(instance *cnsfileaccessconfigv1alpha1.CnsFileAccessConf
7068
},
7169
}
7270
}
73-
74-
// getMaxWorkerThreadsToReconcileCnsFileAccessConfig returns the maximum number
75-
// of worker threads which can be run to reconcile CnsFileAccessConfig instances.
76-
// If environment variable WORKER_THREADS_FILE_ACCESS_CONFIG is set and valid,
77-
// return the value read from environment variable otherwise, use the default
78-
// value.
79-
func getMaxWorkerThreadsToReconcileCnsFileAccessConfig(ctx context.Context) int {
80-
log := logger.GetLogger(ctx)
81-
workerThreads := defaultMaxWorkerThreadsForFileAccessConfig
82-
if v := os.Getenv("WORKER_THREADS_FILE_ACCESS_CONFIG"); v != "" {
83-
if value, err := strconv.Atoi(v); err == nil {
84-
if value <= 0 {
85-
log.Warnf("Maximum number of worker threads to run set in env variable "+
86-
"WORKER_THREADS_FILE_ACCESS_CONFIG %s is less than 1, will use the default value %d",
87-
v, defaultMaxWorkerThreadsForFileAccessConfig)
88-
} else if value > defaultMaxWorkerThreadsForFileAccessConfig {
89-
log.Warnf("Maximum number of worker threads to run set in env variable "+
90-
"WORKER_THREADS_FILE_ACCESS_CONFIG %s is greater than %d, will use the default value %d",
91-
v, defaultMaxWorkerThreadsForFileAccessConfig, defaultMaxWorkerThreadsForFileAccessConfig)
92-
} else {
93-
workerThreads = value
94-
log.Debugf("Maximum number of worker threads to run to reconcile "+
95-
"CnsFileAccessConfig instances is set to %d", workerThreads)
96-
}
97-
} else {
98-
log.Warnf("Maximum number of worker threads to run set in env variable "+
99-
"WORKER_THREADS_FILE_ACCESS_CONFIG %s is invalid, will use the default value %d",
100-
v, defaultMaxWorkerThreadsForFileAccessConfig)
101-
}
102-
} else {
103-
log.Debugf("WORKER_THREADS_FILE_ACCESS_CONFIG is not set. Picking the default value %d",
104-
defaultMaxWorkerThreadsForFileAccessConfig)
105-
}
106-
return workerThreads
107-
}

pkg/syncer/cnsoperator/controller/cnsnodevmattachment/cnsnodevmattachment_controller.go

Lines changed: 7 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23-
"os"
24-
"strconv"
2523
"strings"
2624
"sync"
2725
"time"
@@ -55,10 +53,12 @@ import (
5553
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger"
5654
k8s "sigs.k8s.io/vsphere-csi-driver/v3/pkg/kubernetes"
5755
cnsoptypes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/types"
56+
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/util"
5857
)
5958

6059
const (
61-
defaultMaxWorkerThreadsForNodeVMAttach = 10
60+
workerThreadsEnvVar = "WORKER_THREADS_NODEVM_ATTACH"
61+
defaultMaxWorkerThreads = 10
6262
)
6363

6464
// backOffDuration is a map of cnsnodevmattachment name's to the time after
@@ -135,15 +135,17 @@ func newReconciler(mgr manager.Manager, configInfo *config.ConfigurationInfo,
135135
// add adds a new Controller to mgr with r as the reconcile.Reconciler.
136136
func add(mgr manager.Manager, r reconcile.Reconciler) error {
137137
ctx, log := logger.GetNewContextWithLogger()
138-
maxWorkerThreads := getMaxWorkerThreadsToReconcileCnsNodeVmAttachment(ctx)
138+
139+
maxWorkerThreads := util.GetMaxWorkerThreads(ctx,
140+
workerThreadsEnvVar, defaultMaxWorkerThreads)
139141
// Create a new controller.
140142
err := ctrl.NewControllerManagedBy(mgr).Named("cnsnodevmattachment-controller").
141143
For(&v1a1.CnsNodeVmAttachment{}).
142144
WithEventFilter(predicate.GenerationChangedPredicate{}).
143145
WithOptions(controller.Options{MaxConcurrentReconciles: maxWorkerThreads}).
144146
Complete(r)
145147
if err != nil {
146-
log.Errorf("failed to create new CnsNodeVmAttachment controller with error: %+v", err)
148+
log.Errorf("Failed to build application controller. Err: %v", err)
147149
return err
148150
}
149151

@@ -869,41 +871,6 @@ func updateCnsNodeVMAttachment(ctx context.Context, client client.Client,
869871
return err
870872
}
871873

872-
// getMaxWorkerThreadsToReconcileCnsNodeVmAttachment returns the maximum
873-
// number of worker threads which can be run to reconcile CnsNodeVmAttachment
874-
// instances. If environment variable WORKER_THREADS_NODEVM_ATTACH is set and
875-
// valid, return the value read from environment variable otherwise, use the
876-
// default value.
877-
func getMaxWorkerThreadsToReconcileCnsNodeVmAttachment(ctx context.Context) int {
878-
log := logger.GetLogger(ctx)
879-
workerThreads := defaultMaxWorkerThreadsForNodeVMAttach
880-
if v := os.Getenv("WORKER_THREADS_NODEVM_ATTACH"); v != "" {
881-
if value, err := strconv.Atoi(v); err == nil {
882-
if value <= 0 {
883-
log.Warnf("Maximum number of worker threads to run set in env variable "+
884-
"WORKER_THREADS_NODEVM_ATTACH %s is less than 1, will use the default value %d",
885-
v, defaultMaxWorkerThreadsForNodeVMAttach)
886-
} else if value > defaultMaxWorkerThreadsForNodeVMAttach {
887-
log.Warnf("Maximum number of worker threads to run set in env variable "+
888-
"WORKER_THREADS_NODEVM_ATTACH %s is greater than %d, will use the default value %d",
889-
v, defaultMaxWorkerThreadsForNodeVMAttach, defaultMaxWorkerThreadsForNodeVMAttach)
890-
} else {
891-
workerThreads = value
892-
log.Debugf("Maximum number of worker threads to run to reconcile CnsNodeVmAttachment "+
893-
"instances is set to %d", workerThreads)
894-
}
895-
} else {
896-
log.Warnf("Maximum number of worker threads to run set in env variable "+
897-
"WORKER_THREADS_NODEVM_ATTACH %s is invalid, will use the default value %d",
898-
v, defaultMaxWorkerThreadsForNodeVMAttach)
899-
}
900-
} else {
901-
log.Debugf("WORKER_THREADS_NODEVM_ATTACH is not set. Picking the default value %d",
902-
defaultMaxWorkerThreadsForNodeVMAttach)
903-
}
904-
return workerThreads
905-
}
906-
907874
// recordEvent records the event, sets the backOffDuration for the instance
908875
// appropriately and logs the message.
909876
// backOffDuration is reset to 1 second on success and doubled on failure.

pkg/syncer/cnsoperator/controller/cnsnodevmbatchattachment/cnsnodevmbatchattachment_controller.go

Lines changed: 8 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23-
"os"
24-
"strconv"
2523
"strings"
2624
"sync"
2725
"time"
2826

2927
ctrl "sigs.k8s.io/controller-runtime"
28+
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/util"
3029

3130
cnsvsphere "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/vsphere"
3231
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common/commonco"
@@ -50,7 +49,7 @@ import (
5049
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
5150

5251
cnsoperatorapis "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator"
53-
v1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/cnsnodevmbatchattachment/v1alpha1"
52+
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/cnsnodevmbatchattachment/v1alpha1"
5453
volumes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/volume"
5554
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config"
5655
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common"
@@ -69,6 +68,7 @@ var (
6968
)
7069

7170
const (
71+
workerThreadsEnvVar = "WORKER_THREADS_NODEVM_BATCH_ATTACH"
7272
defaultMaxWorkerThreads = 10
7373
)
7474

@@ -132,10 +132,9 @@ func newReconciler(mgr manager.Manager, configInfo *config.ConfigurationInfo,
132132
// add adds this package's controller to the provided manager.
133133
func add(mgr manager.Manager, r reconcile.Reconciler) error {
134134
ctx, log := logger.GetNewContextWithLogger()
135-
maxWorkerThreads := getMaxWorkerThreads(ctx)
136-
137-
backOffDuration = make(map[types.NamespacedName]time.Duration)
138135

136+
maxWorkerThreads := util.GetMaxWorkerThreads(ctx,
137+
workerThreadsEnvVar, defaultMaxWorkerThreads)
139138
// Create a new controller.
140139
err := ctrl.NewControllerManagedBy(mgr).
141140
Named("cnsnodevmbatchattachment-controller").
@@ -145,11 +144,12 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
145144
MaxConcurrentReconciles: maxWorkerThreads,
146145
}).
147146
Complete(r)
148-
149147
if err != nil {
150-
log.Errorf("failed to watch for changes to CnsNodeVmBatchAttachment resource with error: %+v", err)
148+
log.Errorf("Failed to build application controller. Err: %v", err)
151149
return err
152150
}
151+
152+
backOffDuration = make(map[types.NamespacedName]time.Duration)
153153
return nil
154154
}
155155

@@ -168,42 +168,6 @@ type Reconciler struct {
168168
instanceLock sync.Map
169169
}
170170

171-
// getMaxWorkerThreads returns the maximum
172-
// number of worker threads which can be run to reconcile CnsNodeVmBatchAttachment
173-
// instances. If environment variable WORKER_THREADS_NODEVM_BATCH_ATTACH is set and
174-
// valid, return the value read from environment variable otherwise, use the
175-
// default value.
176-
func getMaxWorkerThreads(ctx context.Context) int {
177-
log := logger.GetLogger(ctx)
178-
179-
workerThreads := defaultMaxWorkerThreads
180-
envVal := os.Getenv("WORKER_THREADS_NODEVM_BATCH_ATTACH")
181-
if envVal == "" {
182-
log.Debugf("WORKER_THREADS_NODEVM_BATCH_ATTACH is not set. Picking the default value %d",
183-
defaultMaxWorkerThreads)
184-
return workerThreads
185-
}
186-
187-
value, err := strconv.Atoi(envVal)
188-
if err != nil {
189-
log.Warnf("Invalid value for WORKER_THREADS_NODEVM_BATCH_ATTACH: %s. Using default value %d",
190-
envVal, defaultMaxWorkerThreads)
191-
return workerThreads
192-
}
193-
194-
switch {
195-
case value <= 0 || value > defaultMaxWorkerThreads:
196-
log.Warnf("Value %s for WORKER_THREADS_NODEVM_BATCH_ATTACH is invalid. Using default value %d",
197-
envVal, defaultMaxWorkerThreads)
198-
default:
199-
workerThreads = value
200-
log.Debugf("Maximum number of worker threads to reconcile CnsNodeVmBatchAttachment is set to %d",
201-
workerThreads)
202-
203-
}
204-
return workerThreads
205-
}
206-
207171
// Reconcile over CnsNodeVmBatchAttachment CR.
208172
// Reconcile stops when all volumes have been attached or detached successfully.
209173
func (r *Reconciler) Reconcile(ctx context.Context,

0 commit comments

Comments
 (0)