Skip to content

Commit 3e47858

Browse files
authored
Merge pull request kubernetes-sigs#10880 from sbueringer/pr-cct-qps-burst
✨ Add QPS & burst options & flags for ClusterCacheTracker
2 parents f4b6762 + e86d00e commit 3e47858

File tree

8 files changed

+157
-90
lines changed

8 files changed

+157
-90
lines changed

bootstrap/kubeadm/main.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,25 +62,27 @@ var (
6262
controllerName = "cluster-api-kubeadm-bootstrap-manager"
6363

6464
// flags.
65-
enableLeaderElection bool
66-
leaderElectionLeaseDuration time.Duration
67-
leaderElectionRenewDeadline time.Duration
68-
leaderElectionRetryPeriod time.Duration
69-
watchFilterValue string
70-
watchNamespace string
71-
profilerAddress string
72-
enableContentionProfiling bool
73-
syncPeriod time.Duration
74-
restConfigQPS float32
75-
restConfigBurst int
76-
webhookPort int
77-
webhookCertDir string
78-
webhookCertName string
79-
webhookKeyName string
80-
healthAddr string
81-
tlsOptions = flags.TLSOptions{}
82-
diagnosticsOptions = flags.DiagnosticsOptions{}
83-
logOptions = logs.NewOptions()
65+
enableLeaderElection bool
66+
leaderElectionLeaseDuration time.Duration
67+
leaderElectionRenewDeadline time.Duration
68+
leaderElectionRetryPeriod time.Duration
69+
watchFilterValue string
70+
watchNamespace string
71+
profilerAddress string
72+
enableContentionProfiling bool
73+
syncPeriod time.Duration
74+
restConfigQPS float32
75+
restConfigBurst int
76+
clusterCacheTrackerClientQPS float32
77+
clusterCacheTrackerClientBurst int
78+
webhookPort int
79+
webhookCertDir string
80+
webhookCertName string
81+
webhookKeyName string
82+
healthAddr string
83+
tlsOptions = flags.TLSOptions{}
84+
diagnosticsOptions = flags.DiagnosticsOptions{}
85+
logOptions = logs.NewOptions()
8486
// CABPK specific flags.
8587
clusterConcurrency int
8688
clusterCacheTrackerConcurrency int
@@ -139,10 +141,16 @@ func InitFlags(fs *pflag.FlagSet) {
139141
"The minimum interval at which watched resources are reconciled (e.g. 15m)")
140142

141143
fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
142-
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
144+
"Maximum queries per second from the controller client to the Kubernetes API server.")
143145

144146
fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
145-
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
147+
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")
148+
149+
fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
150+
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")
151+
152+
fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
153+
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")
146154

147155
fs.DurationVar(&tokenTTL, "bootstrap-token-ttl", kubeadmbootstrapcontrollers.DefaultTokenTTL,
148156
"The amount of time the bootstrap token will be valid")
@@ -314,6 +322,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
314322
SecretCachingClient: secretCachingClient,
315323
ControllerName: controllerName,
316324
Log: &ctrl.Log,
325+
ClientQPS: clusterCacheTrackerClientQPS,
326+
ClientBurst: clusterCacheTrackerClientBurst,
317327
},
318328
)
319329
if err != nil {

controllers/remote/cluster_cache_tracker.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,11 @@ var ErrClusterLocked = errors.New("cluster is locked already")
6767

6868
// ClusterCacheTracker manages client caches for workload clusters.
6969
type ClusterCacheTracker struct {
70-
log logr.Logger
70+
log logr.Logger
71+
7172
clientUncachedObjects []client.Object
73+
clientQPS float32
74+
clientBurst int
7275

7376
client client.Client
7477

@@ -116,7 +119,18 @@ type ClusterCacheTrackerOptions struct {
116119
// it'll instead query the API server directly.
117120
// Defaults to never caching ConfigMap and Secret if not set.
118121
ClientUncachedObjects []client.Object
119-
Indexes []Index
122+
123+
// ClientQPS is the maximum queries per second from the controller client
124+
// to the Kubernetes API server of workload clusters.
125+
// Defaults to 20.
126+
ClientQPS float32
127+
128+
// ClientBurst is the maximum number of queries that should be allowed in
129+
// one burst from the controller client to the Kubernetes API server of workload clusters.
130+
// Default 30.
131+
ClientBurst int
132+
133+
Indexes []Index
120134

121135
// ControllerName is the name of the controller.
122136
// This is used to calculate the user agent string.
@@ -139,6 +153,13 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
139153
&corev1.Secret{},
140154
}
141155
}
156+
157+
if opts.ClientQPS == 0 {
158+
opts.ClientQPS = 20
159+
}
160+
if opts.ClientBurst == 0 {
161+
opts.ClientBurst = 30
162+
}
142163
}
143164

