Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions pkg/controllers/workapplier/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,20 @@ func (r *Reconciler) serverSideApply(
inMemberClusterObj.SetResourceVersion("")
}

// Check if forced server-side apply is needed even if it is not turned on by the user.
//
// Note (chenyu1): This is added to addresses cases where Kubernetes might register
// Fleet (the member agent) as an Update typed field manager for the object, which blocks
// the same agent itself from performing a server-side apply due to conflicts,
// as Kubernetes considers Update typed and Apply typed field managers to be different
// entities, despite having the same identifier. In these cases, users will see their
// first apply attempt being successful, yet any subsequent update would fail due to
// conflicts. There are also a few other similar cases that are solved by this check;
// see the inner comments for specifics.
if shouldUseForcedServerSideApply(inMemberClusterObj) {
force = true
}

// Use server-side apply to apply the manifest object.
//
// See the Kubernetes documentation on structured merged diff for the exact behaviors.
Expand Down Expand Up @@ -586,3 +600,39 @@ func shouldEnableOptimisticLock(applyStrategy *fleetv1beta1.ApplyStrategy) bool
// Optimistic lock is enabled if the apply strategy is set to IfNotDrifted.
return applyStrategy.WhenToApply == fleetv1beta1.WhenToApplyTypeIfNotDrifted
}

// shouldUseForcedServerSideApply checks if forced server-side apply should be used even if
// the force option is not turned on.
func shouldUseForcedServerSideApply(inMemberClusterObj *unstructured.Unstructured) bool {
managedFields := inMemberClusterObj.GetManagedFields()
for idx := range managedFields {
mf := &managedFields[idx]
// workFieldManagerName is the field manager name used by Fleet; its presence
// suggests that some (not necessarily all) fields are managed by Fleet.
//
// `before-first-apply` is a field manager name used by Kubernetes to "properly"
// track field managers between non-apply and apply ops. Specifically, this
// manager is added when an object is being applied, but Kubernetes finds
// that the object does not have any managed field specified.
//
// Note (chenyu1): unfortunately this name is not exposed as a public variable. See
// the Kubernetes source code for more information.
if mf.Manager != workFieldManagerName && mf.Manager != "before-first-apply" {
// There exists a field manager this is neither Fleet nor the `before-first-apply`
// field manager, which suggests that the object (or at least some of its fields)
// is managed by another entity. Fleet will not enable forced server-side apply in
// this case and let user decide if forced apply is needed.
klog.V(2).InfoS("Found a field manager that is neither Fleet nor the `before-first-apply` field manager; Fleet will not enable forced server-side apply unless explicitly requested",
"fieldManager", mf.Manager,
"GVK", inMemberClusterObj.GroupVersionKind(), "inMemberClusterObj", klog.KObj(inMemberClusterObj))
return false
}
}

// All field managers are either Fleet or the `before-first-apply` field manager;
// use forced server-side apply to avoid confusing self-conflicts. This would
// allow Fleet to (correctly) assume ownership of managed fields.
klog.V(2).InfoS("All field managers are either Fleet or the `before-first-apply` field manager; Fleet will enable forced server-side apply",
"GVK", inMemberClusterObj.GroupVersionKind(), "inMemberClusterObj", klog.KObj(inMemberClusterObj))
return true
}
91 changes: 91 additions & 0 deletions pkg/controllers/workapplier/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
)
Expand Down Expand Up @@ -459,3 +460,93 @@ func TestSetFleetLastAppliedAnnotation(t *testing.T) {
})
}
}

// TestShouldUseForcedServerSideApply tests the shouldUseForcedServerSideApply function.
func TestShouldUseForcedServerSideApply(t *testing.T) {
testCases := []struct {
name string
inMemberClusterObj client.Object
wantShouldUseForcedServerSideApply bool
}{
{
name: "object under Fleet's management",
inMemberClusterObj: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: workFieldManagerName,
Operation: metav1.ManagedFieldsOperationUpdate,
},
},
},
},
wantShouldUseForcedServerSideApply: true,
},
{
name: "object under before-first-apply field manager's management",
inMemberClusterObj: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "before-first-apply",
Operation: metav1.ManagedFieldsOperationUpdate,
},
},
},
},
wantShouldUseForcedServerSideApply: true,
},
{
name: "object under both Fleet and before-first-apply field managers' management",
inMemberClusterObj: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "before-first-apply",
Operation: metav1.ManagedFieldsOperationUpdate,
},
{
Manager: workFieldManagerName,
Operation: metav1.ManagedFieldsOperationUpdate,
},
},
},
},
wantShouldUseForcedServerSideApply: true,
},
{
name: "object under other field manager's management",
inMemberClusterObj: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "before-first-apply",
Operation: metav1.ManagedFieldsOperationUpdate,
},
{
Manager: "3rd-party-manager",
Operation: metav1.ManagedFieldsOperationApply,
},
{
Manager: workFieldManagerName,
Operation: metav1.ManagedFieldsOperationUpdate,
},
},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
got := shouldUseForcedServerSideApply(toUnstructured(t, tc.inMemberClusterObj))
if got != tc.wantShouldUseForcedServerSideApply {
t.Errorf("shouldUseForcedServerSideApply() = %t, want %t", got, tc.wantShouldUseForcedServerSideApply)
}
})
}
}
117 changes: 117 additions & 0 deletions test/e2e/placement_apply_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,123 @@ var _ = Describe("validating CRP when resources exists", Ordered, func() {
})
})

