Skip to content

Commit 5c2fbc2

Browse files
committed
fix the issue that the relevant fields in rb and pp are inconsistent
Signed-off-by: zhzhuang-zju <[email protected]>
1 parent 1c6f1db commit 5c2fbc2

File tree

3 files changed

+146
-15
lines changed

3 files changed

+146
-15
lines changed

pkg/detector/detector.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -343,13 +343,26 @@ func (d *ResourceDetector) OnUpdate(oldObj, newObj interface{}) {
343343
return
344344
}
345345

346-
resourceChangeByKarmada := eventfilter.ResourceChangeByKarmada(unstructuredOldObj, unstructuredNewObj)
346+
isLazyActivation, err := d.isClaimedByLazyPolicy(unstructuredNewObj)
347+
if err != nil {
348+
// should never come here
349+
klog.Errorf("Failed to check if the object (kind=%s, %s/%s) is bound by lazy policy. err: %v", unstructuredNewObj.GetKind(), unstructuredNewObj.GetNamespace(), unstructuredNewObj.GetName(), err)
350+
}
347351

348-
resourceItem := ResourceItem{
349-
Obj: newRuntimeObj,
350-
ResourceChangeByKarmada: resourceChangeByKarmada,
352+
if isLazyActivation {
353+
resourceItem := ResourceItem{
354+
Obj: newRuntimeObj,
355+
ResourceChangeByKarmada: eventfilter.ResourceChangeByKarmada(unstructuredOldObj, unstructuredNewObj),
356+
}
357+
358+
d.Processor.Enqueue(resourceItem)
359+
return
351360
}
352361

362+
// For non-lazy policies, it is no need to distinguish whether the change is from Karmada or not.
363+
resourceItem := ResourceItem{
364+
Obj: newRuntimeObj,
365+
}
353366
d.Processor.Enqueue(resourceItem)
354367
}
355368

@@ -1158,7 +1171,7 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic
11581171
if err != nil {
11591172
return err
11601173
}
1161-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
1174+
d.enqueueResourceTemplateForPolicyChange(resourceKey, policy.Spec.ActivationPreference)
11621175
}
11631176

11641177
// check whether there are matched RT in waiting list, is so, add it to processor
@@ -1176,7 +1189,7 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic
11761189

11771190
for _, key := range matchedKeys {
11781191
d.RemoveWaiting(key)
1179-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
1192+
d.enqueueResourceTemplateForPolicyChange(key, policy.Spec.ActivationPreference)
11801193
}
11811194

11821195
// If preemption is enabled, handle the preemption process.
@@ -1225,14 +1238,14 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy
12251238
if err != nil {
12261239
return err
12271240
}
1228-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
1241+
d.enqueueResourceTemplateForPolicyChange(resourceKey, policy.Spec.ActivationPreference)
12291242
}
12301243
for _, crb := range clusterResourceBindings.Items {
12311244
resourceKey, err := helper.ConstructClusterWideKey(crb.Spec.Resource)
12321245
if err != nil {
12331246
return err
12341247
}
1235-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
1248+
d.enqueueResourceTemplateForPolicyChange(resourceKey, policy.Spec.ActivationPreference)
12361249
}
12371250

12381251
matchedKeys := d.GetMatching(policy.Spec.ResourceSelectors)
@@ -1249,7 +1262,7 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy
12491262

12501263
for _, key := range matchedKeys {
12511264
d.RemoveWaiting(key)
1252-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
1265+
d.enqueueResourceTemplateForPolicyChange(key, policy.Spec.ActivationPreference)
12531266
}
12541267

