Skip to content

Commit 0688c4d

Browse files
authored
feat: enable the new work applier (#1042)
1 parent 7952705 commit 0688c4d

20 files changed

+3208
-352
lines changed

cmd/memberagent/main.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@ import (
4343
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
4444
imcv1alpha1 "go.goms.io/fleet/pkg/controllers/internalmembercluster/v1alpha1"
4545
imcv1beta1 "go.goms.io/fleet/pkg/controllers/internalmembercluster/v1beta1"
46-
"go.goms.io/fleet/pkg/controllers/work"
46+
"go.goms.io/fleet/pkg/controllers/workapplier"
4747
workv1alpha1controller "go.goms.io/fleet/pkg/controllers/workv1alpha1"
4848
fleetmetrics "go.goms.io/fleet/pkg/metrics"
4949
"go.goms.io/fleet/pkg/propertyprovider"
5050
"go.goms.io/fleet/pkg/propertyprovider/azure"
5151
"go.goms.io/fleet/pkg/utils"
5252
"go.goms.io/fleet/pkg/utils/httpclient"
53+
"go.goms.io/fleet/pkg/utils/parallelizer"
5354
//+kubebuilder:scaffold:imports
5455
)
5556

@@ -68,12 +69,14 @@ var (
6869
metricsAddr = flag.String("metrics-bind-address", ":8090", "The address the metric endpoint binds to.")
6970
enableLeaderElection = flag.Bool("leader-elect", false,
7071
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
71-
leaderElectionNamespace = flag.String("leader-election-namespace", "kube-system", "The namespace in which the leader election resource will be created.")
72-
enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", true, "If set, the agents will watch for the v1alpha1 APIs.")
73-
enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", false, "If set, the agents will watch for the v1beta1 APIs.")
74-
propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.")
75-
region = flag.String("region", "", "The region where the member cluster resides.")
76-
cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.")
72+
leaderElectionNamespace = flag.String("leader-election-namespace", "kube-system", "The namespace in which the leader election resource will be created.")
73+
enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", true, "If set, the agents will watch for the v1alpha1 APIs.")
74+
enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", false, "If set, the agents will watch for the v1beta1 APIs.")
75+
propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.")
76+
region = flag.String("region", "", "The region where the member cluster resides.")
77+
cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.")
78+
availabilityCheckInterval = flag.Int("availability-check-interval", 5, "The interval in seconds between attempts to check for resource availability when resources are not yet available.")
79+
driftDetectionInterval = flag.Int("drift-detection-interval", 15, "The interval in seconds between attempts to detect configuration drifts in the cluster.")
7780
)
7881

7982
func init() {
@@ -349,11 +352,20 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
349352
return err
350353
}
351354
// create the work controller, so we can pass it to the internal member cluster reconciler
352-
workController := work.NewApplyWorkReconciler(
355+
workController := workapplier.NewReconciler(
353356
hubMgr.GetClient(),
357+
targetNS,
354358
spokeDynamicClient,
355359
memberMgr.GetClient(),
356-
restMapper, hubMgr.GetEventRecorderFor("work_controller"), 5, targetNS)
360+
restMapper,
361+
hubMgr.GetEventRecorderFor("work_applier"),
362+
// The number of concurrent reconcilations. This is set to 5 to boost performance in
363+
// resource processing.
364+
5,
365+
// Use the default worker count (4) for parallelized manifest processing.
366+
parallelizer.DefaultNumOfWorkers,
367+
time.Second*time.Duration(*availabilityCheckInterval),
368+
time.Second*time.Duration(*driftDetectionInterval))
357369

358370
if err = workController.SetupWithManager(hubMgr); err != nil {
359371
klog.ErrorS(err, "Failed to create v1beta1 controller", "controller", "work")

pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"path/filepath"
1111
"sync"
1212
"testing"
13+
"time"
1314

1415
. "github.com/onsi/ginkgo/v2"
1516
. "github.com/onsi/gomega"
@@ -30,7 +31,7 @@ import (
3031
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
3132

3233
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
33-
"go.goms.io/fleet/pkg/controllers/work"
34+
"go.goms.io/fleet/pkg/controllers/workapplier"
3435
"go.goms.io/fleet/pkg/propertyprovider"
3536
)
3637

@@ -60,8 +61,8 @@ var (
6061
hubClient client.Client
6162
member1Client client.Client
6263
member2Client client.Client
63-
workApplier1 *work.ApplyWorkReconciler
64-
workApplier2 *work.ApplyWorkReconciler
64+
workApplier1 *workapplier.Reconciler
65+
workApplier2 *workapplier.Reconciler
6566
propertyProvider1 *manuallyUpdatedProvider
6667

6768
ctx context.Context
@@ -367,7 +368,7 @@ var _ = BeforeSuite(func() {
367368

368369
// This controller is created for testing purposes only; no reconciliation loop is actually
369370
// run.
370-
workApplier1 = work.NewApplyWorkReconciler(hubClient, nil, nil, nil, nil, 0, "")
371+
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, 1, time.Second*5, time.Second*5)
371372

372373
propertyProvider1 = &manuallyUpdatedProvider{}
373374
member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1)
@@ -390,7 +391,7 @@ var _ = BeforeSuite(func() {
390391

391392
// This controller is created for testing purposes only; no reconciliation loop is actually
392393
// run.
393-
workApplier2 = work.NewApplyWorkReconciler(hubClient, nil, nil, nil, nil, 0, "")
394+
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, 1, time.Second*5, time.Second*5)
394395

395396
member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil)
396397
Expect(err).NotTo(HaveOccurred())

pkg/controllers/rollout/controller.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030

3131
fleetv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1"
3232
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
33-
"go.goms.io/fleet/pkg/controllers/work"
33+
"go.goms.io/fleet/pkg/controllers/workapplier"
3434
bindingutils "go.goms.io/fleet/pkg/utils/binding"
3535
"go.goms.io/fleet/pkg/utils/condition"
3636
"go.goms.io/fleet/pkg/utils/controller"
@@ -626,7 +626,10 @@ func isBindingReady(binding *fleetv1beta1.ClusterResourceBinding, readyTimeCutOf
626626
// find the latest applied condition that has the same generation as the binding
627627
availableCondition := binding.GetCondition(string(fleetv1beta1.ResourceBindingAvailable))
628628
if condition.IsConditionStatusTrue(availableCondition, binding.GetGeneration()) {
629-
if availableCondition.Reason != work.WorkNotTrackableReason {
629+
// TO-DO (chenyu1): currently it checks for both the new and the old reason
630+
// (as set previously by the work generator) to avoid compatibility issues.
631+
// the check for the old reason can be removed once the rollout completes successfully.
632+
if availableCondition.Reason != condition.WorkNotAvailabilityTrackableReason && availableCondition.Reason != workapplier.WorkNotAllManifestsTrackableReason {
630633
return 0, true
631634
}
632635

@@ -1035,7 +1038,7 @@ func (r *Reconciler) processApplyStrategyUpdates(
10351038
// Verify if the binding has the latest apply strategy set.
10361039
if equality.Semantic.DeepEqual(binding.Spec.ApplyStrategy, applyStrategy) {
10371040
// The binding already has the latest apply strategy set; no need to push the update.
1038-
klog.V(3).InfoS("The binding already has the latest apply strategy; skip the apply strategy update", "clusterResourceBinding", klog.KObj(binding))
1041+
klog.V(2).InfoS("The binding already has the latest apply strategy; skip the apply strategy update", "clusterResourceBinding", klog.KObj(binding), "bindingGeneration", binding.Generation)
10391042
continue
10401043
}
10411044

@@ -1052,7 +1055,7 @@ func (r *Reconciler) processApplyStrategyUpdates(
10521055
klog.ErrorS(err, "Failed to update binding with new apply strategy", "clusterResourceBinding", klog.KObj(binding))
10531056
return controller.NewAPIServerError(false, err)
10541057
}
1055-
klog.V(2).InfoS("Updated binding with new apply strategy", "clusterResourceBinding", klog.KObj(binding))
1058+
klog.V(2).InfoS("Updated binding with new apply strategy", "clusterResourceBinding", klog.KObj(binding), "beforeUpdateBindingGeneration", binding.Generation, "afterUpdateBindingGeneration", updatedBinding.Generation)
10561059
return nil
10571060
})
10581061
}

pkg/controllers/rollout/controller_integration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"k8s.io/utils/ptr"
2323

2424
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
25-
"go.goms.io/fleet/pkg/controllers/work"
25+
"go.goms.io/fleet/pkg/controllers/workapplier"
2626
"go.goms.io/fleet/pkg/utils"
2727
"go.goms.io/fleet/pkg/utils/condition"
2828
)
@@ -1045,7 +1045,7 @@ func markBindingAvailable(binding *fleetv1beta1.ClusterResourceBinding, trackabl
10451045
Eventually(func() error {
10461046
reason := "trackable"
10471047
if !trackable {
1048-
reason = work.WorkNotTrackableReason
1048+
reason = workapplier.WorkNotAllManifestsTrackableReason
10491049
}
10501050
binding.SetConditions(metav1.Condition{
10511051
Type: string(fleetv1beta1.ResourceBindingAvailable),

pkg/controllers/rollout/controller_test.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
2727
fleetv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1"
2828
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
29-
"go.goms.io/fleet/pkg/controllers/work"
29+
"go.goms.io/fleet/pkg/controllers/workapplier"
3030
"go.goms.io/fleet/pkg/utils/condition"
3131
"go.goms.io/fleet/pkg/utils/controller"
3232
)
@@ -668,7 +668,7 @@ func TestIsBindingReady(t *testing.T) {
668668
LastTransitionTime: metav1.Time{
669669
Time: now.Add(-time.Millisecond),
670670
},
671-
Reason: work.WorkNotTrackableReason,
671+
Reason: condition.WorkNotAvailabilityTrackableReason,
672672
},
673673
},
674674
},
@@ -691,7 +691,30 @@ func TestIsBindingReady(t *testing.T) {
691691
LastTransitionTime: metav1.Time{
692692
Time: now.Add(time.Millisecond),
693693
},
694-
Reason: work.WorkNotTrackableReason,
694+
Reason: condition.WorkNotAvailabilityTrackableReason,
695+
},
696+
},
697+
},
698+
},
699+
readyTimeCutOff: now,
700+
wantReady: false,
701+
wantWaitTime: time.Millisecond,
702+
},
703+
"binding available (not trackable with old reason) after the ready time cut off should return not ready with a wait time": {
704+
binding: &fleetv1beta1.ClusterResourceBinding{
705+
ObjectMeta: metav1.ObjectMeta{
706+
Generation: 10,
707+
},
708+
Status: fleetv1beta1.ResourceBindingStatus{
709+
Conditions: []metav1.Condition{
710+
{
711+
Type: string(fleetv1beta1.ResourceBindingAvailable),
712+
Status: metav1.ConditionTrue,
713+
ObservedGeneration: 10,
714+
LastTransitionTime: metav1.Time{
715+
Time: now.Add(time.Millisecond),
716+
},
717+
Reason: workapplier.WorkNotAllManifestsTrackableReason,
695718
},
696719
},
697720
},
@@ -804,7 +827,7 @@ func TestIsBindingReady(t *testing.T) {
804827
Type: string(fleetv1beta1.ResourceBindingAvailable),
805828
Status: metav1.ConditionTrue,
806829
LastTransitionTime: metav1.NewTime(now.Add(-5 * time.Second)),
807-
Reason: work.WorkAvailableReason,
830+
Reason: workapplier.WorkAllManifestsAvailableReason,
808831
ObservedGeneration: 10,
809832
},
810833
{
@@ -2275,7 +2298,7 @@ func generateReadyClusterResourceBinding(state fleetv1beta1.BindingState, resour
22752298
{
22762299
Type: string(fleetv1beta1.ResourceBindingAvailable),
22772300
Status: metav1.ConditionTrue,
2278-
Reason: work.WorkAvailableReason, // Make it ready
2301+
Reason: workapplier.WorkAllManifestsAvailableReason, // Make it ready
22792302
},
22802303
}
22812304
return binding
@@ -2292,7 +2315,7 @@ func generateNotTrackableClusterResourceBinding(state fleetv1beta1.BindingState,
22922315
Type: string(fleetv1beta1.ResourceBindingAvailable),
22932316
Status: metav1.ConditionTrue,
22942317
LastTransitionTime: lastTransitionTime,
2295-
Reason: work.WorkNotTrackableReason, // Make it not ready
2318+
Reason: condition.WorkNotAvailabilityTrackableReason, // Make it not ready
22962319
},
22972320
}
22982321
return binding

pkg/controllers/workapplier/controller.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"sigs.k8s.io/controller-runtime/pkg/predicate"
4545

4646
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
47+
"go.goms.io/fleet/pkg/controllers/work"
4748
"go.goms.io/fleet/pkg/utils/controller"
4849
"go.goms.io/fleet/pkg/utils/defaulter"
4950
"go.goms.io/fleet/pkg/utils/parallelizer"
@@ -113,7 +114,18 @@ func NewReconciler(
113114

114115
var (
115116
// Some exported reasons for Work object conditions. Currently only the untrackable reason is being actively used.
116-
WorkNotAllManifestsTrackableReason = "SomeManifestsAreNotAvailabilityTrackable"
117+
118+
// This is a new reason for the Availability condition when the manifests are not
119+
// trackable for availability. This value is currently unused.
120+
//
121+
// TO-DO (chenyu1): switch to the new reason after proper rollout.
122+
WorkNotAllManifestsTrackableReasonNew = "SomeManifestsAreNotAvailabilityTrackable"
123+
// This reason uses the exact same value as the one kept in the old work applier for
124+
// compatibility reasons. It helps guard the case where the member agent is upgraded
125+
// before the hub agent.
126+
//
127+
// TO-DO (chenyu1): switch off the old reason after proper rollout.
128+
WorkNotAllManifestsTrackableReason = work.WorkNotTrackableReason
117129
WorkAllManifestsAppliedReason = "AllManifestsApplied"
118130
WorkAllManifestsAvailableReason = "AllManifestsAvailable"
119131
WorkAllManifestsDiffReportedReason = "AllManifestsDiffReported"
@@ -143,7 +155,8 @@ const (
143155
ManifestProcessingApplyResultTypeNotTakenOver manifestProcessingAppliedResultType = "NotTakenOver"
144156
ManifestProcessingApplyResultTypeFailedToRunDriftDetection manifestProcessingAppliedResultType = "FailedToRunDriftDetection"
145157
ManifestProcessingApplyResultTypeFoundDrifts manifestProcessingAppliedResultType = "FoundDrifts"
146-
ManifestProcessingApplyResultTypeFailedToApply manifestProcessingAppliedResultType = "FailedToApply"
158+
// Note that the reason string below uses the same value as kept in the old work applier.
159+
ManifestProcessingApplyResultTypeFailedToApply manifestProcessingAppliedResultType = "ManifestApplyFailed"
147160

148161
// The result type and description for partially successfully processing attempts.
149162
ManifestProcessingApplyResultTypeAppliedWithFailedDriftDetection manifestProcessingAppliedResultType = "AppliedWithFailedDriftDetection"
@@ -168,13 +181,17 @@ const (
168181
// The result type for availability check failures.
169182
ManifestProcessingAvailabilityResultTypeFailed ManifestProcessingAvailabilityResultType = "Failed"
170183

184+
// The description for availability check failures.
171185
ManifestProcessingAvailabilityResultTypeFailedDescription = "Failed to track the availability of the applied manifest (error = %s)"
172186

173187
// The result types for completed availability checks.
174-
ManifestProcessingAvailabilityResultTypeAvailable ManifestProcessingAvailabilityResultType = "Available"
175-
ManifestProcessingAvailabilityResultTypeNotYetAvailable ManifestProcessingAvailabilityResultType = "NotYetAvailable"
176-
ManifestProcessingAvailabilityResultTypeNotTrackable ManifestProcessingAvailabilityResultType = "NotTrackable"
188+
ManifestProcessingAvailabilityResultTypeAvailable ManifestProcessingAvailabilityResultType = "Available"
189+
// Note that the reason string below uses the same value as kept in the old work applier.
190+
ManifestProcessingAvailabilityResultTypeNotYetAvailable ManifestProcessingAvailabilityResultType = "ManifestNotAvailableYet"
191+
192+
ManifestProcessingAvailabilityResultTypeNotTrackable ManifestProcessingAvailabilityResultType = "NotTrackable"
177193

194+
// The descriptions for completed availability checks.
178195
ManifestProcessingAvailabilityResultTypeAvailableDescription = "Manifest is available"
179196
ManifestProcessingAvailabilityResultTypeNotYetAvailableDescription = "Manifest is not yet available; Fleet will check again later"
180197
ManifestProcessingAvailabilityResultTypeNotTrackableDescription = "Manifest's availability is not trackable; Fleet assumes that the applied manifest is available"

pkg/controllers/workgenerator/controller.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import (
4444

4545
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
4646
fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
47-
"go.goms.io/fleet/pkg/controllers/work"
47+
"go.goms.io/fleet/pkg/controllers/workapplier"
4848
"go.goms.io/fleet/pkg/utils"
4949
"go.goms.io/fleet/pkg/utils/condition"
5050
"go.goms.io/fleet/pkg/utils/controller"
@@ -1159,8 +1159,21 @@ func setAllWorkAvailableCondition(works map[string]*fleetv1beta1.Work, binding *
11591159
for _, w := range works {
11601160
availableCond := meta.FindStatusCondition(w.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable)
11611161
switch {
1162-
case condition.IsConditionStatusTrue(availableCond, w.GetGeneration()) && availableCond.Reason == work.WorkNotTrackableReason:
1163-
// The Work object has completed the availability check successfully, due to the resources being untrackable.
1162+
case condition.IsConditionStatusTrue(availableCond, w.GetGeneration()) && availableCond.Reason == workapplier.WorkNotAllManifestsTrackableReasonNew:
1163+
// The Work object has completed the availability check successfully, due to the
1164+
// resources being untrackable.
1165+
//
1166+
// This branch is currently never visited as the work applier would still populate
1167+
// the Available condition using the old reason string for compatibility reasons.
1168+
if firstWorkWithSuccessfulAvailabilityCheckDueToUntrackableRes == nil {
1169+
firstWorkWithSuccessfulAvailabilityCheckDueToUntrackableRes = w
1170+
}
1171+
case condition.IsConditionStatusTrue(availableCond, w.GetGeneration()) && availableCond.Reason == workapplier.WorkNotAllManifestsTrackableReason:
1172+
// The Work object has completed the availability check successfully, due to the
1173+
// resources being untrackable. This is the same branch as the one above but checks
1174+
// for the old reason string; it is kept for compatibility reasons.
1175+
//
1176+
// TO-DO (chenyu1): drop this branch after the rollout completes.
11641177
if firstWorkWithSuccessfulAvailabilityCheckDueToUntrackableRes == nil {
11651178
firstWorkWithSuccessfulAvailabilityCheckDueToUntrackableRes = w
11661179
}
@@ -1218,7 +1231,7 @@ func setAllWorkAvailableCondition(works map[string]*fleetv1beta1.Work, binding *
12181231
meta.SetStatusCondition(&binding.Status.Conditions, metav1.Condition{
12191232
Status: metav1.ConditionTrue,
12201233
Type: string(fleetv1beta1.ResourceBindingAvailable),
1221-
Reason: work.WorkNotTrackableReason,
1234+
Reason: condition.WorkNotAvailabilityTrackableReason,
12221235
Message: fmt.Sprintf("The availability of work object %s is not trackable", firstWorkWithSuccessfulAvailabilityCheckDueToUntrackableRes.Name),
12231236
ObservedGeneration: binding.GetGeneration(),
12241237
})

pkg/controllers/workgenerator/controller_integration_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
3030
placementv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1"
3131
placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
32-
fleetwork "go.goms.io/fleet/pkg/controllers/work"
3332
"go.goms.io/fleet/pkg/controllers/workapplier"
3433
"go.goms.io/fleet/pkg/utils"
3534
"go.goms.io/fleet/pkg/utils/condition"
@@ -2839,7 +2838,7 @@ var _ = Describe("Test Work Generator Controller", func() {
28392838
// actually handles resources; ConfigMap objects are considered to be
28402839
// trackable (available immediately after placement). Here it is set
28412840
// to NotTrackable for testing purposes.
2842-
Reason: fleetwork.WorkNotTrackableReason,
2841+
Reason: workapplier.WorkNotAllManifestsTrackableReason,
28432842
ObservedGeneration: work.GetGeneration(),
28442843
}
28452844

@@ -3209,7 +3208,7 @@ var _ = Describe("Test Work Generator Controller", func() {
32093208
availableCond := metav1.Condition{
32103209
Type: string(placementv1beta1.ResourceBindingAvailable),
32113210
Status: metav1.ConditionTrue,
3212-
Reason: fleetwork.WorkNotTrackableReason,
3211+
Reason: condition.WorkNotAvailabilityTrackableReason,
32133212
ObservedGeneration: clusterResourceBinding.GetGeneration(),
32143213
}
32153214

0 commit comments

Comments
 (0)