Skip to content

Commit 72cdc8c

Browse files
authored
Merge pull request kubernetes#89256 from yue9944882/integration-test-flowcontrol
Priority-level isolation integration test
2 parents 07179d0 + df5dfb4 commit 72cdc8c

File tree

8 files changed

+402
-0
lines changed

8 files changed

+402
-0
lines changed

build/visible_to/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ package_group(
435435
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
436436
"//staging/src/k8s.io/component-base/metrics/...",
437437
"//test/e2e_node",
438+
"//test/integration/apiserver/flowcontrol",
438439
"//vendor/...",
439440
],
440441
)

staging/src/k8s.io/component-base/metrics/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ package_group(
9393
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
9494
"//staging/src/k8s.io/component-base/metrics/...",
9595
"//test/e2e_node",
96+
"//test/integration/apiserver/flowcontrol",
9697
"//vendor/...",
9798
],
9899
)

test/integration/apiserver/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ filegroup(
9090
"//test/integration/apiserver/admissionwebhook:all-srcs",
9191
"//test/integration/apiserver/apply:all-srcs",
9292
"//test/integration/apiserver/certreload:all-srcs",
93+
"//test/integration/apiserver/flowcontrol:all-srcs",
9394
"//test/integration/apiserver/podlogs:all-srcs",
9495
],
9596
tags = ["automanaged"],
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_test")
2+
3+
go_test(
4+
name = "go_default_test",
5+
srcs = [
6+
"concurrency_test.go",
7+
"main_test.go",
8+
],
9+
tags = ["integration"],
10+
deps = [
11+
"//pkg/master:go_default_library",
12+
"//staging/src/k8s.io/api/flowcontrol/v1alpha1:go_default_library",
13+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
14+
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
15+
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
16+
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
17+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
18+
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
19+
"//staging/src/k8s.io/client-go/rest:go_default_library",
20+
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
21+
"//test/integration/framework:go_default_library",
22+
"//vendor/github.com/prometheus/common/expfmt:go_default_library",
23+
"//vendor/github.com/prometheus/common/model:go_default_library",
24+
],
25+
)
26+
27+
filegroup(
28+
name = "package-srcs",
29+
srcs = glob(["**"]),
30+
tags = ["automanaged"],
31+
visibility = ["//visibility:private"],
32+
)
33+
34+
filegroup(
35+
name = "all-srcs",
36+
srcs = [":package-srcs"],
37+
tags = ["automanaged"],
38+
visibility = ["//visibility:public"],
39+
)
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package flowcontrol
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"io"
23+
"net/http/httptest"
24+
"strings"
25+
"testing"
26+
"time"
27+
28+
"github.com/prometheus/common/expfmt"
29+
"github.com/prometheus/common/model"
30+
31+
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
32+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/runtime/schema"
34+
"k8s.io/apimachinery/pkg/util/wait"
35+
genericfeatures "k8s.io/apiserver/pkg/features"
36+
utilfeature "k8s.io/apiserver/pkg/util/feature"
37+
clientset "k8s.io/client-go/kubernetes"
38+
"k8s.io/client-go/rest"
39+
featuregatetesting "k8s.io/component-base/featuregate/testing"
40+
"k8s.io/kubernetes/pkg/master"
41+
"k8s.io/kubernetes/test/integration/framework"
42+
)
43+
44+
const (
45+
sharedConcurrencyMetricsName = "apiserver_flowcontrol_request_concurrency_limit"
46+
dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total"
47+
labelPriorityLevel = "priorityLevel"
48+
timeout = time.Second * 10
49+
)
50+
51+
func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
52+
opts := framework.MasterConfigOptions{EtcdOptions: framework.DefaultEtcdOptions()}
53+
opts.EtcdOptions.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
54+
masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(&opts)
55+
resourceConfig := master.DefaultAPIResourceConfigSource()
56+
resourceConfig.EnableVersions(schema.GroupVersion{
57+
Group: "flowcontrol.apiserver.k8s.io",
58+
Version: "v1alpha1",
59+
})
60+
masterConfig.GenericConfig.MaxRequestsInFlight = 1
61+
masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 1
62+
masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
63+
masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
64+
_, s, closeFn := framework.RunAMaster(masterConfig)
65+
66+
return s, masterConfig.GenericConfig.LoopbackClientConfig, closeFn
67+
}
68+
69+
func TestPriorityLevelIsolation(t *testing.T) {
70+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
71+
// NOTE: disabling the feature should fail the test
72+
_, loopbackConfig, closeFn := setup(t)
73+
defer closeFn()
74+
75+
loopbackClient := clientset.NewForConfigOrDie(loopbackConfig)
76+
noxu1Client := getClientFor(loopbackConfig, "noxu1")
77+
noxu2Client := getClientFor(loopbackConfig, "noxu2")
78+
79+
queueLength := 50
80+
concurrencyShares := 1
81+
82+
priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
83+
loopbackClient, "noxu1", concurrencyShares, queueLength)
84+
if err != nil {
85+
t.Error(err)
86+
}
87+
priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
88+
loopbackClient, "noxu2", concurrencyShares, queueLength)
89+
if err != nil {
90+
t.Error(err)
91+
}
92+
93+
sharedConcurrency, err := getSharedConcurrencyOfPriorityLevel(loopbackClient)
94+
if err != nil {
95+
t.Error(err)
96+
}
97+
98+
if 1 != sharedConcurrency[priorityLevelNoxu1.Name] {
99+
t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu1.Name], 1)
100+
}
101+
if 1 != sharedConcurrency[priorityLevelNoxu2.Name] {
102+
t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu2.Name], 1)
103+
}
104+
105+
stopCh := make(chan struct{})
106+
defer close(stopCh)
107+
108+
// "elephant"
109+
streamRequests(concurrencyShares+queueLength, func() {
110+
_, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
111+
if err != nil {
112+
t.Error(err)
113+
}
114+
}, stopCh)
115+
// "mouse"
116+
streamRequests(3, func() {
117+
_, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
118+
if err != nil {
119+
t.Error(err)
120+
}
121+
}, stopCh)
122+
123+
time.Sleep(time.Second * 10) // running in background for a while
124+
125+
reqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
126+
if err != nil {
127+
t.Error(err)
128+
}
129+
130+
noxu1RequestCount := reqCounts[priorityLevelNoxu1.Name]
131+
noxu2RequestCount := reqCounts[priorityLevelNoxu2.Name]
132+
133+
// Theoretically, the actual expected value of request counts upon the two priority-level should be
134+
// the equal. We're deliberately lax to make flakes super rare.
135+
if (noxu1RequestCount/2) > noxu2RequestCount || (noxu2RequestCount/2) > noxu1RequestCount {
136+
t.Errorf("imbalanced requests made by noxu1/2: (%d:%d)", noxu1RequestCount, noxu2RequestCount)
137+
}
138+
}
139+
140+
func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface {
141+
config := &rest.Config{
142+
Host: loopbackConfig.Host,
143+
QPS: -1,
144+
BearerToken: loopbackConfig.BearerToken,
145+
Impersonate: rest.ImpersonationConfig{
146+
UserName: username,
147+
},
148+
}
149+
return clientset.NewForConfigOrDie(config)
150+
}
151+
152+
func getMetrics(c clientset.Interface) (string, error) {
153+
resp, err := c.CoreV1().
154+
RESTClient().
155+
Get().
156+
RequestURI("/metrics").
157+
DoRaw(context.Background())
158+
if err != nil {
159+
return "", err
160+
}
161+
return string(resp), err
162+
}
163+
164+
func getSharedConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
165+
resp, err := getMetrics(c)
166+
if err != nil {
167+
return nil, err
168+
}
169+
170+
dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
171+
decoder := expfmt.SampleDecoder{
172+
Dec: dec,
173+
Opts: &expfmt.DecodeOptions{},
174+
}
175+
176+
concurrency := make(map[string]int)
177+
for {
178+
var v model.Vector
179+
if err := decoder.Decode(&v); err != nil {
180+
if err == io.EOF {
181+
// Expected loop termination condition.
182+
return concurrency, nil
183+
}
184+
return nil, fmt.Errorf("failed decoding metrics: %v", err)
185+
}
186+
for _, metric := range v {
187+
switch name := string(metric.Metric[model.MetricNameLabel]); name {
188+
case sharedConcurrencyMetricsName:
189+
concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
190+
}
191+
}
192+
}
193+
}
194+
195+
func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
196+
resp, err := getMetrics(c)
197+
if err != nil {
198+
return nil, err
199+
}
200+
201+
dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
202+
decoder := expfmt.SampleDecoder{
203+
Dec: dec,
204+
Opts: &expfmt.DecodeOptions{},
205+
}
206+
207+
reqCounts := make(map[string]int)
208+
for {
209+
var v model.Vector
210+
if err := decoder.Decode(&v); err != nil {
211+
if err == io.EOF {
212+
// Expected loop termination condition.
213+
return reqCounts, nil
214+
}
215+
return nil, fmt.Errorf("failed decoding metrics: %v", err)
216+
}
217+
for _, metric := range v {
218+
switch name := string(metric.Metric[model.MetricNameLabel]); name {
219+
case dispatchedRequestCountMetricsName:
220+
reqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
221+
}
222+
}
223+
}
224+
}
225+
226+
func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string, concurrencyShares, queuelength int) (*flowcontrolv1alpha1.PriorityLevelConfiguration, *flowcontrolv1alpha1.FlowSchema, error) {
227+
pl, err := c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(context.Background(), &flowcontrolv1alpha1.PriorityLevelConfiguration{
228+
ObjectMeta: metav1.ObjectMeta{
229+
Name: username,
230+
},
231+
Spec: flowcontrolv1alpha1.PriorityLevelConfigurationSpec{
232+
Type: flowcontrolv1alpha1.PriorityLevelEnablementLimited,
233+
Limited: &flowcontrolv1alpha1.LimitedPriorityLevelConfiguration{
234+
AssuredConcurrencyShares: int32(concurrencyShares),
235+
LimitResponse: flowcontrolv1alpha1.LimitResponse{
236+
Type: flowcontrolv1alpha1.LimitResponseTypeQueue,
237+
Queuing: &flowcontrolv1alpha1.QueuingConfiguration{
238+
Queues: 100,
239+
HandSize: 1,
240+
QueueLengthLimit: int32(queuelength),
241+
},
242+
},
243+
},
244+
},
245+
}, metav1.CreateOptions{})
246+
if err != nil {
247+
return nil, nil, err
248+
}
249+
fs, err := c.FlowcontrolV1alpha1().FlowSchemas().Create(context.TODO(), &flowcontrolv1alpha1.FlowSchema{
250+
ObjectMeta: metav1.ObjectMeta{
251+
Name: username,
252+
},
253+
Spec: flowcontrolv1alpha1.FlowSchemaSpec{
254+
DistinguisherMethod: &flowcontrolv1alpha1.FlowDistinguisherMethod{
255+
Type: flowcontrolv1alpha1.FlowDistinguisherMethodByUserType,
256+
},
257+
MatchingPrecedence: 1000,
258+
PriorityLevelConfiguration: flowcontrolv1alpha1.PriorityLevelConfigurationReference{
259+
Name: username,
260+
},
261+
Rules: []flowcontrolv1alpha1.PolicyRulesWithSubjects{
262+
{
263+
ResourceRules: []flowcontrolv1alpha1.ResourcePolicyRule{
264+
{
265+
Verbs: []string{flowcontrolv1alpha1.VerbAll},
266+
APIGroups: []string{flowcontrolv1alpha1.APIGroupAll},
267+
Resources: []string{flowcontrolv1alpha1.ResourceAll},
268+
Namespaces: []string{flowcontrolv1alpha1.NamespaceEvery},
269+
ClusterScope: true,
270+
},
271+
},
272+
Subjects: []flowcontrolv1alpha1.Subject{
273+
{
274+
Kind: flowcontrolv1alpha1.SubjectKindUser,
275+
User: &flowcontrolv1alpha1.UserSubject{
276+
Name: username,
277+
},
278+
},
279+
},
280+
},
281+
},
282+
},
283+
}, metav1.CreateOptions{})
284+
if err != nil {
285+
return nil, nil, err
286+
}
287+
288+
return pl, fs, wait.Poll(time.Second, timeout, func() (bool, error) {
289+
fs, err := c.FlowcontrolV1alpha1().FlowSchemas().Get(context.TODO(), username, metav1.GetOptions{})
290+
if err != nil {
291+
return false, err
292+
}
293+
for _, condition := range fs.Status.Conditions {
294+
if condition.Type == flowcontrolv1alpha1.FlowSchemaConditionDangling {
295+
if condition.Status == flowcontrolv1alpha1.ConditionFalse {
296+
return true, nil
297+
}
298+
}
299+
}
300+
return false, nil
301+
})
302+
}
303+
304+
func streamRequests(parallel int, request func(), stopCh <-chan struct{}) {
305+
for i := 0; i < parallel; i++ {
306+
go func() {
307+
for {
308+
select {
309+
case <-stopCh:
310+
return
311+
default:
312+
request()
313+
}
314+
}
315+
}()
316+
}
317+
}

0 commit comments

Comments
 (0)