12551268
// If preemption is enabled, handle the preemption process.
@@ -1373,3 +1386,21 @@ func (d *ResourceDetector) applyReplicaInterpretation(object *unstructured.Unstr
13731386

13741387
return nil
13751388
}
1389+
1390+
// enqueueResourceTemplateForPolicyChange enqueues a resource template key for reconciliation in response to a
1391+
// PropagationPolicy or ClusterPropagationPolicy change. If the policy's ActivationPreference is set to Lazy,
1392+
// the ResourceChangeByKarmada flag is set to true, indicating that the resource template is being enqueued
1393+
// due to a policy change and should not be propagated to member clusters. For non-lazy policies, this flag
1394+
// is omitted as the distinction is unnecessary.
1395+
//
1396+
// Note: Setting ResourceChangeByKarmada changes the effective queue key. Mixing both true/false for the same
1397+
// resource may result in two different queue keys being processed concurrently, which can cause race conditions.
1398+
// Therefore, only set ResourceChangeByKarmada in lazy activation mode.
1399+
// For more details, see: https://github.com/karmada-io/karmada/issues/5996.
1400+
func (d *ResourceDetector) enqueueResourceTemplateForPolicyChange(key keys.ClusterWideKey, pref policyv1alpha1.ActivationPreference) {
1401+
if util.IsLazyActivationEnabled(pref) {
1402+
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
1403+
return
1404+
}
1405+
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key})
1406+
}

pkg/detector/detector_test.go

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,6 @@ func TestOnUpdate(t *testing.T) {
429429
oldObj interface{}
430430
newObj interface{}
431431
expectedEnqueue bool
432-
expectedChangeByKarmada bool
433432
expectToUnstructuredError bool
434433
}{
435434
{
@@ -460,8 +459,7 @@ func TestOnUpdate(t *testing.T) {
460459
},
461460
},
462461
},
463-
expectedEnqueue: true,
464-
expectedChangeByKarmada: false,
462+
expectedEnqueue: true,
465463
},
466464
{
467465
name: "update without changes",
@@ -524,8 +522,7 @@ func TestOnUpdate(t *testing.T) {
524522
},
525523
},
526524
},
527-
expectedEnqueue: true,
528-
expectedChangeByKarmada: true,
525+
expectedEnqueue: true,
529526
},
530527
{
531528
name: "core v1 object",
@@ -573,7 +570,6 @@ func TestOnUpdate(t *testing.T) {
573570
assert.IsType(t, ResourceItem{}, mockProcessor.lastEnqueued, "Enqueued item should be of type ResourceItem")
574571
enqueued := mockProcessor.lastEnqueued.(ResourceItem)
575572
assert.Equal(t, tt.newObj, enqueued.Obj, "Enqueued object should match the new object")
576-
assert.Equal(t, tt.expectedChangeByKarmada, enqueued.ResourceChangeByKarmada, "ResourceChangeByKarmada flag should match expected value")
577573
} else {
578574
assert.Equal(t, 0, mockProcessor.enqueueCount, "Object should not be enqueued")
579575
}
@@ -967,6 +963,71 @@ func TestApplyClusterPolicy(t *testing.T) {
967963
}
968964
}
969965

