Skip to content

Commit 4393038

Browse files
authored
Merge pull request #3491 from ntnn/kcp3350-clusterwithcontext
Prevent workspace deletion leaks
2 parents 384a53c + 032f189 commit 4393038

File tree

12 files changed

+282
-165
lines changed

12 files changed

+282
-165
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ require (
2323
github.com/spf13/cobra v1.9.1
2424
github.com/spf13/pflag v1.0.6
2525
github.com/stretchr/testify v1.10.0
26-
go.uber.org/goleak v1.3.0
26+
go.uber.org/goleak v1.3.1-0.20241121203838-4ff5fa6529ee
2727
go.uber.org/multierr v1.11.0
2828
golang.org/x/sys v0.32.0
2929
gopkg.in/square/go-jose.v2 v2.6.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,8 @@ go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37Cb
346346
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
347347
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
348348
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
349-
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
350-
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
349+
go.uber.org/goleak v1.3.1-0.20241121203838-4ff5fa6529ee h1:uOMbcH1Dmxv45VkkpZQYoerZFeDncWpjbN7ATiQOO7c=
350+
go.uber.org/goleak v1.3.1-0.20241121203838-4ff5fa6529ee/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
351351
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
352352
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
353353
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=

pkg/admission/mutatingwebhook/plugin.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ import (
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/labels"
2929
"k8s.io/apimachinery/pkg/runtime/schema"
30+
"k8s.io/apimachinery/pkg/util/wait"
3031
"k8s.io/apiserver/pkg/admission"
3132
"k8s.io/apiserver/pkg/admission/configuration"
3233
"k8s.io/apiserver/pkg/admission/plugin/webhook/generic"
3334
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
3435
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
36+
"k8s.io/client-go/tools/cache"
3537

3638
kcpkubernetesinformers "github.com/kcp-dev/client-go/informers"
3739
kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes"
@@ -40,6 +42,7 @@ import (
4042
kcpinitializers "github.com/kcp-dev/kcp/pkg/admission/initializers"
4143
"github.com/kcp-dev/kcp/pkg/admission/validatingwebhook"
4244
apisv1alpha2 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha2"
45+
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
4346
kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions"
4447
)
4548

@@ -55,11 +58,13 @@ type Plugin struct {
5558
kubeClusterClient kcpkubernetesclientset.ClusterInterface
5659
localKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory
5760
globalKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory
61+
serverStopChannel <-chan struct{}
5862

5963
getAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha2.APIBinding, error)
6064

61-
managerLock sync.Mutex
62-
managersCache map[logicalcluster.Name]generic.Source
65+
managerLock sync.Mutex
66+
managersCache map[logicalcluster.Name]generic.Source
67+
managersCancel map[logicalcluster.Name]context.CancelFunc
6368
}
6469

6570
var (
@@ -72,9 +77,10 @@ var (
7277

7378
func NewMutatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) {
7479
p := &Plugin{
75-
managerLock: sync.Mutex{},
76-
managersCache: make(map[logicalcluster.Name]generic.Source),
77-
Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
80+
managerLock: sync.Mutex{},
81+
managersCache: make(map[logicalcluster.Name]generic.Source),
82+
managersCancel: make(map[logicalcluster.Name]context.CancelFunc),
83+
Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
7884
}
7985
if configFile != nil {
8086
config, err := io.ReadAll(configFile)
@@ -146,10 +152,13 @@ func (p *Plugin) getHookSource(clusterName logicalcluster.Name, groupResource sc
146152

147153
p.managerLock.Lock()
148154
defer p.managerLock.Unlock()
155+
149156
if _, ok := p.managersCache[clusterNameForGroupResource]; !ok {
157+
ctx, cancel := context.WithCancel(wait.ContextForChannel(p.serverStopChannel))
150158
p.managersCache[clusterNameForGroupResource] = configuration.NewMutatingWebhookConfigurationManagerForInformer(
151-
p.globalKubeSharedInformerFactory.Admissionregistration().V1().MutatingWebhookConfigurations().Cluster(clusterNameForGroupResource),
159+
p.globalKubeSharedInformerFactory.Admissionregistration().V1().MutatingWebhookConfigurations().ClusterWithContext(ctx, clusterNameForGroupResource),
152160
)
161+
p.managersCancel[clusterNameForGroupResource] = cancel
153162
}
154163

155164
return p.managersCache[clusterNameForGroupResource], nil
@@ -184,6 +193,9 @@ func (p *Plugin) ValidateInitialization() error {
184193
if p.globalKubeSharedInformerFactory == nil {
185194
return errors.New("missing globalKubeSharedInformerFactory")
186195
}
196+
if p.serverStopChannel == nil {
197+
return errors.New("missing serverStopChannel")
198+
}
187199
return nil
188200
}
189201

@@ -200,4 +212,35 @@ func (p *Plugin) SetKcpInformers(local, global kcpinformers.SharedInformerFactor
200212
p.getAPIBindings = func(clusterName logicalcluster.Name) ([]*apisv1alpha2.APIBinding, error) {
201213
return local.Apis().V1alpha2().APIBindings().Lister().Cluster(clusterName).List(labels.Everything())
202214
}
215+
216+
// handler doesn't need to be deregistered - the webhook is valid
217+
// for as long as kcp runs and when kcp stops the informer is
218+
// stopped and the handler gets cleaned up.
219+
_, _ = local.Core().V1alpha1().LogicalClusters().Informer().AddEventHandler(
220+
cache.ResourceEventHandlerFuncs{
221+
DeleteFunc: func(obj interface{}) {
222+
cl, ok := obj.(*corev1alpha1.LogicalCluster)
223+
if !ok {
224+
return
225+
}
226+
227+
clName := logicalcluster.Name(cl.Annotations[logicalcluster.AnnotationKey])
228+
229+
p.managerLock.Lock()
230+
defer p.managerLock.Unlock()
231+
232+
cancel, ok := p.managersCancel[clName]
233+
if !ok {
234+
return
235+
}
236+
delete(p.managersCancel, clName)
237+
delete(p.managersCache, clName)
238+
cancel()
239+
},
240+
},
241+
)
242+
}
243+
244+
func (p *Plugin) SetServerShutdownChannel(ch <-chan struct{}) {
245+
p.serverStopChannel = ch
203246
}

pkg/admission/validatingwebhook/plugin.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,21 @@ import (
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/labels"
2929
"k8s.io/apimachinery/pkg/runtime/schema"
30+
"k8s.io/apimachinery/pkg/util/wait"
3031
"k8s.io/apiserver/pkg/admission"
3132
"k8s.io/apiserver/pkg/admission/configuration"
3233
"k8s.io/apiserver/pkg/admission/plugin/webhook/generic"
3334
"k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
3435
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
36+
"k8s.io/client-go/tools/cache"
3537

3638
kcpkubernetesinformers "github.com/kcp-dev/client-go/informers"
3739
kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes"
3840
"github.com/kcp-dev/logicalcluster/v3"
3941

4042
kcpinitializers "github.com/kcp-dev/kcp/pkg/admission/initializers"
4143
apisv1alpha2 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha2"
44+
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
4245
kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions"
4346
)
4447

@@ -54,11 +57,13 @@ type Plugin struct {
5457
kubeClusterClient kcpkubernetesclientset.ClusterInterface
5558
localKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory
5659
globalKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory
60+
serverStopChannel <-chan struct{}
5761

5862
getAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha2.APIBinding, error)
5963

60-
managerLock sync.Mutex
61-
managersCache map[logicalcluster.Name]generic.Source
64+
managerLock sync.Mutex
65+
managersCache map[logicalcluster.Name]generic.Source
66+
managersCancel map[logicalcluster.Name]context.CancelFunc
6267
}
6368

6469
var (
@@ -71,9 +76,10 @@ var (
7176

7277
func NewValidatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) {
7378
p := &Plugin{
74-
managerLock: sync.Mutex{},
75-
managersCache: make(map[logicalcluster.Name]generic.Source),
76-
Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
79+
managerLock: sync.Mutex{},
80+
managersCache: make(map[logicalcluster.Name]generic.Source),
81+
managersCancel: make(map[logicalcluster.Name]context.CancelFunc),
82+
Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
7783
}
7884
if configFile != nil {
7985
config, err := io.ReadAll(configFile)
@@ -145,10 +151,13 @@ func (p *Plugin) getHookSource(clusterName logicalcluster.Name, groupResource sc
145151

146152
p.managerLock.Lock()
147153
defer p.managerLock.Unlock()
154+
148155
if _, ok := p.managersCache[clusterNameForGroupResource]; !ok {
156+
ctx, cancel := context.WithCancel(wait.ContextForChannel(p.serverStopChannel))
149157
p.managersCache[clusterNameForGroupResource] = configuration.NewValidatingWebhookConfigurationManagerForInformer(
150-
p.globalKubeSharedInformerFactory.Admissionregistration().V1().ValidatingWebhookConfigurations().Cluster(clusterNameForGroupResource),
158+
p.globalKubeSharedInformerFactory.Admissionregistration().V1().ValidatingWebhookConfigurations().ClusterWithContext(ctx, clusterNameForGroupResource),
151159
)
160+
p.managersCancel[clusterNameForGroupResource] = cancel
152161
}
153162

154163
return p.managersCache[clusterNameForGroupResource], nil
@@ -183,6 +192,9 @@ func (p *Plugin) ValidateInitialization() error {
183192
if p.globalKubeSharedInformerFactory == nil {
184193
return errors.New("missing globalKubeSharedInformerFactory")
185194
}
195+
if p.serverStopChannel == nil {
196+
return errors.New("missing serverStopChannel")
197+
}
186198
return nil
187199
}
188200

@@ -199,6 +211,33 @@ func (p *Plugin) SetKcpInformers(local, global kcpinformers.SharedInformerFactor
199211
p.getAPIBindings = func(clusterName logicalcluster.Name) ([]*apisv1alpha2.APIBinding, error) {
200212
return local.Apis().V1alpha2().APIBindings().Lister().Cluster(clusterName).List(labels.Everything())
201213
}
214+
215+
// handler doesn't need to be deregistered - the webhook is valid
216+
// for as long as kcp runs and when kcp stops the informer is
217+
// stopped and the handler gets cleaned up.
218+
_, _ = local.Core().V1alpha1().LogicalClusters().Informer().AddEventHandler(
219+
cache.ResourceEventHandlerFuncs{
220+
DeleteFunc: func(obj interface{}) {
221+
cl, ok := obj.(*corev1alpha1.LogicalCluster)
222+
if !ok {
223+
return
224+
}
225+
226+
clName := logicalcluster.Name(cl.Annotations[logicalcluster.AnnotationKey])
227+
228+
p.managerLock.Lock()
229+
defer p.managerLock.Unlock()
230+
231+
cancel, ok := p.managersCancel[clName]
232+
if !ok {
233+
return
234+
}
235+
delete(p.managersCancel, clName)
236+
delete(p.managersCache, clName)
237+
cancel()
238+
},
239+
},
240+
)
202241
}
203242

204243
// SetClusterAnnotation sets the cluster annotation on the given object to the given clusterName,
@@ -225,3 +264,7 @@ func SetClusterAnnotation(obj metav1.Object, clusterName logicalcluster.Name) fu
225264
obj.SetAnnotations(anns)
226265
return undoFn
227266
}
267+
268+
func (p *Plugin) SetServerShutdownChannel(ch <-chan struct{}) {
269+
p.serverStopChannel = ch
270+
}

pkg/informer/informer.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,20 @@ func (d *DiscoveringDynamicSharedInformerFactory) Cluster(cluster logicalcluster
267267
}
268268
}
269269

270+
func (d *DiscoveringDynamicSharedInformerFactory) ClusterWithContext(ctx context.Context, cluster logicalcluster.Name) kcpinformers.ScopedDynamicSharedInformerFactory {
271+
informer := &scopedDiscoveringDynamicSharedInformerFactory{
272+
DiscoveringDynamicSharedInformerFactory: d,
273+
cluster: cluster,
274+
contextFn: func() context.Context { return ctx },
275+
}
276+
277+
return informer
278+
}
279+
270280
type scopedDiscoveringDynamicSharedInformerFactory struct {
271281
*DiscoveringDynamicSharedInformerFactory
272-
cluster logicalcluster.Name
282+
cluster logicalcluster.Name
283+
contextFn func() context.Context
273284
}
274285

275286
// ForResource returns the GenericInformer for gvr, creating it if needed. The GenericInformer must be started
@@ -279,6 +290,9 @@ func (d *scopedDiscoveringDynamicSharedInformerFactory) ForResource(gvr schema.G
279290
if err != nil {
280291
return nil, err
281292
}
293+
if d.contextFn != nil {
294+
return clusterInformer.ClusterWithContext(d.contextFn(), d.cluster), nil
295+
}
282296
return clusterInformer.Cluster(d.cluster), nil
283297
}
284298

pkg/reconciler/cache/cachedresources/cachedresources_reconcile_replication.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,7 @@ func (r *replication) reconcile(ctx context.Context, cachedResource *cachev1alph
7979
global := r.cacheKcpInformers.Cache().V1alpha1().CachedObjects()
8080

8181
// Local informer is based on the specific types we want to replicate.
82-
// TODO: use ClusterWithContext
83-
local, err := r.discoveringDynamicKcpInformers.Cluster(cluster).ForResource(gvr)
82+
local, err := r.discoveringDynamicKcpInformers.ClusterWithContext(ctx, cluster).ForResource(gvr)
8483
if err != nil {
8584
logger.Error(err, "Failed to get local informer for resource", "resource", gvr)
8685
cancel()

pkg/reconciler/garbagecollector/garbagecollector_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func (c *Controller) startGarbageCollectorForLogicalCluster(ctx context.Context,
253253
c.metadataClient.Cluster(clusterName.Path()),
254254
c.dynamicDiscoverySharedInformerFactory.RESTMapper(),
255255
c.ignoredResources,
256-
c.dynamicDiscoverySharedInformerFactory.Cluster(clusterName),
256+
c.dynamicDiscoverySharedInformerFactory.ClusterWithContext(ctx, clusterName),
257257
c.informersStarted,
258258
)
259259
if err != nil {

pkg/reconciler/kubequota/kubequota_controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ const (
5151

5252
type scopeableInformerFactory interface {
5353
Cluster(logicalcluster.Name) kcpkubernetesinformers.ScopedDynamicSharedInformerFactory
54+
ClusterWithContext(context.Context, logicalcluster.Name) kcpkubernetesinformers.ScopedDynamicSharedInformerFactory
5455
}
5556

5657
// Controller manages per-workspace resource quota controllers.
@@ -263,9 +264,9 @@ func (c *Controller) startQuotaForLogicalCluster(ctx context.Context, clusterNam
263264

264265
resourceQuotaControllerOptions := &resourcequota.ControllerOptions{
265266
QuotaClient: resourceQuotaControllerClient.CoreV1(),
266-
ResourceQuotaInformer: c.resourceQuotaClusterInformer.Cluster(clusterName),
267+
ResourceQuotaInformer: c.resourceQuotaClusterInformer.ClusterWithContext(ctx, clusterName),
267268
ResyncPeriod: controller.StaticResyncPeriodFunc(c.quotaRecalculationPeriod),
268-
InformerFactory: c.scopingGenericSharedInformerFactory.Cluster(clusterName),
269+
InformerFactory: c.scopingGenericSharedInformerFactory.ClusterWithContext(ctx, clusterName),
269270
ReplenishmentResyncPeriod: func() time.Duration {
270271
return c.fullResyncPeriod
271272
},

0 commit comments

Comments
 (0)