Skip to content

Commit df5dfb4

Browse files
committed
assert shared concurrency
1 parent 875407a commit df5dfb4

File tree

3 files changed

+81
-16
lines changed

3 files changed

+81
-16
lines changed

build/visible_to/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ package_group(
435435
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics",
436436
"//staging/src/k8s.io/component-base/metrics/...",
437437
"//test/e2e_node",
438+
"//test/integration/apiserver/flowcontrol",
438439
"//vendor/...",
439440
],
440441
)

test/integration/apiserver/flowcontrol/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ go_test(
2121
"//test/integration/framework:go_default_library",
2222
"//vendor/github.com/prometheus/common/expfmt:go_default_library",
2323
"//vendor/github.com/prometheus/common/model:go_default_library",
24-
"//vendor/github.com/stretchr/testify/require:go_default_library",
2524
],
2625
)
2726

test/integration/apiserver/flowcontrol/concurrency_test.go

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727

2828
"github.com/prometheus/common/expfmt"
2929
"github.com/prometheus/common/model"
30-
"github.com/stretchr/testify/require"
3130

3231
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
3332
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -43,9 +42,10 @@ import (
4342
)
4443

4544
const (
46-
dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total"
47-
dispatchedRequestCountMetricsLabelPriorityLevel = "priorityLevel"
48-
timeout = time.Second * 10
45+
sharedConcurrencyMetricsName = "apiserver_flowcontrol_request_concurrency_limit"
46+
dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total"
47+
labelPriorityLevel = "priorityLevel"
48+
timeout = time.Second * 10
4949
)
5050

5151
func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
@@ -57,8 +57,8 @@ func setup(t testing.TB) (*httptest.Server, *rest.Config, framework.CloseFunc) {
5757
Group: "flowcontrol.apiserver.k8s.io",
5858
Version: "v1alpha1",
5959
})
60-
masterConfig.GenericConfig.MaxRequestsInFlight = 5
61-
masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 5
60+
masterConfig.GenericConfig.MaxRequestsInFlight = 1
61+
masterConfig.GenericConfig.MaxMutatingRequestsInFlight = 1
6262
masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
6363
masterConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
6464
_, s, closeFn := framework.RunAMaster(masterConfig)
@@ -81,33 +81,59 @@ func TestPriorityLevelIsolation(t *testing.T) {
8181

8282
priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
8383
loopbackClient, "noxu1", concurrencyShares, queueLength)
84-
require.NoError(t, err)
84+
if err != nil {
85+
t.Error(err)
86+
}
8587
priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
8688
loopbackClient, "noxu2", concurrencyShares, queueLength)
87-
require.NoError(t, err)
89+
if err != nil {
90+
t.Error(err)
91+
}
92+
93+
sharedConcurrency, err := getSharedConcurrencyOfPriorityLevel(loopbackClient)
94+
if err != nil {
95+
t.Error(err)
96+
}
97+
98+
if 1 != sharedConcurrency[priorityLevelNoxu1.Name] {
99+
t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu1.Name], 1)
100+
}
101+
if 1 != sharedConcurrency[priorityLevelNoxu2.Name] {
102+
t.Errorf("unexpected shared concurrency %v instead of %v", sharedConcurrency[priorityLevelNoxu2.Name], 1)
103+
}
88104

89105
stopCh := make(chan struct{})
90106
defer close(stopCh)
107+
91108
// "elephant"
92109
streamRequests(concurrencyShares+queueLength, func() {
93110
_, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
94-
require.NoError(t, err)
111+
if err != nil {
112+
t.Error(err)
113+
}
95114
}, stopCh)
96115
// "mouse"
97-
streamRequests(1, func() {
116+
streamRequests(3, func() {
98117
_, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
99-
require.NoError(t, err)
118+
if err != nil {
119+
t.Error(err)
120+
}
100121
}, stopCh)
101122

102123
time.Sleep(time.Second * 10) // running in background for a while
103124

104125
reqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
126+
if err != nil {
127+
t.Error(err)
128+
}
105129

106130
noxu1RequestCount := reqCounts[priorityLevelNoxu1.Name]
107131
noxu2RequestCount := reqCounts[priorityLevelNoxu2.Name]
108132

109-
if (noxu1RequestCount / 2) > noxu2RequestCount {
110-
t.Errorf("total requests made by noxu2 should at least half of noxu1: (%d:%d)", noxu1RequestCount, noxu2RequestCount)
133+
// Theoretically, the actual expected value of request counts upon the two priority-level should be
134+
// the equal. We're deliberately lax to make flakes super rare.
135+
if (noxu1RequestCount/2) > noxu2RequestCount || (noxu2RequestCount/2) > noxu1RequestCount {
136+
t.Errorf("imbalanced requests made by noxu1/2: (%d:%d)", noxu1RequestCount, noxu2RequestCount)
111137
}
112138
}
113139

@@ -123,12 +149,51 @@ func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interf
123149
return clientset.NewForConfigOrDie(config)
124150
}
125151

126-
func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
152+
func getMetrics(c clientset.Interface) (string, error) {
127153
resp, err := c.CoreV1().
128154
RESTClient().
129155
Get().
130156
RequestURI("/metrics").
131157
DoRaw(context.Background())
158+
if err != nil {
159+
return "", err
160+
}
161+
return string(resp), err
162+
}
163+
164+
func getSharedConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
165+
resp, err := getMetrics(c)
166+
if err != nil {
167+
return nil, err
168+
}
169+
170+
dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
171+
decoder := expfmt.SampleDecoder{
172+
Dec: dec,
173+
Opts: &expfmt.DecodeOptions{},
174+
}
175+
176+
concurrency := make(map[string]int)
177+
for {
178+
var v model.Vector
179+
if err := decoder.Decode(&v); err != nil {
180+
if err == io.EOF {
181+
// Expected loop termination condition.
182+
return concurrency, nil
183+
}
184+
return nil, fmt.Errorf("failed decoding metrics: %v", err)
185+
}
186+
for _, metric := range v {
187+
switch name := string(metric.Metric[model.MetricNameLabel]); name {
188+
case sharedConcurrencyMetricsName:
189+
concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
190+
}
191+
}
192+
}
193+
}
194+
195+
func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
196+
resp, err := getMetrics(c)
132197
if err != nil {
133198
return nil, err
134199
}
@@ -152,7 +217,7 @@ func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, erro
152217
for _, metric := range v {
153218
switch name := string(metric.Metric[model.MetricNameLabel]); name {
154219
case dispatchedRequestCountMetricsName:
155-
reqCounts[string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel])] = int(metric.Value)
220+
reqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
156221
}
157222
}
158223
}

0 commit comments

Comments
 (0)