Skip to content

Commit 834871d

Browse files
authored
[release-1.27] multicluster collection cleanup (#57692)
* Cleanup resources when remote clusters are removed (#57361) * Use cluster stop instead of global stop for multicluster ambient index Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> * Fix build and add release note Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> * Fixup Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> * Stash Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> * reduce diff Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> * Add tests and comments for the goroutine leaks Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> * Fix typo Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> * Fixup Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> * Address PR comments Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> --------- Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> * Fix flaky ambient multicluster leak test (#57670) * Add some more flexibility in checks but disable the test while it's flaky Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> * Uncomment and use t.Skip Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> --------- Signed-off-by: Keith Mattix II <keithmattix@microsoft.com> --------- Signed-off-by: Keith Mattix II <keithmattix@microsoft.com>
1 parent 7e1d1eb commit 834871d

File tree

14 files changed

+290
-116
lines changed

14 files changed

+290
-116
lines changed

pilot/pkg/serviceregistry/kube/controller/ambient/ambientindex.go

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,10 @@ func New(options Options) Index {
179179

180180
// TODO: Should this go ahead and transform the full ns into some intermediary with just the details we care about?
181181
Namespaces := krt.NewInformer[*corev1.Namespace](options.Client, opts.With(
182-
append(opts.WithName("informer/Namespaces"),
183-
krt.WithMetadata(krt.Metadata{
184-
multicluster.ClusterKRTMetadataKey: options.ClusterID,
185-
}),
186-
)...,
182+
krt.WithName("informer/Namespaces"),
183+
krt.WithMetadata(krt.Metadata{
184+
multicluster.ClusterKRTMetadataKey: options.ClusterID,
185+
}),
187186
)...)
188187
authzPolicies := kclient.NewDelayedInformer[*securityclient.AuthorizationPolicy](options.Client,
189188
gvr.AuthorizationPolicy, kubetypes.StandardInformer, configFilter)
@@ -195,11 +194,10 @@ func New(options Options) Index {
195194

196195
gatewayClient := kclient.NewDelayedInformer[*v1beta1.Gateway](options.Client, gvr.KubernetesGateway, kubetypes.StandardInformer, filter)
197196
Gateways := krt.WrapClient[*v1beta1.Gateway](gatewayClient, opts.With(
198-
append(opts.WithName("informer/Gateways"),
199-
krt.WithMetadata(krt.Metadata{
200-
multicluster.ClusterKRTMetadataKey: options.ClusterID,
201-
}),
202-
)...,
197+
krt.WithName("informer/Gateways"),
198+
krt.WithMetadata(krt.Metadata{
199+
multicluster.ClusterKRTMetadataKey: options.ClusterID,
200+
}),
203201
)...)
204202

205203
gatewayClassClient := kclient.NewDelayedInformer[*v1beta1.GatewayClass](options.Client, gvr.GatewayClass, kubetypes.StandardInformer, filter)
@@ -208,11 +206,10 @@ func New(options Options) Index {
208206
ObjectFilter: options.Client.ObjectFilter(),
209207
ObjectTransform: kubeclient.StripPodUnusedFields,
210208
}, opts.With(
211-
append(opts.WithName("informer/Pods"),
212-
krt.WithMetadata(krt.Metadata{
213-
multicluster.ClusterKRTMetadataKey: options.ClusterID,
214-
}),
215-
)...,
209+
krt.WithName("informer/Pods"),
210+
krt.WithMetadata(krt.Metadata{
211+
multicluster.ClusterKRTMetadataKey: options.ClusterID,
212+
}),
216213
)...)
217214

218215
serviceEntries := kclient.NewDelayedInformer[*networkingclient.ServiceEntry](options.Client,
@@ -225,31 +222,28 @@ func New(options Options) Index {
225222

226223
servicesClient := kclient.NewFiltered[*corev1.Service](options.Client, filter)
227224
Services := krt.WrapClient[*corev1.Service](servicesClient, opts.With(
228-
append(opts.WithName("informer/Services"),
229-
krt.WithMetadata(krt.Metadata{
230-
multicluster.ClusterKRTMetadataKey: options.ClusterID,
231-
}),
232-
)...,
225+
krt.WithName("informer/Services"),
226+
krt.WithMetadata(krt.Metadata{
227+
multicluster.ClusterKRTMetadataKey: options.ClusterID,
228+
}),
233229
)...)
234230
Nodes := krt.NewInformerFiltered[*corev1.Node](options.Client, kclient.Filter{
235231
ObjectFilter: options.Client.ObjectFilter(),
236232
ObjectTransform: kubeclient.StripNodeUnusedFields,
237233
}, opts.With(
238-
append(opts.WithName("informer/Nodes"),
239-
krt.WithMetadata(krt.Metadata{
240-
multicluster.ClusterKRTMetadataKey: options.ClusterID,
241-
}),
242-
)...,
234+
krt.WithName("informer/Nodes"),
235+
krt.WithMetadata(krt.Metadata{
236+
multicluster.ClusterKRTMetadataKey: options.ClusterID,
237+
}),
243238
)...)
244239

245240
EndpointSlices := krt.NewInformerFiltered[*discovery.EndpointSlice](options.Client, kclient.Filter{
246241
ObjectFilter: options.Client.ObjectFilter(),
247242
}, opts.With(
248-
append(opts.WithName("informer/EndpointSlices"),
249-
krt.WithMetadata(krt.Metadata{
250-
multicluster.ClusterKRTMetadataKey: options.ClusterID,
251-
}),
252-
)...,
243+
krt.WithName("informer/EndpointSlices"),
244+
krt.WithMetadata(krt.Metadata{
245+
multicluster.ClusterKRTMetadataKey: options.ClusterID,
246+
}),
253247
)...)
254248

255249
// In the multicluster use-case, we populate the collections with global, dynamically changing data.

pilot/pkg/serviceregistry/kube/controller/ambient/multicluster.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,21 @@ func (a *index) buildGlobalCollections(
107107
log.Warnf("Failed to sync gateways informer for cluster %s", c.ID)
108108
return nil
109109
}
110+
111+
// N.B we're not using the opts.WithXXX pattern here since we want to be very obvious about which
112+
// stop is being used to shutdown the collection (it should always be the cluster stop, NEVER
113+
// the top-level stop associated with the ambient controller)
114+
opts := []krt.CollectionOption{
115+
krt.WithName(fmt.Sprintf("ambient/GatewaysWithCluster[%s]", c.ID)),
116+
krt.WithDebugging(opts.Debugger()),
117+
krt.WithStop(c.GetStop()),
118+
}
110119
return ptr.Of(krt.MapCollection(c.Gateways(), func(obj *v1beta1.Gateway) krt.ObjectWithCluster[*v1beta1.Gateway] {
111120
return krt.ObjectWithCluster[*v1beta1.Gateway]{
112121
ClusterID: c.ID,
113122
Object: &obj,
114123
}
115-
}, opts.WithName(fmt.Sprintf("GatewaysWithCluster[%s]", c.ID))...))
124+
}, opts...))
116125
}, "GatewaysWithCluster", opts)
117126

