Skip to content

Commit a7ac8d4

Browse files
authored
Merge pull request kubernetes#82146 from deads2k/agg-discovery-timeout-2
add a timeout for proxying discovery requests
2 parents 96a1558 + c24a366 commit a7ac8d4

File tree

7 files changed

+122
-32
lines changed

7 files changed

+122
-32
lines changed

cmd/kube-apiserver/app/aggregator.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
utilfeature "k8s.io/apiserver/pkg/util/feature"
4242
kubeexternalinformers "k8s.io/client-go/informers"
4343
"k8s.io/client-go/tools/cache"
44-
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
44+
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
4545
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
4646
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
4747
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
@@ -50,6 +50,7 @@ import (
5050
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
5151
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
5252
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
53+
kubefeatures "k8s.io/kubernetes/pkg/features"
5354
"k8s.io/kubernetes/pkg/master/controller/crdregistration"
5455
)
5556

@@ -109,10 +110,11 @@ func createAggregatorConfig(
109110
SharedInformerFactory: externalInformers,
110111
},
111112
ExtraConfig: aggregatorapiserver.ExtraConfig{
112-
ProxyClientCert: certBytes,
113-
ProxyClientKey: keyBytes,
114-
ServiceResolver: serviceResolver,
115-
ProxyTransport: proxyTransport,
113+
ProxyClientCert: certBytes,
114+
ProxyClientKey: keyBytes,
115+
ServiceResolver: serviceResolver,
116+
ProxyTransport: proxyTransport,
117+
EnableAggregatedDiscoveryTimeout: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.EnableAggregatedDiscoveryTimeout),
116118
},
117119
}
118120

pkg/features/kube_features.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,13 @@ const (
496496
//
497497
// Enables the startupProbe in kubelet worker.
498498
StartupProbe featuregate.Feature = "StartupProbe"
499+
500+
// owner @deads2k
501+
// deprecated: v1.16
502+
//
503+
// Enable the aggregated discovery timeout to ensure client responsiveness. Note this feature is present
504+
// only for backward compatibility, it will be removed in the 1.17 release.
505+
EnableAggregatedDiscoveryTimeout featuregate.Feature = "EnableAggregatedDiscoveryTimeout"
499506
)
500507

