@@ -28,7 +28,9 @@ import (
28
28
"github.com/stretchr/testify/require"
29
29
)
30
30
31
- // TestGranterBasic is a datadriven test with the following commands:
31
+ // TestCPUGranterBasic is a datadriven test for the CPU GrantCoordinator and
32
+ // its constituents, without real requesters (WorkQueues). It has the
33
+ // following commands:
32
34
//
33
35
// init-grant-coordinator min-cpu=<int> max-cpu=<int> sql-kv-tokens=<int>
34
36
// sql-sql-tokens=<int>
@@ -39,22 +41,15 @@ import (
39
41
// took-without-permission work=<kind> [v=<int>]
40
42
// continue-grant-chain work=<kind>
41
43
// cpu-load runnable=<int> procs=<int> [infrequent=<bool>]
42
- // init-store-grant-coordinator
43
- // set-tokens io-tokens=<int> disk-write-tokens=<int>
44
- // adjust-disk-error actual-write-bytes=<int> actual-read-bytes=<int>
45
- func TestGranterBasic (t * testing.T ) {
44
+ func TestCPUGranterBasic (t * testing.T ) {
46
45
defer leaktest .AfterTest (t )()
47
46
defer log .Scope (t ).Close (t )
48
47
49
48
if ! goschedstats .Supported {
50
49
skip .IgnoreLint (t , "goschedstats not supported" )
51
50
}
52
51
var ambientCtx log.AmbientContext
53
- // requesters[numWorkKinds] is used for kv elastic work, when working with a
54
- // store grant coordinator.
55
- // requesters[numWorkKinds + 1] is used for snapshot ingest, when working with a
56
- // store grant coordinator.
57
- var requesters [numWorkKinds + 2 ]* testRequester
52
+ var requesters [numWorkKinds ]* testRequester
58
53
var coord * GrantCoordinator
59
54
clearRequesterAndCoord := func () {
60
55
coord = nil
@@ -103,10 +98,121 @@ func TestGranterBasic(t *testing.T) {
103
98
coord = coords .RegularCPU
104
99
return flushAndReset ()
105
100
101
+ case "set-has-waiting-requests" :
102
+ var v bool
103
+ d .ScanArgs (t , "v" , & v )
104
+ requesters [scanCPUWorkKind (t , d )].waitingRequests = v
105
+ return flushAndReset ()
106
+
107
+ case "set-return-value-from-granted" :
108
+ var v int
109
+ d .ScanArgs (t , "v" , & v )
110
+ requesters [scanCPUWorkKind (t , d )].returnValueFromGranted = int64 (v )
111
+ return flushAndReset ()
112
+
113
+ case "try-get" :
114
+ v := 1
115
+ if d .HasArg ("v" ) {
116
+ d .ScanArgs (t , "v" , & v )
117
+ }
118
+ requesters [scanCPUWorkKind (t , d )].tryGet (int64 (v ))
119
+ return flushAndReset ()
120
+
121
+ case "return-grant" :
122
+ v := 1
123
+ if d .HasArg ("v" ) {
124
+ d .ScanArgs (t , "v" , & v )
125
+ }
126
+ requesters [scanCPUWorkKind (t , d )].returnGrant (int64 (v ))
127
+ return flushAndReset ()
128
+
129
+ case "took-without-permission" :
130
+ v := 1
131
+ if d .HasArg ("v" ) {
132
+ d .ScanArgs (t , "v" , & v )
133
+ }
134
+ requesters [scanCPUWorkKind (t , d )].tookWithoutPermission (int64 (v ))
135
+ return flushAndReset ()
136
+
137
+ case "continue-grant-chain" :
138
+ requesters [scanCPUWorkKind (t , d )].continueGrantChain ()
139
+ return flushAndReset ()
140
+
141
+ case "cpu-load" :
142
+ var runnable , procs int
143
+ d .ScanArgs (t , "runnable" , & runnable )
144
+ d .ScanArgs (t , "procs" , & procs )
145
+ infrequent := false
146
+ if d .HasArg ("infrequent" ) {
147
+ d .ScanArgs (t , "infrequent" , & infrequent )
148
+ }
149
+
150
+ samplePeriod := time .Millisecond
151
+ if infrequent {
152
+ samplePeriod = 250 * time .Millisecond
153
+ }
154
+ coord .CPULoad (runnable , procs , samplePeriod )
155
+ str := flushAndReset ()
156
+ kvsa := coord .mu .cpuLoadListener .(* kvSlotAdjuster )
157
+ microsToMillis := func (micros int64 ) int64 {
158
+ return micros * int64 (time .Microsecond ) / int64 (time .Millisecond )
159
+ }
160
+ return fmt .Sprintf ("%sSlotAdjuster metrics: slots: %d, duration (short, long) millis: (%d, %d), inc: %d, dec: %d\n " ,
161
+ str , kvsa .totalSlotsMetric .Value (),
162
+ microsToMillis (kvsa .cpuLoadShortPeriodDurationMetric .Count ()),
163
+ microsToMillis (kvsa .cpuLoadLongPeriodDurationMetric .Count ()),
164
+ kvsa .slotAdjusterIncrementsMetric .Count (), kvsa .slotAdjusterDecrementsMetric .Count (),
165
+ )
166
+
167
+ default :
168
+ return fmt .Sprintf ("unknown command: %s" , d .Cmd )
169
+ }
170
+ })
171
+ }
172
+
173
+ // TestStoreGranterBasic is a datadriven test for the store GrantCoordinator
174
+ // and its constituents, without real requesters (WorkQueues). It has the
175
+ // following commands:
176
+ //
177
+ // init-store-grant-coordinator
178
+ // set-has-waiting-requests work=<kind> v=<true|false>
179
+ // set-return-value-from-granted work=<kind> v=<int>
180
+ // try-get work=<kind> [v=<int>]
181
+ // return-grant work=<kind> [v=<int>]
182
+ // took-without-permission work=<kind> [v=<int>]
183
+ // set-tokens-loop io-tokens=<int> disk-write-tokens=<int> loop=<int>
184
+ // set-tokens io-tokens=<int> disk-write-tokens=<int>
185
+ // store-write-done work=<kind> orig-tokens=<int> write-bytes=<int>
186
+ // adjust-disk-error actual-write-bytes=<int> actual-read-bytes=<int>
187
+ func TestStoreGranterBasic (t * testing.T ) {
188
+ defer leaktest .AfterTest (t )()
189
+ defer log .Scope (t ).Close (t )
190
+
191
+ var ambientCtx log.AmbientContext
192
+ var requesters [admissionpb .NumStoreWorkTypes ]* testRequester
193
+ var coord * GrantCoordinator
194
+ clearRequesterAndCoord := func () {
195
+ coord = nil
196
+ for i := range requesters {
197
+ requesters [i ] = nil
198
+ }
199
+ }
200
+ var buf strings.Builder
201
+ flushAndReset := func () string {
202
+ fmt .Fprintf (& buf , "GrantCoordinator:\n %s\n " , coord .String ())
203
+ str := buf .String ()
204
+ buf .Reset ()
205
+ return str
206
+ }
207
+ settings := cluster .MakeTestingClusterSettings ()
208
+ registry := metric .NewRegistry ()
209
+ datadriven .RunTest (t , datapathutils .TestDataPath (t , "store_granter" ), func (t * testing.T , d * datadriven.TestData ) string {
210
+ switch d .Cmd {
106
211
case "init-store-grant-coordinator" :
107
212
clearRequesterAndCoord ()
108
213
storeCoordinators := & StoreGrantCoordinators {
109
- settings : settings ,
214
+ ambientCtx : ambientCtx ,
215
+ settings : settings ,
110
216
makeStoreRequesterFunc : func (
111
217
ambientCtx log.AmbientContext , _ roachpb.StoreID , granters [admissionpb .NumWorkClasses ]granterWithStoreReplicatedWorkAdmitted ,
112
218
settings * cluster.Settings , metrics [admissionpb .NumWorkClasses ]* WorkQueueMetrics , opts workQueueOptions , knobs * TestingKnobs ,
@@ -131,8 +237,9 @@ func TestGranterBasic(t *testing.T) {
131
237
req := & storeTestRequester {}
132
238
req .requesters [admissionpb .RegularWorkClass ] = makeTestRequester (admissionpb .RegularWorkClass )
133
239
req .requesters [admissionpb .ElasticWorkClass ] = makeTestRequester (admissionpb .ElasticWorkClass )
134
- requesters [KVWork ] = req .requesters [admissionpb .RegularWorkClass ]
135
- requesters [numWorkKinds ] = req .requesters [admissionpb .ElasticWorkClass ]
240
+ requesters [admissionpb .RegularStoreWorkType ] = req .requesters [admissionpb .RegularWorkClass ]
241
+ requesters [admissionpb .ElasticStoreWorkType ] = req .requesters [admissionpb .ElasticWorkClass ]
242
+ // We will initialize requesters[SnapshotIngestStoreWorkType] below.
136
243
return req
137
244
},
138
245
disableTickerForTesting : true ,
@@ -150,6 +257,11 @@ func TestGranterBasic(t *testing.T) {
150
257
require .NotNil (t , kvStoreGranter .snapshotRequester )
151
258
snapshotGranter := kvStoreGranter .snapshotRequester .(* SnapshotQueue ).snapshotGranter
152
259
require .NotNil (t , snapshotGranter )
260
+ // Instead of injecting a testRequester at creation time, we have
261
+ // already created a SnapshotQueue, and are now replacing it with a
262
+ // testRequester.
263
+ //
264
+ // TODO(sumeer): inject instead of this replacement hack.
153
265
snapshotReq := & testRequester {
154
266
workKind : KVWork ,
155
267
granter : snapshotGranter ,
@@ -160,8 +272,8 @@ func TestGranterBasic(t *testing.T) {
160
272
}
161
273
kvStoreGranter .snapshotRequester = snapshotReq
162
274
snapshotQueue := storeCoordinators .TryGetSnapshotQueueForStore (1 )
163
- require .NotNil (t , snapshotQueue )
164
- requesters [numWorkKinds + 1 ] = snapshotReq
275
+ require .Equal (t , snapshotReq , snapshotQueue .( * testRequester ) )
276
+ requesters [admissionpb . SnapshotIngestStoreWorkType ] = snapshotReq
165
277
// Use the same model for the IO linear models.
166
278
tlm := tokensLinearModel {multiplier : 0.5 , constant : 50 }
167
279
// Use w-amp of 1 for the purpose of this test.
@@ -172,69 +284,39 @@ func TestGranterBasic(t *testing.T) {
172
284
case "set-has-waiting-requests" :
173
285
var v bool
174
286
d .ScanArgs (t , "v" , & v )
175
- requesters [scanWorkKind (t , d )].waitingRequests = v
287
+ requesters [scanStoreWorkType (t , d )].waitingRequests = v
176
288
return flushAndReset ()
177
289
178
290
case "set-return-value-from-granted" :
179
291
var v int
180
292
d .ScanArgs (t , "v" , & v )
181
- requesters [scanWorkKind (t , d )].returnValueFromGranted = int64 (v )
293
+ requesters [scanStoreWorkType (t , d )].returnValueFromGranted = int64 (v )
182
294
return flushAndReset ()
183
295
184
296
case "try-get" :
185
297
v := 1
186
298
if d .HasArg ("v" ) {
187
299
d .ScanArgs (t , "v" , & v )
188
300
}
189
- requesters [scanWorkKind (t , d )].tryGet (int64 (v ))
301
+ requesters [scanStoreWorkType (t , d )].tryGet (int64 (v ))
190
302
return flushAndReset ()
191
303
192
304
case "return-grant" :
193
305
v := 1
194
306
if d .HasArg ("v" ) {
195
307
d .ScanArgs (t , "v" , & v )
196
308
}
197
- requesters [scanWorkKind (t , d )].returnGrant (int64 (v ))
309
+ requesters [scanStoreWorkType (t , d )].returnGrant (int64 (v ))
198
310
return flushAndReset ()
199
311
200
312
case "took-without-permission" :
201
313
v := 1
202
314
if d .HasArg ("v" ) {
203
315
d .ScanArgs (t , "v" , & v )
204
316
}
205
- requesters [scanWorkKind (t , d )].tookWithoutPermission (int64 (v ))
206
- return flushAndReset ()
207
-
208
- case "continue-grant-chain" :
209
- requesters [scanWorkKind (t , d )].continueGrantChain ()
317
+ requesters [scanStoreWorkType (t , d )].tookWithoutPermission (int64 (v ))
210
318
return flushAndReset ()
211
319
212
- case "cpu-load" :
213
- var runnable , procs int
214
- d .ScanArgs (t , "runnable" , & runnable )
215
- d .ScanArgs (t , "procs" , & procs )
216
- infrequent := false
217
- if d .HasArg ("infrequent" ) {
218
- d .ScanArgs (t , "infrequent" , & infrequent )
219
- }
220
-
221
- samplePeriod := time .Millisecond
222
- if infrequent {
223
- samplePeriod = 250 * time .Millisecond
224
- }
225
- coord .CPULoad (runnable , procs , samplePeriod )
226
- str := flushAndReset ()
227
- kvsa := coord .mu .cpuLoadListener .(* kvSlotAdjuster )
228
- microsToMillis := func (micros int64 ) int64 {
229
- return micros * int64 (time .Microsecond ) / int64 (time .Millisecond )
230
- }
231
- return fmt .Sprintf ("%sSlotAdjuster metrics: slots: %d, duration (short, long) millis: (%d, %d), inc: %d, dec: %d\n " ,
232
- str , kvsa .totalSlotsMetric .Value (),
233
- microsToMillis (kvsa .cpuLoadShortPeriodDurationMetric .Count ()),
234
- microsToMillis (kvsa .cpuLoadLongPeriodDurationMetric .Count ()),
235
- kvsa .slotAdjusterIncrementsMetric .Count (), kvsa .slotAdjusterDecrementsMetric .Count (),
236
- )
237
-
238
320
case "set-tokens-loop" :
239
321
var ioTokens int
240
322
var elasticDiskWriteTokens int
@@ -309,7 +391,7 @@ func TestGranterBasic(t *testing.T) {
309
391
var origTokens , writeBytes int
310
392
d .ScanArgs (t , "orig-tokens" , & origTokens )
311
393
d .ScanArgs (t , "write-bytes" , & writeBytes )
312
- requesters [scanWorkKind (t , d )].granter .(granterWithStoreReplicatedWorkAdmitted ).storeWriteDone (
394
+ requesters [scanStoreWorkType (t , d )].granter .(granterWithStoreReplicatedWorkAdmitted ).storeWriteDone (
313
395
int64 (origTokens ), StoreWorkDoneInfo {WriteBytes : int64 (writeBytes )})
314
396
coord .testingTryGrant ()
315
397
return flushAndReset ()
@@ -490,20 +572,30 @@ func (str *storeTestRequester) setStoreRequestEstimates(estimates storeRequestEs
490
572
// Only used by ioLoadListener, so don't bother.
491
573
}
492
574
493
- func scanWorkKind (t * testing.T , d * datadriven.TestData ) int8 {
575
+ func scanCPUWorkKind (t * testing.T , d * datadriven.TestData ) WorkKind {
494
576
var kindStr string
495
577
d .ScanArgs (t , "work" , & kindStr )
496
578
switch kindStr {
497
579
case "kv" :
498
- return int8 ( KVWork )
580
+ return KVWork
499
581
case "sql-kv-response" :
500
- return int8 ( SQLKVResponseWork )
582
+ return SQLKVResponseWork
501
583
case "sql-sql-response" :
502
- return int8 (SQLSQLResponseWork )
503
- case "kv-elastic" :
504
- return int8 (numWorkKinds )
505
- case "kv-snapshot" :
506
- return int8 (numWorkKinds + 1 )
584
+ return SQLSQLResponseWork
585
+ }
586
+ panic ("unknown WorkKind" )
587
+ }
588
+
589
+ func scanStoreWorkType (t * testing.T , d * datadriven.TestData ) admissionpb.StoreWorkType {
590
+ var kindStr string
591
+ d .ScanArgs (t , "work" , & kindStr )
592
+ switch kindStr {
593
+ case "regular" :
594
+ return admissionpb .RegularStoreWorkType
595
+ case "elastic" :
596
+ return admissionpb .ElasticStoreWorkType
597
+ case "snapshot" :
598
+ return admissionpb .SnapshotIngestStoreWorkType
507
599
}
508
600
panic ("unknown WorkKind" )
509
601
}
0 commit comments