Skip to content

Commit f3b7593

Browse files
authored
Merge pull request #2478 from wallee94/remove-shard-validation-on-sts-create
fix(discovery): configure sharding every time MetricsHandler.Run runs
2 parents 77a99a5 + 166921b commit f3b7593

File tree

4 files changed

+248
-90
lines changed

4 files changed

+248
-90
lines changed

internal/discovery/discovery.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,6 @@ func (r *CRDiscoverer) PollForCacheUpdates(
203203
) {
204204
// The interval at which we will check the cache for updates.
205205
t := time.NewTicker(Interval)
206-
// Track previous context to allow refreshing cache.
207-
olderContext, olderCancel := context.WithCancel(ctx)
208-
// Prevent context leak (kill the last metric handler instance).
209-
defer olderCancel()
210206
generateMetrics := func() {
211207
// Get families for discovered factories.
212208
customFactories, err := factoryGenerator()
@@ -239,21 +235,8 @@ func (r *CRDiscoverer) PollForCacheUpdates(
239235
r.SafeWrite(func() {
240236
r.WasUpdated = false
241237
})
242-
// Run the metrics handler with updated configs.
243-
olderContext, olderCancel = context.WithCancel(ctx)
244-
go func() {
245-
// Blocks indefinitely until the unbuffered context is cancelled to serve metrics for that duration.
246-
err = m.Run(olderContext)
247-
if err != nil {
248-
// Check if context was cancelled.
249-
select {
250-
case <-olderContext.Done():
251-
// Context cancelled, don't really need to log this though.
252-
default:
253-
klog.ErrorS(err, "failed to run metrics handler")
254-
}
255-
}
256-
}()
238+
// Update metric handler with the new configs.
239+
m.BuildWriters(ctx)
257240
}
258241
go func() {
259242
for range t.C {
@@ -269,7 +252,6 @@ func (r *CRDiscoverer) PollForCacheUpdates(
269252
shouldGenerateMetrics = r.WasUpdated
270253
})
271254
if shouldGenerateMetrics {
272-
olderCancel()
273255
generateMetrics()
274256
klog.InfoS("discovery finished, cache updated")
275257
}

pkg/app/server.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -286,14 +286,12 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
286286
opts.EnableGZIPEncoding,
287287
)
288288
// Run MetricsHandler
289-
if config == nil {
290-
ctxMetricsHandler, cancel := context.WithCancel(ctx)
291-
g.Add(func() error {
292-
return m.Run(ctxMetricsHandler)
293-
}, func(error) {
294-
cancel()
295-
})
296-
}
289+
ctxMetricsHandler, cancel := context.WithCancel(ctx)
290+
g.Add(func() error {
291+
return m.Run(ctxMetricsHandler)
292+
}, func(error) {
293+
cancel()
294+
})
297295

298296
tlsConfig := opts.TLSConfig
299297

pkg/metricshandler/metrics_handler.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,24 +69,35 @@ func New(opts *options.Options, kubeClient kubernetes.Interface, storeBuilder ks
6969
}
7070
}
7171

72-
// ConfigureSharding (re-)configures sharding. Re-configuration can be done
73-
// concurrently.
74-
func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) {
72+
// BuildWriters builds the metrics writers, cancelling any previous context and passing a new one on every build.
73+
// Build can be used multiple times and concurrently.
74+
func (m *MetricsHandler) BuildWriters(ctx context.Context) {
7575
m.mtx.Lock()
7676
defer m.mtx.Unlock()
7777

7878
if m.cancel != nil {
7979
m.cancel()
8080
}
81-
if totalShards != 1 {
82-
klog.InfoS("Configuring sharding of this instance to be shard index (zero-indexed) out of total shards", "shard", shard, "totalShards", totalShards)
83-
}
8481
ctx, m.cancel = context.WithCancel(ctx)
85-
m.storeBuilder.WithSharding(shard, totalShards)
8682
m.storeBuilder.WithContext(ctx)
8783
m.metricsWriters = m.storeBuilder.Build()
84+
}
85+
86+
// ConfigureSharding configures sharding. Configuration can be used multiple times and
87+
// concurrently.
88+
func (m *MetricsHandler) ConfigureSharding(ctx context.Context, shard int32, totalShards int) {
89+
m.mtx.Lock()
90+
91+
if totalShards != 1 {
92+
klog.InfoS("Configuring sharding of this instance to be shard index (zero-indexed) out of total shards", "shard", shard, "totalShards", totalShards)
93+
}
8894
m.curShard = shard
8995
m.curTotalShards = totalShards
96+
m.storeBuilder.WithSharding(shard, totalShards)
97+
98+
// unlock because BuildWriters will hold a lock again
99+
m.mtx.Unlock()
100+
m.BuildWriters(ctx)
90101
}
91102

92103
// Run configures the MetricsHandler's sharding and if autosharding is enabled

0 commit comments

Comments
 (0)