7
7
// the Business Source License, use of this software will be governed
8
8
// by the Apache License, Version 2.0.
9
9
10
+ use rand:: Rng ;
10
11
use std:: collections:: { HashMap , HashSet , VecDeque } ;
11
12
use std:: convert:: TryInto ;
12
13
use std:: sync:: atomic:: { AtomicBool , Ordering } ;
@@ -667,8 +668,6 @@ struct ControlPlaneInfo {
667
668
start_offset : MzOffset ,
668
669
/// Source Type (Real-time or BYO)
669
670
source_type : Consistency ,
670
- /// Number of records processed since capability was last downgraded
671
- record_count_since_downgrade : u64 ,
672
671
}
673
672
674
673
impl ControlPlaneInfo {
@@ -685,7 +684,6 @@ impl ControlPlaneInfo {
685
684
start_offset,
686
685
source_type : consistency,
687
686
time_since_downgrade : Instant :: now ( ) ,
688
- record_count_since_downgrade : 0 ,
689
687
}
690
688
}
691
689
@@ -765,6 +763,7 @@ fn activate_source_timestamping<G>(
765
763
766
764
/// This function is responsible for refreshing the number of known partitions. It marks the source
767
765
/// has needing to be refreshed if new partitions are detected.
766
+ #[ allow( clippy:: too_many_arguments) ]
768
767
fn metadata_fetch (
769
768
timestamping_stopped : Arc < AtomicBool > ,
770
769
consumer : Arc < BaseConsumer < GlueConsumerContext > > ,
@@ -790,6 +789,7 @@ fn metadata_fetch(
790
789
}
791
790
792
791
let mut partition_kafka_metadata: HashMap < i32 , IntGauge > = HashMap :: new ( ) ;
792
+ let mut rng = rand:: thread_rng ( ) ;
793
793
794
794
while !timestamping_stopped. load ( Ordering :: SeqCst ) {
795
795
let metadata = consumer. fetch_metadata ( Some ( & topic) , Duration :: from_secs ( 30 ) ) ;
@@ -855,7 +855,10 @@ fn metadata_fetch(
855
855
}
856
856
857
857
if new_partition_count > 0 {
858
- thread:: sleep ( wait) ;
858
+ // Add jitter to spread-out metadata requests from workers. Brokers can get overloaded
859
+ // if all workers make simultaneous metadata request calls.
860
+ let sleep_jitter = rng. gen_range ( Duration :: from_secs ( 0 ) , Duration :: from_secs ( 15 ) ) ;
861
+ thread:: sleep ( wait + sleep_jitter) ;
859
862
} else {
860
863
// If no partitions have been detected yet, sleep for a second rather than
861
864
// the specified "wait" period of time, as we know that there should at least be one
@@ -1039,7 +1042,6 @@ where
1039
1042
& mut dp_info. partition_metrics . get_mut ( & partition) . unwrap ( ) ;
1040
1043
partition_metrics. offset_ingested . set ( offset. offset ) ;
1041
1044
partition_metrics. messages_ingested . inc ( ) ;
1042
- cp_info. record_count_since_downgrade += 1 ;
1043
1045
}
1044
1046
}
1045
1047
@@ -1271,7 +1273,6 @@ fn downgrade_capability(
1271
1273
if changed && min > 0 {
1272
1274
dp_info. source_metrics . capability . set ( min) ;
1273
1275
cap. downgrade ( & ( & min + 1 ) ) ;
1274
- cp_info. record_count_since_downgrade = 0 ;
1275
1276
cp_info. last_closed_ts = min;
1276
1277
}
1277
1278
} else {
@@ -1286,7 +1287,6 @@ fn downgrade_capability(
1286
1287
cap. downgrade ( & ( & ts + 1 ) ) ;
1287
1288
}
1288
1289
cp_info. time_since_downgrade = Instant :: now ( ) ;
1289
- cp_info. record_count_since_downgrade = 0 ;
1290
1290
}
1291
1291
}
1292
1292
}
0 commit comments