Skip to content

Commit dfd4bff

Browse files
authored
feat(consumer): Add metrics for partitions assigned and revoked (#497)
* feat(consumer): Add metrics for partitions assigned and revoked
1 parent 38bb567 commit dfd4bff

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

src/kafka/consumer.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ pub async fn start_consumer(
6464

6565
handle_shutdown_signals(event_sender.clone());
6666
poll_consumer_client(consumer.clone(), client_shutdown_receiver);
67+
metrics::gauge!("arroyo.consumer.current_partitions").set(0);
6768
handle_events(
6869
consumer,
6970
event_receiver,
@@ -148,6 +149,8 @@ impl ConsumerContext for KafkaContext {
148149
info!("Partition assignment event sent, waiting for rendezvous...");
149150
let _ = rendezvous_receiver.recv();
150151
info!("Rendezvous complete");
152+
metrics::counter!("arroyo.consumer.partitions_assigned.count")
153+
.increment(tpl.count() as u64);
151154
}
152155
Rebalance::Revoke(tpl) => {
153156
debug!("Got pre-rebalance callback, kind: Revoke");
@@ -165,6 +168,8 @@ impl ConsumerContext for KafkaContext {
165168
info!("Partition revocation event sent, waiting for rendezvous...");
166169
let _ = rendezvous_receiver.recv();
167170
info!("Rendezvous complete");
171+
metrics::counter!("arroyo.consumer.partitions_revoked.count")
172+
.increment(tpl.count() as u64);
168173
}
169174
Rebalance::Error(err) => {
170175
debug!("Got pre-rebalance callback, kind: Error");
@@ -366,6 +371,7 @@ pub async fn handle_events(
366371
info!("Received event: {:?}", event);
367372
state = match (state, event) {
368373
(ConsumerState::Ready, Event::Assign(tpl)) => {
374+
metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64);
369375
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
370376
}
371377
(ConsumerState::Ready, Event::Revoke(_)) => {
@@ -381,12 +387,14 @@ pub async fn handle_events(
381387
"Revoked TPL should be equal to the subset of TPL we're consuming from"
382388
);
383389
handles.shutdown(CALLBACK_DURATION).await;
390+
metrics::gauge!("arroyo.consumer.current_partitions").set(0);
384391
ConsumerState::Ready
385392
}
386393
(ConsumerState::Consuming(handles, _), Event::Shutdown) => {
387394
handles.shutdown(CALLBACK_DURATION).await;
388395
debug!("Signaling shutdown to client...");
389396
shutdown_client.take();
397+
metrics::gauge!("arroyo.consumer.current_partitions").set(0);
390398
ConsumerState::Stopped
391399
}
392400
(ConsumerState::Stopped, _) => {

0 commit comments

Comments
 (0)