Skip to content

Commit d0fddf1

Browse files
authored
Merge pull request kubernetes#122148 from pohly/controllers-context-support
controllers + apiserver: enhance context support
2 parents 031e6c3 + b92273a commit d0fddf1

File tree

26 files changed

+198
-123
lines changed

26 files changed

+198
-123
lines changed

cmd/kube-apiserver/app/server.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ limitations under the License.
2020
package app
2121

2222
import (
23+
"context"
2324
"crypto/tls"
2425
"fmt"
2526
"net/http"
@@ -114,7 +115,7 @@ cluster's shared state through which all other components interact.`,
114115
}
115116
// add feature enablement metrics
116117
utilfeature.DefaultMutableFeatureGate.AddMetrics()
117-
return Run(completedOptions, genericapiserver.SetupSignalHandler())
118+
return Run(cmd.Context(), completedOptions)
118119
},
119120
Args: func(cmd *cobra.Command, args []string) error {
120121
for _, arg := range args {
@@ -125,6 +126,7 @@ cluster's shared state through which all other components interact.`,
125126
return nil
126127
},
127128
}
129+
cmd.SetContext(genericapiserver.SetupSignalContext())
128130

129131
fs := cmd.Flags()
130132
namedFlagSets := s.Flags()
@@ -142,7 +144,7 @@ cluster's shared state through which all other components interact.`,
142144
}
143145

144146
// Run runs the specified APIServer. This should never exit.
145-
func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
147+
func Run(ctx context.Context, opts options.CompletedOptions) error {
146148
// To help debugging, immediately log version
147149
klog.Infof("Version: %+v", version.Get())
148150

@@ -166,7 +168,7 @@ func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
166168
return err
167169
}
168170

169-
return prepared.Run(stopCh)
171+
return prepared.Run(ctx)
170172
}
171173

172174
// CreateServerChain creates the apiservers connected via delegation.

cmd/kube-apiserver/app/testing/testserver.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"os"
3333
"path/filepath"
3434
"runtime"
35+
"testing"
3536
"time"
3637

3738
"github.com/spf13/pflag"
@@ -56,10 +57,11 @@ import (
5657
"k8s.io/klog/v2"
5758
"k8s.io/kube-aggregator/pkg/apiserver"
5859
"k8s.io/kubernetes/pkg/features"
60+
testutil "k8s.io/kubernetes/test/utils"
61+
"k8s.io/kubernetes/test/utils/ktesting"
5962

6063
"k8s.io/kubernetes/cmd/kube-apiserver/app"
6164
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
62-
testutil "k8s.io/kubernetes/test/utils"
6365
)
6466

