Skip to content

Commit 1874039

Browse files
authored
Merge pull request kubernetes#127388 from p0lyn0mial/upstream-watchlist-meta-client
metadata client: add support for API streaming
2 parents f153edf + 0f933a0 commit 1874039

File tree

2 files changed

+83
-14
lines changed

2 files changed

+83
-14
lines changed

staging/src/k8s.io/client-go/metadata/metadata.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
"k8s.io/apimachinery/pkg/types"
3434
"k8s.io/apimachinery/pkg/watch"
3535
"k8s.io/client-go/rest"
36+
"k8s.io/client-go/util/consistencydetector"
37+
"k8s.io/client-go/util/watchlist"
3638
)
3739

3840
var deleteScheme = runtime.NewScheme()
@@ -218,6 +220,24 @@ func (c *client) Get(ctx context.Context, name string, opts metav1.GetOptions, s
218220

219221
// List returns all resources within the specified scope (namespace or cluster).
220222
func (c *client) List(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) {
223+
if watchListOptions, hasWatchListOptionsPrepared, watchListOptionsErr := watchlist.PrepareWatchListOptionsFromListOptions(opts); watchListOptionsErr != nil {
224+
klog.FromContext(ctx).Error(watchListOptionsErr, "Failed preparing watchlist options, falling back to the standard LIST semantics", "resource", c.resource)
225+
} else if hasWatchListOptionsPrepared {
226+
result, err := c.watchList(ctx, watchListOptions)
227+
if err == nil {
228+
consistencydetector.CheckWatchListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("watchlist request for %v", c.resource), c.list, opts, result)
229+
return result, nil
230+
}
231+
klog.FromContext(ctx).Error(err, "The watchlist request ended with an error, falling back to the standard LIST semantics", "resource", c.resource)
232+
}
233+
result, err := c.list(ctx, opts)
234+
if err == nil {
235+
consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result)
236+
}
237+
return result, err
238+
}
239+
240+
func (c *client) list(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) {
221241
result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).
222242
SetHeader("Accept", "application/vnd.kubernetes.protobuf;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1,application/json;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1,application/json").
223243
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
@@ -249,6 +269,25 @@ func (c *client) List(ctx context.Context, opts metav1.ListOptions) (*metav1.Par
249269
return partial, nil
250270
}
251271

272+
// watchList establishes a watch stream with the server and returns PartialObjectMetadataList.
273+
func (c *client) watchList(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) {
274+
var timeout time.Duration
275+
if opts.TimeoutSeconds != nil {
276+
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
277+
}
278+
279+
result := &metav1.PartialObjectMetadataList{}
280+
err := c.client.client.Get().
281+
AbsPath(c.makeURLSegments("")...).
282+
SetHeader("Accept", "application/vnd.kubernetes.protobuf;as=PartialObjectMetadata;g=meta.k8s.io;v=v1,application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1,application/json").
283+
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
284+
Timeout(timeout).
285+
WatchList(ctx).
286+
Into(result)
287+
288+
return result, err
289+
}
290+
252291
// Watch finds all changes to the resources in the specified scope (namespace or cluster).
253292
func (c *client) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
254293
var timeout time.Duration

test/e2e/apimachinery/watchlist.go

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@ import (
3131
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3232
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3333
"k8s.io/apimachinery/pkg/runtime"
34+
"k8s.io/apimachinery/pkg/runtime/schema"
3435
"k8s.io/apimachinery/pkg/util/wait"
3536
"k8s.io/apimachinery/pkg/watch"
3637
utilfeature "k8s.io/apiserver/pkg/util/feature"
3738
"k8s.io/client-go/dynamic"
3839
clientfeatures "k8s.io/client-go/features"
3940
"k8s.io/client-go/kubernetes"
41+
"k8s.io/client-go/metadata"
4042
"k8s.io/client-go/rest"
4143
"k8s.io/client-go/tools/cache"
4244
"k8s.io/client-go/util/consistencydetector"
@@ -66,13 +68,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
6668
nil,
6769
)
6870

69-
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
70-
var expectedSecrets []v1.Secret
71-
for i := 1; i <= 5; i++ {
72-
secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
73-
framework.ExpectNoError(err)
74-
expectedSecrets = append(expectedSecrets, *secret)
75-
}
71+
expectedSecrets := addWellKnownSecrets(ctx, f)
7672

7773
ginkgo.By("Starting the secret informer")
7874
go secretInformer.Run(stopCh)
@@ -99,13 +95,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
9995
ginkgo.It("should be requested by client-go's List method when WatchListClient is enabled", func(ctx context.Context) {
10096
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
10197

102-
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
103-
var expectedSecrets []v1.Secret
104-
for i := 1; i <= 5; i++ {
105-
secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
106-
framework.ExpectNoError(err)
107-
expectedSecrets = append(expectedSecrets, *secret)
108-
}
98+
expectedSecrets := addWellKnownSecrets(ctx, f)
10999

110100
rt, clientConfig := clientConfigWithRoundTripper(f)
111101
wrappedKubeClient, err := kubernetes.NewForConfig(clientConfig)
@@ -152,6 +142,35 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
152142
expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientFor(secretList.GetResourceVersion())
153143
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient))
154144
})
145+
ginkgo.It("should be requested by metadata client's List method when WatchListClient is enabled", func(ctx context.Context) {
146+
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
147+
148+
metaClient, err := metadata.NewForConfig(f.ClientConfig())
149+
framework.ExpectNoError(err)
150+
expectedMetaSecrets := []metav1.PartialObjectMetadata{}
151+
for _, addedSecret := range addWellKnownSecrets(ctx, f) {
152+
addedSecretMeta, err := metaClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Get(ctx, addedSecret.Name, metav1.GetOptions{})
153+
framework.ExpectNoError(err)
154+
expectedMetaSecrets = append(expectedMetaSecrets, *addedSecretMeta)
155+
}
156+
157+
rt, clientConfig := clientConfigWithRoundTripper(f)
158+
wrappedMetaClient, err := metadata.NewForConfig(clientConfig)
159+
framework.ExpectNoError(err)
160+
161+
ginkgo.By("Streaming secrets metadata from the server")
162+
secretMetaList, err := wrappedMetaClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{})
163+
framework.ExpectNoError(err)
164+
165+
ginkgo.By("Verifying if the secret meta list was properly streamed")
166+
streamedMetaSecrets := secretMetaList.Items
167+
gomega.Expect(cmp.Equal(expectedMetaSecrets, streamedMetaSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data"))
168+
gomega.Expect(secretMetaList.GetObjectKind().GroupVersionKind()).To(gomega.Equal(schema.GroupVersion{}.WithKind("PartialObjectMetadataList")))
169+
170+
ginkgo.By("Verifying if expected requests were sent to the server")
171+
expectedRequestMadeByMetaClient := getExpectedRequestMadeByClientFor(secretMetaList.GetResourceVersion())
172+
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByMetaClient))
173+
})
155174
})
156175

157176
type roundTripper struct {
@@ -204,6 +223,17 @@ func getExpectedRequestMadeByClientFor(rv string) []string {
204223
return expectedRequestMadeByClient
205224
}
206225

226+
func addWellKnownSecrets(ctx context.Context, f *framework.Framework) []v1.Secret {
227+
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
228+
var secrets []v1.Secret
229+
for i := 1; i <= 5; i++ {
230+
secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
231+
framework.ExpectNoError(err)
232+
secrets = append(secrets, *secret)
233+
}
234+
return secrets
235+
}
236+
207237
type byName []v1.Secret
208238

209239
func (a byName) Len() int { return len(a) }

0 commit comments

Comments
 (0)