@@ -53,7 +53,7 @@ lazy_static! {
53
53
/// Per-Kafka source metrics.
54
54
pub struct SourceMetrics {
55
55
operator_scheduled_counter : IntCounter ,
56
- capability : UIntGauge ,
56
+ capability : UIntGauge
57
57
}
58
58
59
59
impl SourceMetrics {
@@ -369,13 +369,17 @@ impl DataPlaneInfo {
369
369
let refresh = self . refresh_metadata_info . clone ( ) ;
370
370
let id = self . source_id . clone ( ) ;
371
371
let topic = self . topic_name . clone ( ) ;
372
+ let worker_id = self . worker_id ;
373
+ let worker_count = self . worker_count ;
372
374
thread:: spawn ( move || {
373
375
metadata_fetch (
374
376
timestamping_stopped,
375
377
consumer,
376
378
refresh,
377
379
& id,
378
380
& topic,
381
+ worker_id,
382
+ worker_count,
379
383
metadata_refresh_frequency,
380
384
)
381
385
} ) ;
@@ -421,8 +425,6 @@ impl DataPlaneInfo {
421
425
}
422
426
423
427
/// Returns true if this worker is responsible for this partition
424
- /// If multi-worker reading is not enabled, this worker is *always* responsible for the
425
- /// partition
426
428
/// Ex: if pid=0 and worker_id = 0, then true
427
429
/// if pid=1 and worker_id = 0, then false
428
430
fn has_partition ( & self , partition_id : i32 ) -> bool {
@@ -665,6 +667,8 @@ struct ControlPlaneInfo {
665
667
start_offset : MzOffset ,
666
668
/// Source Type (Real-time or BYO)
667
669
source_type : Consistency ,
670
+ /// Number of records processed since capability was last downgraded
671
+ record_count_since_downgrade : u64 ,
668
672
}
669
673
670
674
impl ControlPlaneInfo {
@@ -681,6 +685,7 @@ impl ControlPlaneInfo {
681
685
start_offset,
682
686
source_type : consistency,
683
687
time_since_downgrade : Instant :: now ( ) ,
688
+ record_count_since_downgrade : 0 ,
684
689
}
685
690
}
686
691
@@ -766,6 +771,8 @@ fn metadata_fetch(
766
771
partition_count : Arc < Mutex < Option < i32 > > > ,
767
772
id : & str ,
768
773
topic : & str ,
774
+ worker_id : i32 ,
775
+ worker_count : i32 ,
769
776
wait : Duration ,
770
777
) {
771
778
debug ! (
@@ -811,26 +818,29 @@ fn metadata_fetch(
811
818
// Upgrade partition metrics
812
819
for p in 0 ..new_partition_count {
813
820
let pid = p. try_into ( ) . unwrap ( ) ;
814
- match consumer. fetch_watermarks ( & topic, pid, Duration :: from_secs ( 1 ) ) {
815
- Ok ( ( _, high) ) => {
816
- if let Some ( max_available_offset) =
817
- partition_kafka_metadata. get_mut ( & pid)
818
- {
819
- max_available_offset. set ( high)
820
- } else {
821
- let max_offset = MAX_AVAILABLE_OFFSET . with_label_values ( & [
822
- topic,
823
- & id,
824
- & pid. to_string ( ) ,
825
- ] ) ;
826
- max_offset. set ( high) ;
827
- partition_kafka_metadata. insert ( pid, max_offset) ;
821
+ // Only check metadata updates for partitions that the worker owns
822
+ if ( pid % worker_count) == worker_id {
823
+ match consumer. fetch_watermarks ( & topic, pid, Duration :: from_secs ( 1 ) ) {
824
+ Ok ( ( _, high) ) => {
825
+ if let Some ( max_available_offset) =
826
+ partition_kafka_metadata. get_mut ( & pid)
827
+ {
828
+ max_available_offset. set ( high)
829
+ } else {
830
+ let max_offset = MAX_AVAILABLE_OFFSET . with_label_values ( & [
831
+ topic,
832
+ & id,
833
+ & pid. to_string ( ) ,
834
+ ] ) ;
835
+ max_offset. set ( high) ;
836
+ partition_kafka_metadata. insert ( pid, max_offset) ;
837
+ }
828
838
}
839
+ Err ( e) => warn ! (
840
+ "error loading watermarks topic={} partition={} error={}" ,
841
+ topic, p, e
842
+ ) ,
829
843
}
830
- Err ( e) => warn ! (
831
- "error loading watermarks topic={} partition={} error={}" ,
832
- topic, p, e
833
- ) ,
834
844
}
835
845
}
836
846
@@ -1029,6 +1039,7 @@ where
1029
1039
& mut dp_info. partition_metrics . get_mut ( & partition) . unwrap ( ) ;
1030
1040
partition_metrics. offset_ingested . set ( offset. offset ) ;
1031
1041
partition_metrics. messages_ingested . inc ( ) ;
1042
+ cp_info. record_count_since_downgrade +=1 ;
1032
1043
}
1033
1044
}
1034
1045
@@ -1260,6 +1271,7 @@ fn downgrade_capability(
1260
1271
if changed && min > 0 {
1261
1272
dp_info. source_metrics . capability . set ( min) ;
1262
1273
cap. downgrade ( & ( & min + 1 ) ) ;
1274
+ cp_info. record_count_since_downgrade = 0 ;
1263
1275
cp_info. last_closed_ts = min;
1264
1276
}
1265
1277
} else {
@@ -1274,6 +1286,7 @@ fn downgrade_capability(
1274
1286
cap. downgrade ( & ( & ts + 1 ) ) ;
1275
1287
}
1276
1288
cp_info. time_since_downgrade = Instant :: now ( ) ;
1289
+ cp_info. record_count_since_downgrade = 0 ;
1277
1290
}
1278
1291
}
1279
1292
}
0 commit comments