var _ = Describe("SSA", Ordered, func() {
Context("use server-side apply to place resources (with changes)", func() {
crpName := fmt.Sprintf(crpNameTemplate, GinkgoParallelProcess())
nsName := fmt.Sprintf(workNamespaceNameTemplate, GinkgoParallelProcess())
cmName := fmt.Sprintf(appConfigMapNameTemplate, GinkgoParallelProcess())

// The key here should match the one used in the default config map.
cmDataKey := "data"
cmDataVal1 := "foobar"

BeforeAll(func() {
// Create the resources on the hub cluster.
createWorkResources()

// Create the CRP.
crp := &placementv1beta1.ClusterResourcePlacement{
ObjectMeta: metav1.ObjectMeta{
Name: crpName,
// Add a custom finalizer; this would allow us to better observe
// the behavior of the controllers.
Finalizers: []string{customDeletionBlockerFinalizer},
},
Spec: placementv1beta1.ClusterResourcePlacementSpec{
ResourceSelectors: workResourceSelector(),
Policy: &placementv1beta1.PlacementPolicy{
PlacementType: placementv1beta1.PickAllPlacementType,
},
Strategy: placementv1beta1.RolloutStrategy{
Type: placementv1beta1.RollingUpdateRolloutStrategyType,
RollingUpdate: &placementv1beta1.RollingUpdateConfig{
MaxUnavailable: ptr.To(intstr.FromInt(1)),
MaxSurge: ptr.To(intstr.FromInt(1)),
UnavailablePeriodSeconds: ptr.To(2),
},
ApplyStrategy: &placementv1beta1.ApplyStrategy{
Type: placementv1beta1.ApplyStrategyTypeServerSideApply,
},
},
},
}
Expect(hubClient.Create(ctx, crp)).To(Succeed())
})

It("should update CRP status as expected", func() {
crpStatusUpdatedActual := crpStatusUpdatedActual(workResourceIdentifiers(), allMemberClusterNames, nil, "0")
Eventually(crpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP status as expected")
})

It("should place the resources on all member clusters", checkIfPlacedWorkResourcesOnAllMemberClusters)

It("can update the manifests", func() {
Eventually(func() error {
cm := &corev1.ConfigMap{}
if err := hubClient.Get(ctx, client.ObjectKey{Name: cmName, Namespace: nsName}, cm); err != nil {
return fmt.Errorf("failed to get configmap: %w", err)
}

if cm.Data == nil {
cm.Data = make(map[string]string)
}
cm.Data[cmDataKey] = cmDataVal1

if err := hubClient.Update(ctx, cm); err != nil {
return fmt.Errorf("failed to update configmap: %w", err)
}
return nil
}, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update the manifests")
})

It("should update CRP status as expected", func() {
crpStatusUpdatedActual := crpStatusUpdatedActual(workResourceIdentifiers(), allMemberClusterNames, nil, "1")
// Longer timeout is used to allow full rollouts.
Eventually(crpStatusUpdatedActual, workloadEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP status as expected")
})

It("should refresh the resources on all member clusters", func() {
for idx := range allMemberClusters {
memberClient := allMemberClusters[idx].KubeClient

Eventually(func() error {
cm := &corev1.ConfigMap{}
if err := memberClient.Get(ctx, client.ObjectKey{Name: cmName, Namespace: nsName}, cm); err != nil {
return fmt.Errorf("failed to get configmap: %w", err)
}

// To keep things simple, here the config map for comparison is
// rebuilt from the retrieved data.
rebuiltCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cm.Name,
Namespace: cm.Namespace,
},
Data: cm.Data,
}
wantCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cmName,
Namespace: nsName,
},
Data: map[string]string{
cmDataKey: cmDataVal1,
},
}
if diff := cmp.Diff(rebuiltCM, wantCM); diff != "" {
return fmt.Errorf("configMap diff (-got, +want):\n%s", diff)
}
return nil
}, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to refresh the resources on %s", allMemberClusters[idx].ClusterName)
}
})

AfterAll(func() {
ensureCRPAndRelatedResourcesDeleted(crpName, allMemberClusters)
})
})
})

var _ = Describe("switching apply strategies", func() {
Context("switch from client-side apply to report diff", Ordered, func() {
crpName := fmt.Sprintf(crpNameTemplate, GinkgoParallelProcess())
Expand Down
Loading