Skip to content

Commit b92273a

Browse files
committed
apiserver + controllers: enhance context support
27a68ae introduced context support for events. Creating an event broadcaster with context makes tests more resilient against leaking goroutines when that context gets canceled at the end of a test and enables per-test output via ktesting. The context could get passed to the constructor. A cleaner solution is to enhance context support for the apiserver and then pass the context into the controller's run method. This ripples up the call stack to all places which start an apiserver.
1 parent 5918559 commit b92273a

File tree

25 files changed

+197
-122
lines changed

25 files changed

+197
-122
lines changed

cmd/kube-apiserver/app/server.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ limitations under the License.
2020
package app
2121

2222
import (
23+
"context"
2324
"crypto/tls"
2425
"fmt"
2526
"net/http"
@@ -114,7 +115,7 @@ cluster's shared state through which all other components interact.`,
114115
}
115116
// add feature enablement metrics
116117
utilfeature.DefaultMutableFeatureGate.AddMetrics()
117-
return Run(completedOptions, genericapiserver.SetupSignalHandler())
118+
return Run(cmd.Context(), completedOptions)
118119
},
119120
Args: func(cmd *cobra.Command, args []string) error {
120121
for _, arg := range args {
@@ -125,6 +126,7 @@ cluster's shared state through which all other components interact.`,
125126
return nil
126127
},
127128
}
129+
cmd.SetContext(genericapiserver.SetupSignalContext())
128130

129131
fs := cmd.Flags()
130132
namedFlagSets := s.Flags()
@@ -142,7 +144,7 @@ cluster's shared state through which all other components interact.`,
142144
}
143145

144146
// Run runs the specified APIServer. This should never exit.
145-
func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
147+
func Run(ctx context.Context, opts options.CompletedOptions) error {
146148
// To help debugging, immediately log version
147149
klog.Infof("Version: %+v", version.Get())
148150

@@ -166,7 +168,7 @@ func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
166168
return err
167169
}
168170

169-
return prepared.Run(stopCh)
171+
return prepared.Run(ctx)
170172
}
171173

172174
// CreateServerChain creates the apiservers connected via delegation.

cmd/kube-apiserver/app/testing/testserver.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"os"
3333
"path/filepath"
3434
"runtime"
35+
"testing"
3536
"time"
3637

3738
"github.com/spf13/pflag"
@@ -56,10 +57,11 @@ import (
5657
"k8s.io/klog/v2"
5758
"k8s.io/kube-aggregator/pkg/apiserver"
5859
"k8s.io/kubernetes/pkg/features"
60+
testutil "k8s.io/kubernetes/test/utils"
61+
"k8s.io/kubernetes/test/utils/ktesting"
5962

6063
"k8s.io/kubernetes/cmd/kube-apiserver/app"
6164
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
62-
testutil "k8s.io/kubernetes/test/utils"
6365
)
6466

