Skip to content

Commit ae35048

Browse files
authored
adds watchListEndpointRestrictions for watchlist requests (kubernetes#126996)
* endpoints/handlers/get: intro watchListEndpointRestrictions * consistencydetector/list_data_consistency_detector: expose IsDataConsistencyDetectionForListEnabled * e2e/watchlist: extract common function for adding unstructured secrets * e2e/watchlist: new e2e scenarios for convering watchListEndpointRestrict
1 parent 99ff62e commit ae35048

File tree

3 files changed

+140
-21
lines changed

3 files changed

+140
-21
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
utilfeature "k8s.io/apiserver/pkg/util/feature"
4646
"k8s.io/component-base/tracing"
4747
"k8s.io/klog/v2"
48+
"k8s.io/utils/ptr"
4849
)
4950

5051
// getterFunc performs a get request with the given context and object name. The request
@@ -185,15 +186,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
185186
if err != nil {
186187
hasName = false
187188
}
188-
189189
ctx = request.WithNamespace(ctx, namespace)
190190

191-
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
192-
if err != nil {
193-
scope.err(err, w, req)
194-
return
195-
}
196-
197191
opts := metainternalversion.ListOptions{}
198192
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &opts); err != nil {
199193
err = errors.NewBadRequest(err.Error())
@@ -208,6 +202,17 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
208202
return
209203
}
210204

205+
var restrictions negotiation.EndpointRestrictions
206+
restrictions = scope
207+
if isListWatchRequest(opts) {
208+
restrictions = &watchListEndpointRestrictions{scope}
209+
}
210+
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, restrictions)
211+
if err != nil {
212+
scope.err(err, w, req)
213+
return
214+
}
215+
211216
// transform fields
212217
// TODO: DecodeParametersInto should do this.
213218
if opts.FieldSelector != nil {
@@ -307,3 +312,18 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
307312
transformResponseObject(ctx, scope, req, w, http.StatusOK, outputMediaType, result)
308313
}
309314
}
315+
316+
type watchListEndpointRestrictions struct {
317+
negotiation.EndpointRestrictions
318+
}
319+
320+
func (e *watchListEndpointRestrictions) AllowsMediaTypeTransform(mimeType, mimeSubType string, target *schema.GroupVersionKind) bool {
321+
if target != nil && target.Kind == "Table" {
322+
return false
323+
}
324+
return e.EndpointRestrictions.AllowsMediaTypeTransform(mimeType, mimeSubType, target)
325+
}
326+
327+
func isListWatchRequest(opts metainternalversion.ListOptions) bool {
328+
return utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && ptr.Deref(opts.SendInitialEvents, false) && opts.AllowWatchBookmarks
329+
}

staging/src/k8s.io/client-go/util/consistencydetector/list_data_consistency_detector.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ func init() {
3232
dataConsistencyDetectionForListFromCacheEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR"))
3333
}
3434

