Skip to content

Commit fe41ace

Browse files
committed
Reducing lock acquisition time in metadata requests
1 parent d43b1d5 commit fe41ace

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

src/dataflow/src/source/kafka.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ lazy_static! {
5353
/// Per-Kafka source metrics.
5454
pub struct SourceMetrics {
5555
operator_scheduled_counter: IntCounter,
56-
capability: UIntGauge
56+
capability: UIntGauge,
5757
}
5858

5959
impl SourceMetrics {
@@ -813,7 +813,6 @@ fn metadata_fetch(
813813
continue;
814814
}
815815
new_partition_count = metadata_topic.partitions().len();
816-
let mut refresh_data = partition_count.lock().expect("lock poisoned");
817816

818817
// Upgrade partition metrics
819818
for p in 0..new_partition_count {
@@ -846,7 +845,8 @@ fn metadata_fetch(
846845

847846
// Kafka partition are i32, and Kafka consequently cannot support more than i32
848847
// partitions
849-
*refresh_data = Some(new_partition_count.try_into().unwrap());
848+
*partition_count.lock().expect("lock poisoned") =
849+
Some(new_partition_count.try_into().unwrap());
850850
}
851851
Err(e) => {
852852
new_partition_count = 0;
@@ -1039,7 +1039,7 @@ where
10391039
&mut dp_info.partition_metrics.get_mut(&partition).unwrap();
10401040
partition_metrics.offset_ingested.set(offset.offset);
10411041
partition_metrics.messages_ingested.inc();
1042-
cp_info.record_count_since_downgrade+=1;
1042+
cp_info.record_count_since_downgrade += 1;
10431043
}
10441044
}
10451045

0 commit comments

Comments
 (0)