6567
func init() {
@@ -139,7 +141,9 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
139141
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
140142
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
141143
// enough time to remove temporary files.
142-
func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
144+
func StartTestServer(t ktesting.TB, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
145+
tCtx := ktesting.Init(t)
146+
143147
if instanceOptions == nil {
144148
instanceOptions = NewDefaultTestServerOptions()
145149
}
@@ -149,12 +153,11 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
149153
return result, fmt.Errorf("failed to create temp dir: %v", err)
150154
}
151155

152-
stopCh := make(chan struct{})
153156
var errCh chan error
154157
tearDown := func() {
155-
// Closing stopCh is stopping apiserver and cleaning up
158+
// Cancel is stopping apiserver and cleaning up
156159
// after itself, including shutting down its storage layer.
157-
close(stopCh)
160+
tCtx.Cancel("tearing down")
158161

159162
// If the apiserver was started, let's wait for it to
160163
// shutdown clearly.
@@ -359,15 +362,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
359362
}
360363

361364
errCh = make(chan error)
362-
go func(stopCh <-chan struct{}) {
365+
go func() {
363366
defer close(errCh)
364367
prepared, err := server.PrepareRun()
365368
if err != nil {
366369
errCh <- err
367-
} else if err := prepared.Run(stopCh); err != nil {
370+
} else if err := prepared.Run(tCtx); err != nil {
368371
errCh <- err
369372
}
370-
}(stopCh)
373+
}()
371374

372375
client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
373376
if err != nil {
@@ -465,7 +468,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
465468
}
466469

467470
// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
468-
func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
471+
func StartTestServerOrDie(t testing.TB, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
469472
result, err := StartTestServer(t, instanceOptions, flags, storageConfig)
470473
if err == nil {
471474
return &result

pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ func NewController(
5555
secondaryRange net.IPNet,
5656
client clientset.Interface,
5757
) *Controller {
58-
broadcaster := record.NewBroadcaster()
59-
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
60-
6158
c := &Controller{
6259
client: client,
6360
interval: 10 * time.Second, // same as DefaultEndpointReconcilerInterval
@@ -79,9 +76,6 @@ func NewController(
7976
c.serviceCIDRLister = networkingv1alpha1listers.NewServiceCIDRLister(c.serviceCIDRInformer.GetIndexer())
8077
c.serviceCIDRsSynced = c.serviceCIDRInformer.HasSynced
8178

82-
c.eventBroadcaster = broadcaster
83-
c.eventRecorder = recorder
84-
8579
return c
8680
}
8781

@@ -101,9 +95,12 @@ type Controller struct {
10195
}
10296

10397
// Start will not return until the default ServiceCIDR exists or stopCh is closed.
104-
func (c *Controller) Start(stopCh <-chan struct{}) {
98+
func (c *Controller) Start(ctx context.Context) {
10599
defer utilruntime.HandleCrash()
100+
stopCh := ctx.Done()
106101

102+
c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx))
103+
c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
107104
c.eventBroadcaster.StartStructuredLogging(0)
108105
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
109106
defer c.eventBroadcaster.Shutdown()
@@ -116,8 +113,6 @@ func (c *Controller) Start(stopCh <-chan struct{}) {
116113
return
117114
}
118115

119-
// derive a context from the stopCh so we can cancel the poll loop
120-
ctx := wait.ContextForChannel(stopCh)
121116
// wait until first successfully sync
122117
// this blocks apiserver startup so poll with a short interval
123118
err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {

pkg/controlplane/instance.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
420420
)
421421
// The default serviceCIDR must exist before the apiserver is healthy
422422
// otherwise the allocators for Services will not work.
423-
controller.Start(hookContext.StopCh)
423+
controller.Start(hookContext)
424424
return nil
425425
})
426426
}

staging/src/k8s.io/apiextensions-apiserver/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import (
2525
)
2626

2727
func main() {
28-
stopCh := genericapiserver.SetupSignalHandler()
29-
cmd := server.NewServerCommand(os.Stdout, os.Stderr, stopCh)
28+
ctx := genericapiserver.SetupSignalContext()
29+
cmd := server.NewServerCommand(ctx, os.Stdout, os.Stderr)
3030
code := cli.Run(cmd)
3131
os.Exit(code)
3232
}

staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/server.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package server
1818

1919
import (
20+
"context"
2021
"io"
2122

2223
"github.com/spf13/cobra"
@@ -25,7 +26,7 @@ import (
2526
genericapiserver "k8s.io/apiserver/pkg/server"
2627
)
2728

28-
func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Command {
29+
func NewServerCommand(ctx context.Context, out, errOut io.Writer) *cobra.Command {
2930
o := options.NewCustomResourceDefinitionsServerOptions(out, errOut)
3031

3132
cmd := &cobra.Command{
@@ -38,19 +39,20 @@ func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Comm
3839
if err := o.Validate(); err != nil {
3940
return err
4041
}
41-
if err := Run(o, stopCh); err != nil {
42+
if err := Run(c.Context(), o); err != nil {
4243
return err
4344
}
4445
return nil
4546
},
4647
}
48+
cmd.SetContext(ctx)
4749

4850
fs := cmd.Flags()
4951
o.AddFlags(fs)
5052
return cmd
5153
}
5254

53-
func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct{}) error {
55+
func Run(ctx context.Context, o *options.CustomResourceDefinitionsServerOptions) error {
5456
config, err := o.Config()
5557
if err != nil {
5658
return err
@@ -60,5 +62,5 @@ func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct
6062
if err != nil {
6163
return err
6264
}
63-
return server.GenericAPIServer.PrepareRun().Run(stopCh)
65+
return server.GenericAPIServer.PrepareRun().RunWithContext(ctx)
6466
}

staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package testing
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"net"
2324
"os"
@@ -83,13 +84,15 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
8384
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
8485
// enough time to remove temporary files.
8586
func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
86-
stopCh := make(chan struct{})
87+
// TODO: this is a candidate for using what is now test/utils/ktesting,
88+
// should that become a staging repo.
89+
ctx, cancel := context.WithCancelCause(context.Background())
8790
var errCh chan error
8891
tearDown := func() {
89-
// Closing stopCh is stopping apiextensions apiserver and its
92+
// Cancel is stopping apiextensions apiserver and its
9093
// delegates, which itself is cleaning up after itself,
9194
// including shutting down its storage layer.
92-
close(stopCh)
95+
cancel(errors.New("tearing down"))
9396

9497
// If the apiextensions apiserver was started, let's wait for
9598
// it to shutdown clearly.
@@ -166,13 +169,13 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
166169
}
167170

168171
errCh = make(chan error)
169-
go func(stopCh <-chan struct{}) {
172+
go func() {
170173
defer close(errCh)
171174

172-
if err := server.GenericAPIServer.PrepareRun().Run(stopCh); err != nil {
175+
if err := server.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil {
173176
errCh <- err
174177
}
175-
}(stopCh)
178+
}()
176179

177180
t.Logf("Waiting for /healthz to be ok...")
178181

staging/src/k8s.io/apiserver/pkg/server/config_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package server
1818

1919
import (
20+
"context"
21+
"errors"
2022
"fmt"
2123
"io"
2224
"net/http"
@@ -42,6 +44,7 @@ import (
4244
"k8s.io/client-go/kubernetes/fake"
4345
"k8s.io/client-go/rest"
4446
"k8s.io/component-base/tracing"
47+
"k8s.io/klog/v2/ktesting"
4548
netutils "k8s.io/utils/net"
4649
)
4750

@@ -79,6 +82,9 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) {
7982
}
8083

8184
func TestNewWithDelegate(t *testing.T) {
85+
_, ctx := ktesting.NewTestContext(t)
86+
ctx, cancel := context.WithCancelCause(ctx)
87+
defer cancel(errors.New("test is done"))
8288
delegateConfig := NewConfig(codecs)
8389
delegateConfig.ExternalAddress = "192.168.10.4:443"
8490
delegateConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4")
@@ -136,10 +142,8 @@ func TestNewWithDelegate(t *testing.T) {
136142
return nil
137143
})
138144

139-
stopCh := make(chan struct{})
140-
defer close(stopCh)
141145
wrappingServer.PrepareRun()
142-
wrappingServer.RunPostStartHooks(stopCh)
146+
wrappingServer.RunPostStartHooks(ctx)
143147

144148
server := httptest.NewServer(wrappingServer.Handler)
145149
defer server.Close()

0 commit comments

Comments
 (0)