@@ -218,10 +218,10 @@ const (
218
218
219
219
// StartRequest begins the process of handling a request. We take the
220
220
// approach of updating the metrics about total requests queued and
221
- // executing on each path out of this method and Request::Wait. We do
222
- // not update those metrics in lower level functions because there can
223
- // be multiple lower level changes in one invocation here .
224
- func (qs * queueSet ) StartRequest (ctx context.Context , hashValue uint64 , descr1 , descr2 interface {}) (fq.Request , bool ) {
221
+ // executing at each point where there is a change in that quantity,
222
+ // because the metrics --- and only the metrics --- track that
223
+ // quantity per FlowSchema .
224
+ func (qs * queueSet ) StartRequest (ctx context.Context , hashValue uint64 , fsName string , descr1 , descr2 interface {}) (fq.Request , bool ) {
225
225
qs .lockAndSyncTime ()
226
226
defer qs .lock .Unlock ()
227
227
var req * request
@@ -231,11 +231,11 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1,
231
231
// Apply only concurrency limit, if zero queues desired
232
232
if qs .qCfg .DesiredNumQueues < 1 {
233
233
if qs .totRequestsExecuting >= qs .dCfg .ConcurrencyLimit {
234
- klog .V (5 ).Infof ("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d" , qs .qCfg .Name , descr1 , descr2 , qs .totRequestsExecuting , qs .dCfg .ConcurrencyLimit )
234
+ klog .V (5 ).Infof ("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d" , qs .qCfg .Name , fsName , descr1 , descr2 , qs .totRequestsExecuting , qs .dCfg .ConcurrencyLimit )
235
+ metrics .AddReject (qs .qCfg .Name , fsName , "concurrency-limit" )
235
236
return nil , qs .isIdleLocked ()
236
237
}
237
- req = qs .dispatchSansQueueLocked (ctx , descr1 , descr2 )
238
- metrics .UpdateFlowControlRequestsExecuting (qs .qCfg .Name , qs .totRequestsExecuting )
238
+ req = qs .dispatchSansQueueLocked (ctx , fsName , descr1 , descr2 )
239
239
return req , false
240
240
}
241
241
@@ -246,13 +246,12 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1,
246
246
// 3) Reject current request if there is not enough concurrency shares and
247
247
// we are at max queue length
248
248
// 4) If not rejected, create a request and enqueue
249
- req = qs .timeoutOldRequestsAndRejectOrEnqueueLocked (ctx , hashValue , descr1 , descr2 )
250
- defer metrics .UpdateFlowControlRequestsInQueue (qs .qCfg .Name , qs .totRequestsWaiting )
249
+ req = qs .timeoutOldRequestsAndRejectOrEnqueueLocked (ctx , hashValue , fsName , descr1 , descr2 )
251
250
// req == nil means that the request was rejected - no remaining
252
251
// concurrency shares and at max queue length already
253
252
if req == nil {
254
- klog .V (5 ).Infof ("QS(%s): rejecting request %#+v %#+v due to queue full" , qs .qCfg .Name , descr1 , descr2 )
255
- metrics .AddReject (qs .qCfg .Name , "queue-full" )
253
+ klog .V (5 ).Infof ("QS(%s): rejecting request %q % #+v %#+v due to queue full" , qs .qCfg .Name , fsName , descr1 , descr2 )
254
+ metrics .AddReject (qs .qCfg .Name , fsName , "queue-full" )
256
255
return nil , qs .isIdleLocked ()
257
256
}
258
257
@@ -266,7 +265,6 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1,
266
265
// fair queuing technique to pick a queue and dispatch a
267
266
// request from that queue.
268
267
qs .dispatchAsMuchAsPossibleLocked ()
269
- defer metrics .UpdateFlowControlRequestsExecuting (qs .qCfg .Name , qs .totRequestsExecuting )
270
268
271
269
// ========================================================================
272
270
// Step 3:
@@ -288,7 +286,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, descr1,
288
286
// known that the count does not need to be accurate.
289
287
// BTW, the count only needs to be accurate in a test that
290
288
// uses FakeEventClock::Run().
291
- klog .V (6 ).Infof ("QS(%s): Context of request %#+v %#+v is Done" , qs .qCfg .Name , descr1 , descr2 )
289
+ klog .V (6 ).Infof ("QS(%s): Context of request %q % #+v %#+v is Done" , qs .qCfg .Name , fsName , descr1 , descr2 )
292
290
qs .cancelWait (req )
293
291
qs .goroutineDoneOrBlocked ()
294
292
}()
@@ -329,7 +327,7 @@ func (req *request) wait() (bool, bool) {
329
327
switch decision {
330
328
case decisionReject :
331
329
klog .V (5 ).Infof ("QS(%s): request %#+v %#+v timed out after being enqueued\n " , qs .qCfg .Name , req .descr1 , req .descr2 )
332
- metrics .AddReject (qs .qCfg .Name , "time-out" )
330
+ metrics .AddReject (qs .qCfg .Name , req . fsName , "time-out" )
333
331
return false , qs .isIdleLocked ()
334
332
case decisionCancel :
335
333
// TODO(aaron-prindle) add metrics for this case
@@ -400,19 +398,20 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
400
398
// returns the enqueud request on a successful enqueue
401
399
// returns nil in the case that there is no available concurrency or
402
400
// the queuelengthlimit has been reached
403
- func (qs * queueSet ) timeoutOldRequestsAndRejectOrEnqueueLocked (ctx context.Context , hashValue uint64 , descr1 , descr2 interface {}) * request {
401
+ func (qs * queueSet ) timeoutOldRequestsAndRejectOrEnqueueLocked (ctx context.Context , hashValue uint64 , fsName string , descr1 , descr2 interface {}) * request {
404
402
// Start with the shuffle sharding, to pick a queue.
405
403
queueIdx := qs .chooseQueueIndexLocked (hashValue , descr1 , descr2 )
406
404
queue := qs .queues [queueIdx ]
407
405
// The next step is the logic to reject requests that have been waiting too long
408
- qs .removeTimedOutRequestsFromQueueLocked (queue )
406
+ qs .removeTimedOutRequestsFromQueueLocked (queue , fsName )
409
407
// NOTE: currently timeout is only checked for each new request. This means that there can be
410
408
// requests that are in the queue longer than the timeout if there are no new requests
411
409
// We prefer the simplicity over the promptness, at least for now.
412
410
413
411
// Create a request and enqueue
414
412
req := & request {
415
413
qs : qs ,
414
+ fsName : fsName ,
416
415
ctx : ctx ,
417
416
decision : lockingpromise .NewWriteOnce (& qs .lock , qs .counter ),
418
417
arrivalTime : qs .clock .Now (),
@@ -423,7 +422,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
423
422
if ok := qs .rejectOrEnqueueLocked (req ); ! ok {
424
423
return nil
425
424
}
426
- metrics .ObserveQueueLength (qs .qCfg .Name , len (queue .requests ))
425
+ metrics .ObserveQueueLength (qs .qCfg .Name , fsName , len (queue .requests ))
427
426
return req
428
427
}
429
428
@@ -446,7 +445,7 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
446
445
447
446
// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
448
447
// past the requestWaitLimit
449
- func (qs * queueSet ) removeTimedOutRequestsFromQueueLocked (queue * queue ) {
448
+ func (qs * queueSet ) removeTimedOutRequestsFromQueueLocked (queue * queue , fsName string ) {
450
449
timeoutIdx := - 1
451
450
now := qs .clock .Now ()
452
451
reqs := queue .requests
@@ -461,6 +460,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) {
461
460
req .decision .SetLocked (decisionReject )
462
461
// get index for timed out requests
463
462
timeoutIdx = i
463
+ metrics .AddRequestsInQueues (qs .qCfg .Name , req .fsName , - 1 )
464
464
} else {
465
465
break
466
466
}
@@ -505,6 +505,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
505
505
}
506
506
queue .Enqueue (request )
507
507
qs .totRequestsWaiting ++
508
+ metrics .AddRequestsInQueues (qs .qCfg .Name , request .fsName , 1 )
508
509
}
509
510
510
511
// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
@@ -522,10 +523,11 @@ func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
522
523
}
523
524
}
524
525
525
- func (qs * queueSet ) dispatchSansQueueLocked (ctx context.Context , descr1 , descr2 interface {}) * request {
526
+ func (qs * queueSet ) dispatchSansQueueLocked (ctx context.Context , fsName string , descr1 , descr2 interface {}) * request {
526
527
now := qs .clock .Now ()
527
528
req := & request {
528
529
qs : qs ,
530
+ fsName : fsName ,
529
531
ctx : ctx ,
530
532
startTime : now ,
531
533
decision : lockingpromise .NewWriteOnce (& qs .lock , qs .counter ),
@@ -535,8 +537,9 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, descr1, descr2
535
537
}
536
538
req .decision .SetLocked (decisionExecute )
537
539
qs .totRequestsExecuting ++
540
+ metrics .AddRequestsExecuting (qs .qCfg .Name , fsName , 1 )
538
541
if klog .V (5 ) {
539
- klog .Infof ("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing" , qs .qCfg .Name , now .Format (nsTimeFmt ), qs .virtualTime , descr1 , descr2 , qs .totRequestsExecuting )
542
+ klog .Infof ("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q % #+v %#+v, qs will have %d executing" , qs .qCfg .Name , now .Format (nsTimeFmt ), qs .virtualTime , fsName , descr1 , descr2 , qs .totRequestsExecuting )
540
543
}
541
544
return req
542
545
}
@@ -563,6 +566,8 @@ func (qs *queueSet) dispatchLocked() bool {
563
566
qs .totRequestsWaiting --
564
567
qs .totRequestsExecuting ++
565
568
queue .requestsExecuting ++
569
+ metrics .AddRequestsInQueues (qs .qCfg .Name , request .fsName , - 1 )
570
+ metrics .AddRequestsExecuting (qs .qCfg .Name , request .fsName , 1 )
566
571
if klog .V (6 ) {
567
572
klog .Infof ("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing" , qs .qCfg .Name , request .startTime .Format (nsTimeFmt ), qs .virtualTime , request .descr1 , request .descr2 , queue .index , queue .virtualStart , len (queue .requests ), queue .requestsExecuting )
568
573
}
@@ -590,6 +595,7 @@ func (qs *queueSet) cancelWait(req *request) {
590
595
// remove the request
591
596
queue .requests = append (queue .requests [:i ], queue .requests [i + 1 :]... )
592
597
qs .totRequestsWaiting --
598
+ metrics .AddRequestsInQueues (qs .qCfg .Name , req .fsName , - 1 )
593
599
break
594
600
}
595
601
}
@@ -634,8 +640,6 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
634
640
635
641
qs .finishRequestLocked (req )
636
642
qs .dispatchAsMuchAsPossibleLocked ()
637
- metrics .UpdateFlowControlRequestsInQueue (qs .qCfg .Name , qs .totRequestsWaiting )
638
- metrics .UpdateFlowControlRequestsExecuting (qs .qCfg .Name , qs .totRequestsExecuting )
639
643
return qs .isIdleLocked ()
640
644
}
641
645
@@ -644,6 +648,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
644
648
// callback updates important state in the queueSet
645
649
func (qs * queueSet ) finishRequestLocked (r * request ) {
646
650
qs .totRequestsExecuting --
651
+ metrics .AddRequestsExecuting (qs .qCfg .Name , r .fsName , - 1 )
647
652
648
653
if r .queue == nil {
649
654
if klog .V (6 ) {
0 commit comments