118127
globalGatewaysByCluster := nestedCollectionIndexByCluster(GlobalGatewaysWithCluster)
@@ -142,12 +151,17 @@ func (a *index) buildGlobalCollections(
142151
log.Warnf("Failed to sync nodes informer for cluster %s", c.ID)
143152
return nil
144153
}
154+
opts := []krt.CollectionOption{
155+
krt.WithName(fmt.Sprintf("ambient/NodesWithCluster[%s]", c.ID)),
156+
krt.WithDebugging(opts.Debugger()),
157+
krt.WithStop(c.GetStop()),
158+
}
145159
return ptr.Of(krt.MapCollection(c.Nodes(), func(obj *v1.Node) krt.ObjectWithCluster[*v1.Node] {
146160
return krt.ObjectWithCluster[*v1.Node]{
147161
ClusterID: c.ID,
148162
Object: &obj,
149163
}
150-
}, opts.WithName(fmt.Sprintf("NodesWithCluster[%s]", c.ID))...))
164+
}, opts...))
151165
}, "NodesWithCluster", opts)
152166
// Set up collections for remote clusters
153167
GlobalNetworks := buildGlobalNetworkCollections(
@@ -699,7 +713,10 @@ func mergeServiceInfosWithCluster(
699713
return nil
700714
}
701715
// otherwise, skip merging
702-
return &base
716+
return &krt.ObjectWithCluster[model.ServiceInfo]{
717+
ClusterID: base.ClusterID,
718+
Object: precomputeServicePtr(base.Object),
719+
}
703720
}
704721

