Skip to content

Commit 3b896ff

Browse files
authored
Wait until topology annotation gets added on supervisor PVCs while adding node affinity rules on PVs in guest cluster (#3588)
1 parent 9bc9208 commit 3b896ff

File tree

3 files changed

+246
-17
lines changed

3 files changed

+246
-17
lines changed

pkg/syncer/pvcsi_fullsync.go

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package syncer
1919
import (
2020
"context"
2121
"encoding/json"
22+
"errors"
2223
"fmt"
2324
"reflect"
2425
"time"
@@ -42,6 +43,12 @@ import (
4243
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger"
4344
)
4445

46+
var (
47+
cnsconfigGetSupervisorNamespace = cnsconfig.GetSupervisorNamespace
48+
k8sNewClient = k8s.NewClient
49+
timeoutAddNodeAffinityOnPVs = 300 * time.Second
50+
)
51+
4552
// PvcsiFullSync reconciles PV/PVC/Pod metadata on the guest cluster with
4653
// cnsvolumemetadata objects on the supervisor cluster for the guest cluster.
4754
func PvcsiFullSync(ctx context.Context, metadataSyncer *metadataSyncInformer) error {
@@ -61,7 +68,7 @@ func PvcsiFullSync(ctx context.Context, metadataSyncer *metadataSyncInformer) er
6168
isWorkloadDomainIsolationEnabledInPVCSI := metadataSyncer.coCommonInterface.IsFSSEnabled(
6269
ctx, common.WorkloadDomainIsolationFSS)
6370
if isWorkloadDomainIsolationEnabledInPVCSI {
64-
AddNodeAffinityRulesOnPV(ctx, metadataSyncer)
71+
go AddNodeAffinityRulesOnPV(ctx, metadataSyncer)
6572
}
6673
// guestCnsVolumeMetadataList is an in-memory list of cnsvolumemetadata
6774
// objects that represents PV/PVC/Pod objects in the guest cluster API server.
@@ -310,36 +317,47 @@ func AddNodeAffinityRulesOnPV(ctx context.Context, metadataSyncer *metadataSyncI
310317
return
311318
}
312319
// Get the supervisor namespace in which the guest cluster is deployed.
313-
supervisorNamespace, err := cnsconfig.GetSupervisorNamespace(ctx)
320+
supervisorNamespace, err := cnsconfigGetSupervisorNamespace(ctx)
314321
if err != nil {
315322
log.Errorf("FullSync: could not get supervisor namespace in which guest cluster was deployed. Err: %v", err)
316323
return
317324
}
318325

319326
// Create the kubernetes client from config.
320-
k8sClient, err := k8s.NewClient(ctx)
327+
k8sClient, err := k8sNewClient(ctx)
321328
if err != nil {
322329
log.Errorf("creating Kubernetes client failed. Err: %v", err)
323330
return
324331
}
325332

326-
for _, pv := range pvList {
333+
pvsWithoutSupervisorPvcTopologyAnnotation := make(map[string]*v1.PersistentVolume)
334+
addNodeAffinityOnPVInternal := func(pv *v1.PersistentVolume) error {
327335
if pv.Spec.NodeAffinity == nil {
328336
supervisorPVClaim, err := metadataSyncer.supervisorClient.CoreV1().
329337
PersistentVolumeClaims(supervisorNamespace).Get(ctx, pv.Spec.CSI.VolumeHandle, metav1.GetOptions{})
330338
if err != nil {
331339
log.Errorf("AddNodeAffinityRulesOnPV: failed to get supervisor PVC: %v "+
332340
"for the TKG PV %v in the supervisor namespace: %v. Err: %v",
333341
pv.Spec.CSI.VolumeHandle, pv.Name, supervisorNamespace, err)
334-
continue
342+
return err
343+
}
344+
// If volume accessible topology annotation is not yet available on the supervisor PVC, then add
345+
// this PV to the pvsWithoutSupervisorPvcTopologyAnnotation map and we will check again after some
346+
// time if annotation gets added.
347+
if supervisorPVClaim.Annotations[common.AnnVolumeAccessibleTopology] == "" {
348+
errstr := fmt.Sprintf("Annotation %q is not set on the PVC: %q, namespace: %q",
349+
common.AnnVolumeAccessibleTopology, supervisorPVClaim.Name, supervisorPVClaim.Namespace)
350+
log.Errorf(errstr)
351+
pvsWithoutSupervisorPvcTopologyAnnotation[pv.Name] = pv
352+
return errors.New(errstr)
335353
}
336354
accessibleTopologies, err := generateVolumeAccessibleTopologyFromPVCAnnotation(supervisorPVClaim)
337355
if err != nil {
338356
log.Errorf("failed to generate volume accessibleTopologies "+
339-
"from supervisor PVC: %v for csi.vsphere.volume-accessible-topology annoation: %v "+
357+
"from supervisor PVC: %v for csi.vsphere.volume-accessible-topology annoation: %v, "+
340358
"Err: %v", supervisorPVClaim.Name,
341-
supervisorPVClaim.Annotations["csi.vsphere.volume-accessible-topology"], err)
342-
continue
359+
supervisorPVClaim.Annotations[common.AnnVolumeAccessibleTopology], err)
360+
return err
343361
}
344362
var csiAccessibleTopology []*csi.Topology
345363
for _, topoSegments := range accessibleTopologies {
@@ -351,29 +369,62 @@ func AddNodeAffinityRulesOnPV(ctx context.Context, metadataSyncer *metadataSyncI
351369
oldData, err := json.Marshal(pv)
352370
if err != nil {
353371
log.Errorf("failed to marshal pv: %v, Error: %v", pv, err)
354-
continue
372+
return err
355373
}
356374
newPV := pv.DeepCopy()
357375
newPV.Spec.NodeAffinity = GenerateVolumeNodeAffinity(csiAccessibleTopology)
358376
newData, err := json.Marshal(newPV)
359377
if err != nil {
360378
log.Errorf("failed to marshal updated PV with node affinity rules: %v, Error: %v", newPV, err)
361-
continue
379+
return err
362380
}
363381

364382
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pv)
365383
if err != nil {
366384
log.Errorf("error Creating two way merge patch for PV %q with error : %v", pv.Name, err)
367-
continue
385+
return err
368386
}
369387
_, err = k8sClient.CoreV1().PersistentVolumes().Patch(
370388
context.TODO(), pv.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
371389
if err != nil {
372390
log.Errorf("error patching PV %q error : %v", pv.Name, err)
373-
continue
391+
return err
374392
}
375393
log.Infof("patched PV: %v with node affinity %v", pv.Name, newPV.Spec.NodeAffinity)
376394
}
395+
return nil
396+
}
397+
398+
for _, pv := range pvList {
399+
_ = addNodeAffinityOnPVInternal(pv)
400+
}
401+
402+
// Check if there are any PVs on which we didn't add node affinity rules yet, as topology annotation
403+
// was missing from associated supervisor PVC. We will iterate over all such PVs again and will check if
404+
// PVC annotation is added now. We will retry this until all PVs get node affinity rules added or until
405+
// timeout of 5 minutes is reached.
406+
timeoutCtx, cancel := context.WithTimeout(context.Background(), timeoutAddNodeAffinityOnPVs)
407+
defer cancel()
408+
for len(pvsWithoutSupervisorPvcTopologyAnnotation) != 0 {
409+
select {
410+
case <-timeoutCtx.Done():
411+
log.Infof("Timeout exceeded for adding node affinity rules on PVs. Waited 5 minutes to get " +
412+
"volume accessibility topology annotation added on supervisor PVCs.")
413+
// Make pvsWithoutSupervisorPvcTopologyAnnotation nil once timeout is hit
414+
// to exit from the outer for loop
415+
pvsWithoutSupervisorPvcTopologyAnnotation = nil
416+
default:
417+
for pvName, pv := range pvsWithoutSupervisorPvcTopologyAnnotation {
418+
err := addNodeAffinityOnPVInternal(pv)
419+
if err == nil {
420+
delete(pvsWithoutSupervisorPvcTopologyAnnotation, pvName)
421+
}
422+
}
423+
if len(pvsWithoutSupervisorPvcTopologyAnnotation) != 0 {
424+
// Sleep for some time before retrying
425+
time.Sleep(10 * time.Second)
426+
}
427+
}
377428
}
378429
log.Info("AddNodeAffinityRulesOnPV End.")
379430
}

pkg/syncer/pvcsi_fullsync_test.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,18 @@ limitations under the License.
1717
package syncer
1818

1919
import (
20+
"context"
2021
"testing"
22+
"time"
2123

2224
"github.com/container-storage-interface/spec/lib/go/csi"
2325
"github.com/stretchr/testify/assert"
2426
v1 "k8s.io/api/core/v1"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
clientset "k8s.io/client-go/kubernetes"
29+
k8sfake "k8s.io/client-go/kubernetes/fake"
30+
cnsconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config"
31+
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common"
2532
)
2633

2734
func TestGenerateVolumeNodeAffinity(t *testing.T) {
@@ -107,3 +114,178 @@ func TestGenerateVolumeNodeAffinity(t *testing.T) {
107114
})
108115
}
109116
}
117+
118+
func TestAddNodeAffinityRulesOnPVTopologyAnnotationPresent(t *testing.T) {
119+
ctx := context.Background()
120+
121+
// Create supervisor PVC with topology annotation
122+
supPVC := &v1.PersistentVolumeClaim{
123+
ObjectMeta: metav1.ObjectMeta{
124+
Name: "volume-1",
125+
Namespace: "sv-namespace",
126+
Annotations: map[string]string{
127+
common.AnnVolumeAccessibleTopology: `[{"zone":"zone-a"}]`,
128+
},
129+
},
130+
}
131+
// Create guest PV without node affinity
132+
guestPV := &v1.PersistentVolume{
133+
ObjectMeta: metav1.ObjectMeta{
134+
Name: "pv-1",
135+
},
136+
Spec: v1.PersistentVolumeSpec{
137+
PersistentVolumeSource: v1.PersistentVolumeSource{
138+
CSI: &v1.CSIPersistentVolumeSource{
139+
VolumeHandle: "volume-1",
140+
},
141+
},
142+
},
143+
}
144+
145+
// Setup supervisor client with PVC
146+
supervisorClient := k8sfake.NewSimpleClientset(supPVC)
147+
// Setup guest client with PV
148+
guestClient := k8sfake.NewSimpleClientset(guestPV)
149+
150+
// Setup metadataSyncer
151+
metadataSyncer := &metadataSyncInformer{
152+
supervisorClient: supervisorClient,
153+
}
154+
metadataSyncer.configInfo = &cnsconfig.ConfigurationInfo{
155+
Cfg: &cnsconfig.Config{
156+
GC: cnsconfig.GCConfig{
157+
Endpoint: "endpoint",
158+
Port: "443",
159+
},
160+
},
161+
}
162+
163+
// Patch k8sNewClient to return our guestClient
164+
origK8sClient := k8sNewClient
165+
defer func() {
166+
k8sNewClient = origK8sClient
167+
}()
168+
k8sNewClient = func(ctx context.Context) (clientset.Interface, error) {
169+
return guestClient, nil
170+
}
171+
172+
// Patch getPVsInBoundAvailableOrReleased to return our PV
173+
origGetPVs := getPVsInBoundAvailableOrReleased
174+
defer func() {
175+
getPVsInBoundAvailableOrReleased = origGetPVs
176+
}()
177+
getPVsInBoundAvailableOrReleased = func(ctx context.Context,
178+
syncer *metadataSyncInformer) ([]*v1.PersistentVolume, error) {
179+
return []*v1.PersistentVolume{guestPV}, nil
180+
}
181+
182+
// Patch cnsconfig.GetSupervisorNamespace to return our namespace
183+
origGetSuperNS := cnsconfigGetSupervisorNamespace
184+
defer func() {
185+
cnsconfigGetSupervisorNamespace = origGetSuperNS
186+
}()
187+
cnsconfigGetSupervisorNamespace = func(ctx context.Context) (string, error) {
188+
return "sv-namespace", nil
189+
}
190+
191+
// Run function
192+
AddNodeAffinityRulesOnPV(ctx, metadataSyncer)
193+
194+
// Verify node affinity added
195+
gotPV, err := guestClient.CoreV1().PersistentVolumes().Get(ctx, "pv-1", metav1.GetOptions{})
196+
if err != nil {
197+
t.Fatalf("PV not found: %v", err)
198+
}
199+
if gotPV.Spec.NodeAffinity == nil || len(gotPV.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
200+
t.Errorf("Expected node affinity to be set on PV when supervisor PVC has topology annotation")
201+
}
202+
}
203+
204+
func TestAddNodeAffinityRulesOnPVTopologyAnnotationAbsent(t *testing.T) {
205+
ctx := context.Background()
206+
207+
// Create supervisor PVC without annotation
208+
supPVC := &v1.PersistentVolumeClaim{
209+
ObjectMeta: metav1.ObjectMeta{
210+
Name: "volume-2",
211+
Namespace: "sv-namespace",
212+
Annotations: map[string]string{},
213+
},
214+
}
215+
// Create guest PV without node affinity
216+
guestPV := &v1.PersistentVolume{
217+
ObjectMeta: metav1.ObjectMeta{
218+
Name: "pv-2",
219+
},
220+
Spec: v1.PersistentVolumeSpec{
221+
PersistentVolumeSource: v1.PersistentVolumeSource{
222+
CSI: &v1.CSIPersistentVolumeSource{
223+
VolumeHandle: "volume-2",
224+
},
225+
},
226+
},
227+
}
228+
229+
supervisorClient := k8sfake.NewSimpleClientset(supPVC)
230+
guestClient := k8sfake.NewSimpleClientset(guestPV)
231+
232+
// Setup metadataSyncer
233+
metadataSyncer := &metadataSyncInformer{
234+
supervisorClient: supervisorClient,
235+
}
236+
metadataSyncer.configInfo = &cnsconfig.ConfigurationInfo{
237+
Cfg: &cnsconfig.Config{
238+
GC: cnsconfig.GCConfig{
239+
Endpoint: "endpoint",
240+
Port: "443",
241+
},
242+
},
243+
}
244+
245+
// Patch k8sNewClient to return our guestClient
246+
origK8sClient := k8sNewClient
247+
defer func() {
248+
k8sNewClient = origK8sClient
249+
}()
250+
k8sNewClient = func(ctx context.Context) (clientset.Interface, error) {
251+
return guestClient, nil
252+
}
253+
254+
// Patch getPVsInBoundAvailableOrReleased to return our PV
255+
origGetPVs := getPVsInBoundAvailableOrReleased
256+
defer func() {
257+
getPVsInBoundAvailableOrReleased = origGetPVs
258+
}()
259+
getPVsInBoundAvailableOrReleased = func(ctx context.Context,
260+
syncer *metadataSyncInformer) ([]*v1.PersistentVolume, error) {
261+
return []*v1.PersistentVolume{guestPV}, nil
262+
}
263+
264+
// Patch cnsconfig.GetSupervisorNamespace to return our namespace
265+
origGetSuperNS := cnsconfigGetSupervisorNamespace
266+
defer func() {
267+
cnsconfigGetSupervisorNamespace = origGetSuperNS
268+
}()
269+
cnsconfigGetSupervisorNamespace = func(ctx context.Context) (string, error) {
270+
return "sv-namespace", nil
271+
}
272+
273+
// Reduce timeout value used in code for testing
274+
origTimeout := timeoutAddNodeAffinityOnPVs
275+
defer func() {
276+
timeoutAddNodeAffinityOnPVs = origTimeout
277+
}()
278+
timeoutAddNodeAffinityOnPVs = 15 * time.Second
279+
280+
// Run function
281+
AddNodeAffinityRulesOnPV(ctx, metadataSyncer)
282+
283+
// Verify node affinity NOT added as supervisor PVC doesn't have topology annotation
284+
gotPV, err := guestClient.CoreV1().PersistentVolumes().Get(ctx, "pv-2", metav1.GetOptions{})
285+
if err != nil {
286+
t.Fatalf("PV not found: %v", err)
287+
}
288+
if gotPV.Spec.NodeAffinity != nil {
289+
t.Errorf("Expected node affinity NOT to be set on PV when supervisor PVC has no topology annotation")
290+
}
291+
}

pkg/syncer/util.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ const (
5555

5656
// getPVsInBoundAvailableOrReleased return PVs in Bound, Available or Released
5757
// state.
58-
func getPVsInBoundAvailableOrReleased(ctx context.Context,
58+
var getPVsInBoundAvailableOrReleased = func(ctx context.Context,
5959
metadataSyncer *metadataSyncInformer) ([]*v1.PersistentVolume, error) {
6060
log := logger.GetLogger(ctx)
6161
var pvsInDesiredState []*v1.PersistentVolume
@@ -961,10 +961,6 @@ func hasClusterDistributionSet(ctx context.Context, volume cnstypes.CnsVolume,
961961
func generateVolumeAccessibleTopologyFromPVCAnnotation(claim *v1.PersistentVolumeClaim) (
962962
[]map[string]string, error) {
963963
volumeAccessibleTopology := claim.Annotations[common.AnnVolumeAccessibleTopology]
964-
if volumeAccessibleTopology == "" {
965-
return nil, fmt.Errorf("annotation %q is not set for the claim: %q, namespace: %q",
966-
common.AnnVolumeAccessibleTopology, claim.Name, claim.Namespace)
967-
}
968964
volumeAccessibleTopologyArray := make([]map[string]string, 0)
969965
err := json.Unmarshal([]byte(volumeAccessibleTopology), &volumeAccessibleTopologyArray)
970966
if err != nil {

0 commit comments

Comments
 (0)