Skip to content

Commit 30bc0fc

Browse files
committed
integration test for priority-level isolation
1 parent b0ed3cd commit 30bc0fc

File tree

6 files changed

+334
-0
lines changed

6 files changed

+334
-0
lines changed

test/integration/apiserver/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ filegroup(
9090
"//test/integration/apiserver/admissionwebhook:all-srcs",
9191
"//test/integration/apiserver/apply:all-srcs",
9292
"//test/integration/apiserver/certreload:all-srcs",
93+
"//test/integration/apiserver/flowcontrol:all-srcs",
9394
"//test/integration/apiserver/podlogs:all-srcs",
9495
],
9596
tags = ["automanaged"],
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_test")
2+
3+
go_test(
4+
name = "go_default_test",
5+
srcs = [
6+
"concurrency_test.go",
7+
"main_test.go",
8+
],
9+
tags = ["integration"],
10+
deps = [
11+
"//pkg/master:go_default_library",
12+
"//staging/src/k8s.io/api/flowcontrol/v1alpha1:go_default_library",
13+
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
14+
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
15+
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
16+
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
17+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
18+
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
19+
"//staging/src/k8s.io/client-go/rest:go_default_library",
20+
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
21+
"//test/integration/framework:go_default_library",
22+
"//vendor/github.com/prometheus/common/expfmt:go_default_library",
23+
"//vendor/github.com/prometheus/common/model:go_default_library",
24+
"//vendor/github.com/stretchr/testify/assert:go_default_library",
25+
"//vendor/github.com/stretchr/testify/require:go_default_library",
26+
],
27+
)
28+
29+
filegroup(
30+
name = "package-srcs",
31+
srcs = glob(["**"]),
32+
tags = ["automanaged"],
33+
visibility = ["//visibility:private"],
34+
)
35+
36+
filegroup(
37+
name = "all-srcs",
38+
srcs = [":package-srcs"],
39+
tags = ["automanaged"],
40+
visibility = ["//visibility:public"],
41+
)
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package flowcontrol
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"io"
23+
"net/http/httptest"
24+
"strings"
25+
"sync"
26+
"testing"
27+
"time"
28+
29+
"github.com/prometheus/common/expfmt"
30+
"github.com/prometheus/common/model"
31+
"github.com/stretchr/testify/assert"
32+
"github.com/stretchr/testify/require"
33+
34+
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
35+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36+
"k8s.io/apimachinery/pkg/runtime/schema"
37+
"k8s.io/apimachinery/pkg/util/wait"
38+
genericfeatures "k8s.io/apiserver/pkg/features"
39+
utilfeature "k8s.io/apiserver/pkg/util/feature"
40+
clientset "k8s.io/client-go/kubernetes"
41+
"k8s.io/client-go/rest"
42+
featuregatetesting "k8s.io/component-base/featuregate/testing"
43+
"k8s.io/kubernetes/pkg/master"
44+
"k8s.io/kubernetes/test/integration/framework"
45+
)
46+
47+
const (
48+
dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total"
49+
dispatchedRequestCountMetricsLabelPriorityLevel = "priorityLevel"
50+
timeout = time.Second * 10
51+
)
52+
53+
func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
54+
opts := framework.MasterConfigOptions{EtcdOptions: framework.DefaultEtcdOptions()}
55+
opts.EtcdOptions.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
56+
masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(&opts)
57+
resourceConfig := master.DefaultAPIResourceConfigSource()
58+
resourceConfig.EnableVersions(schema.GroupVersion{
59+
Group: "flowcontrol.apiserver.k8s.io",
60+
Version: "v1alpha1",
61+
})
62+
masterConfig.GenericConfig.MaxRequestsInFlight = 5
63+
masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 5
64+
masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
65+
masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
66+
_, s, closeFn := framework.RunAMaster(masterConfig)
67+
68+
return s, masterConfig.GenericConfig.LoopbackClientConfig, closeFn
69+
}
70+
71+
func TestPriorityLevelIsolation(t *testing.T) {
72+
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
73+
// NOTE: disabling the feature should fail the test
74+
_, loopbackConfig, closeFn := setup(t)
75+
defer closeFn()
76+
77+
loopbackClient := clientset.NewForConfigOrDie(loopbackConfig)
78+
noxu1Client := getClientFor(loopbackConfig, "noxu1")
79+
noxu2Client := getClientFor(loopbackConfig, "noxu2")
80+
81+
priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(loopbackClient, "noxu1")
82+
require.NoError(t, err)
83+
priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(loopbackClient, "noxu2")
84+
require.NoError(t, err)
85+
86+
wg := &sync.WaitGroup{}
87+
// "elephant"
88+
streamRequests(wg, 10, 100, func() {
89+
_, err := noxu1Client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
90+
require.NoError(t, err)
91+
})
92+
93+
streamRequests(nil, 1, 100, func() {
94+
_, err := noxu2Client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
95+
require.NoError(t, err)
96+
})
97+
98+
wg.Wait()
99+
100+
dispatchedCountNoxu1, err := getRequestCountOfPriorityLevel(loopbackClient, priorityLevelNoxu1.Name)
101+
require.NoError(t, err)
102+
dispatchedCountNoxu2, err := getRequestCountOfPriorityLevel(loopbackClient, priorityLevelNoxu2.Name)
103+
require.NoError(t, err)
104+
105+
assert.Equal(t, 1000, dispatchedCountNoxu1)
106+
assert.Equal(t, 100, dispatchedCountNoxu2)
107+
}
108+
109+
func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface {
110+
config := &rest.Config{
111+
Host: loopbackConfig.Host,
112+
QPS: -1,
113+
BearerToken: loopbackConfig.BearerToken,
114+
Impersonate: rest.ImpersonationConfig{
115+
UserName: username,
116+
},
117+
}
118+
return clientset.NewForConfigOrDie(config)
119+
}
120+
121+
func getRequestCountOfPriorityLevel(c clientset.Interface, priorityLevelName string) (int, error) {
122+
resp, err := c.CoreV1().
123+
RESTClient().
124+
Get().
125+
RequestURI("/metrics").
126+
DoRaw(context.TODO())
127+
if err != nil {
128+
return 0, err
129+
}
130+
131+
dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
132+
decoder := expfmt.SampleDecoder{
133+
Dec: dec,
134+
Opts: &expfmt.DecodeOptions{},
135+
}
136+
137+
for {
138+
var v model.Vector
139+
if err := decoder.Decode(&v); err != nil {
140+
if err == io.EOF {
141+
// Expected loop termination condition.
142+
return 0, fmt.Errorf("no dispatched-count metrics found for priorityLevel %v", priorityLevelName)
143+
}
144+
return 0, fmt.Errorf("failed decoding metrics: %v", err)
145+
}
146+
for _, metric := range v {
147+
switch name := string(metric.Metric[model.MetricNameLabel]); name {
148+
case dispatchedRequestCountMetricsName:
149+
if priorityLevelName == string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel]) {
150+
return int(metric.Value), nil
151+
}
152+
}
153+
}
154+
}
155+
}
156+
157+
func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string) (*flowcontrolv1alpha1.PriorityLevelConfiguration, *flowcontrolv1alpha1.FlowSchema, error) {
158+
pl, err := c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(context.TODO(), &flowcontrolv1alpha1.PriorityLevelConfiguration{
159+
ObjectMeta: metav1.ObjectMeta{
160+
Name: username,
161+
},
162+
Spec: flowcontrolv1alpha1.PriorityLevelConfigurationSpec{
163+
Type: flowcontrolv1alpha1.PriorityLevelEnablementLimited,
164+
Limited: &flowcontrolv1alpha1.LimitedPriorityLevelConfiguration{
165+
AssuredConcurrencyShares: 10,
166+
LimitResponse: flowcontrolv1alpha1.LimitResponse{
167+
Type: flowcontrolv1alpha1.LimitResponseTypeQueue,
168+
Queuing: &flowcontrolv1alpha1.QueuingConfiguration{
169+
Queues: 100,
170+
HandSize: 1,
171+
QueueLengthLimit: 10,
172+
},
173+
},
174+
},
175+
},
176+
}, metav1.CreateOptions{})
177+
if err != nil {
178+
return nil, nil, err
179+
}
180+
fs, err := c.FlowcontrolV1alpha1().FlowSchemas().Create(context.TODO(), &flowcontrolv1alpha1.FlowSchema{
181+
ObjectMeta: metav1.ObjectMeta{
182+
Name: username,
183+
},
184+
Spec: flowcontrolv1alpha1.FlowSchemaSpec{
185+
DistinguisherMethod: &flowcontrolv1alpha1.FlowDistinguisherMethod{
186+
Type: flowcontrolv1alpha1.FlowDistinguisherMethodByUserType,
187+
},
188+
MatchingPrecedence: 1000,
189+
PriorityLevelConfiguration: flowcontrolv1alpha1.PriorityLevelConfigurationReference{
190+
Name: username,
191+
},
192+
Rules: []flowcontrolv1alpha1.PolicyRulesWithSubjects{
193+
{
194+
ResourceRules: []flowcontrolv1alpha1.ResourcePolicyRule{
195+
{
196+
Verbs: []string{flowcontrolv1alpha1.VerbAll},
197+
APIGroups: []string{flowcontrolv1alpha1.APIGroupAll},
198+
Resources: []string{flowcontrolv1alpha1.ResourceAll},
199+
Namespaces: []string{flowcontrolv1alpha1.NamespaceEvery},
200+
ClusterScope: true,
201+
},
202+
},
203+
Subjects: []flowcontrolv1alpha1.Subject{
204+
{
205+
Kind: flowcontrolv1alpha1.SubjectKindUser,
206+
User: &flowcontrolv1alpha1.UserSubject{
207+
Name: username,
208+
},
209+
},
210+
},
211+
},
212+
},
213+
},
214+
}, metav1.CreateOptions{})
215+
if err != nil {
216+
return nil, nil, err
217+
}
218+
219+
return pl, fs, wait.Poll(time.Second, timeout, func() (bool, error) {
220+
fs, err := c.FlowcontrolV1alpha1().FlowSchemas().Get(context.TODO(), username, metav1.GetOptions{})
221+
if err != nil {
222+
return false, err
223+
}
224+
for _, condition := range fs.Status.Conditions {
225+
if condition.Type == flowcontrolv1alpha1.FlowSchemaConditionDangling {
226+
if condition.Status == flowcontrolv1alpha1.ConditionFalse {
227+
return true, nil
228+
}
229+
}
230+
}
231+
return false, nil
232+
})
233+
}
234+
235+
func streamRequests(wg *sync.WaitGroup, parallel, times int, request func()) {
236+
for i := 0; i < parallel; i++ {
237+
if wg != nil {
238+
wg.Add(1)
239+
}
240+
go func() {
241+
for j := 0; j < times; j++ {
242+
request()
243+
}
244+
if wg != nil {
245+
wg.Done()
246+
}
247+
}()
248+
}
249+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package flowcontrol
18+
19+
import (
20+
"testing"
21+
22+
"k8s.io/kubernetes/test/integration/framework"
23+
)
24+
25+
func TestMain(m *testing.M) {
26+
framework.EtcdMain(m.Run)
27+
}

test/integration/framework/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,13 @@ go_library(
6262
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library",
6363
"//staging/src/k8s.io/apiserver/pkg/authorization/union:go_default_library",
6464
"//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library",
65+
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
6566
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
6667
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
6768
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
6869
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
70+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
71+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library",
6972
"//staging/src/k8s.io/client-go/informers:go_default_library",
7073
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
7174
"//staging/src/k8s.io/client-go/rest:go_default_library",

test/integration/framework/master_utils.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,13 @@ import (
3939
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
4040
authorizerunion "k8s.io/apiserver/pkg/authorization/union"
4141
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
42+
genericfeatures "k8s.io/apiserver/pkg/features"
4243
genericapiserver "k8s.io/apiserver/pkg/server"
4344
"k8s.io/apiserver/pkg/server/options"
4445
serverstorage "k8s.io/apiserver/pkg/server/storage"
4546
"k8s.io/apiserver/pkg/storage/storagebackend"
47+
utilfeature "k8s.io/apiserver/pkg/util/feature"
48+
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
4649
"k8s.io/client-go/informers"
4750
clientset "k8s.io/client-go/kubernetes"
4851
restclient "k8s.io/client-go/rest"
@@ -191,6 +194,16 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
191194
}
192195

193196
masterConfig.ExtraConfig.VersionedInformers = informers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout)
197+
198+
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) {
199+
masterConfig.GenericConfig.FlowControl = utilflowcontrol.New(
200+
masterConfig.ExtraConfig.VersionedInformers,
201+
clientset.FlowcontrolV1alpha1(),
202+
masterConfig.GenericConfig.MaxRequestsInFlight+masterConfig.GenericConfig.MaxMutatingRequestsInFlight,
203+
masterConfig.GenericConfig.RequestTimeout/4,
204+
)
205+
}
206+
194207
m, err = masterConfig.Complete().New(genericapiserver.NewEmptyDelegate())
195208
if err != nil {
196209
// We log the error first so that even if closeFn crashes, the error is shown

0 commit comments

Comments
 (0)