Skip to content

Commit f9e1620

Browse files
committed
aggregation availability should ensure that discovery responds non-failing
1 parent 3ec638d commit f9e1620

File tree

4 files changed

+141
-73
lines changed

4 files changed

+141
-73
lines changed

staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,19 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
182182
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
183183

184184
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s)
185-
availableController := statuscontrollers.NewAvailableConditionController(
185+
availableController, err := statuscontrollers.NewAvailableConditionController(
186186
informerFactory.Apiregistration().InternalVersion().APIServices(),
187187
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
188188
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
189189
apiregistrationClient.Apiregistration(),
190190
c.ExtraConfig.ProxyTransport,
191+
c.ExtraConfig.ProxyClientCert,
192+
c.ExtraConfig.ProxyClientKey,
191193
s.serviceResolver,
192194
)
195+
if err != nil {
196+
return nil, err
197+
}
193198

194199
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
195200
informerFactory.Start(context.StopCh)

staging/src/k8s.io/kube-aggregator/pkg/controllers/status/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ go_library(
2626
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
2727
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
2828
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
29+
"//staging/src/k8s.io/client-go/rest:go_default_library",
2930
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
31+
"//staging/src/k8s.io/client-go/transport:go_default_library",
3032
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
3133
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
3234
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library",

staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,12 @@ limitations under the License.
1717
package apiserver
1818

1919
import (
20-
"crypto/tls"
2120
"fmt"
2221
"net/http"
2322
"net/url"
2423
"time"
2524

26-
"k8s.io/klog"
27-
28-
"k8s.io/api/core/v1"
25+
v1 "k8s.io/api/core/v1"
2926
"k8s.io/apimachinery/pkg/api/equality"
3027
apierrors "k8s.io/apimachinery/pkg/api/errors"
3128
"k8s.io/apimachinery/pkg/api/meta"
@@ -36,9 +33,11 @@ import (
3633
"k8s.io/apimachinery/pkg/util/wait"
3734
v1informers "k8s.io/client-go/informers/core/v1"
3835
v1listers "k8s.io/client-go/listers/core/v1"
36+
"k8s.io/client-go/rest"
3937
"k8s.io/client-go/tools/cache"
38+
"k8s.io/client-go/transport"
4039
"k8s.io/client-go/util/workqueue"
41-
40+
"k8s.io/klog"
4241
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
4342
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
4443
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
@@ -81,8 +80,10 @@ func NewAvailableConditionController(
8180
endpointsInformer v1informers.EndpointsInformer,
8281
apiServiceClient apiregistrationclient.APIServicesGetter,
8382
proxyTransport *http.Transport,
83+
proxyClientCert []byte,
84+
proxyClientKey []byte,
8485
serviceResolver ServiceResolver,
85-
) *AvailableConditionController {
86+
) (*AvailableConditionController, error) {
8687
c := &AvailableConditionController{
8788
apiServiceClient: apiServiceClient,
8889
apiServiceLister: apiServiceInformer.Lister(),
@@ -100,19 +101,28 @@ func NewAvailableConditionController(
100101
"AvailableConditionController"),
101102
}
102103

104+
// if a particular transport was specified, use that otherwise build one
103105
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
104-
// that's not so bad) and sets a very short timeout.
105-
discoveryClient := &http.Client{
106-
Transport: &http.Transport{
107-
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
106+
// that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information
107+
restConfig := &rest.Config{
108+
TLSClientConfig: rest.TLSClientConfig{
109+
Insecure: true,
110+
CertData: proxyClientCert,
111+
KeyData: proxyClientKey,
108112
},
113+
}
114+
if proxyTransport != nil && proxyTransport.DialContext != nil {
115+
restConfig.Dial = proxyTransport.DialContext
116+
}
117+
transport, err := rest.TransportFor(restConfig)
118+
if err != nil {
119+
return nil, err
120+
}
121+
c.discoveryClient = &http.Client{
122+
Transport: transport,
109123
// the request should happen quickly.
110124
Timeout: 5 * time.Second,
111125
}
112-
if proxyTransport != nil {
113-
discoveryClient.Transport = proxyTransport
114-
}
115-
c.discoveryClient = discoveryClient
116126

117127
// resync on this one because it is low cardinality and rechecking the actual discovery
118128
// allows us to detect health in a more timely fashion when network connectivity to
@@ -140,7 +150,7 @@ func NewAvailableConditionController(
140150

141151
c.syncFn = c.sync
142152

143-
return c
153+
return c, nil
144154
}
145155

146156
func (c *AvailableConditionController) sync(key string) error {
@@ -254,17 +264,31 @@ func (c *AvailableConditionController) sync(key string) error {
254264

255265
errCh := make(chan error)
256266
go func() {
257-
resp, err := c.discoveryClient.Get(discoveryURL.String())
267+
newReq, err := http.NewRequest("GET", discoveryURL.String(), nil)
268+
if err != nil {
269+
errCh <- err
270+
return
271+
}
272+
273+
// setting the system-masters identity ensures that we will always have access rights
274+
transport.SetAuthProxyHeaders(newReq, "system:kube-aggregator", []string{"system:masters"}, nil)
275+
resp, err := c.discoveryClient.Do(newReq)
258276
if resp != nil {
259277
resp.Body.Close()
278+
// we should always been in the 200s or 300s
279+
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
280+
errCh <- fmt.Errorf("bad status from %v: %v", discoveryURL, resp.StatusCode)
281+
return
282+
}
260283
}
284+
261285
errCh <- err
262286
}()
263287

264288
select {
265289
case err = <-errCh:
266290
if err != nil {
267-
results <- fmt.Errorf("no response from %v: %v", discoveryURL, err)
291+
results <- fmt.Errorf("failing or missing response from %v: %v", discoveryURL, err)
268292
return
269293
}
270294

staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go

Lines changed: 92 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@ package apiserver
1818

1919
import (
2020
"fmt"
21+
"net/http"
22+
"net/http/httptest"
23+
"net/url"
24+
"strings"
2125
"testing"
2226

2327
"github.com/davecgh/go-spew/spew"
2428

25-
"k8s.io/api/core/v1"
29+
v1 "k8s.io/api/core/v1"
2630
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2731
v1listers "k8s.io/client-go/listers/core/v1"
2832
clienttesting "k8s.io/client-go/testing"
@@ -99,10 +103,12 @@ func TestSync(t *testing.T) {
99103
tests := []struct {
100104
name string
101105

102-
apiServiceName string
103-
apiServices []*apiregistration.APIService
104-
services []*v1.Service
105-
endpoints []*v1.Endpoints
106+
apiServiceName string
107+
apiServices []*apiregistration.APIService
108+
services []*v1.Service
109+
endpoints []*v1.Endpoints
110+
forceDiscoveryFail bool
111+
106112
expectedAvailability apiregistration.APIServiceCondition
107113
}{
108114
{
@@ -200,66 +206,97 @@ func TestSync(t *testing.T) {
200206
Message: `all checks passed`,
201207
},
202208
},
209+
{
210+
name: "remote-bad-return",
211+
apiServiceName: "remote.group",
212+
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
213+
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
214+
endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)},
215+
forceDiscoveryFail: true,
216+
expectedAvailability: apiregistration.APIServiceCondition{
217+
Type: apiregistration.Available,
218+
Status: apiregistration.ConditionFalse,
219+
Reason: "FailedDiscoveryCheck",
220+
Message: `failing or missing response from`,
221+
},
222+
},
203223
}
204224

205225
for _, tc := range tests {
206-
fakeClient := fake.NewSimpleClientset()
207-
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
208-
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
209-
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
210-
for _, obj := range tc.apiServices {
211-
apiServiceIndexer.Add(obj)
212-
}
213-
for _, obj := range tc.services {
214-
serviceIndexer.Add(obj)
215-
}
216-
for _, obj := range tc.endpoints {
217-
endpointsIndexer.Add(obj)
218-
}
226+
t.Run(tc.name, func(t *testing.T) {
227+
fakeClient := fake.NewSimpleClientset()
228+
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
229+
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
230+
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
231+
for _, obj := range tc.apiServices {
232+
apiServiceIndexer.Add(obj)
233+
}
234+
for _, obj := range tc.services {
235+
serviceIndexer.Add(obj)
236+
}
237+
for _, obj := range tc.endpoints {
238+
endpointsIndexer.Add(obj)
239+
}
240+
241+
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
242+
if !tc.forceDiscoveryFail {
243+
w.WriteHeader(http.StatusOK)
244+
}
245+
w.WriteHeader(http.StatusForbidden)
246+
}))
247+
defer testServer.Close()
219248

220-
c := AvailableConditionController{
221-
apiServiceClient: fakeClient.Apiregistration(),
222-
apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer),
223-
serviceLister: v1listers.NewServiceLister(serviceIndexer),
224-
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
225-
}
226-
c.sync(tc.apiServiceName)
249+
c := AvailableConditionController{
250+
apiServiceClient: fakeClient.Apiregistration(),
251+
apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer),
252+
serviceLister: v1listers.NewServiceLister(serviceIndexer),
253+
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
254+
discoveryClient: testServer.Client(),
255+
serviceResolver: &fakeServiceResolver{url: testServer.URL},
256+
}
257+
c.sync(tc.apiServiceName)
227258

228-
// ought to have one action writing status
229-
if e, a := 1, len(fakeClient.Actions()); e != a {
230-
t.Errorf("%v expected %v, got %v", tc.name, e, fakeClient.Actions())
231-
continue
232-
}
259+
// ought to have one action writing status
260+
if e, a := 1, len(fakeClient.Actions()); e != a {
261+
t.Fatalf("%v expected %v, got %v", tc.name, e, fakeClient.Actions())
262+
}
233263

234-
action, ok := fakeClient.Actions()[0].(clienttesting.UpdateAction)
235-
if !ok {
236-
t.Errorf("%v got %v", tc.name, ok)
237-
continue
238-
}
264+
action, ok := fakeClient.Actions()[0].(clienttesting.UpdateAction)
265+
if !ok {
266+
t.Fatalf("%v got %v", tc.name, ok)
267+
}
239268

240-
if e, a := 1, len(action.GetObject().(*apiregistration.APIService).Status.Conditions); e != a {
241-
t.Errorf("%v expected %v, got %v", tc.name, e, action.GetObject())
242-
continue
243-
}
244-
condition := action.GetObject().(*apiregistration.APIService).Status.Conditions[0]
245-
if e, a := tc.expectedAvailability.Type, condition.Type; e != a {
246-
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
247-
}
248-
if e, a := tc.expectedAvailability.Status, condition.Status; e != a {
249-
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
250-
}
251-
if e, a := tc.expectedAvailability.Reason, condition.Reason; e != a {
252-
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
253-
}
254-
if e, a := tc.expectedAvailability.Message, condition.Message; e != a {
255-
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
256-
}
257-
if condition.LastTransitionTime.IsZero() {
258-
t.Error("expected lastTransitionTime to be non-zero")
259-
}
269+
if e, a := 1, len(action.GetObject().(*apiregistration.APIService).Status.Conditions); e != a {
270+
t.Fatalf("%v expected %v, got %v", tc.name, e, action.GetObject())
271+
}
272+
condition := action.GetObject().(*apiregistration.APIService).Status.Conditions[0]
273+
if e, a := tc.expectedAvailability.Type, condition.Type; e != a {
274+
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
275+
}
276+
if e, a := tc.expectedAvailability.Status, condition.Status; e != a {
277+
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
278+
}
279+
if e, a := tc.expectedAvailability.Reason, condition.Reason; e != a {
280+
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
281+
}
282+
if e, a := tc.expectedAvailability.Message, condition.Message; !strings.HasPrefix(a, e) {
283+
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
284+
}
285+
if condition.LastTransitionTime.IsZero() {
286+
t.Error("expected lastTransitionTime to be non-zero")
287+
}
288+
})
260289
}
261290
}
262291

292+
type fakeServiceResolver struct {
293+
url string
294+
}
295+
296+
func (f *fakeServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
297+
return url.Parse(f.url)
298+
}
299+
263300
func TestUpdateAPIServiceStatus(t *testing.T) {
264301
foo := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "foo"}}}}
265302
bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}}

0 commit comments

Comments
 (0)