705722
vips := sets.NewWithLength[simpleNetworkAddress](svcInfosLen)

pilot/pkg/serviceregistry/kube/controller/ambient/multicluster/cluster.go

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ func (c *Cluster) ResourceName() string {
162162
return c.ID.String()
163163
}
164164

165+
func (c *Cluster) GetStop() <-chan struct{} {
166+
return c.stop
167+
}
168+
165169
func (c *Cluster) Run(localMeshConfig meshwatcher.WatcherCollection, debugger *krt.DebugHandler) {
166170
// Check and see if this is a local cluster or not
167171
if c.RemoteClusterCollections != nil {
@@ -207,59 +211,53 @@ func (c *Cluster) Run(localMeshConfig meshwatcher.WatcherCollection, debugger *k
207211
}
208212

209213
Namespaces := krt.WrapClient(namespaces, opts.With(
210-
append(opts.WithName("informer/Namespaces"),
211-
krt.WithMetadata(krt.Metadata{
212-
ClusterKRTMetadataKey: c.ID,
213-
}),
214-
)...,
214+
krt.WithName("informer/Namespaces"),
215+
krt.WithMetadata(krt.Metadata{
216+
ClusterKRTMetadataKey: c.ID,
217+
}),
215218
)...)
216219
Pods := krt.NewInformerFiltered[*corev1.Pod](c.Client, kclient.Filter{
217220
ObjectFilter: c.Client.ObjectFilter(),
218221
ObjectTransform: kube.StripPodUnusedFields,
219222
}, opts.With(
220-
append(opts.WithName("informer/Pods"),
221-
krt.WithMetadata(krt.Metadata{
222-
ClusterKRTMetadataKey: c.ID,
223-
}),
224-
)...,
223+
krt.WithName("informer/Pods"),
224+
krt.WithMetadata(krt.Metadata{
225+
ClusterKRTMetadataKey: c.ID,
226+
}),
225227
)...)
226228

227229
gatewayClient := kclient.NewDelayedInformer[*v1beta1.Gateway](c.Client, gvr.KubernetesGateway, kubetypes.StandardInformer, defaultFilter)
228230
Gateways := krt.WrapClient[*v1beta1.Gateway](gatewayClient, opts.With(
229-
append(opts.WithName("informer/Gateways"),
230-
krt.WithMetadata(krt.Metadata{
231-
ClusterKRTMetadataKey: c.ID,
232-
}),
233-
)...,
231+
krt.WithName("informer/Gateways"),
232+
krt.WithMetadata(krt.Metadata{
233+
ClusterKRTMetadataKey: c.ID,
234+
}),
234235
)...)
235236
servicesClient := kclient.NewFiltered[*corev1.Service](c.Client, defaultFilter)
236237
Services := krt.WrapClient[*corev1.Service](servicesClient, opts.With(
237-
append(opts.WithName("informer/Services"),
238-
krt.WithMetadata(krt.Metadata{
239-
ClusterKRTMetadataKey: c.ID,
240-
}),
241-
)...,
238+
krt.WithName("informer/Services"),
239+
krt.WithMetadata(krt.Metadata{
240+
ClusterKRTMetadataKey: c.ID,
241+
}),
242242
)...)
243243

244244
Nodes := krt.NewInformerFiltered[*corev1.Node](c.Client, kclient.Filter{
245245
ObjectFilter: c.Client.ObjectFilter(),
246246
ObjectTransform: kube.StripNodeUnusedFields,
247247
}, opts.With(
248-
append(opts.WithName("informer/Nodes"),
249-
krt.WithMetadata(krt.Metadata{
250-
ClusterKRTMetadataKey: c.ID,
251-
}),
252-
)...,
248+
krt.WithName("informer/Nodes"),
249+
krt.WithMetadata(krt.Metadata{
250+
ClusterKRTMetadataKey: c.ID,
251+
}),
253252
)...)
254253

255254
EndpointSlices := krt.NewInformerFiltered[*discovery.EndpointSlice](c.Client, kclient.Filter{
256255
ObjectFilter: c.Client.ObjectFilter(),
257256
}, opts.With(
258-
append(opts.WithName("informer/EndpointSlices"),
259-
krt.WithMetadata(krt.Metadata{
260-
ClusterKRTMetadataKey: c.ID,
261-
}),
262-
)...,
257+
krt.WithName("informer/EndpointSlices"),
258+
krt.WithMetadata(krt.Metadata{
259+
ClusterKRTMetadataKey: c.ID,
260+
}),
263261
)...)
264262

265263
c.RemoteClusterCollections = &RemoteClusterCollections{

pilot/pkg/serviceregistry/kube/controller/ambient/networks.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ func buildGlobalNetworkCollections(
129129
localNetworkGateways,
130130
clusters,
131131
func(ctx krt.HandlerContext, c *multicluster.Cluster) *krt.Collection[krt.ObjectWithCluster[NetworkGateway]] {
132+
opts := []krt.CollectionOption{
133+
krt.WithName(fmt.Sprintf("NetworkGateways[%s]", c.ID)),
134+
krt.WithDebugging(opts.Debugger()),
135+
krt.WithStop(c.GetStop()),
136+
krt.WithMetadata(krt.Metadata{
137+
multicluster.ClusterKRTMetadataKey: c.ID,
138+
}),
139+
}
132140
gatewaysPtr := krt.FetchOne(ctx, gateways, krt.FilterIndex(gatewaysByCluster, c.ID))
133141
if gatewaysPtr == nil {
134142
log.Warnf("No gateways found for cluster %s", c.ID)
@@ -146,12 +154,7 @@ func buildGlobalNetworkCollections(
146154
}
147155
return k8sGatewayToNetworkGatewaysWithCluster(c.ID, innerGw, options.ClusterID)
148156
},
149-
append(
150-
opts.WithName(fmt.Sprintf("NetworkGateways[%s]", c.ID)),
151-
krt.WithMetadata(krt.Metadata{
152-
multicluster.ClusterKRTMetadataKey: c.ID,
153-
}),
154-
)...,
157+
opts...,
155158
)
156159

157160
return ptr.Of(nwGateways)

pilot/pkg/serviceregistry/kube/controller/ambient/remotesecrets.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,7 @@ func (a *index) deleteCluster(secretKey string, cluster *multicluster.Cluster) {
164164
cluster.Stop()
165165
// The delete event will be processed within the ClusterStore
166166
a.cs.Delete(secretKey, cluster.ID)
167-
168-
log.Infof("Number of remote clusters: %d", a.cs.Len())
167+
cluster.Client.Shutdown() // Shutdown all of the informers so that the goroutines won't leak
169168
}
170169

171170
func (a *index) processSecretEvent(key types.NamespacedName) error {

pilot/pkg/serviceregistry/kube/controller/ambient/remotesecrets_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"istio.io/istio/pkg/test/util/assert"
4949
"istio.io/istio/pkg/test/util/retry"
5050
"istio.io/istio/pkg/util/sets"
51+
"istio.io/istio/tests/util/leak"
5152
)
5253

5354
const secretNamespace string = "istio-system"
@@ -743,6 +744,61 @@ func TestSecretController(t *testing.T) {
743744
}
744745
}
745746

747+
func TestRemoteClusterCollectionCleanup(t *testing.T) {
748+
t.Skip("https://github.com/istio/istio/issues/57269")
749+
test.SetForTest(t, &features.EnableAmbientMultiNetwork, true)
750+
fakeRestConfig := &rest.Config{}
751+
client := kube.NewFakeClient()
752+
opts := krt.NewOptionsBuilder(test.NewStop(t), "test", krt.GlobalDebugHandler)
753+
builder := func(kubeConfig []byte, clusterId cluster.ID, configOverrides ...func(*rest.Config)) (kube.Client, error) {
754+
for _, override := range configOverrides {
755+
override(fakeRestConfig)
756+
}
757+
return kube.NewFakeClient(), nil
758+
}
759+
options := Options{
760+
Client: client,
761+
ClusterID: "local-cluster",
762+
ClientBuilder: builder,
763+
}
764+
765+
a := newAmbientTestServerFromOptions(t, testNW, options, true)
766+
a.clientBuilder = builder
767+
clusters := a.remoteClusters
768+
769+
assert.Equal(t, clusters.WaitUntilSynced(opts.Stop()), true)
770+
secret0 := makeSecret(secretNamespace, "s0", clusterCredential{"c0", []byte("kubeconfig0-0")})
771+
secrets := a.sec
772+
assert.Equal(t, a.workloads.WaitUntilSynced(opts.Stop()), true)
773+
assert.Equal(t, a.services.WaitUntilSynced(opts.Stop()), true)
774+
assert.Equal(t, a.waypoints.WaitUntilSynced(opts.Stop()), true)
775+
t.Run("test remote cluster cleanup", func(t *testing.T) {
776+
// Verify that we only leak the expected number of goroutines.
777+
// Currently for ambient, we leak several goroutines per cluster
778+
// (see https://github.com/istio/istio/issues/57269#issuecomment-3203115068
779+
// for more details). Most of the leaks we can't control at this point (e.g. Fetches)
780+
// come from the processorListener so exclude those. We also allow 6 additional leaks
781+
// for buffer since we see it occurring often.
782+
// TODO: Figure out if we can minimize this.
783+
784+
// Filter all goroutines that existed before the test started
785+
leak.Check(t, leak.WithExcludedLeaks([]string{
786+
"krt.(*processorListener[...]).run",
787+
"krt.(*processorListener[...]).pop",
788+
}), leak.WithAllowedLeaks(6))
789+
secrets.Create(secret0)
790+
assert.EventuallyEqual(t, func() int {
791+
return len(a.remoteClusters.List())
792+
}, 1)
793+
// Now remove the secret
794+
secrets.Delete(secret0.Name, secret0.Namespace)
795+
assert.EventuallyEqual(t, func() int {
796+
return len(a.remoteClusters.List())
797+
}, 0)
798+
// There shouldn't be any goroutines left from that cluster
799+
})
800+
}
801+
746802
type testHandler struct {
747803
ID cluster.ID
748804
Iter int

pilot/pkg/serviceregistry/kube/controller/ambient/services.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ func GlobalMergedWorkloadServicesCollection(
111111
LocalServiceInfosWithCluster,
112112
clusters,
113113
func(ctx krt.HandlerContext, cluster *multicluster.Cluster) *krt.Collection[krt.ObjectWithCluster[model.ServiceInfo]] {
114+
opts := []krt.CollectionOption{
115+
krt.WithDebugging(opts.Debugger()),
116+
krt.WithStop(cluster.GetStop()),
117+
}
114118
services := cluster.Services()
115119
waypointsPtr := krt.FetchOne(ctx, globalWaypoints, krt.FilterIndex(waypointsByCluster, cluster.ID))
116120
if waypointsPtr == nil {
@@ -136,21 +140,24 @@ func GlobalMergedWorkloadServicesCollection(
136140
return ""
137141
}
138142
return nw.Network
139-
}), opts.With(
143+
}),
140144
append(
141-
opts.WithName(fmt.Sprintf("ServiceServiceInfos[%s]", cluster.ID)),
145+
opts,
146+
krt.WithName(fmt.Sprintf("ambient/ServiceServiceInfos[%s]", cluster.ID)),
142147
krt.WithMetadata(krt.Metadata{
143148
multicluster.ClusterKRTMetadataKey: cluster.ID,
144149
}),
145-
)...,
146-
)...)
150+
)...)
147151

148152
servicesInfoWithCluster := krt.MapCollection(
149153
servicesInfo,
150154
func(o model.ServiceInfo) krt.ObjectWithCluster[model.ServiceInfo] {
151155
return krt.ObjectWithCluster[model.ServiceInfo]{ClusterID: cluster.ID, Object: &o}
152156
},
153-
opts.WithName(fmt.Sprintf("ServiceServiceInfosWithCluster[%s]", cluster.ID))...,
157+
append(
158+
opts,
159+
krt.WithName(fmt.Sprintf("ServiceServiceInfosWithCluster[%s]", cluster.ID)),
160+
)...,
154161
)
155162
return ptr.Of(servicesInfoWithCluster)
156163
},

0 commit comments

Comments
 (0)