966+
func TestEnqueueResourceKeyWithActivationPref(t *testing.T) {
967+
testClusterWideKey := keys.ClusterWideKey{
968+
Group: "foo",
969+
Version: "foo",
970+
Kind: "foo",
971+
Namespace: "foo",
972+
Name: "foo",
973+
}
974+
tests := []struct {
975+
name string
976+
key keys.ClusterWideKey
977+
pref policyv1alpha1.ActivationPreference
978+
want keys.ClusterWideKeyWithConfig
979+
}{
980+
{
981+
name: "lazy pp and resourceChangeByKarmada is true",
982+
key: testClusterWideKey,
983+
pref: policyv1alpha1.LazyActivation,
984+
want: keys.ClusterWideKeyWithConfig{
985+
ClusterWideKey: testClusterWideKey,
986+
ResourceChangeByKarmada: true,
987+
},
988+
},
989+
{
990+
name: "non-lazy ignores ResourceChangeByKarmada",
991+
key: testClusterWideKey,
992+
pref: "",
993+
want: keys.ClusterWideKeyWithConfig{
994+
ClusterWideKey: testClusterWideKey,
995+
ResourceChangeByKarmada: false,
996+
},
997+
},
998+
}
999+
for _, tt := range tests {
1000+
t.Run(tt.name, func(t *testing.T) {
1001+
ctx, cancel := context.WithCancel(context.Background())
1002+
detector := ResourceDetector{
1003+
Processor: util.NewAsyncWorker(util.Options{
1004+
Name: "resource detector",
1005+
KeyFunc: ResourceItemKeyFunc,
1006+
ReconcileFunc: func(key util.QueueKey) (err error) {
1007+
defer cancel()
1008+
defer func() {
1009+
assert.NoError(t, err)
1010+
}()
1011+
clusterWideKeyWithConfig, ok := key.(keys.ClusterWideKeyWithConfig)
1012+
if !ok {
1013+
err = fmt.Errorf("invalid key")
1014+
return err
1015+
}
1016+
if clusterWideKeyWithConfig != tt.want {
1017+
err = fmt.Errorf("unexpected key. want:%+v, got:%+v", tt.want, clusterWideKeyWithConfig)
1018+
return err
1019+
}
1020+
return nil
1021+
},
1022+
}),
1023+
}
1024+
detector.Processor.Run(ctx, 1)
1025+
detector.enqueueResourceTemplateForPolicyChange(tt.key, tt.pref)
1026+
<-ctx.Done()
1027+
})
1028+
}
1029+
}
1030+
9701031
// Helper Functions
9711032

9721033
// setupTestScheme creates a runtime scheme with necessary types for testing

pkg/detector/policy.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,45 @@ func (d *ResourceDetector) listCPPDerivedCRBs(policyID, policyName string) (*wor
332332
return bindings, nil
333333
}
334334

335+
func (d *ResourceDetector) isClaimedByLazyPolicy(obj *unstructured.Unstructured) (bool, error) {
336+
policyAnnotations := obj.GetAnnotations()
337+
policyLabels := obj.GetLabels()
338+
policyNamespace := util.GetAnnotationValue(policyAnnotations, policyv1alpha1.PropagationPolicyNamespaceAnnotation)
339+
policyName := util.GetAnnotationValue(policyAnnotations, policyv1alpha1.PropagationPolicyNameAnnotation)
340+
claimedID := util.GetLabelValue(policyLabels, policyv1alpha1.PropagationPolicyPermanentIDLabel)
341+
if policyNamespace != "" && policyName != "" && claimedID != "" {
342+
matchedPropagationPolicy := &policyv1alpha1.PropagationPolicy{}
343+
err := d.Client.Get(context.TODO(), client.ObjectKey{Namespace: policyNamespace, Name: policyName}, matchedPropagationPolicy)
344+
if err != nil {
345+
if apierrors.IsNotFound(err) {
346+
return false, nil
347+
}
348+
349+
return false, err
350+
}
351+
352+
return util.IsLazyActivationEnabled(matchedPropagationPolicy.Spec.ActivationPreference), nil
353+
}
354+
355+
policyName = util.GetAnnotationValue(policyAnnotations, policyv1alpha1.ClusterPropagationPolicyAnnotation)
356+
claimedID = util.GetLabelValue(policyLabels, policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel)
357+
if policyName != "" && claimedID != "" {
358+
matchedClusterPropagationPolicy := &policyv1alpha1.ClusterPropagationPolicy{}
359+
err := d.Client.Get(context.TODO(), client.ObjectKey{Name: policyName}, matchedClusterPropagationPolicy)
360+
if err != nil {
361+
if apierrors.IsNotFound(err) {
362+
return false, nil
363+
}
364+
365+
return false, err
366+
}
367+
368+
return util.IsLazyActivationEnabled(matchedClusterPropagationPolicy.Spec.ActivationPreference), nil
369+
}
370+
371+
return false, nil
372+
}
373+
335374
// excludeClusterPolicy excludes cluster propagation policy.
336375
// If propagation policy was claimed, cluster propagation policy should not exist.
337376
func excludeClusterPolicy(obj metav1.Object) (hasClaimedClusterPolicy bool) {

0 commit comments

Comments
 (0)