Skip to content

Commit 082aba2

Browse files
committed
update discovery to reconfigure sharding
Signed-off-by: Walther Lee <[email protected]>
1 parent dec7a1b commit 082aba2

File tree

3 files changed

+26
-29
lines changed

3 files changed

+26
-29
lines changed

internal/discovery/discovery.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,6 @@ func (r *CRDiscoverer) PollForCacheUpdates(
203203
) {
204204
// The interval at which we will check the cache for updates.
205205
t := time.NewTicker(Interval)
206-
// Track previous context to allow refreshing cache.
207-
olderContext, olderCancel := context.WithCancel(ctx)
208-
// Prevent context leak (kill the last metric handler instance).
209-
defer olderCancel()
210206
generateMetrics := func() {
211207
// Get families for discovered factories.
212208
customFactories, err := factoryGenerator()
@@ -239,21 +235,8 @@ func (r *CRDiscoverer) PollForCacheUpdates(
239235
r.SafeWrite(func() {
240236
r.WasUpdated = false
241237
})
242-
// Run the metrics handler with updated configs.
243-
olderContext, olderCancel = context.WithCancel(ctx)
244-
go func() {
245-
// Blocks indefinitely until the unbuffered context is cancelled to serve metrics for that duration.
246-
err = m.Run(olderContext)
247-
if err != nil {
248-
// Check if context was cancelled.
249-
select {
250-
case <-olderContext.Done():
251-
// Context cancelled, don't really need to log this though.
252-
default:
253-
klog.ErrorS(err, "failed to run metrics handler")
254-
}
255-
}
256-
}()
238+
// Update metric handler with the new configs.
239+
m.ReconfigureSharding(ctx)
257240
}
258241
go func() {
259242
for range t.C {
@@ -269,7 +252,6 @@ func (r *CRDiscoverer) PollForCacheUpdates(
269252
shouldGenerateMetrics = r.WasUpdated
270253
})
271254
if shouldGenerateMetrics {
272-
olderCancel()
273255
generateMetrics()
274256
klog.InfoS("discovery finished, cache updated")
275257
}

pkg/app/server.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -297,14 +297,12 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
297297
opts.EnableGZIPEncoding,
298298
)
299299
// Run MetricsHandler
300-
if config == nil {
301-
ctxMetricsHandler, cancel := context.WithCancel(ctx)
302-
g.Add(func() error {
303-
return m.Run(ctxMetricsHandler)
304-
}, func(error) {
305-
cancel()
306-
})
307-
}
300+
ctxMetricsHandler, cancel := context.WithCancel(ctx)
301+
g.Add(func() error {
302+
return m.Run(ctxMetricsHandler)
303+
}, func(error) {
304+
cancel()
305+
})
308306

309307
tlsConfig := opts.TLSConfig
310308

pkg/metricshandler/metrics_handler.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,16 @@ func New(opts *options.Options, kubeClient kubernetes.Interface, storeBuilder ks
6969
}
7070
}
7171

72-
// ConfigureSharding (re-)configures sharding. Re-configuration can be done
72+
// ReconfigureSharding reconfigures sharding with the current shard and totalShards, and
73+
// it's a no-op if both values are 0.
74+
func (m *MetricsHandler) ReconfigureSharding(ctx context.Context) {
75+
if m.curShard == 0 && m.curTotalShards == 0 {
76+
return
77+
}
78+
m.ConfigureSharding(ctx, m.curShard, m.curTotalShards)
79+
}
80+
81+
// ConfigureSharding configures sharding. Configuration can be used mutlitple times and
7382
// concurrently.
7483
func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) {
7584
m.mtx.Lock()
@@ -132,6 +141,14 @@ func (m *MetricsHandler) Run(ctx context.Context) error {
132141
return
133142
}
134143

144+
m.mtx.RLock()
145+
shardingUnchanged := m.curShard == shard && m.curTotalShards == totalShards
146+
m.mtx.RUnlock()
147+
148+
if shardingUnchanged {
149+
return
150+
}
151+
135152
m.ConfigureSharding(ctx, shard, totalShards)
136153
},
137154
UpdateFunc: func(oldo, curo interface{}) {

0 commit comments

Comments
 (0)