Skip to content

Commit 3c9b982

Browse files
committed
Add integration test to avoid manager.Start deadlocks
1 parent b1d6919 commit 3c9b982

File tree

7 files changed

+590
-0
lines changed

7 files changed

+590
-0
lines changed

pkg/envtest/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,9 @@ func (te *Environment) Start() (*rest.Config, error) {
289289
}
290290

291291
log.V(1).Info("installing CRDs")
292+
if te.CRDInstallOptions.Scheme == nil {
293+
te.CRDInstallOptions.Scheme = te.Scheme
294+
}
292295
te.CRDInstallOptions.CRDs = mergeCRDs(te.CRDInstallOptions.CRDs, te.CRDs)
293296
te.CRDInstallOptions.Paths = mergePaths(te.CRDInstallOptions.Paths, te.CRDDirectoryPaths)
294297
te.CRDInstallOptions.ErrorIfPathMissing = te.ErrorIfCRDPathMissing
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package integration
18+
19+
import (
20+
"testing"
21+
22+
. "github.com/onsi/ginkgo/v2"
23+
. "github.com/onsi/gomega"
24+
)
25+
26+
func TestSource(t *testing.T) {
27+
RegisterFailHandler(Fail)
28+
RunSpecs(t, "Manager Integration Suite")
29+
}
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package integration
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"net"
23+
"net/http"
24+
"reflect"
25+
"sync/atomic"
26+
"time"
27+
"unsafe"
28+
29+
. "github.com/onsi/ginkgo/v2"
30+
. "github.com/onsi/gomega"
31+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
32+
apierrors "k8s.io/apimachinery/pkg/api/errors"
33+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
35+
"k8s.io/apimachinery/pkg/runtime"
36+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
37+
38+
ctrl "sigs.k8s.io/controller-runtime"
39+
"sigs.k8s.io/controller-runtime/pkg/client"
40+
"sigs.k8s.io/controller-runtime/pkg/envtest"
41+
logf "sigs.k8s.io/controller-runtime/pkg/log"
42+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
43+
"sigs.k8s.io/controller-runtime/pkg/manager"
44+
crewv1 "sigs.k8s.io/controller-runtime/pkg/manager/integration/v1"
45+
crewv2 "sigs.k8s.io/controller-runtime/pkg/manager/integration/v2"
46+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
47+
"sigs.k8s.io/controller-runtime/pkg/webhook"
48+
"sigs.k8s.io/controller-runtime/pkg/webhook/conversion"
49+
)
50+
51+
var (
52+
scheme = runtime.NewScheme()
53+
54+
driverCRD = &apiextensionsv1.CustomResourceDefinition{
55+
ObjectMeta: metav1.ObjectMeta{
56+
Name: "drivers.crew.example.com",
57+
},
58+
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
59+
Group: crewv1.GroupVersion.Group,
60+
Names: apiextensionsv1.CustomResourceDefinitionNames{
61+
Plural: "drivers",
62+
Singular: "driver",
63+
Kind: "Driver",
64+
},
65+
Scope: apiextensionsv1.NamespaceScoped,
66+
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
67+
{
68+
Name: crewv1.GroupVersion.Version,
69+
Served: true,
70+
// At creation v1 will be the storage version.
71+
// During the test v2 will become the storage version.
72+
Storage: true,
73+
Schema: &apiextensionsv1.CustomResourceValidation{
74+
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
75+
Type: "object",
76+
},
77+
},
78+
},
79+
{
80+
Name: crewv2.GroupVersion.Version,
81+
Served: true,
82+
Storage: false,
83+
Schema: &apiextensionsv1.CustomResourceValidation{
84+
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
85+
Type: "object",
86+
},
87+
},
88+
},
89+
},
90+
},
91+
}
92+
)
93+
94+
var _ = Describe("manger.Manager", func() {
95+
// This test ensure the Manager starts without running into any deadlocks as it can be very tricky
96+
// to start health probes, webhooks, caches (including informers) and reconcilers in the right order.
97+
//
98+
// To verify this we set up a test environment in the following state:
99+
// * Ensure Informer sync requires a functioning conversion webhook (and thus readiness probe)
100+
// * Driver CRD is deployed with v1 as storage version
101+
// * A Driver CR is created and stored in the v1 version
102+
// * The CRD is updated to make v2 the storage version
103+
// * This ensures every Driver list call goes through conversion.
104+
// * Setup manager:
105+
// * Set up health probes
106+
// * Set up a Driver reconciler to verify reconciliation works
107+
// * Set up a conversion webhook which only works if readiness probe succeeds (just like a Kubernetes service)
108+
// * Add an index on v2 Driver to ensure we start and wait for an informer during cache.Start (as part of manager.Start)
109+
// * Note: cache.Start would fail if the conversion webhook doesn't work (which in turn depends on the readiness probe)
110+
Describe("Start should start all components without deadlock", func() {
111+
ctx := ctrl.SetupSignalHandler()
112+
113+
// Set up schema.
114+
Expect(clientgoscheme.AddToScheme(scheme)).To(Succeed())
115+
Expect(apiextensionsv1.AddToScheme(scheme)).To(Succeed())
116+
Expect(crewv1.AddToScheme(scheme)).To(Succeed())
117+
Expect(crewv2.AddToScheme(scheme)).To(Succeed())
118+
119+
// Set up test environment.
120+
env := &envtest.Environment{
121+
Scheme: scheme,
122+
CRDInstallOptions: envtest.CRDInstallOptions{
123+
CRDs: []*apiextensionsv1.CustomResourceDefinition{driverCRD},
124+
},
125+
}
126+
cfg, err := env.Start()
127+
Expect(err).NotTo(HaveOccurred())
128+
Expect(cfg).NotTo(BeNil())
129+
defer func() {
130+
Expect(env.Stop()).To(Succeed())
131+
}()
132+
c, err := client.New(cfg, client.Options{})
133+
Expect(err).NotTo(HaveOccurred())
134+
135+
// Create driver CR (which is stored as v1).
136+
driverV1 := &unstructured.Unstructured{}
137+
driverV1.SetGroupVersionKind(crewv1.GroupVersion.WithKind("Driver"))
138+
driverV1.SetName("driver1")
139+
driverV1.SetNamespace(metav1.NamespaceDefault)
140+
Expect(c.Create(ctx, driverV1)).To(Succeed())
141+
142+
// Update driver CRD to make v2 the storage version.
143+
driverCRDV2Storage := driverCRD.DeepCopy()
144+
driverCRDV2Storage.Spec.Versions[0].Storage = false
145+
driverCRDV2Storage.Spec.Versions[0].Storage = true
146+
Expect(c.Patch(ctx, driverCRDV2Storage, client.MergeFrom(driverCRD))).To(Succeed())
147+
148+
// Set up Manager.
149+
ctrl.SetLogger(zap.New())
150+
mgr, err := manager.New(env.Config, manager.Options{
151+
Scheme: scheme,
152+
HealthProbeBindAddress: ":0",
153+
// Disable metrics to avoid port conflicts.
154+
MetricsBindAddress: "0",
155+
WebhookServer: webhook.NewServer(webhook.Options{
156+
Port: env.WebhookInstallOptions.LocalServingPort,
157+
Host: env.WebhookInstallOptions.LocalServingHost,
158+
CertDir: env.WebhookInstallOptions.LocalServingCertDir,
159+
}),
160+
})
161+
Expect(err).NotTo(HaveOccurred())
162+
163+
// Configure health probes.
164+
Expect(mgr.AddReadyzCheck("webhook", mgr.GetWebhookServer().StartedChecker())).To(Succeed())
165+
Expect(mgr.AddHealthzCheck("webhook", mgr.GetWebhookServer().StartedChecker())).To(Succeed())
166+
167+
// Set up Driver reconciler.
168+
driverReconciler := &DriverReconciler{
169+
Client: mgr.GetClient(),
170+
}
171+
Expect(ctrl.NewControllerManagedBy(mgr).For(&crewv1.Driver{}).Complete(driverReconciler)).To(Succeed())
172+
173+
// Set up a conversion webhook.
174+
conversionWebhook := createConversionWebhook(mgr)
175+
mgr.GetWebhookServer().Register("/convert", conversionWebhook)
176+
177+
// Add an index on v2 Driver.
178+
Expect(mgr.GetCache().IndexField(ctx, &crewv2.Driver{}, "name", func(object client.Object) []string {
179+
return []string{object.GetName()}
180+
})).To(Succeed())
181+
182+
// Start the Manager.
183+
ctx, cancel := context.WithCancel(ctx)
184+
go func() {
185+
defer GinkgoRecover()
186+
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
187+
}()
188+
189+
// Verify manager.Start successfully started health probes, webhooks, caches (including informers) and reconcilers.
190+
<-mgr.Elected()
191+
192+
// Verify the reconciler reconciles.
193+
Eventually(func(g Gomega) {
194+
g.Expect(atomic.LoadUint64(&driverReconciler.ReconcileCount)).Should(BeNumerically(">", 0))
195+
}, 10*time.Second).Should(Succeed())
196+
197+
// Verify conversion webhook was called.
198+
Expect(atomic.LoadUint64(&conversionWebhook.ConversionCount)).Should(BeNumerically(">", 0))
199+
200+
// Verify the conversion webhook works.
201+
driverV2 := &unstructured.Unstructured{}
202+
driverV2.SetGroupVersionKind(crewv2.GroupVersion.WithKind("Driver"))
203+
driverV2.SetName("driver1")
204+
driverV2.SetNamespace(metav1.NamespaceDefault)
205+
Expect(c.Get(ctx, client.ObjectKeyFromObject(driverV2), driverV2)).To(Succeed())
206+
207+
// Shutdown the server
208+
cancel()
209+
})
210+
})
211+
212+
type DriverReconciler struct {
213+
Client client.Client
214+
ReconcileCount uint64
215+
}
216+
217+
func (r *DriverReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
218+
log := ctrl.LoggerFrom(ctx)
219+
log.Info("Reconciling")
220+
221+
// Fetch the Driver instance.
222+
cluster := &crewv2.Driver{}
223+
if err := r.Client.Get(ctx, req.NamespacedName, cluster); err != nil {
224+
if apierrors.IsNotFound(err) {
225+
return ctrl.Result{}, nil
226+
}
227+
228+
// Error reading the object - requeue the request.
229+
return ctrl.Result{}, err
230+
}
231+
232+
atomic.AddUint64(&r.ReconcileCount, 1)
233+
234+
return reconcile.Result{}, nil
235+
}
236+
237+
type ConversionWebhook struct {
238+
httpClient http.Client
239+
conversionHandler http.Handler
240+
readinessEndpoint string
241+
ConversionCount uint64
242+
}
243+
244+
func createConversionWebhook(mgr manager.Manager) *ConversionWebhook {
245+
conversionHandler := conversion.NewWebhookHandler(mgr.GetScheme())
246+
httpClient := http.Client{
247+
// Setting a timeout to not get stuck when calling the readiness probe.
248+
Timeout: 5 * time.Second,
249+
}
250+
251+
// Read the unexported healthProbeListener field of the manager to get the listener address.
252+
// This is a hack but it's better than using a hard-coded port.
253+
v := reflect.ValueOf(mgr).Elem()
254+
field := v.FieldByName("healthProbeListener")
255+
healthProbeListener := *(*net.Listener)(unsafe.Pointer(field.UnsafeAddr())) //nolint:gosec
256+
readinessEndpoint := fmt.Sprint("http://", healthProbeListener.Addr().String(), "/readyz")
257+
258+
return &ConversionWebhook{
259+
httpClient: httpClient,
260+
conversionHandler: conversionHandler,
261+
readinessEndpoint: readinessEndpoint,
262+
}
263+
}
264+
265+
func (c *ConversionWebhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
266+
resp, err := c.httpClient.Get(c.readinessEndpoint)
267+
if err != nil {
268+
logf.Log.WithName("conversion-webhook").Error(err, "failed to serve conversion: readiness endpoint is not up")
269+
w.WriteHeader(http.StatusInternalServerError)
270+
return
271+
}
272+
273+
Expect(err).NotTo(HaveOccurred())
274+
defer resp.Body.Close()
275+
276+
if resp.StatusCode != http.StatusOK {
277+
// This simulates the behavior in Kubernetes that conversion webhooks are only served after
278+
// the controller is ready (and thus the Kubernetes service sends requests to the controller).
279+
logf.Log.WithName("conversion-webhook").Info("failed to serve conversion: controller is not ready yet")
280+
w.WriteHeader(http.StatusInternalServerError)
281+
return
282+
}
283+
284+
atomic.AddUint64(&c.ConversionCount, 1)
285+
c.conversionHandler.ServeHTTP(w, r)
286+
}

0 commit comments

Comments
 (0)