501508
func init() {
@@ -598,6 +605,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
598605
apiextensionsfeatures.CustomResourcePublishOpenAPI: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
599606
apiextensionsfeatures.CustomResourceDefaulting: {Default: true, PreRelease: featuregate.Beta},
600607

608+
EnableAggregatedDiscoveryTimeout: {Default: true, PreRelease: featuregate.Deprecated},
609+
601610
// features that enable backwards compatibility but are scheduled to be removed
602611
// ...
603612
HPAScaleToZero: {Default: false, PreRelease: featuregate.Alpha},

staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package proxy
1919
import (
2020
"bufio"
2121
"bytes"
22-
"context"
2322
"fmt"
2423
"io"
2524
"io/ioutil"
@@ -222,8 +221,8 @@ func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
222221
h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
223222
}
224223

225-
// WithContext creates a shallow clone of the request with the new context.
226-
newReq := req.WithContext(context.Background())
224+
// WithContext creates a shallow clone of the request with the same context.
225+
newReq := req.WithContext(req.Context())
227226
newReq.Header = utilnet.CloneHeader(req.Header)
228227
if !h.UseRequestLocation {
229228
newReq.URL = &loc

staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ go_test(
1818
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
1919
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
2020
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
21+
"//staging/src/k8s.io/apimachinery/pkg/util/proxy:go_default_library",
2122
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
2223
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
2324
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",

staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ type ExtraConfig struct {
7171

7272
// Mechanism by which the Aggregator will resolve services. Required.
7373
ServiceResolver ServiceResolver
74+
75+
EnableAggregatedDiscoveryTimeout bool
7476
}
7577

7678
// Config represents the configuration needed to create an APIAggregator.
@@ -132,6 +134,8 @@ type APIAggregator struct {
132134

133135
// openAPIAggregationController downloads and merges OpenAPI specs.
134136
openAPIAggregationController *openapicontroller.AggregationController
137+
138+
enableAggregatedDiscoveryTimeout bool
135139
}
136140

137141
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
@@ -172,17 +176,18 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
172176
)
173177

174178
s := &APIAggregator{
175-
GenericAPIServer: genericServer,
176-
delegateHandler: delegationTarget.UnprotectedHandler(),
177-
proxyClientCert: c.ExtraConfig.ProxyClientCert,
178-
proxyClientKey: c.ExtraConfig.ProxyClientKey,
179-
proxyTransport: c.ExtraConfig.ProxyTransport,
180-
proxyHandlers: map[string]*proxyHandler{},
181-
handledGroups: sets.String{},
182-
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
183-
APIRegistrationInformers: informerFactory,
184-
serviceResolver: c.ExtraConfig.ServiceResolver,
185-
openAPIConfig: openAPIConfig,
179+
GenericAPIServer: genericServer,
180+
delegateHandler: delegationTarget.UnprotectedHandler(),
181+
proxyClientCert: c.ExtraConfig.ProxyClientCert,
182+
proxyClientKey: c.ExtraConfig.ProxyClientKey,
183+
proxyTransport: c.ExtraConfig.ProxyTransport,
184+
proxyHandlers: map[string]*proxyHandler{},
185+
handledGroups: sets.String{},
186+
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
187+
APIRegistrationInformers: informerFactory,
188+
serviceResolver: c.ExtraConfig.ServiceResolver,
189+
openAPIConfig: openAPIConfig,
190+
enableAggregatedDiscoveryTimeout: c.ExtraConfig.EnableAggregatedDiscoveryTimeout,
186191
}
187192

188193
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
@@ -286,11 +291,12 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
286291

287292
// register the proxy handler
288293
proxyHandler := &proxyHandler{
289-
localDelegate: s.delegateHandler,
290-
proxyClientCert: s.proxyClientCert,
291-
proxyClientKey: s.proxyClientKey,
292-
proxyTransport: s.proxyTransport,
293-
serviceResolver: s.serviceResolver,
294+
localDelegate: s.delegateHandler,
295+
proxyClientCert: s.proxyClientCert,
296+
proxyClientKey: s.proxyClientKey,
297+
proxyTransport: s.proxyTransport,
298+
serviceResolver: s.serviceResolver,
299+
enableAggregatedDiscoveryTimeout: s.enableAggregatedDiscoveryTimeout,
294300
}
295301
proxyHandler.updateAPIService(apiService)
296302
if s.openAPIAggregationController != nil {

staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ package apiserver
1818

1919
import (
2020
"context"
21-
"k8s.io/klog"
2221
"net/http"
2322
"net/url"
23+
"strings"
2424
"sync/atomic"
25+
"time"
2526

2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/util/httpstream"
@@ -35,11 +36,16 @@ import (
3536
utilfeature "k8s.io/apiserver/pkg/util/feature"
3637
restclient "k8s.io/client-go/rest"
3738
"k8s.io/client-go/transport"
39+
"k8s.io/klog"
3840
apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
3941
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
4042
)
4143

42-
const aggregatorComponent string = "aggregator"
44+
const (
45+
aggregatorComponent string = "aggregator"
46+
47+
aggregatedDiscoveryTimeout = 5 * time.Second
48+
)
4349

4450
// proxyHandler provides a http.Handler which will proxy traffic to locations
4551
// specified by items implementing Redirector.
@@ -57,6 +63,8 @@ type proxyHandler struct {
5763
serviceResolver ServiceResolver
5864

5965
handlingInfo atomic.Value
66+
67+
enableAggregatedDiscoveryTimeout bool
6068
}
6169

6270
type proxyHandlingInfo struct {
@@ -140,11 +148,8 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
140148
location.Path = req.URL.Path
141149
location.RawQuery = req.URL.Query().Encode()
142150

143-
// WithContext creates a shallow clone of the request with the new context.
144-
newReq := req.WithContext(context.Background())
145-
newReq.Header = utilnet.CloneHeader(req.Header)
146-
newReq.URL = location
147-
newReq.Host = location.Host
151+
newReq, cancelFn := newRequestForProxy(location, req, r.enableAggregatedDiscoveryTimeout)
152+
defer cancelFn()
148153

149154
if handlingInfo.proxyRoundTripper == nil {
150155
proxyError(w, req, "", http.StatusNotFound)
@@ -171,6 +176,31 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
171176
handler.ServeHTTP(w, newReq)
172177
}
173178

179+
// newRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests
180+
func newRequestForProxy(location *url.URL, req *http.Request, enableAggregatedDiscoveryTimeout bool) (*http.Request, context.CancelFunc) {
181+
newCtx := req.Context()
182+
cancelFn := func() {}
183+
184+
if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok {
185+
// trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three
186+
// segments that we are going to proxy, we have a discovery request.
187+
if enableAggregatedDiscoveryTimeout && !requestInfo.IsResourceRequest && len(strings.Split(strings.Trim(requestInfo.Path, "/"), "/")) == 3 {
188+
// discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that
189+
// should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions)
190+
// so forcing a short timeout here helps responsiveness of all clients.
191+
newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout)
192+
}
193+
}
194+
195+
// WithContext creates a shallow clone of the request with the same context.
196+
newReq := req.WithContext(newCtx)
197+
newReq.Header = utilnet.CloneHeader(req.Header)
198+
newReq.URL = location
199+
newReq.Host = location.Host
200+
201+
return newReq, cancelFn
202+
}
203+
174204
// maybeWrapForConnectionUpgrades wraps the roundtripper for upgrades. The bool indicates if it was wrapped
175205
func maybeWrapForConnectionUpgrades(restConfig *restclient.Config, rt http.RoundTripper, req *http.Request) (http.RoundTripper, bool, error) {
176206
if !httpstream.IsUpgradeRequest(req) {

staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"crypto/tls"
2121
"fmt"
2222
"io/ioutil"
23-
"k8s.io/utils/pointer"
2423
"net/http"
2524
"net/http/httptest"
2625
"net/http/httputil"
@@ -33,10 +32,12 @@ import (
3332
"golang.org/x/net/websocket"
3433

3534
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35+
"k8s.io/apimachinery/pkg/util/proxy"
3636
"k8s.io/apimachinery/pkg/util/sets"
3737
"k8s.io/apiserver/pkg/authentication/user"
3838
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
3939
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
40+
"k8s.io/utils/pointer"
4041
)
4142

4243
type targetHTTPHandler struct {
@@ -523,3 +524,45 @@ gR0TAoGACFOvhl8txfbkwLeuNeunyOPL7J4nIccthgd2ioFOr3HTou6wzN++vYTa
523524
a3OF9jH5Z7m6X1rrwn6J1+Gw9sBme38/GeGXHigsBI/8WaTvyuppyVIXOVPoTvVf
524525
VYsTwo5YgV1HzDkV+BNmBCw1GYcGXAElhJI+dCsgQuuU6TKzgl8=
525526
-----END RSA PRIVATE KEY-----`)
527+
528+
func TestGetContextForNewRequest(t *testing.T) {
529+
done := make(chan struct{})
530+
server := httptest.NewTLSServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
531+
<-done // never return so that we're certain to return base on timeout
532+
}))
533+
defer server.Close()
534+
defer close(done)
535+
536+
proxyServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
537+
location, err := url.Parse(server.URL)
538+
if err != nil {
539+
t.Fatal(err)
540+
}
541+
location.Path = req.URL.Path
542+
543+
nestedReq := req.WithContext(genericapirequest.WithRequestInfo(req.Context(), &genericapirequest.RequestInfo{Path: req.URL.Path}))
544+
newReq, cancelFn := newRequestForProxy(location, nestedReq, true)
545+
defer cancelFn()
546+
547+
theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w})
548+
theproxy.ServeHTTP(w, newReq)
549+
}))
550+
defer proxyServer.Close()
551+
552+
// normal clients will not be setting a timeout, don't set one here. Our proxy logic should construct this for us
553+
resp, err := proxyServer.Client().Get(proxyServer.URL + "/apis/group/version")
554+
if err != nil {
555+
t.Fatal(err)
556+
}
557+
if resp.StatusCode != http.StatusServiceUnavailable {
558+
t.Error(err)
559+
}
560+
body, err := ioutil.ReadAll(resp.Body)
561+
if err != nil {
562+
t.Fatal(err)
563+
}
564+
if !strings.Contains(string(body), "Error trying to reach service: 'context deadline exceeded'") {
565+
t.Error(string(body))
566+
}
567+
568+
}

0 commit comments

Comments
 (0)