@@ -40,7 +40,6 @@ import (
4040 "encoding/json"
4141 "fmt"
4242 "math"
43- "runtime"
4443 "runtime/trace"
4544 "strings"
4645 "sync"
@@ -264,159 +263,6 @@ type batchConnMetrics struct {
264263 bestBatchSize prometheus.Observer
265264}
266265
267- type batchConn struct {
268- // An atomic flag indicates whether the batch is idle or not.
269- // 0 for busy, others for idle.
270- idle uint32
271-
272- // batchCommandsCh used for batch commands.
273- batchCommandsCh chan * batchCommandsEntry
274- batchCommandsClients []* batchCommandsClient
275- tikvTransportLayerLoad uint64
276- closed chan struct {}
277-
278- reqBuilder * batchCommandsBuilder
279-
280- // Notify rpcClient to check the idle flag
281- idleNotify * uint32
282- idleDetect * time.Timer
283-
284- fetchMoreTimer * time.Timer
285-
286- index uint32
287-
288- metrics batchConnMetrics
289- }
290-
291- func newBatchConn (connCount , maxBatchSize uint , idleNotify * uint32 ) * batchConn {
292- return & batchConn {
293- batchCommandsCh : make (chan * batchCommandsEntry , maxBatchSize ),
294- batchCommandsClients : make ([]* batchCommandsClient , 0 , connCount ),
295- tikvTransportLayerLoad : 0 ,
296- closed : make (chan struct {}),
297- reqBuilder : newBatchCommandsBuilder (maxBatchSize ),
298- idleNotify : idleNotify ,
299- idleDetect : time .NewTimer (idleTimeout ),
300- }
301- }
302-
303- func (a * batchConn ) initMetrics (target string ) {
304- a .metrics .pendingRequests = metrics .TiKVBatchPendingRequests .WithLabelValues (target )
305- a .metrics .batchSize = metrics .TiKVBatchRequests .WithLabelValues (target )
306- a .metrics .sendLoopWaitHeadDur = metrics .TiKVBatchSendLoopDuration .WithLabelValues (target , "wait-head" )
307- a .metrics .sendLoopWaitMoreDur = metrics .TiKVBatchSendLoopDuration .WithLabelValues (target , "wait-more" )
308- a .metrics .sendLoopSendDur = metrics .TiKVBatchSendLoopDuration .WithLabelValues (target , "send" )
309- a .metrics .recvLoopRecvDur = metrics .TiKVBatchRecvLoopDuration .WithLabelValues (target , "recv" )
310- a .metrics .recvLoopProcessDur = metrics .TiKVBatchRecvLoopDuration .WithLabelValues (target , "process" )
311- a .metrics .batchSendTailLat = metrics .TiKVBatchSendTailLatency .WithLabelValues (target )
312- a .metrics .batchRecvTailLat = metrics .TiKVBatchRecvTailLatency .WithLabelValues (target )
313- a .metrics .headArrivalInterval = metrics .TiKVBatchHeadArrivalInterval .WithLabelValues (target )
314- a .metrics .batchMoreRequests = metrics .TiKVBatchMoreRequests .WithLabelValues (target )
315- a .metrics .bestBatchSize = metrics .TiKVBatchBestSize .WithLabelValues (target )
316- }
317-
318- func (a * batchConn ) isIdle () bool {
319- return atomic .LoadUint32 (& a .idle ) != 0
320- }
321-
322- // fetchAllPendingRequests fetches all pending requests from the channel.
323- func (a * batchConn ) fetchAllPendingRequests (maxBatchSize int ) (headRecvTime time.Time , headArrivalInterval time.Duration ) {
324- // Block on the first element.
325- latestReqStartTime := a .reqBuilder .latestReqStartTime
326- var headEntry * batchCommandsEntry
327- select {
328- case headEntry = <- a .batchCommandsCh :
329- if ! a .idleDetect .Stop () {
330- <- a .idleDetect .C
331- }
332- a .idleDetect .Reset (idleTimeout )
333- case <- a .idleDetect .C :
334- a .idleDetect .Reset (idleTimeout )
335- atomic .AddUint32 (& a .idle , 1 )
336- atomic .CompareAndSwapUint32 (a .idleNotify , 0 , 1 )
337- // This batchConn to be recycled
338- return time .Now (), 0
339- case <- a .closed :
340- return time .Now (), 0
341- }
342- if headEntry == nil {
343- return time .Now (), 0
344- }
345- headRecvTime = time .Now ()
346- if headEntry .start .After (latestReqStartTime ) && ! latestReqStartTime .IsZero () {
347- headArrivalInterval = headEntry .start .Sub (latestReqStartTime )
348- }
349- a .reqBuilder .push (headEntry )
350-
351- // This loop is for trying best to collect more requests.
352- for a .reqBuilder .len () < maxBatchSize {
353- select {
354- case entry := <- a .batchCommandsCh :
355- if entry == nil {
356- return
357- }
358- a .reqBuilder .push (entry )
359- default :
360- return
361- }
362- }
363- return
364- }
365-
366- // fetchMorePendingRequests fetches more pending requests from the channel.
367- func (a * batchConn ) fetchMorePendingRequests (
368- maxBatchSize int ,
369- batchWaitSize int ,
370- maxWaitTime time.Duration ,
371- ) {
372- // Try to collect `batchWaitSize` requests, or wait `maxWaitTime`.
373- if a .fetchMoreTimer == nil {
374- a .fetchMoreTimer = time .NewTimer (maxWaitTime )
375- } else {
376- a .fetchMoreTimer .Reset (maxWaitTime )
377- }
378- for a .reqBuilder .len () < batchWaitSize {
379- select {
380- case entry := <- a .batchCommandsCh :
381- if entry == nil {
382- if ! a .fetchMoreTimer .Stop () {
383- <- a .fetchMoreTimer .C
384- }
385- return
386- }
387- a .reqBuilder .push (entry )
388- case <- a .fetchMoreTimer .C :
389- return
390- }
391- }
392- if ! a .fetchMoreTimer .Stop () {
393- <- a .fetchMoreTimer .C
394- }
395-
396- // Do an additional non-block try. Here we test the length with `maxBatchSize` instead
397- // of `batchWaitSize` because trying best to fetch more requests is necessary so that
398- // we can adjust the `batchWaitSize` dynamically.
399- yielded := false
400- for a .reqBuilder .len () < maxBatchSize {
401- select {
402- case entry := <- a .batchCommandsCh :
403- if entry == nil {
404- return
405- }
406- a .reqBuilder .push (entry )
407- default :
408- if yielded {
409- return
410- }
411- // yield once to batch more requests.
412- runtime .Gosched ()
413- yielded = true
414- }
415- }
416- }
417-
418- const idleTimeout = 3 * time .Minute
419-
420266var (
421267 // presetBatchPolicies defines a set of [turboBatchOptions] as batch policies.
422268 presetBatchPolicies = map [string ]turboBatchOptions {
@@ -534,150 +380,6 @@ func (t *turboBatchTrigger) preferredBatchWaitSize(avgBatchWaitSize float64, def
534380 return batchWaitSize
535381}
536382
537- // BatchSendLoopPanicCounter is only used for testing.
538- var BatchSendLoopPanicCounter int64 = 0
539-
540- var initBatchPolicyWarn sync.Once
541-
542- func (a * batchConn ) batchSendLoop (cfg config.TiKVClient ) {
543- defer func () {
544- if r := recover (); r != nil {
545- metrics .TiKVPanicCounter .WithLabelValues (metrics .LabelBatchSendLoop ).Inc ()
546- logutil .BgLogger ().Error ("batchSendLoop" ,
547- zap .Any ("r" , r ),
548- zap .Stack ("stack" ))
549- atomic .AddInt64 (& BatchSendLoopPanicCounter , 1 )
550- logutil .BgLogger ().Info ("restart batchSendLoop" , zap .Int64 ("count" , atomic .LoadInt64 (& BatchSendLoopPanicCounter )))
551- go a .batchSendLoop (cfg )
552- }
553- }()
554-
555- trigger , ok := newTurboBatchTriggerFromPolicy (cfg .BatchPolicy )
556- if ! ok {
557- initBatchPolicyWarn .Do (func () {
558- logutil .BgLogger ().Warn ("fallback to default batch policy due to invalid value" , zap .String ("value" , cfg .BatchPolicy ))
559- })
560- }
561- turboBatchWaitTime := trigger .turboWaitTime ()
562-
563- avgBatchWaitSize := float64 (cfg .BatchWaitSize )
564- for {
565- sendLoopStartTime := time .Now ()
566- a .reqBuilder .reset ()
567-
568- headRecvTime , headArrivalInterval := a .fetchAllPendingRequests (int (cfg .MaxBatchSize ))
569- if a .reqBuilder .len () == 0 {
570- // the conn is closed or recycled.
571- return
572- }
573-
574- // curl -X PUT -d 'return(true)' http://0.0.0.0:10080/fail/tikvclient/mockBlockOnBatchClient
575- if val , err := util .EvalFailpoint ("mockBlockOnBatchClient" ); err == nil {
576- if val .(bool ) {
577- time .Sleep (1 * time .Hour )
578- }
579- }
580-
581- if batchSize := a .reqBuilder .len (); batchSize < int (cfg .MaxBatchSize ) {
582- if cfg .MaxBatchWaitTime > 0 && atomic .LoadUint64 (& a .tikvTransportLayerLoad ) > uint64 (cfg .OverloadThreshold ) {
583- // If the target TiKV is overload, wait a while to collect more requests.
584- metrics .TiKVBatchWaitOverLoad .Inc ()
585- a .fetchMorePendingRequests (int (cfg .MaxBatchSize ), int (cfg .BatchWaitSize ), cfg .MaxBatchWaitTime )
586- } else if turboBatchWaitTime > 0 && headArrivalInterval > 0 && trigger .needFetchMore (headArrivalInterval ) {
587- batchWaitSize := trigger .preferredBatchWaitSize (avgBatchWaitSize , int (cfg .BatchWaitSize ))
588- a .fetchMorePendingRequests (int (cfg .MaxBatchSize ), batchWaitSize , turboBatchWaitTime )
589- a .metrics .batchMoreRequests .Observe (float64 (a .reqBuilder .len () - batchSize ))
590- }
591- }
592- length := a .reqBuilder .len ()
593- avgBatchWaitSize = 0.2 * float64 (length ) + 0.8 * avgBatchWaitSize
594- a .metrics .pendingRequests .Observe (float64 (len (a .batchCommandsCh ) + length ))
595- a .metrics .bestBatchSize .Observe (avgBatchWaitSize )
596- a .metrics .headArrivalInterval .Observe (headArrivalInterval .Seconds ())
597- a .metrics .sendLoopWaitHeadDur .Observe (headRecvTime .Sub (sendLoopStartTime ).Seconds ())
598- a .metrics .sendLoopWaitMoreDur .Observe (time .Since (sendLoopStartTime ).Seconds ())
599-
600- a .getClientAndSend ()
601-
602- sendLoopEndTime := time .Now ()
603- a .metrics .sendLoopSendDur .Observe (sendLoopEndTime .Sub (sendLoopStartTime ).Seconds ())
604- if dur := sendLoopEndTime .Sub (headRecvTime ); dur > batchSendTailLatThreshold {
605- a .metrics .batchSendTailLat .Observe (dur .Seconds ())
606- }
607- }
608- }
609-
610- const (
611- SendFailedReasonNoAvailableLimit = "concurrency limit exceeded"
612- SendFailedReasonTryLockForSendFail = "tryLockForSend fail"
613- )
614-
615- func (a * batchConn ) getClientAndSend () {
616- if val , err := util .EvalFailpoint ("mockBatchClientSendDelay" ); err == nil {
617- if timeout , ok := val .(int ); ok && timeout > 0 {
618- time .Sleep (time .Duration (timeout * int (time .Millisecond )))
619- }
620- }
621-
622- // Choose a connection by round-robbin.
623- var (
624- cli * batchCommandsClient
625- target string
626- )
627- reasons := make ([]string , 0 )
628- hasHighPriorityTask := a .reqBuilder .hasHighPriorityTask ()
629- for i := 0 ; i < len (a .batchCommandsClients ); i ++ {
630- a .index = (a .index + 1 ) % uint32 (len (a .batchCommandsClients ))
631- target = a .batchCommandsClients [a .index ].target
632- // The lock protects the batchCommandsClient from been closed while it's in use.
633- c := a .batchCommandsClients [a .index ]
634- if hasHighPriorityTask || c .available () > 0 {
635- if c .tryLockForSend () {
636- cli = c
637- break
638- } else {
639- reasons = append (reasons , SendFailedReasonTryLockForSendFail )
640- }
641- } else {
642- reasons = append (reasons , SendFailedReasonNoAvailableLimit )
643- }
644- }
645- if cli == nil {
646- logutil .BgLogger ().Info ("no available connections" , zap .String ("target" , target ), zap .Any ("reasons" , reasons ))
647- metrics .TiKVNoAvailableConnectionCounter .Inc ()
648- if config .GetGlobalConfig ().TiKVClient .MaxConcurrencyRequestLimit == config .DefMaxConcurrencyRequestLimit {
649- // Only cancel requests when MaxConcurrencyRequestLimit feature is not enabled, to be compatible with the behavior of older versions.
650- // TODO: But when MaxConcurrencyRequestLimit feature is enabled, the requests won't be canceled and will wait until timeout.
651- // This behavior may not be reasonable, as the timeout is usually 40s or 60s, which is too long to retry in time.
652- a .reqBuilder .cancel (errors .New ("no available connections" ))
653- }
654- return
655- }
656- defer cli .unlockForSend ()
657- available := cli .available ()
658- reqSendTime := time .Now ()
659- batch := 0
660- req , forwardingReqs := a .reqBuilder .buildWithLimit (available , func (id uint64 , e * batchCommandsEntry ) {
661- cli .batched .Store (id , e )
662- cli .sent .Add (1 )
663- atomic .StoreInt64 (& e .sendLat , int64 (reqSendTime .Sub (e .start )))
664- if trace .IsEnabled () {
665- trace .Log (e .ctx , "rpc" , "send" )
666- }
667- })
668- if req != nil {
669- batch += len (req .RequestIds )
670- cli .send ("" , req )
671- }
672- for forwardedHost , req := range forwardingReqs {
673- batch += len (req .RequestIds )
674- cli .send (forwardedHost , req )
675- }
676- if batch > 0 {
677- a .metrics .batchSize .Observe (float64 (batch ))
678- }
679- }
680-
681383type tryLock struct {
682384 * sync.Cond
683385 reCreating bool
@@ -1127,18 +829,6 @@ func (c *batchCommandsClient) initBatchClient(forwardedHost string) error {
1127829 return nil
1128830}
1129831
1130- func (a * batchConn ) Close () {
1131- // Close all batchRecvLoop.
1132- for _ , c := range a .batchCommandsClients {
1133- // After connections are closed, `batchRecvLoop`s will check the flag.
1134- atomic .StoreInt32 (& c .closed , 1 )
1135- }
1136- // Don't close(batchCommandsCh) because when Close() is called, someone maybe
1137- // calling SendRequest and writing batchCommandsCh, if we close it here the
1138- // writing goroutine will panic.
1139- close (a .closed )
1140- }
1141-
1142832func sendBatchRequest (
1143833 ctx context.Context ,
1144834 addr string ,
0 commit comments