Skip to content

Commit 5d70a45

Browse files
authored
Cause certain Kafka metrics to be deleted when their resources are (MaterializeInc#3371)
This uses MaterializeInc/rust-prometheus#2 to guarantee that at least some metrics stop being reported when `DROP SOURCE` is performed. This is enough to resolve the false metrics appearing in #3352, but it does not delete all Kafka metrics when their sources are deleted, in particular in the timestamper, where we don't have structs that live along with their partitions, yet.
1 parent 215f486 commit 5d70a45

File tree

2 files changed

+35
-18
lines changed

2 files changed

+35
-18
lines changed

Cargo.lock

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/dataflow/src/source/kafka.rs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ use dataflow_types::{
2020
use expr::{PartitionId, SourceInstanceId};
2121
use lazy_static::lazy_static;
2222
use log::{debug, error, info, log_enabled, warn};
23+
use prometheus::core::{AtomicI64, AtomicU64};
2324
use prometheus::{
2425
register_int_counter, register_int_counter_vec, register_int_gauge_vec,
25-
register_uint_gauge_vec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge,
26-
UIntGaugeVec,
26+
register_uint_gauge_vec, DeleteOnDropCounter, DeleteOnDropGauge, IntCounter, IntCounterVec,
27+
IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
2728
};
2829
use rdkafka::consumer::base_consumer::PartitionQueue;
2930
use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
@@ -81,10 +82,10 @@ impl SourceMetrics {
8182

8283
/// Per-Kafka source partition metrics.
8384
pub struct PartitionMetrics {
84-
offset_ingested: IntGauge,
85-
offset_received: IntGauge,
86-
closed_ts: UIntGauge,
87-
messages_ingested: IntCounter,
85+
offset_ingested: DeleteOnDropGauge<'static, AtomicI64>,
86+
offset_received: DeleteOnDropGauge<'static, AtomicI64>,
87+
closed_ts: DeleteOnDropGauge<'static, AtomicU64>,
88+
messages_ingested: DeleteOnDropCounter<'static, AtomicI64>,
8889
}
8990

9091
impl PartitionMetrics {
@@ -119,10 +120,26 @@ impl PartitionMetrics {
119120
}
120121
let labels = &[topic_name, source_id, partition_id];
121122
PartitionMetrics {
122-
offset_ingested: OFFSET_INGESTED.with_label_values(labels),
123-
offset_received: OFFSET_RECEIVED.with_label_values(labels),
124-
closed_ts: CLOSED_TS.with_label_values(labels),
125-
messages_ingested: MESSAGES_INGESTED.with_label_values(labels),
123+
offset_ingested: DeleteOnDropGauge::new_with_error_handler(
124+
OFFSET_INGESTED.with_label_values(labels),
125+
&OFFSET_INGESTED,
126+
|e, v| log::warn!("unable to delete metric {}: {}", v.fq_name(), e),
127+
),
128+
offset_received: DeleteOnDropGauge::new_with_error_handler(
129+
OFFSET_RECEIVED.with_label_values(labels),
130+
&OFFSET_RECEIVED,
131+
|e, v| log::warn!("unable to delete metric {}: {}", v.fq_name(), e),
132+
),
133+
closed_ts: DeleteOnDropGauge::new_with_error_handler(
134+
CLOSED_TS.with_label_values(labels),
135+
&CLOSED_TS,
136+
|e, v| log::warn!("unable to delete metric {}: {}", v.fq_name(), e),
137+
),
138+
messages_ingested: DeleteOnDropCounter::new_with_error_handler(
139+
MESSAGES_INGESTED.with_label_values(labels),
140+
&MESSAGES_INGESTED,
141+
|e, v| log::warn!("unable to delete metric {}: {}", v.fq_name(), e),
142+
),
126143
}
127144
}
128145
}

0 commit comments

Comments
 (0)