Skip to content

Commit 232e731

Browse files
authored
Merge pull request #6674 from zhzhuang-zju/changebykarmada
fix the issue that the relevant fields in rb and pp are inconsistent
2 parents e7ecc93 + 5c2fbc2 commit 232e731

File tree

3 files changed

+146
-15
lines changed

3 files changed

+146
-15
lines changed

pkg/detector/detector.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -343,13 +343,26 @@ func (d *ResourceDetector) OnUpdate(oldObj, newObj interface{}) {
343343
return
344344
}
345345

346-
resourceChangeByKarmada := eventfilter.ResourceChangeByKarmada(unstructuredOldObj, unstructuredNewObj)
346+
isLazyActivation, err := d.isClaimedByLazyPolicy(unstructuredNewObj)
347+
if err != nil {
348+
// should never come here
349+
klog.Errorf("Failed to check if the object (kind=%s, %s/%s) is bound by lazy policy. err: %v", unstructuredNewObj.GetKind(), unstructuredNewObj.GetNamespace(), unstructuredNewObj.GetName(), err)
350+
}
347351

348-
resourceItem := ResourceItem{
349-
Obj: newRuntimeObj,
350-
ResourceChangeByKarmada: resourceChangeByKarmada,
352+
if isLazyActivation {
353+
resourceItem := ResourceItem{
354+
Obj: newRuntimeObj,
355+
ResourceChangeByKarmada: eventfilter.ResourceChangeByKarmada(unstructuredOldObj, unstructuredNewObj),
356+
}
357+
358+
d.Processor.Enqueue(resourceItem)
359+
return
351360
}
352361

362+
// For non-lazy policies, it is no need to distinguish whether the change is from Karmada or not.
363+
resourceItem := ResourceItem{
364+
Obj: newRuntimeObj,
365+
}
353366
d.Processor.Enqueue(resourceItem)
354367
}
355368

@@ -1208,7 +1221,7 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic
12081221
if err != nil {
12091222
return err
12101223
}
1211-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
1224+
d.enqueueResourceTemplateForPolicyChange(resourceKey, policy.Spec.ActivationPreference)
12121225
}
12131226

12141227
// check whether there are matched RT in waiting list, is so, add it to processor
@@ -1226,7 +1239,7 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic
12261239

12271240
for _, key := range matchedKeys {
12281241
d.RemoveWaiting(key)
1229-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
1242+
d.enqueueResourceTemplateForPolicyChange(key, policy.Spec.ActivationPreference)
12301243
}
12311244

12321245
// If preemption is enabled, handle the preemption process.
@@ -1275,14 +1288,14 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy
12751288
if err != nil {
12761289
return err
12771290
}
1278-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
1291+
d.enqueueResourceTemplateForPolicyChange(resourceKey, policy.Spec.ActivationPreference)
12791292
}
12801293
for _, crb := range clusterResourceBindings.Items {
12811294
resourceKey, err := helper.ConstructClusterWideKey(crb.Spec.Resource)
12821295
if err != nil {
12831296
return err
12841297
}
1285-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
1298+
d.enqueueResourceTemplateForPolicyChange(resourceKey, policy.Spec.ActivationPreference)
12861299
}
12871300

12881301
matchedKeys := d.GetMatching(policy.Spec.ResourceSelectors)
@@ -1299,7 +1312,7 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy
12991312

13001313
for _, key := range matchedKeys {
13011314
d.RemoveWaiting(key)
1302-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
1315+
d.enqueueResourceTemplateForPolicyChange(key, policy.Spec.ActivationPreference)
13031316
}
13041317