35+
// IsDataConsistencyDetectionForListEnabled returns true when
36+
// the KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
37+
func IsDataConsistencyDetectionForListEnabled() bool {
38+
return dataConsistencyDetectionForListFromCacheEnabled
39+
}
40+
3541
// CheckListFromCacheDataConsistencyIfRequested performs a data consistency check only when
3642
// the KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR environment variable was set during a binary startup
3743
// for requests that have a high chance of being served from the watch-cache.
@@ -50,7 +56,7 @@ func init() {
5056
// the cache (even though this might not be true for some requests)
5157
// and issue the second call to get data from etcd for comparison.
5258
func CheckListFromCacheDataConsistencyIfRequested[T runtime.Object](ctx context.Context, identity string, listItemsFn ListFunc[T], optionsUsedToReceiveList metav1.ListOptions, receivedList runtime.Object) {
53-
if !dataConsistencyDetectionForListFromCacheEnabled {
59+
if !IsDataConsistencyDetectionForListEnabled() {
5460
return
5561
}
5662
checkListFromCacheDataConsistencyIfRequestedInternal(ctx, identity, listItemsFn, optionsUsedToReceiveList, receivedList)

test/e2e/apimachinery/watchlist.go

Lines changed: 106 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"net/http"
2323
"sort"
24+
"strings"
2425
"time"
2526

2627
"github.com/google/go-cmp/cmp"
@@ -117,14 +118,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
117118
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
118119

119120
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
120-
var expectedSecrets []unstructured.Unstructured
121-
for i := 1; i <= 5; i++ {
122-
unstructuredSecret, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newSecret(fmt.Sprintf("secret-%d", i)))
123-
framework.ExpectNoError(err)
124-
secret, err := f.DynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Create(ctx, &unstructured.Unstructured{Object: unstructuredSecret}, metav1.CreateOptions{})
125-
framework.ExpectNoError(err)
126-
expectedSecrets = append(expectedSecrets, *secret)
127-
}
121+
expectedSecrets := addWellKnownUnstructuredSecrets(ctx, f)
128122

129123
rt, clientConfig := clientConfigWithRoundTripper(f)
130124
wrappedDynamicClient, err := dynamic.NewForConfig(clientConfig)
@@ -171,16 +165,86 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
171165
expectedRequestMadeByMetaClient := getExpectedRequestMadeByClientFor(secretMetaList.GetResourceVersion())
172166
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByMetaClient))
173167
})
168+
169+
// Validates unsupported Accept headers in WatchList.
170+
// Sets AcceptContentType to "application/json;as=Table", which the API doesn't support, returning a 406 error.
171+
// After the 406, the client falls back to a regular list request.
172+
ginkgo.It("doesn't support receiving resources as Tables", func(ctx context.Context) {
173+
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
174+
175+
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
176+
_ = addWellKnownUnstructuredSecrets(ctx, f)
177+
178+
rt, clientConfig := clientConfigWithRoundTripper(f)
179+
modifiedClientConfig := dynamic.ConfigFor(clientConfig)
180+
modifiedClientConfig.AcceptContentTypes = strings.Join([]string{
181+
fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName),
182+
}, ",")
183+
modifiedClientConfig.GroupVersion = &v1.SchemeGroupVersion
184+
restClient, err := rest.RESTClientFor(modifiedClientConfig)
185+
framework.ExpectNoError(err)
186+
wrappedDynamicClient := dynamic.New(restClient)
187+
188+
// note that the client in case of an error (406) will fall back
189+
// to a standard list request thus the overall call passes
190+
ginkgo.By("Streaming secrets as Table from the server")
191+
secretTable, err := wrappedDynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{})
192+
framework.ExpectNoError(err)
193+
gomega.Expect(secretTable.GetObjectKind().GroupVersionKind()).To(gomega.Equal(metav1.SchemeGroupVersion.WithKind("Table")))
194+
195+
ginkgo.By("Verifying if expected response was sent by the server")
196+
gomega.Expect(rt.actualResponseStatuses[0]).To(gomega.Equal("406 Not Acceptable"))
197+
expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientWhenFallbackToListFor(secretTable.GetResourceVersion())
198+
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient))
199+
200+
})
201+
202+
// Sets AcceptContentType to both "application/json;as=Table" and "application/json".
203+
// Unlike the previous test, no 406 error occurs, as the API falls back to "application/json" and returns a valid response.
204+
ginkgo.It("falls backs to supported content type when when receiving resources as Tables was requested", func(ctx context.Context) {
205+
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
206+
207+
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
208+
expectedSecrets := addWellKnownUnstructuredSecrets(ctx, f)
209+
210+
rt, clientConfig := clientConfigWithRoundTripper(f)
211+
modifiedClientConfig := dynamic.ConfigFor(clientConfig)
212+
modifiedClientConfig.AcceptContentTypes = strings.Join([]string{
213+
fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName),
214+
"application/json",
215+
}, ",")
216+
modifiedClientConfig.GroupVersion = &v1.SchemeGroupVersion
217+
restClient, err := rest.RESTClientFor(modifiedClientConfig)
218+
framework.ExpectNoError(err)
219+
wrappedDynamicClient := dynamic.New(restClient)
220+
221+
ginkgo.By("Streaming secrets from the server")
222+
secretList, err := wrappedDynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{})
223+
framework.ExpectNoError(err)
224+
225+
ginkgo.By("Verifying if the secret list was properly streamed")
226+
streamedSecrets := secretList.Items
227+
gomega.Expect(cmp.Equal(expectedSecrets, streamedSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data"))
228+
229+
ginkgo.By("Verifying if expected requests were sent to the server")
230+
expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientFor(secretList.GetResourceVersion())
231+
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient))
232+
})
174233
})
175234

