Skip to content

Commit 6699a24

Browse files
authored
Handle multicluster cleanup (#58093)
* Handle multicluster cleanup Signed-off-by: Keith Mattix II <[email protected]> * More cleanup to stop leak Signed-off-by: Keith Mattix II <[email protected]> * Address PR comments Signed-off-by: Keith Mattix II <[email protected]> --------- Signed-off-by: Keith Mattix II <[email protected]>
1 parent 6efb75e commit 6699a24

File tree

6 files changed

+31
-3
lines changed

6 files changed

+31
-3
lines changed

pilot/pkg/config/kube/crdclient/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,12 @@ func (cl *Client) Run(stop <-chan struct{}) {
176176
}
177177
<-stop
178178
close(cl.stop)
179+
// Cleanup handlers
180+
for _, h := range cl.allKinds() {
181+
for _, reg := range h.handlers {
182+
reg.UnregisterHandler()
183+
}
184+
}
179185
cl.logger.Infof("controller terminated")
180186
}
181187

pilot/pkg/serviceregistry/kube/controller/controller.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,9 @@ func (c *Controller) Cleanup() error {
408408
c.opts.MeshNetworksWatcher.DeleteNetworksHandler(c.networksHandlerRegistration)
409409
}
410410

411+
// Shutdown all the informer handlers
412+
c.shutdownInformerHandlers()
413+
411414
return nil
412415
}
413416

@@ -649,6 +652,14 @@ func (c *Controller) HasSynced() bool {
649652
return c.queue.HasSynced()
650653
}
651654

655+
func (c *Controller) shutdownInformerHandlers() {
656+
c.namespaces.ShutdownHandlers()
657+
c.services.ShutdownHandlers()
658+
c.endpoints.slices.ShutdownHandlers()
659+
c.pods.pods.ShutdownHandlers()
660+
c.nodes.ShutdownHandlers()
661+
}
662+
652663
func (c *Controller) informersSynced() bool {
653664
return c.namespaces.HasSynced() &&
654665
c.services.HasSynced() &&

pkg/kube/kclient/crdwatcher.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ func (c *crdWatcher) Run(stop <-chan struct{}) {
123123
c.mutex.Unlock()
124124
kube.WaitForCacheSync("crd watcher", stop, c.crds.HasSynced)
125125
c.queue.Run(stop)
126+
log.Info("Stopping CRD watcher")
126127
c.crds.ShutdownHandlers()
127128
}
128129

pkg/kube/multicluster/cluster.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ func (c *Cluster) Run(mesh mesh.Watcher, handlers []handler, action ACTION) {
7979
// Build a namespace watcher. This must have no filter, since this is our input to the filter itself.
8080
// This must be done before we build components, so they can access the filter.
8181
namespaces := kclient.New[*corev1.Namespace](c.Client)
82+
// When this cluster stops, clean up the namespace watcher
83+
go func() {
84+
<-c.stop
85+
namespaces.ShutdownHandlers()
86+
}()
8287
// This will start a namespace informer and wait for it to be ready. So we must start it in a go routine to avoid blocking.
8388
filter := filter.NewDiscoveryNamespacesFilter(namespaces, mesh, c.stop)
8489
kube.SetObjectFilter(c.Client, filter)

pkg/kube/multicluster/secretcontroller.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,7 @@ func (c *Controller) addSecret(name types.NamespacedName, s *corev1.Secret) erro
290290
}
291291
// stop previous remote cluster
292292
prev.Stop()
293-
// TODO(keithmattix): Is it safe to shutdown the kubeclient here as well? If we don't
294-
// the goroutine will continue to run and the client will leak.
293+
prev.Client.Shutdown() // Shutdown all of the informers so that the goroutines won't leak
295294
} else if c.cs.Contains(cluster.ID(clusterID)) {
296295
// if the cluster has been registered before by another secret, ignore the new one.
297296
logger.Warnf("cluster has already been registered")

pkg/kube/namespace/filter.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,16 @@ func NewDiscoveryNamespacesFilter(
5454
namespaces: namespaces,
5555
discoveryNamespaces: sets.New[string](),
5656
}
57-
mesh.AddMeshHandler(func() {
57+
reg := mesh.AddMeshHandler(func() {
5858
f.selectorsChanged(mesh.Mesh().GetDiscoverySelectors(), true)
5959
})
6060

61+
// Clean up mesh handler on stop
62+
go func() {
63+
<-stop
64+
mesh.DeleteMeshHandler(reg)
65+
}()
66+
6167
namespaces.AddEventHandler(controllers.EventHandler[*corev1.Namespace]{
6268
AddFunc: func(ns *corev1.Namespace) {
6369
f.lock.Lock()

0 commit comments

Comments
 (0)