Skip to content

Commit a726757

Browse files
authored
fix(service-mirror): don't restart cluster watch upon Link status updates (#13579)
* fix(service-mirror): don't restart cluster watch upon Link status updates Every time there's an update to a Link resource the service mirror restarts the cluster watch after cleaning up any existing worker. We recently introduced a status stanza in Link that gets updated upon every mirroring of a service, which was unnecessarily triggering a cluster watcher restart. For a sufficiently high number of services getting mirrored at once this was causing severe contention on the controller, delaying mirroring up to a halt. This change fixes the situation by only considering changes in the Link Spec for restarting the cluster watch. * Lower log level * Extract the resource event handler functions into a separate file, and add unit test making sure the add/update/delete functions are called, and that in particular the update function is _not_ called when updating a Link status.
1 parent 70cf784 commit a726757

File tree

7 files changed

+227
-53
lines changed

7 files changed

+227
-53
lines changed

controller/k8s/api.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
1313
l5dcrdinformer "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions"
1414
ewinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/externalworkload/v1beta1"
15+
linkinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/link/v1alpha2"
1516
srvinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/server/v1beta3"
1617
spinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/serviceprofile/v1alpha2"
1718
"github.com/linkerd/linkerd2/pkg/k8s"
@@ -52,6 +53,7 @@ type API struct {
5253
es discoveryinformers.EndpointSliceInformer
5354
ew ewinformers.ExternalWorkloadInformer
5455
job batchv1informers.JobInformer
56+
link linkinformers.LinkInformer
5557
mwc arinformers.MutatingWebhookConfigurationInformer
5658
ns coreinformers.NamespaceInformer
5759
pod coreinformers.PodInformer
@@ -248,6 +250,13 @@ func newAPI(
248250
api.job = sharedInformers.Batch().V1().Jobs()
249251
api.syncChecks = append(api.syncChecks, api.job.Informer().HasSynced)
250252
api.promGauges.addInformerSize(k8s.Job, informerLabels, api.job.Informer())
253+
case Link:
254+
if l5dCrdSharedInformers == nil {
255+
panic("Linkerd CRD shared informer not configured")
256+
}
257+
api.link = l5dCrdSharedInformers.Link().V1alpha2().Links()
258+
api.syncChecks = append(api.syncChecks, api.link.Informer().HasSynced)
259+
api.promGauges.addInformerSize(k8s.Link, informerLabels, api.link.Informer())
251260
case MWC:
252261
api.mwc = sharedInformers.Admissionregistration().V1().MutatingWebhookConfigurations()
253262
api.syncChecks = append(api.syncChecks, api.mwc.Informer().HasSynced)

controller/k8s/api_resource.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const (
3232
ES // EndpointSlice resource
3333
ExtWorkload
3434
Job
35+
Link
3536
MWC
3637
NS
3738
Pod

controller/k8s/test_helper.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrd
5757
DS,
5858
Endpoint,
5959
Job,
60+
Link,
6061
MWC,
6162
NS,
6263
Pod,

multicluster/cmd/service-mirror/main.go

Lines changed: 1 addition & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
log "github.com/sirupsen/logrus"
2222
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2323
"k8s.io/client-go/kubernetes"
24-
"k8s.io/client-go/tools/cache"
2524
"k8s.io/client-go/tools/clientcmd"
2625
"k8s.io/client-go/tools/leaderelection"
2726
"k8s.io/client-go/tools/leaderelection/resourcelock"
@@ -149,58 +148,7 @@ func Main(args []string) {
149148
log.Infof("Starting Link informer")
150149
informerFactory.Start(ctx.Done())
151150

152-
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
153-
AddFunc: func(obj interface{}) {
154-
link, ok := obj.(*v1alpha2.Link)
155-
if !ok {
156-
log.Errorf("object is not a Link: %+v", obj)
157-
return
158-
}
159-
if link.GetName() == linkName {
160-
select {
161-
case results <- link:
162-
default:
163-
log.Errorf("Link update dropped (queue full): %s", link.GetName())
164-
}
165-
}
166-
},
167-
UpdateFunc: func(_, obj interface{}) {
168-
link, ok := obj.(*v1alpha2.Link)
169-
if !ok {
170-
log.Errorf("object is not a Link: %+v", obj)
171-
return
172-
}
173-
if link.GetName() == linkName {
174-
select {
175-
case results <- link:
176-
default:
177-
log.Errorf("Link update dropped (queue full): %s", link.GetName())
178-
}
179-
}
180-
},
181-
DeleteFunc: func(obj interface{}) {
182-
link, ok := obj.(*v1alpha2.Link)
183-
if !ok {
184-
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
185-
if !ok {
186-
log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
187-
return
188-
}
189-
link, ok = tombstone.Obj.(*v1alpha2.Link)
190-
if !ok {
191-
log.Errorf("DeletedFinalStateUnknown contained object that is not a Link %#v", obj)
192-
return
193-
}
194-
}
195-
if link.GetName() == linkName {
196-
select {
197-
case results <- nil: // nil indicates the link was deleted
198-
default:
199-
log.Errorf("Link delete dropped (queue full): %s", link.GetName())
200-
}
201-
}
202-
},
203-
})
151+
_, err := informer.AddEventHandler(servicemirror.GetLinkHandlers(results, linkName))
204152
if err != nil {
205153
log.Fatalf("Failed to add event handler to Link informer: %s", err)
206154
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package servicemirror
2+
3+
import (
4+
"reflect"
5+
6+
"github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha2"
7+
log "github.com/sirupsen/logrus"
8+
"k8s.io/client-go/tools/cache"
9+
)
10+
11+
func GetLinkHandlers(results chan<- *v1alpha2.Link, linkName string) cache.ResourceEventHandlerFuncs {
12+
return cache.ResourceEventHandlerFuncs{
13+
AddFunc: func(obj interface{}) {
14+
link, ok := obj.(*v1alpha2.Link)
15+
if !ok {
16+
log.Errorf("object is not a Link: %+v", obj)
17+
return
18+
}
19+
if link.GetName() == linkName {
20+
select {
21+
case results <- link:
22+
default:
23+
log.Errorf("Link update dropped (queue full): %s", link.GetName())
24+
}
25+
}
26+
},
27+
UpdateFunc: func(oldObj, currentObj interface{}) {
28+
oldLink, ok := oldObj.(*v1alpha2.Link)
29+
if !ok {
30+
log.Errorf("object is not a Link: %+v", oldObj)
31+
return
32+
}
33+
currentLink, ok := currentObj.(*v1alpha2.Link)
34+
if !ok {
35+
log.Errorf("object is not a Link: %+v", currentObj)
36+
return
37+
}
38+
if reflect.DeepEqual(oldLink.Spec, currentLink.Spec) {
39+
log.Debugf("Link update ignored (only status changed): %s", currentLink.GetName())
40+
return
41+
}
42+
if currentLink.GetName() == linkName {
43+
select {
44+
case results <- currentLink:
45+
default:
46+
log.Errorf("Link update dropped (queue full): %s", currentLink.GetName())
47+
}
48+
}
49+
},
50+
DeleteFunc: func(obj interface{}) {
51+
link, ok := obj.(*v1alpha2.Link)
52+
if !ok {
53+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
54+
if !ok {
55+
log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
56+
return
57+
}
58+
link, ok = tombstone.Obj.(*v1alpha2.Link)
59+
if !ok {
60+
log.Errorf("DeletedFinalStateUnknown contained object that is not a Link %#v", obj)
61+
return
62+
}
63+
}
64+
if link.GetName() == linkName {
65+
select {
66+
case results <- nil: // nil indicates the link was deleted
67+
default:
68+
log.Errorf("Link delete dropped (queue full): %s", link.GetName())
69+
}
70+
}
71+
},
72+
}
73+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package servicemirror
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log"
7+
"testing"
8+
"time"
9+
10+
"github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha2"
11+
l5dcrdinformer "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions"
12+
"github.com/linkerd/linkerd2/controller/k8s"
13+
corev1 "k8s.io/api/core/v1"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/types"
16+
)
17+
18+
const nsName = "ns1"
19+
const linkName = "linkName"
20+
21+
func TestLinkHandlers(t *testing.T) {
22+
k8sAPI, l5dAPI, err := k8s.NewFakeAPIWithL5dClient()
23+
if err != nil {
24+
t.Fatal(err)
25+
}
26+
k8sAPI.Sync(nil)
27+
28+
informerFactory := l5dcrdinformer.NewSharedInformerFactoryWithOptions(
29+
l5dAPI,
30+
k8s.ResyncTime,
31+
l5dcrdinformer.WithNamespace(nsName),
32+
)
33+
informer := informerFactory.Link().V1alpha2().Links().Informer()
34+
informerFactory.Start(context.Background().Done())
35+
36+
results := make(chan *v1alpha2.Link, 100)
37+
_, err = informer.AddEventHandler(GetLinkHandlers(results, linkName))
38+
if err != nil {
39+
t.Fatal(err)
40+
}
41+
42+
// test that a message is received when a link is created
43+
_, err = k8sAPI.Client.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: nsName}}, metav1.CreateOptions{})
44+
if err != nil {
45+
t.Fatal(err)
46+
}
47+
48+
link := &v1alpha2.Link{
49+
ObjectMeta: metav1.ObjectMeta{
50+
Name: linkName,
51+
Namespace: nsName,
52+
},
53+
Spec: v1alpha2.LinkSpec{ProbeSpec: v1alpha2.ProbeSpec{Timeout: "30s"}},
54+
}
55+
_, err = l5dAPI.LinkV1alpha2().Links(nsName).Create(context.Background(), link, metav1.CreateOptions{})
56+
if err != nil {
57+
t.Fatal(err)
58+
}
59+
60+
select {
61+
case link := <-results:
62+
if link.GetName() != linkName {
63+
t.Fatalf("Expected LinkName, got %s", link.GetName())
64+
}
65+
case <-time.After(time.Second):
66+
t.Fatal("Timed out waiting for message")
67+
}
68+
69+
// test that a message is received when a link spec is updated
70+
patch := map[string]any{
71+
"spec": map[string]any{
72+
"probeSpec": map[string]any{
73+
"timeout": "60s",
74+
},
75+
},
76+
}
77+
patchBytes, err := json.Marshal(patch)
78+
if err != nil {
79+
log.Fatalf("Failed to marshal patch: %v", err)
80+
}
81+
_, err = l5dAPI.LinkV1alpha2().Links(nsName).Patch(
82+
context.Background(),
83+
linkName,
84+
types.MergePatchType,
85+
patchBytes,
86+
metav1.PatchOptions{},
87+
)
88+
if err != nil {
89+
t.Fatalf("Failed to patch link: %s", err)
90+
}
91+
92+
select {
93+
case link := <-results:
94+
if link.GetName() != linkName {
95+
t.Fatalf("Expected LinkName, got %s", link.GetName())
96+
}
97+
case <-time.After(time.Second):
98+
t.Fatal("Timed out waiting for message")
99+
}
100+
101+
// test that a message is _not_ received when a link status is updated
102+
patch = map[string]any{
103+
"status": map[string]any{
104+
"foo": "bar",
105+
},
106+
}
107+
patchBytes, err = json.Marshal(patch)
108+
if err != nil {
109+
log.Fatalf("Failed to marshal patch: %v", err)
110+
}
111+
_, err = l5dAPI.LinkV1alpha2().Links(nsName).Patch(
112+
context.Background(),
113+
linkName,
114+
types.MergePatchType,
115+
patchBytes,
116+
metav1.PatchOptions{},
117+
"status",
118+
)
119+
if err != nil {
120+
t.Fatalf("Failed to patch link: %s", err)
121+
}
122+
123+
select {
124+
case link := <-results:
125+
t.Fatalf("Received unexpected message: %v", link)
126+
case <-time.After(time.Second):
127+
}
128+
129+
// test that a nil message is received when a link is deleted
130+
if err := l5dAPI.LinkV1alpha2().Links(nsName).Delete(context.Background(), linkName, metav1.DeleteOptions{}); err != nil {
131+
t.Fatalf("Failed to delete link: %s", err)
132+
}
133+
select {
134+
case link := <-results:
135+
if link != nil {
136+
t.Fatalf("Expected nil, got %v", link)
137+
}
138+
case <-time.After(time.Second):
139+
t.Fatal("Timed out waiting for message")
140+
}
141+
}

pkg/k8s/k8s.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const (
2121
EndpointSlices = "endpointslices"
2222
ExtWorkload = "externalworkload"
2323
Job = "job"
24+
Link = "link"
2425
MeshTLSAuthentication = "meshtlsauthentication"
2526
MutatingWebhookConfig = "mutatingwebhookconfig"
2627
Namespace = "namespace"

0 commit comments

Comments
 (0)