13051318
// If preemption is enabled, handle the preemption process.
@@ -1472,3 +1485,21 @@ func (d *ResourceDetector) applyReplicaInterpretation(object *unstructured.Unstr
14721485

14731486
return nil
14741487
}
1488+
1489+
// enqueueResourceTemplateForPolicyChange enqueues a resource template key for reconciliation in response to a
1490+
// PropagationPolicy or ClusterPropagationPolicy change. If the policy's ActivationPreference is set to Lazy,
1491+
// the ResourceChangeByKarmada flag is set to true, indicating that the resource template is being enqueued
1492+
// due to a policy change and should not be propagated to member clusters. For non-lazy policies, this flag
1493+
// is omitted as the distinction is unnecessary.
1494+
//
1495+
// Note: Setting ResourceChangeByKarmada changes the effective queue key. Mixing both true/false for the same
1496+
// resource may result in two different queue keys being processed concurrently, which can cause race conditions.
1497+
// Therefore, only set ResourceChangeByKarmada in lazy activation mode.
1498+
// For more details, see: https://github.com/karmada-io/karmada/issues/5996.
1499+
func (d *ResourceDetector) enqueueResourceTemplateForPolicyChange(key keys.ClusterWideKey, pref policyv1alpha1.ActivationPreference) {
1500+
if util.IsLazyActivationEnabled(pref) {
1501+
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
1502+
return
1503+
}
1504+
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key})
1505+
}

pkg/detector/detector_test.go

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,6 @@ func TestOnUpdate(t *testing.T) {
429429
oldObj interface{}
430430
newObj interface{}
431431
expectedEnqueue bool
432-
expectedChangeByKarmada bool
433432
expectToUnstructuredError bool
434433
}{
435434
{
@@ -460,8 +459,7 @@ func TestOnUpdate(t *testing.T) {
460459
},
461460
},
462461
},
463-
expectedEnqueue: true,
464-
expectedChangeByKarmada: false,
462+
expectedEnqueue: true,
465463
},
466464
{
467465
name: "update without changes",
@@ -524,8 +522,7 @@ func TestOnUpdate(t *testing.T) {
524522
},
525523
},
526524
},
527-
expectedEnqueue: true,
528-
expectedChangeByKarmada: true,
525+
expectedEnqueue: true,
529526
},
530527
{
531528
name: "core v1 object",
@@ -573,7 +570,6 @@ func TestOnUpdate(t *testing.T) {
573570
assert.IsType(t, ResourceItem{}, mockProcessor.lastEnqueued, "Enqueued item should be of type ResourceItem")
574571
enqueued := mockProcessor.lastEnqueued.(ResourceItem)
575572
assert.Equal(t, tt.newObj, enqueued.Obj, "Enqueued object should match the new object")
576-
assert.Equal(t, tt.expectedChangeByKarmada, enqueued.ResourceChangeByKarmada, "ResourceChangeByKarmada flag should match expected value")
577573
} else {
578574
assert.Equal(t, 0, mockProcessor.enqueueCount, "Object should not be enqueued")
579575
}
@@ -967,6 +963,71 @@ func TestApplyClusterPolicy(t *testing.T) {
967963
}
968964
}
969965