176235
type roundTripper struct {
177-
actualRequests []string
178-
delegate http.RoundTripper
236+
actualRequests []string
237+
actualResponseStatuses []string
238+
delegate http.RoundTripper
179239
}
180240

181241
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
182242
r.actualRequests = append(r.actualRequests, req.URL.RawQuery)
183-
return r.delegate.RoundTrip(req)
243+
rsp, err := r.delegate.RoundTrip(req)
244+
if rsp != nil {
245+
r.actualResponseStatuses = append(r.actualResponseStatuses, rsp.Status)
246+
}
247+
return rsp, err
184248
}
185249

186250
func (r *roundTripper) Wrap(delegate http.RoundTripper) http.RoundTripper {
@@ -211,10 +275,12 @@ func verifyStore(ctx context.Context, expectedSecrets []v1.Secret, store cache.S
211275
framework.ExpectNoError(err)
212276
}
213277

278+
// corresponds to a streaming request made by the client to stream the secrets
279+
const expectedStreamingRequestMadeByClient string = "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true"
280+
214281
func getExpectedRequestMadeByClientFor(rv string) []string {
215282
expectedRequestMadeByClient := []string{
216-
// corresponds to a streaming request made by the client to stream the secrets
217-
"allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true",
283+
expectedStreamingRequestMadeByClient,
218284
}
219285
if consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
220286
// corresponds to a standard list request made by the consistency detector build in into the client
@@ -223,6 +289,19 @@ func getExpectedRequestMadeByClientFor(rv string) []string {
223289
return expectedRequestMadeByClient
224290
}
225291

292+
func getExpectedRequestMadeByClientWhenFallbackToListFor(rv string) []string {
293+
expectedRequestMadeByClient := []string{
294+
expectedStreamingRequestMadeByClient,
295+
// corresponds to a list request made by the client
296+
"",
297+
}
298+
if consistencydetector.IsDataConsistencyDetectionForListEnabled() {
299+
// corresponds to a standard list request made by the consistency detector build in into the client
300+
expectedRequestMadeByClient = append(expectedRequestMadeByClient, fmt.Sprintf("resourceVersion=%s&resourceVersionMatch=Exact", rv))
301+
}
302+
return expectedRequestMadeByClient
303+
}
304+
226305
func addWellKnownSecrets(ctx context.Context, f *framework.Framework) []v1.Secret {
227306
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
228307
var secrets []v1.Secret
@@ -234,6 +313,20 @@ func addWellKnownSecrets(ctx context.Context, f *framework.Framework) []v1.Secre
234313
return secrets
235314
}
236315

316+
// addWellKnownUnstructuredSecrets exists because secrets from addWellKnownSecrets
317+
// don't have type info and cannot be converted.
318+
func addWellKnownUnstructuredSecrets(ctx context.Context, f *framework.Framework) []unstructured.Unstructured {
319+
var secrets []unstructured.Unstructured
320+
for i := 1; i <= 5; i++ {
321+
unstructuredSecret, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newSecret(fmt.Sprintf("secret-%d", i)))
322+
framework.ExpectNoError(err)
323+
secret, err := f.DynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Create(ctx, &unstructured.Unstructured{Object: unstructuredSecret}, metav1.CreateOptions{})
324+
framework.ExpectNoError(err)
325+
secrets = append(secrets, *secret)
326+
}
327+
return secrets
328+
}
329+
237330
type byName []v1.Secret
238331

239332
func (a byName) Len() int { return len(a) }

0 commit comments

Comments
 (0)