Skip to content

Commit da45d21

Browse files
committed
Minor fixes
Signed-off-by: michaelawyu <[email protected]>
1 parent 28fe672 commit da45d21

File tree

6 files changed

+249
-161
lines changed

6 files changed

+249
-161
lines changed

cmd/memberagent/main.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,21 +77,19 @@ var (
7777
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
7878
leaderElectionNamespace = flag.String("leader-election-namespace", "kube-system", "The namespace in which the leader election resource will be created.")
7979
// TODO(weiweng): only keep enableV1Alpha1APIs for backward compatibility with helm charts. Remove soon.
80-
enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs. This is deprecated and will be removed soon.")
81-
enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
82-
propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.")
83-
region = flag.String("region", "", "The region where the member cluster resides.")
84-
cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.")
85-
watchWorkWithPriorityQueue = flag.Bool("enable-watch-work-with-priority-queue", false, "If set, the apply_work controller will watch/reconcile work objects that are created new or have recent updates")
86-
watchWorkReconcileAgeMinutes = flag.Int("watch-work-reconcile-age", 60, "maximum age (in minutes) of work objects for apply_work controller to watch/reconcile")
87-
deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference")
88-
enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling")
89-
pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling")
90-
hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling")
91-
hubQPS = flag.Float64("hub-api-qps", 50, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
92-
hubBurst = flag.Int("hub-api-burst", 500, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
93-
memberQPS = flag.Float64("member-api-qps", 250, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
94-
memberBurst = flag.Int("member-api-burst", 1000, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
80+
enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs. This is deprecated and will be removed soon.")
81+
enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
82+
propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.")
83+
region = flag.String("region", "", "The region where the member cluster resides.")
84+
cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.")
85+
deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference")
86+
enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling")
87+
pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling")
88+
hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling")
89+
hubQPS = flag.Float64("hub-api-qps", 50, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
90+
hubBurst = flag.Int("hub-api-burst", 500, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
91+
memberQPS = flag.Float64("member-api-qps", 250, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
92+
memberBurst = flag.Int("member-api-burst", 1000, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
9593

9694
// Work applier requeue rate limiter settings.
9795
workApplierRequeueRateLimiterAttemptsWithFixedDelay = flag.Int("work-applier-requeue-rate-limiter-attempts-with-fixed-delay", 1, "If set, the work applier will requeue work objects with a fixed delay for the specified number of attempts before switching to exponential backoff.")
@@ -102,6 +100,12 @@ var (
102100
workApplierRequeueRateLimiterExponentialBaseForFastBackoff = flag.Float64("work-applier-requeue-rate-limiter-exponential-base-for-fast-backoff", 1.5, "If set, the work applier will start to back off fast at this factor after it completes the slow backoff stage, until it reaches the fast backoff delay cap. Its value should be larger than the base value for the slow backoff stage.")
103101
workApplierRequeueRateLimiterMaxFastBackoffDelaySeconds = flag.Float64("work-applier-requeue-rate-limiter-max-fast-backoff-delay-seconds", 900, "If set, the work applier will not back off longer than this value in seconds when it is in the fast backoff stage.")
104102
workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs = flag.Bool("work-applier-requeue-rate-limiter-skip-to-fast-backoff-for-available-or-diff-reported-work-objs", true, "If set, the rate limiter will skip the slow backoff stage and start fast backoff immediately for work objects that are available or have diff reported.")
103+
104+
// Work applier priority queue settings.
105+
enableWorkApplierPriorityQueue = flag.Bool("enable-work-applier-priority-queue", false, "If set, the work applier will use a priority queue to process work objects.")
106+
workApplierPriorityLinearEquationCoeffA = flag.Int("work-applier-priority-linear-equation-coeff-a", -3, "The work applier sets the priority for a Work object processing attempt using the linear equation: priority = A * (work object age in minutes) + B. This flag sets the coefficient A in the equation.")
107+
workApplierPriorityLinearEquationCoeffB = flag.Int("work-applier-priority-linear-equation-coeff-b", 100, "The work applier sets the priority for a Work object processing attempt using the linear equation: priority = A * (work object age in minutes) + B. This flag sets the coefficient B in the equation.")
108+
105109
// Azure property provider feature gates.
106110
isAzProviderCostPropertiesEnabled = flag.Bool("use-cost-properties-in-azure-provider", true, "If set, the Azure property provider will expose cost properties in the member cluster.")
107111
isAzProviderAvailableResPropertiesEnabled = flag.Bool("use-available-res-properties-in-azure-provider", true, "If set, the Azure property provider will expose available resources properties in the member cluster.")
@@ -133,6 +137,13 @@ func main() {
133137
klog.ErrorS(errors.New("either enable-v1alpha1-apis or enable-v1beta1-apis is required"), "Invalid APIs flags")
134138
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
135139
}
140+
// TO-DO (chenyu1): refactor the validation logic.
141+
if workApplierPriorityLinearEquationCoeffA == nil || *workApplierPriorityLinearEquationCoeffA >= 0 {
142+
klog.ErrorS(errors.New("parameter workApplierPriorityLinearEquationCoeffA is set incorrectly; must use a value less than 0"), "InvalidFlag", "workApplierPriorityLinearEquationCoeffA")
143+
}
144+
if workApplierPriorityLinearEquationCoeffB == nil || *workApplierPriorityLinearEquationCoeffB <= 0 {
145+
klog.ErrorS(errors.New("parameter workApplierPriorityLinearEquationCoeffB is set incorrectly; must use a value greater than 0"), "InvalidFlag", "workApplierPriorityLinearEquationCoeffB")
146+
}
136147

137148
hubURL := os.Getenv("HUB_SERVER_URL")
138149

@@ -413,7 +424,6 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
413424
*workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs,
414425
)
415426

416-
workObjAgeForPrioritizedProcessing := time.Minute * time.Duration(*watchWorkReconcileAgeMinutes)
417427
workApplier := workapplier.NewReconciler(
418428
hubMgr.GetClient(),
419429
targetNS,
@@ -428,8 +438,9 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
428438
parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
429439
time.Minute*time.Duration(*deletionWaitTime),
430440
requeueRateLimiter,
431-
*watchWorkWithPriorityQueue,
432-
workObjAgeForPrioritizedProcessing,
441+
*enableWorkApplierPriorityQueue,
442+
workApplierPriorityLinearEquationCoeffA,
443+
workApplierPriorityLinearEquationCoeffB,
433444
)
434445

435446
if err = workApplier.SetupWithManager(hubMgr); err != nil {

pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ var _ = BeforeSuite(func() {
379379

380380
// This controller is created for testing purposes only; no reconciliation loop is actually
381381
// run.
382-
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, 0)
382+
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, nil, nil)
383383

384384
propertyProvider1 = &manuallyUpdatedProvider{}
385385
member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1)
@@ -402,7 +402,7 @@ var _ = BeforeSuite(func() {
402402

403403
// This controller is created for testing purposes only; no reconciliation loop is actually
404404
// run.
405-
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, 0)
405+
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, nil, nil)
406406

407407
member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil)
408408
Expect(err).NotTo(HaveOccurred())

pkg/controllers/workapplier/controller.go

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ import (
5252

5353
const (
5454
patchDetailPerObjLimit = 100
55-
56-
minWorkObjAgeForPrioritizedQueueing = time.Minute * 30
5755
)
5856

5957
const (
@@ -97,24 +95,25 @@ var defaultRequeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimite
9795

9896
// Reconciler reconciles a Work object.
9997
type Reconciler struct {
100-
hubClient client.Client
101-
workNameSpace string
102-
spokeDynamicClient dynamic.Interface
103-
spokeClient client.Client
104-
restMapper meta.RESTMapper
105-
recorder record.EventRecorder
106-
concurrentReconciles int
107-
deletionWaitTime time.Duration
108-
joined *atomic.Bool
109-
parallelizer parallelizerutil.Parallelizer
110-
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter
111-
usePriorityQueue bool
112-
workObjAgeForPrioritizedProcessing time.Duration
98+
hubClient client.Client
99+
workNameSpace string
100+
spokeDynamicClient dynamic.Interface
101+
spokeClient client.Client
102+
restMapper meta.RESTMapper
103+
recorder record.EventRecorder
104+
concurrentReconciles int
105+
deletionWaitTime time.Duration
106+
joined *atomic.Bool
107+
parallelizer parallelizerutil.Parallelizer
108+
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter
109+
usePriorityQueue bool
113110
// The custom priority queue in use if the option watchWorkWithPriorityQueue is enabled.
114111
//
115112
// Note that this variable is set only after the controller starts.
116-
pq priorityqueue.PriorityQueue[reconcile.Request]
117-
pqSetupOnce sync.Once
113+
pq priorityqueue.PriorityQueue[reconcile.Request]
114+
priLinearEqCoeffA int
115+
priLinearEqCoeffB int
116+
pqSetupOnce sync.Once
118117
}
119118

120119
// NewReconciler returns a new Work object reconciler for the work applier.
@@ -127,7 +126,8 @@ func NewReconciler(
127126
deletionWaitTime time.Duration,
128127
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter,
129128
usePriorityQueue bool,
130-
workObjAgeForPrioritizedProcessing time.Duration,
129+
priorityLinearEquationCoeffA *int,
130+
priorityLinearEquationCoeffB *int,
131131
) *Reconciler {
132132
if requeueRateLimiter == nil {
133133
klog.V(2).InfoS("requeue rate limiter is not set; using the default rate limiter")
@@ -137,27 +137,28 @@ func NewReconciler(
137137
klog.V(2).InfoS("parallelizer is not set; using the default parallelizer with a worker count of 1")
138138
parallelizer = parallelizerutil.NewParallelizer(1)
139139
}
140-
141-
woAgeForPrioritizedProcessing := workObjAgeForPrioritizedProcessing
142-
if usePriorityQueue && woAgeForPrioritizedProcessing < minWorkObjAgeForPrioritizedQueueing {
143-
klog.V(2).InfoS("Work object age for prioritized processing is too short; set to the longer default", "workObjAgeForPrioritizedProcessing", woAgeForPrioritizedProcessing)
144-
woAgeForPrioritizedProcessing = minWorkObjAgeForPrioritizedQueueing
140+
if priorityLinearEquationCoeffA == nil || priorityLinearEquationCoeffB == nil {
141+
// Use the default settings if either co-efficient is not set for correctness reasons.
142+
klog.V(2).InfoS("priority linear equation coefficients are not set; using the default settings")
143+
priorityLinearEquationCoeffA = ptr.To(-3)
144+
priorityLinearEquationCoeffB = ptr.To(int(highestPriorityLevel))
145145
}
146146

147147
return &Reconciler{
148-
hubClient: hubClient,
149-
spokeDynamicClient: spokeDynamicClient,
150-
spokeClient: spokeClient,
151-
restMapper: restMapper,
152-
recorder: recorder,
153-
concurrentReconciles: concurrentReconciles,
154-
parallelizer: parallelizer,
155-
workNameSpace: workNameSpace,
156-
joined: atomic.NewBool(false),
157-
deletionWaitTime: deletionWaitTime,
158-
requeueRateLimiter: requeueRateLimiter,
159-
usePriorityQueue: usePriorityQueue,
160-
workObjAgeForPrioritizedProcessing: woAgeForPrioritizedProcessing,
148+
hubClient: hubClient,
149+
spokeDynamicClient: spokeDynamicClient,
150+
spokeClient: spokeClient,
151+
restMapper: restMapper,
152+
recorder: recorder,
153+
concurrentReconciles: concurrentReconciles,
154+
parallelizer: parallelizer,
155+
workNameSpace: workNameSpace,
156+
joined: atomic.NewBool(false),
157+
deletionWaitTime: deletionWaitTime,
158+
requeueRateLimiter: requeueRateLimiter,
159+
usePriorityQueue: usePriorityQueue,
160+
priLinearEqCoeffA: *priorityLinearEquationCoeffA,
161+
priLinearEqCoeffB: *priorityLinearEquationCoeffB,
161162
}
162163
}
163164

@@ -630,8 +631,9 @@ func (r *Reconciler) Leave(ctx context.Context) error {
630631
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
631632
if r.usePriorityQueue {
632633
eventHandler := &priorityBasedWorkObjEventHandler{
633-
qm: r,
634-
workObjAgeForPrioritizedProcessing: r.workObjAgeForPrioritizedProcessing,
634+
qm: r,
635+
priLinearEqCoeffA: r.priLinearEqCoeffA,
636+
priLinearEqCoeffB: r.priLinearEqCoeffB,
635637
}
636638

637639
newPQ := func(controllerName string, rateLimiter workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {

0 commit comments

Comments
 (0)