144165
// NewClusterCacheTracker creates a new ClusterCacheTracker.
@@ -170,6 +191,8 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
170191
controllerPodMetadata: controllerPodMetadata,
171192
log: *options.Log,
172193
clientUncachedObjects: options.ClientUncachedObjects,
194+
clientQPS: options.ClientQPS,
195+
clientBurst: options.ClientBurst,
173196
client: manager.GetClient(),
174197
secretCachingClient: options.SecretCachingClient,
175198
scheme: manager.GetScheme(),
@@ -303,6 +326,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
303326
if err != nil {
304327
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
305328
}
329+
config.QPS = t.clientQPS
330+
config.Burst = t.clientBurst
306331

307332
// Create a http client and a mapper for the cluster.
308333
httpClient, mapper, restClient, err := t.createHTTPClientAndMapper(ctx, config, cluster)

controlplane/kubeadm/main.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -66,25 +66,27 @@ var (
6666
controllerName = "cluster-api-kubeadm-control-plane-manager"
6767

6868
// flags.
69-
enableLeaderElection bool
70-
leaderElectionLeaseDuration time.Duration
71-
leaderElectionRenewDeadline time.Duration
72-
leaderElectionRetryPeriod time.Duration
73-
watchFilterValue string
74-
watchNamespace string
75-
profilerAddress string
76-
enableContentionProfiling bool
77-
syncPeriod time.Duration
78-
restConfigQPS float32
79-
restConfigBurst int
80-
webhookPort int
81-
webhookCertDir string
82-
webhookCertName string
83-
webhookKeyName string
84-
healthAddr string
85-
tlsOptions = flags.TLSOptions{}
86-
diagnosticsOptions = flags.DiagnosticsOptions{}
87-
logOptions = logs.NewOptions()
69+
enableLeaderElection bool
70+
leaderElectionLeaseDuration time.Duration
71+
leaderElectionRenewDeadline time.Duration
72+
leaderElectionRetryPeriod time.Duration
73+
watchFilterValue string
74+
watchNamespace string
75+
profilerAddress string
76+
enableContentionProfiling bool
77+
syncPeriod time.Duration
78+
restConfigQPS float32
79+
restConfigBurst int
80+
clusterCacheTrackerClientQPS float32
81+
clusterCacheTrackerClientBurst int
82+
webhookPort int
83+
webhookCertDir string
84+
webhookCertName string
85+
webhookKeyName string
86+
healthAddr string
87+
tlsOptions = flags.TLSOptions{}
88+
diagnosticsOptions = flags.DiagnosticsOptions{}
89+
logOptions = logs.NewOptions()
8890
// KCP specific flags.
8991
kubeadmControlPlaneConcurrency int
9092
clusterCacheTrackerConcurrency int
@@ -142,10 +144,16 @@ func InitFlags(fs *pflag.FlagSet) {
142144
"The minimum interval at which watched resources are reconciled (e.g. 15m)")
143145

144146
fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
145-
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
147+
"Maximum queries per second from the controller client to the Kubernetes API server.")
146148

147149
fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
148-
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
150+
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")
151+
152+
fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
153+
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")
154+
155+
fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
156+
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")
149157