966+
func TestEnqueueResourceKeyWithActivationPref(t *testing.T) {
967+
testClusterWideKey := keys.ClusterWideKey{
968+
Group: "foo",
969+
Version: "foo",
970+
Kind: "foo",
971+
Namespace: "foo",
972+
Name: "foo",
973+
}
974+
tests := []struct {
975+
name string
976+
key keys.ClusterWideKey
977+
pref policyv1alpha1.ActivationPreference
978+
want keys.ClusterWideKeyWithConfig
979+
}{
980+
{
981+
name: "lazy pp and resourceChangeByKarmada is true",
982+
key: testClusterWideKey,
983+
pref: policyv1alpha1.LazyActivation,
984+
want: keys.ClusterWideKeyWithConfig{
985+
ClusterWideKey: testClusterWideKey,
986+
ResourceChangeByKarmada: true,
987+
},
988+
},
989+
{
990+
name: "non-lazy ignores ResourceChangeByKarmada",
991+
key: testClusterWideKey,
992+
pref: "",
993+
want: keys.ClusterWideKeyWithConfig{
994+
ClusterWideKey: testClusterWideKey,
995+
ResourceChangeByKarmada: false,
996+
},
997+
},
998+
}
999+
for _, tt := range tests {
1000+
t.Run(tt.name, func(t *testing.T) {
1001+
ctx, cancel := context.WithCancel(context.Background())
1002+
detector := ResourceDetector{
1003+
Processor: util.NewAsyncWorker(util.Options{
1004+
Name: "resource detector",
1005+
KeyFunc: ResourceItemKeyFunc,
1006+
ReconcileFunc: func(key util.QueueKey) (err error) {
1007+
defer cancel()
1008+
defer func() {
1009+
assert.NoError(t, err)
1010+
}()
1011+
clusterWideKeyWithConfig, ok := key.(keys.ClusterWideKeyWithConfig)
1012+
if !ok {
1013+
err = fmt.Errorf("invalid key")
1014+
return err
1015+
}
1016+
if clusterWideKeyWithConfig != tt.want {
1017+
err = fmt.Errorf("unexpected key. want:%+v, got:%+v", tt.want, clusterWideKeyWithConfig)
1018+
return err
1019+
}
1020+
return nil
1021+
},
1022+
}),
1023+
}
1024+
detector.Processor.Run(ctx, 1)
1025+
detector.enqueueResourceTemplateForPolicyChange(tt.key, tt.pref)
1026+
<-ctx.Done()
1027+
})
1028+
}
1029+
}
1030+
9701031
// Helper Functions
9711032

9721033
// setupTestScheme creates a runtime scheme with necessary types for testing

pkg/detector/policy.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,45 @@ func (d *ResourceDetector) listCPPDerivedCRBs(policyID, policyName string) (*wor
369369
return bindings, nil
370370
}
371371

372+
func (d *ResourceDetector) isClaimedByLazyPolicy(obj *unstructured.Unstructured) (bool, error) {
373+
policyAnnotations := obj.GetAnnotations()
374+
policyLabels := obj.GetLabels()
375+
policyNamespace := util.GetAnnotationValue(policyAnnotations, policyv1alpha1.PropagationPolicyNamespaceAnnotation)
376+
policyName := util.GetAnnotationValue(policyAnnotations, policyv1alpha1.PropagationPolicyNameAnnotation)
377+
claimedID := util.GetLabelValue(policyLabels, policyv1alpha1.PropagationPolicyPermanentIDLabel)
378+
if policyNamespace != "" && policyName != "" && claimedID != "" {
379+
matchedPropagationPolicy := &policyv1alpha1.PropagationPolicy{}
380+
err := d.Client.Get(context.TODO(), client.ObjectKey{Namespace: policyNamespace, Name: policyName}, matchedPropagationPolicy)
381+
if err != nil {
382+
if apierrors.IsNotFound(err) {
383+
return false, nil
384+
}
385+
386+
return false, err
387+
}
388+
389+
return util.IsLazyActivationEnabled(matchedPropagationPolicy.Spec.ActivationPreference), nil
390+
}
391+
392+
policyName = util.GetAnnotationValue(policyAnnotations, policyv1alpha1.ClusterPropagationPolicyAnnotation)
393+
claimedID = util.GetLabelValue(policyLabels, policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel)
394+
if policyName != "" && claimedID != "" {
395+
matchedClusterPropagationPolicy := &policyv1alpha1.ClusterPropagationPolicy{}
396+
err := d.Client.Get(context.TODO(), client.ObjectKey{Name: policyName}, matchedClusterPropagationPolicy)
397+
if err != nil {
398+
if apierrors.IsNotFound(err) {
399+
return false, nil
400+
}
401+
402+
return false, err
403+
}
404+
405+
return util.IsLazyActivationEnabled(matchedClusterPropagationPolicy.Spec.ActivationPreference), nil
406+
}
407+
408+
return false, nil
409+
}
410+
372411
// excludeClusterPolicy excludes cluster propagation policy.
373412
// If propagation policy was claimed, cluster propagation policy should not exist.
374413
func excludeClusterPolicy(obj metav1.Object) (hasClaimedClusterPolicy bool) {

0 commit comments

Comments
 (0)