6567
func init() {
@@ -139,7 +141,9 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
139141
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
140142
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
141143
// enough time to remove temporary files.
142-
func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
144+
func StartTestServer(t ktesting.TB, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
145+
tCtx := ktesting.Init(t)
146+
143147
if instanceOptions == nil {
144148
instanceOptions = NewDefaultTestServerOptions()
145149
}
@@ -149,12 +153,11 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
149153
return result, fmt.Errorf("failed to create temp dir: %v", err)
150154
}
151155

152-
stopCh := make(chan struct{})
153156
var errCh chan error
154157
tearDown := func() {
155-
// Closing stopCh is stopping apiserver and cleaning up
158+
// Cancel is stopping apiserver and cleaning up
156159
// after itself, including shutting down its storage layer.
157-
close(stopCh)
160+
tCtx.Cancel("tearing down")
158161

159162
// If the apiserver was started, let's wait for it to
160163
// shutdown clearly.
@@ -359,15 +362,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
359362
}
360363

361364
errCh = make(chan error)
362-
go func(stopCh <-chan struct{}) {
365+
go func() {
363366
defer close(errCh)
364367
prepared, err := server.PrepareRun()
365368
if err != nil {
366369
errCh <- err
367-
} else if err := prepared.Run(stopCh); err != nil {
370+
} else if err := prepared.Run(tCtx); err != nil {
368371
errCh <- err
369372
}
370-
}(stopCh)
373+
}()
371374

372375
client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
373376
if err != nil {
@@ -465,7 +468,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
465468
}
466469

467470
// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
468-
func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
471+
func StartTestServerOrDie(t testing.TB, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
469472
result, err := StartTestServer(t, instanceOptions, flags, storageConfig)
470473
if err == nil {
471474
return &result

pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ func NewController(
5555
secondaryRange net.IPNet,
5656
client clientset.Interface,
5757
) *Controller {
58-
broadcaster := record.NewBroadcaster()
59-
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
60-
6158
c := &Controller{
6259
client: client,
6360
interval: 10 * time.Second, // same as DefaultEndpointReconcilerInterval
@@ -79,9 +76,6 @@ func NewController(
7976
c.serviceCIDRLister = networkingv1alpha1listers.NewServiceCIDRLister(c.serviceCIDRInformer.GetIndexer())
8077
c.serviceCIDRsSynced = c.serviceCIDRInformer.HasSynced
8178

82-
c.eventBroadcaster = broadcaster
83-
c.eventRecorder = recorder
84-
8579
return c
8680
}
8781

@@ -101,9 +95,12 @@ type Controller struct {
10195
}
10296

10397
// Start will not return until the default ServiceCIDR exists or stopCh is closed.
104-
func (c *Controller) Start(stopCh <-chan struct{}) {
98+
func (c *Controller) Start(ctx context.Context) {
10599
defer utilruntime.HandleCrash()
100+
stopCh := ctx.Done()
106101

102+
c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx))
103+
c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
107104
c.eventBroadcaster.StartStructuredLogging(0)
108105
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
109106
defer c.eventBroadcaster.Shutdown()
@@ -116,8 +113,6 @@ func (c *Controller) Start(stopCh <-chan struct{}) {
116113
return
117114
}
118115

119-
// derive a context from the stopCh so we can cancel the poll loop
120-
ctx := wait.ContextForChannel(stopCh)
121116
// wait until first successfully sync
122117
// this blocks apiserver startup so poll with a short interval
123118
err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {

pkg/controlplane/instance.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
420420
)
421421
// The default serviceCIDR must exist before the apiserver is healthy
422422
// otherwise the allocators for Services will not work.
423-
controller.Start(hookContext.StopCh)
423+
controller.Start(hookContext)
424424
return nil
425425
})
426426
}

staging/src/k8s.io/apiextensions-apiserver/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import (
2525
)
2626

2727
func main() {
28-
stopCh := genericapiserver.SetupSignalHandler()
29-
cmd := server.NewServerCommand(os.Stdout, os.Stderr, stopCh)
28+
ctx := genericapiserver.SetupSignalContext()
29+
cmd := server.NewServerCommand(ctx, os.Stdout, os.Stderr)
3030
code := cli.Run(cmd)
3131
os.Exit(code)
3232
}

staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/server.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package server
1818

1919
import (
20+
"context"
2021
"io"
2122

2223
"github.com/spf13/cobra"
@@ -25,7 +26,7 @@ import (
2526
genericapiserver "k8s.io/apiserver/pkg/server"
2627
)
2728

28-
func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Command {
29+
func NewServerCommand(ctx context.Context, out, errOut io.Writer) *cobra.Command {
2930
o := options.NewCustomResourceDefinitionsServerOptions(out, errOut)
3031

3132
cmd := &cobra.Command{
@@ -38,19 +39,20 @@ func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Comm
3839
if err := o.Validate(); err != nil {
3940
return err
4041
}
41-
if err := Run(o, stopCh); err != nil {
42+
if err := Run(c.Context(), o); err != nil {
4243
return err
4344
}
4445
return nil
4546
},
4647
}
48+
cmd.SetContext(ctx)
4749

4850
fs := cmd.Flags()
4951
o.AddFlags(fs)
5052
return cmd
5153
}
5254

53-
func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct{}) error {
55+
func Run(ctx context.Context, o *options.CustomResourceDefinitionsServerOptions) error {
5456
config, err := o.Config()
5557
if err != nil {
5658
return err
@@ -60,5 +62,5 @@ func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct
6062
if err != nil {
6163
return err
6264
}
63-
return server.GenericAPIServer.PrepareRun().Run(stopCh)
65+
return server.GenericAPIServer.PrepareRun().RunWithContext(ctx)
6466
}

staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package testing
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"net"
2324
"os"
@@ -83,13 +84,15 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
8384
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
8485
// enough time to remove temporary files.
8586
func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
86-
stopCh := make(chan struct{})
87+
// TODO: this is a candidate for using what is now test/utils/ktesting,
88+
// should that become a staging repo.
89+
ctx, cancel := context.WithCancelCause(context.Background())
8790
var errCh chan error
8891
tearDown := func() {
89-
// Closing stopCh is stopping apiextensions apiserver and its
92+
// Cancel is stopping apiextensions apiserver and its
9093
// delegates, which itself is cleaning up after itself,
9194
// including shutting down its storage layer.
92-
close(stopCh)
95+
cancel(errors.New("tearing down"))
9396

9497
// If the apiextensions apiserver was started, let's wait for
9598
// it to shutdown clearly.
@@ -166,13 +169,13 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
166169
}
167170

168171
errCh = make(chan error)
169-
go func(stopCh <-chan struct{}) {
172+
go func() {
170173
defer close(errCh)
171174

172-
if err := server.GenericAPIServer.PrepareRun().Run(stopCh); err != nil {
175+
if err := server.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil {
173176
errCh <- err
174177
}
175-
}(stopCh)
178+
}()
176179

177180
t.Logf("Waiting for /healthz to be ok...")
178181

staging/src/k8s.io/apiserver/pkg/server/config_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package server
1818

1919
import (
20+
"context"
21+
"errors"
2022
"fmt"
2123
"io"
2224
"net/http"
@@ -42,6 +44,7 @@ import (
4244
"k8s.io/client-go/kubernetes/fake"
4345
"k8s.io/client-go/rest"
4446
"k8s.io/component-base/tracing"
47+
"k8s.io/klog/v2/ktesting"
4548
netutils "k8s.io/utils/net"
4649
)
4750

@@ -79,6 +82,9 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) {
7982
}
8083

8184
func TestNewWithDelegate(t *testing.T) {
85+
_, ctx := ktesting.NewTestContext(t)
86+
ctx, cancel := context.WithCancelCause(ctx)
87+
defer cancel(errors.New("test is done"))
8288
delegateConfig := NewConfig(codecs)
8389
delegateConfig.ExternalAddress = "192.168.10.4:443"
8490
delegateConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4")
@@ -136,10 +142,8 @@ func TestNewWithDelegate(t *testing.T) {
136142
return nil
137143
})
138144

139-
stopCh := make(chan struct{})
140-
defer close(stopCh)
141145
wrappingServer.PrepareRun()
142-
wrappingServer.RunPostStartHooks(stopCh)
146+
wrappingServer.RunPostStartHooks(ctx)
143147

144148
server := httptest.NewServer(wrappingServer.Handler)
145149
defer server.Close()

0 commit comments

Comments
 (0)