@@ -22,13 +22,11 @@ import (
22
22
"io"
23
23
"net/http/httptest"
24
24
"strings"
25
- "sync"
26
25
"testing"
27
26
"time"
28
27
29
28
"github.com/prometheus/common/expfmt"
30
29
"github.com/prometheus/common/model"
31
- "github.com/stretchr/testify/assert"
32
30
"github.com/stretchr/testify/require"
33
31
34
32
flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1"
@@ -78,32 +76,39 @@ func TestPriorityLevelIsolation(t *testing.T) {
78
76
noxu1Client := getClientFor (loopbackConfig , "noxu1" )
79
77
noxu2Client := getClientFor (loopbackConfig , "noxu2" )
80
78
81
- priorityLevelNoxu1 , _ , err := createPriorityLevelAndBindingFlowSchemaForUser (loopbackClient , "noxu1" )
79
+ queueLength := 50
80
+ concurrencyShares := 1
81
+
82
+ priorityLevelNoxu1 , _ , err := createPriorityLevelAndBindingFlowSchemaForUser (
83
+ loopbackClient , "noxu1" , concurrencyShares , queueLength )
82
84
require .NoError (t , err )
83
- priorityLevelNoxu2 , _ , err := createPriorityLevelAndBindingFlowSchemaForUser (loopbackClient , "noxu2" )
85
+ priorityLevelNoxu2 , _ , err := createPriorityLevelAndBindingFlowSchemaForUser (
86
+ loopbackClient , "noxu2" , concurrencyShares , queueLength )
84
87
require .NoError (t , err )
85
88
86
- wg := & sync.WaitGroup {}
89
+ stopCh := make (chan struct {})
90
+ defer close (stopCh )
87
91
// "elephant"
88
- streamRequests (wg , 10 , 100 , func () {
89
- _ , err := noxu1Client .CoreV1 ().Namespaces ().List (context .TODO (), metav1.ListOptions {})
92
+ streamRequests (concurrencyShares + queueLength , func () {
93
+ _ , err := noxu1Client .CoreV1 ().Namespaces ().List (context .Background (), metav1.ListOptions {})
90
94
require .NoError (t , err )
91
- })
92
-
93
- streamRequests (nil , 1 , 100 , func () {
94
- _ , err := noxu2Client .CoreV1 ().Namespaces ().List (context .TODO (), metav1.ListOptions {})
95
+ }, stopCh )
96
+ // "mouse"
97
+ streamRequests (1 , func () {
98
+ _ , err := noxu2Client .CoreV1 ().Namespaces ().List (context .Background (), metav1.ListOptions {})
95
99
require .NoError (t , err )
96
- })
100
+ }, stopCh )
97
101
98
- wg . Wait ()
102
+ time . Sleep ( time . Second * 10 ) // running in background for a while
99
103
100
- dispatchedCountNoxu1 , err := getRequestCountOfPriorityLevel (loopbackClient , priorityLevelNoxu1 .Name )
101
- require .NoError (t , err )
102
- dispatchedCountNoxu2 , err := getRequestCountOfPriorityLevel (loopbackClient , priorityLevelNoxu2 .Name )
103
- require .NoError (t , err )
104
+ reqCounts , err := getRequestCountOfPriorityLevel (loopbackClient )
104
105
105
- assert .Equal (t , 1000 , dispatchedCountNoxu1 )
106
- assert .Equal (t , 100 , dispatchedCountNoxu2 )
106
+ noxu1RequestCount := reqCounts [priorityLevelNoxu1 .Name ]
107
+ noxu2RequestCount := reqCounts [priorityLevelNoxu2 .Name ]
108
+
109
+ if (noxu1RequestCount / 2 ) > noxu2RequestCount {
110
+ t .Errorf ("total requests made by noxu2 should at least half of noxu1: (%d:%d)" , noxu1RequestCount , noxu2RequestCount )
111
+ }
107
112
}
108
113
109
114
func getClientFor (loopbackConfig * rest.Config , username string ) clientset.Interface {
@@ -118,14 +123,14 @@ func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interf
118
123
return clientset .NewForConfigOrDie (config )
119
124
}
120
125
121
- func getRequestCountOfPriorityLevel (c clientset.Interface , priorityLevelName string ) (int , error ) {
126
+ func getRequestCountOfPriorityLevel (c clientset.Interface ) (map [ string ] int , error ) {
122
127
resp , err := c .CoreV1 ().
123
128
RESTClient ().
124
129
Get ().
125
130
RequestURI ("/metrics" ).
126
- DoRaw (context .TODO ())
131
+ DoRaw (context .Background ())
127
132
if err != nil {
128
- return 0 , err
133
+ return nil , err
129
134
}
130
135
131
136
dec := expfmt .NewDecoder (strings .NewReader (string (resp )), expfmt .FmtText )
@@ -134,41 +139,40 @@ func getRequestCountOfPriorityLevel(c clientset.Interface, priorityLevelName str
134
139
Opts : & expfmt.DecodeOptions {},
135
140
}
136
141
142
+ reqCounts := make (map [string ]int )
137
143
for {
138
144
var v model.Vector
139
145
if err := decoder .Decode (& v ); err != nil {
140
146
if err == io .EOF {
141
147
// Expected loop termination condition.
142
- return 0 , fmt . Errorf ( "no dispatched-count metrics found for priorityLevel %v" , priorityLevelName )
148
+ return reqCounts , nil
143
149
}
144
- return 0 , fmt .Errorf ("failed decoding metrics: %v" , err )
150
+ return nil , fmt .Errorf ("failed decoding metrics: %v" , err )
145
151
}
146
152
for _ , metric := range v {
147
153
switch name := string (metric .Metric [model .MetricNameLabel ]); name {
148
154
case dispatchedRequestCountMetricsName :
149
- if priorityLevelName == string (metric .Metric [dispatchedRequestCountMetricsLabelPriorityLevel ]) {
150
- return int (metric .Value ), nil
151
- }
155
+ reqCounts [string (metric .Metric [dispatchedRequestCountMetricsLabelPriorityLevel ])] = int (metric .Value )
152
156
}
153
157
}
154
158
}
155
159
}
156
160
157
- func createPriorityLevelAndBindingFlowSchemaForUser (c clientset.Interface , username string ) (* flowcontrolv1alpha1.PriorityLevelConfiguration , * flowcontrolv1alpha1.FlowSchema , error ) {
158
- pl , err := c .FlowcontrolV1alpha1 ().PriorityLevelConfigurations ().Create (context .TODO (), & flowcontrolv1alpha1.PriorityLevelConfiguration {
161
+ func createPriorityLevelAndBindingFlowSchemaForUser (c clientset.Interface , username string , concurrencyShares , queuelength int ) (* flowcontrolv1alpha1.PriorityLevelConfiguration , * flowcontrolv1alpha1.FlowSchema , error ) {
162
+ pl , err := c .FlowcontrolV1alpha1 ().PriorityLevelConfigurations ().Create (context .Background (), & flowcontrolv1alpha1.PriorityLevelConfiguration {
159
163
ObjectMeta : metav1.ObjectMeta {
160
164
Name : username ,
161
165
},
162
166
Spec : flowcontrolv1alpha1.PriorityLevelConfigurationSpec {
163
167
Type : flowcontrolv1alpha1 .PriorityLevelEnablementLimited ,
164
168
Limited : & flowcontrolv1alpha1.LimitedPriorityLevelConfiguration {
165
- AssuredConcurrencyShares : 10 ,
169
+ AssuredConcurrencyShares : int32 ( concurrencyShares ) ,
166
170
LimitResponse : flowcontrolv1alpha1.LimitResponse {
167
171
Type : flowcontrolv1alpha1 .LimitResponseTypeQueue ,
168
172
Queuing : & flowcontrolv1alpha1.QueuingConfiguration {
169
173
Queues : 100 ,
170
174
HandSize : 1 ,
171
- QueueLengthLimit : 10 ,
175
+ QueueLengthLimit : int32 ( queuelength ) ,
172
176
},
173
177
},
174
178
},
@@ -232,17 +236,16 @@ func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, usern
232
236
})
233
237
}
234
238
235
- func streamRequests (wg * sync. WaitGroup , parallel , times int , request func ()) {
239
+ func streamRequests (parallel int , request func (), stopCh <- chan struct {} ) {
236
240
for i := 0 ; i < parallel ; i ++ {
237
- if wg != nil {
238
- wg .Add (1 )
239
- }
240
241
go func () {
241
- for j := 0 ; j < times ; j ++ {
242
- request ()
243
- }
244
- if wg != nil {
245
- wg .Done ()
242
+ for {
243
+ select {
244
+ case <- stopCh :
245
+ return
246
+ default :
247
+ request ()
248
+ }
246
249
}
247
250
}()
248
251
}
0 commit comments