150158
fs.IntVar(&webhookPort, "webhook-port", 9443,
151159
"Webhook Server port")
@@ -332,6 +340,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
332340
&appsv1.Deployment{},
333341
&appsv1.DaemonSet{},
334342
},
343+
ClientQPS: clusterCacheTrackerClientQPS,
344+
ClientBurst: clusterCacheTrackerClientBurst,
335345
})
336346
if err != nil {
337347
setupLog.Error(err, "unable to create cluster cache tracker")

docs/book/src/developer/providers/migrations/v1.7-to-v1.8.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,5 @@ maintainers of providers and consumers of our Go API.
2727
- It's highly recommended to move to a new setup-envtest version that uses envtest binaries from controller-tools releases
2828
instead of the deprecated GCS bucket. More details can be found in [#10569](https://github.com/kubernetes-sigs/cluster-api/pull/10569)
2929
and [kubernetes-sigs/controller-runtime#2811](https://github.com/kubernetes-sigs/controller-runtime/pull/2811).
30+
- `remote.NewClusterCacheTracker` now has options to configure QPS & Burst. It's highly recommended to implement corresponding flags
31+
the same way as core Cluster API (see PR: https://github.com/kubernetes-sigs/cluster-api/pull/10880).

main.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -84,25 +84,27 @@ var (
8484
controllerName = "cluster-api-controller-manager"
8585

8686
// flags.
87-
enableLeaderElection bool
88-
leaderElectionLeaseDuration time.Duration
89-
leaderElectionRenewDeadline time.Duration
90-
leaderElectionRetryPeriod time.Duration
91-
watchFilterValue string
92-
watchNamespace string
93-
profilerAddress string
94-
enableContentionProfiling bool
95-
syncPeriod time.Duration
96-
restConfigQPS float32
97-
restConfigBurst int
98-
webhookPort int
99-
webhookCertDir string
100-
webhookCertName string
101-
webhookKeyName string
102-
healthAddr string
103-
tlsOptions = flags.TLSOptions{}
104-
diagnosticsOptions = flags.DiagnosticsOptions{}
105-
logOptions = logs.NewOptions()
87+
enableLeaderElection bool
88+
leaderElectionLeaseDuration time.Duration
89+
leaderElectionRenewDeadline time.Duration
90+
leaderElectionRetryPeriod time.Duration
91+
watchFilterValue string
92+
watchNamespace string
93+
profilerAddress string
94+
enableContentionProfiling bool
95+
syncPeriod time.Duration
96+
restConfigQPS float32
97+
restConfigBurst int
98+
clusterCacheTrackerClientQPS float32
99+
clusterCacheTrackerClientBurst int
100+
webhookPort int
101+
webhookCertDir string
102+
webhookCertName string
103+
webhookKeyName string
104+
healthAddr string
105+
tlsOptions = flags.TLSOptions{}
106+
diagnosticsOptions = flags.DiagnosticsOptions{}
107+
logOptions = logs.NewOptions()
106108
// core Cluster API specific flags.
107109
clusterTopologyConcurrency int
108110
clusterCacheTrackerConcurrency int
@@ -208,10 +210,16 @@ func InitFlags(fs *pflag.FlagSet) {
208210
"The minimum interval at which watched resources are reconciled (e.g. 15m)")
209211

210212
fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
211-
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
213+
"Maximum queries per second from the controller client to the Kubernetes API server.")
212214

213215
fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
214-
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
216+
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")
217+
218+
fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
219+
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")
220+
221+
fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
222+
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")
215223

216224
fs.DurationVar(&nodeDrainClientTimeout, "node-drain-client-timeout-duration", time.Second*10,
217225
"The timeout of the client used for draining nodes. Defaults to 10s")
@@ -411,6 +419,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager, watchNamespaces map
411419
ControllerName: controllerName,
412420
Log: &ctrl.Log,
413421
Indexes: []remote.Index{remote.NodeProviderIDIndex},
422+
ClientQPS: clusterCacheTrackerClientQPS,
423+
ClientBurst: clusterCacheTrackerClientBurst,
414424
},
415425
)
416426
if err != nil {

test/extension/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@ func InitFlags(fs *pflag.FlagSet) {
133133
"The minimum interval at which watched resources are reconciled (e.g. 15m)")
134134

135135
fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
136-
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
136+
"Maximum queries per second from the controller client to the Kubernetes API server.")
137137

138138
fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
139-
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
139+
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")
140140

141141
fs.IntVar(&webhookPort, "webhook-port", 9443,
142142
"Webhook Server port")

test/infrastructure/docker/main.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -69,25 +69,27 @@ var (
6969
controllerName = "cluster-api-docker-controller-manager"
7070

7171
// flags.
72-
enableLeaderElection bool
73-
leaderElectionLeaseDuration time.Duration
74-
leaderElectionRenewDeadline time.Duration
75-
leaderElectionRetryPeriod time.Duration
76-
watchFilterValue string
77-
watchNamespace string
78-
profilerAddress string
79-
enableContentionProfiling bool
80-
syncPeriod time.Duration
81-
restConfigQPS float32
82-
restConfigBurst int
83-
webhookPort int
84-
webhookCertDir string
85-
webhookCertName string
86-
webhookKeyName string
87-
healthAddr string
88-
tlsOptions = flags.TLSOptions{}
89-
diagnosticsOptions = flags.DiagnosticsOptions{}
90-
logOptions = logs.NewOptions()
72+
enableLeaderElection bool
73+
leaderElectionLeaseDuration time.Duration
74+
leaderElectionRenewDeadline time.Duration
75+
leaderElectionRetryPeriod time.Duration
76+
watchFilterValue string
77+
watchNamespace string
78+
profilerAddress string
79+
enableContentionProfiling bool
80+
syncPeriod time.Duration
81+
restConfigQPS float32
82+
restConfigBurst int
83+
clusterCacheTrackerClientQPS float32
84+
clusterCacheTrackerClientBurst int
85+
webhookPort int
86+
webhookCertDir string
87+
webhookCertName string
88+
webhookKeyName string
89+
healthAddr string
90+
tlsOptions = flags.TLSOptions{}
91+
diagnosticsOptions = flags.DiagnosticsOptions{}
92+
logOptions = logs.NewOptions()
9193
// CAPD specific flags.
9294
concurrency int
9395
clusterCacheTrackerConcurrency int
@@ -143,10 +145,16 @@ func InitFlags(fs *pflag.FlagSet) {
143145
"The minimum interval at which watched resources are reconciled (e.g. 15m)")
144146

145147
fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
146-
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
148+
"Maximum queries per second from the controller client to the Kubernetes API server.")
147149

148150
fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
149-
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
151+
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")
152+
153+
fs.Float32Var(&clusterCacheTrackerClientQPS, "clustercachetracker-client-qps", 20,
154+
"Maximum queries per second from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")
155+
156+
fs.IntVar(&clusterCacheTrackerClientBurst, "clustercachetracker-client-burst", 30,
157+
"Maximum number of queries that should be allowed in one burst from the cluster cache tracker clients to the Kubernetes API server of workload clusters.")
150158

151159
fs.IntVar(&webhookPort, "webhook-port", 9443,
152160
"Webhook Server port")
@@ -325,6 +333,8 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
325333
SecretCachingClient: secretCachingClient,
326334
ControllerName: controllerName,
327335
Log: &ctrl.Log,
336+
ClientQPS: clusterCacheTrackerClientQPS,
337+
ClientBurst: clusterCacheTrackerClientBurst,
328338
},
329339
)
330340
if err != nil {

test/infrastructure/inmemory/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,10 @@ func InitFlags(fs *pflag.FlagSet) {
138138
"The minimum interval at which watched resources are reconciled (e.g. 15m)")
139139

140140
fs.Float32Var(&restConfigQPS, "kube-api-qps", 20,
141-
"Maximum queries per second from the controller client to the Kubernetes API server. Defaults to 20")
141+
"Maximum queries per second from the controller client to the Kubernetes API server.")
142142

143143
fs.IntVar(&restConfigBurst, "kube-api-burst", 30,
144-
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server. Default 30")
144+
"Maximum number of queries that should be allowed in one burst from the controller client to the Kubernetes API server.")
145145

146146
fs.IntVar(&webhookPort, "webhook-port", 9443,
147147
"Webhook Server port")

0 commit comments

Comments
 (0)