@@ -4,7 +4,7 @@ use std::{
4
4
cmp:: Ordering ,
5
5
sync:: {
6
6
atomic:: { AtomicU64 , Ordering as AtomicOrdering } ,
7
- mpsc, Arc ,
7
+ mpsc, Arc , Condvar , Mutex ,
8
8
} ,
9
9
thread:: { self , Builder as ThreadBuilder , JoinHandle } ,
10
10
time:: Duration ,
@@ -20,10 +20,10 @@ use txn_types::{Key, TimeStamp};
20
20
use super :: {
21
21
compaction_filter:: is_compaction_filter_allowed,
22
22
config:: GcWorkerConfigManager ,
23
- gc_worker:: { sync_gc , GcSafePointProvider , GcTask } ,
23
+ gc_worker:: { schedule_gc , GcSafePointProvider , GcTask } ,
24
24
Result ,
25
25
} ;
26
- use crate :: { server:: metrics:: * , tikv_util:: sys:: thread:: StdThreadBuildWrapper } ;
26
+ use crate :: { server:: metrics:: * , storage :: Callback , tikv_util:: sys:: thread:: StdThreadBuildWrapper } ;
27
27
28
28
const POLL_SAFE_POINT_INTERVAL_SECS : u64 = 10 ;
29
29
@@ -245,6 +245,8 @@ pub(super) struct GcManager<S: GcSafePointProvider, R: RegionInfoProvider, E: Kv
245
245
246
246
cfg_tracker : GcWorkerConfigManager ,
247
247
feature_gate : FeatureGate ,
248
+
249
+ max_concurrent_tasks : usize ,
248
250
}
249
251
250
252
impl < S : GcSafePointProvider , R : RegionInfoProvider + ' static , E : KvEngine > GcManager < S , R , E > {
@@ -254,6 +256,7 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
254
256
worker_scheduler : Scheduler < GcTask < E > > ,
255
257
cfg_tracker : GcWorkerConfigManager ,
256
258
feature_gate : FeatureGate ,
259
+ concurrent_tasks : usize ,
257
260
) -> GcManager < S , R , E > {
258
261
GcManager {
259
262
cfg,
@@ -263,6 +266,7 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
263
266
gc_manager_ctx : GcManagerContext :: new ( ) ,
264
267
cfg_tracker,
265
268
feature_gate,
269
+ max_concurrent_tasks : concurrent_tasks,
266
270
}
267
271
}
268
272
@@ -442,13 +446,27 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
442
446
let mut progress = Some ( Key :: from_encoded ( BEGIN_KEY . to_vec ( ) ) ) ;
443
447
444
448
// Records how many region we have GC-ed.
445
- let mut processed_regions = 0 ;
449
+ let mut scheduled_regions = 0 ;
450
+ let task_controller = Arc :: new ( ( Mutex :: new ( 0 ) , Condvar :: new ( ) ) ) ;
451
+ // the task_controller is the <mutex,Condvar> combination to control the number
452
+ // of tasks The mutex is used for protecting the number of current
453
+ // tasks, while the condvar is used for notifying/get notified when the
454
+ // number of current tasks is changed.
455
+ let ( lock, cvar) = & * task_controller;
456
+ let maybe_wait = |max_tasks| {
457
+ let mut current_tasks: std:: sync:: MutexGuard < ' _ , usize > = lock. lock ( ) . unwrap ( ) ;
458
+ while * current_tasks > max_tasks {
459
+ // Wait until the number of current tasks is below the limit
460
+ current_tasks = cvar. wait ( current_tasks) . unwrap ( ) ;
461
+ }
462
+ } ;
446
463
447
464
info ! ( "gc_worker: auto gc starts" ; "safe_point" => self . curr_safe_point( ) ) ;
448
465
449
466
// The following loop iterates all regions whose leader is on this TiKV and does
450
467
// GC on them. At the same time, check whether safe_point is updated
451
468
// periodically. If it's updated, rewinding will happen.
469
+
452
470
loop {
453
471
self . gc_manager_ctx . check_stopped ( ) ?;
454
472
if is_compaction_filter_allowed ( & self . cfg_tracker . value ( ) , & self . feature_gate ) {
@@ -462,9 +480,9 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
462
480
// We have worked to the end and we need to rewind. Restart from beginning.
463
481
progress = Some ( Key :: from_encoded ( BEGIN_KEY . to_vec ( ) ) ) ;
464
482
need_rewind = false ;
465
- info ! ( "gc_worker: auto gc rewinds" ; "processed_regions " => processed_regions ) ;
483
+ info ! ( "gc_worker: auto gc rewinds" ; "scheduled_regions " => scheduled_regions ) ;
466
484
467
- processed_regions = 0 ;
485
+ scheduled_regions = 0 ;
468
486
// Set the metric to zero to show that rewinding has happened.
469
487
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
470
488
. with_label_values ( & [ PROCESS_TYPE_GC ] )
@@ -483,19 +501,40 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
483
501
if finished {
484
502
// We have worked to the end of the TiKV or our progress has reached `end`, and
485
503
// we don't need to rewind. In this case, the round of GC has finished.
486
- info ! ( "gc_worker: auto gc finishes" ; "processed_regions" => processed_regions) ;
487
- return Ok ( ( ) ) ;
504
+ info ! ( "gc_worker: all regions task are scheduled" ;
505
+ "processed_regions" => scheduled_regions,
506
+ ) ;
507
+ break ;
488
508
}
489
509
}
490
-
491
510
assert ! ( progress. is_some( ) ) ;
492
511
493
512
// Before doing GC, check whether safe_point is updated periodically to
494
513
// determine if rewinding is needed.
495
514
self . check_if_need_rewind ( & progress, & mut need_rewind, & mut end) ;
496
515
497
- progress = self . gc_next_region ( progress. unwrap ( ) , & mut processed_regions) ?;
516
+ let controller: Arc < ( Mutex < usize > , Condvar ) > = Arc :: clone ( & task_controller) ;
517
+ let cb = Box :: new ( move |_res| {
518
+ let ( lock, cvar) = & * controller;
519
+ let mut current_tasks = lock. lock ( ) . unwrap ( ) ;
520
+ * current_tasks -= 1 ;
521
+ cvar. notify_one ( ) ;
522
+ AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
523
+ . with_label_values ( & [ PROCESS_TYPE_GC ] )
524
+ . inc ( ) ;
525
+ } ) ;
526
+ maybe_wait ( self . max_concurrent_tasks - 1 ) ;
527
+ let mut current_tasks = lock. lock ( ) . unwrap ( ) ;
528
+ progress = self . async_gc_next_region ( progress. unwrap ( ) , cb, & mut current_tasks) ?;
529
+ scheduled_regions += 1 ;
498
530
}
531
+
532
+ // wait for all tasks finished
533
+ self . gc_manager_ctx . check_stopped ( ) ?;
534
+ maybe_wait ( 0 ) ;
535
+ info ! ( "gc_worker: auto gc finishes" ; "processed_regions" => scheduled_regions) ;
536
+
537
+ Ok ( ( ) )
499
538
}
500
539
501
540
/// Checks whether we need to rewind in this round of GC. Only used in
@@ -536,13 +575,14 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
536
575
}
537
576
}
538
577
539
- /// Does GC on the next region after `from_key`. Returns the end key of the
540
- /// region it processed. If we have processed to the end of all regions,
541
- /// returns `None`.
542
- fn gc_next_region (
578
+ /// Does GC on the next region after `from_key` asynchronously . Returns the
579
+ /// end key of the region it processed. If we have processed to the end
580
+ /// of all regions, returns `None`.
581
+ fn async_gc_next_region (
543
582
& mut self ,
544
583
from_key : Key ,
545
- processed_regions : & mut usize ,
584
+ callback : Callback < ( ) > ,
585
+ running_tasks : & mut usize ,
546
586
) -> GcManagerResult < Option < Key > > {
547
587
// Get the information of the next region to do GC.
548
588
let ( region, next_key) = self . get_next_gc_context ( from_key) ;
@@ -552,16 +592,16 @@ impl<S: GcSafePointProvider, R: RegionInfoProvider + 'static, E: KvEngine> GcMan
552
592
let hex_end = format ! ( "{:?}" , log_wrappers:: Value :: key( region. get_end_key( ) ) ) ;
553
593
debug ! ( "trying gc" ; "region_id" => region. id, "start_key" => & hex_start, "end_key" => & hex_end) ;
554
594
555
- if let Err ( e ) = sync_gc ( & self . worker_scheduler , region , self . curr_safe_point ( ) ) {
556
- // Ignore the error and continue, since it's useless to retry this.
557
- // TODO: Find a better way to handle errors. Maybe we should retry.
558
- warn ! ( "failed gc" ; "start_key" => & hex_start , "end_key" => & hex_end , "err" => ?e ) ;
559
- }
560
-
561
- * processed_regions += 1 ;
562
- AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
563
- . with_label_values ( & [ PROCESS_TYPE_GC ] )
564
- . inc ( ) ;
595
+ let _ = schedule_gc (
596
+ & self . worker_scheduler ,
597
+ region ,
598
+ self . curr_safe_point ( ) ,
599
+ callback ,
600
+ )
601
+ . map ( |_| {
602
+ * running_tasks += 1 ;
603
+ Ok :: < ( ) , GcManagerError > ( ( ) )
604
+ } ) ;
565
605
566
606
Ok ( next_key)
567
607
}
@@ -710,8 +750,16 @@ mod tests {
710
750
impl GcManagerTestUtil {
711
751
pub fn new ( regions : BTreeMap < Vec < u8 > , RegionInfo > ) -> Self {
712
752
let ( gc_task_sender, gc_task_receiver) = channel ( ) ;
713
- let worker = WorkerBuilder :: new ( "test-gc-manager" ) . create ( ) ;
714
- let scheduler = worker. start ( "gc-manager" , MockGcRunner { tx : gc_task_sender } ) ;
753
+ let worker = WorkerBuilder :: new ( "test-gc-manager" )
754
+ . thread_count ( 2 )
755
+ . create ( ) ;
756
+ let scheduler = worker. start (
757
+ "gc-manager" ,
758
+ MockGcRunner {
759
+ tx : gc_task_sender. clone ( ) ,
760
+ } ,
761
+ ) ;
762
+ worker. start ( "gc-manager" , MockGcRunner { tx : gc_task_sender } ) ;
715
763
716
764
let ( safe_point_sender, safe_point_receiver) = channel ( ) ;
717
765
@@ -731,6 +779,7 @@ mod tests {
731
779
scheduler,
732
780
GcWorkerConfigManager :: default ( ) ,
733
781
Default :: default ( ) ,
782
+ 2 ,
734
783
) ;
735
784
Self {
736
785
gc_manager : Some ( gc_manager) ,
0 commit comments