Skip to content

Commit db4380b

Browse files
authored
fix: add back drop metrics after sink refactor (#3074)
Signed-off-by: adarsh0728 <[email protected]>
1 parent ab53acf commit db4380b

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

rust/numaflow-core/src/pipeline/isb/writer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,12 @@ impl ISBWriter {
123123
async fn write_to_isb(&self, message: Message, cln_token: CancellationToken) -> Result<()> {
124124
// Handle dropped messages
125125
if message.dropped() {
126-
self.publish_stream_drop_metric("n/a", "to_drop", message.value.len());
126+
// Increment metric for user-initiated drops via DROP tag
127+
pipeline_metrics()
128+
.forwarder
129+
.udf_drop_total
130+
.get_or_create(pipeline_metric_labels(self.vertex_type.as_str()))
131+
.inc();
127132
return Ok(());
128133
}
129134

rust/numaflow-core/src/sinker/sink.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::error::Error;
55
use crate::message::Message;
66
use crate::metrics::{
77
PIPELINE_PARTITION_NAME_LABEL, monovertex_metrics, mvtx_forward_metric_labels,
8-
pipeline_metric_labels, pipeline_metrics,
8+
pipeline_drop_metric_labels, pipeline_metric_labels, pipeline_metrics,
99
};
1010
use crate::sinker::actor::{SinkActorMessage, SinkActorResponse};
1111
use numaflow_kafka::sink::KafkaSink;
@@ -453,7 +453,6 @@ impl SinkWriter {
453453
dropped_messages_size: usize,
454454
write_start_time: time::Instant,
455455
) {
456-
// TODO: add metric for dropped messages because of retry strategy
457456
if is_mono_vertex() {
458457
monovertex_metrics()
459458
.sink
@@ -465,6 +464,14 @@ impl SinkWriter {
465464
.write_total
466465
.get_or_create(mvtx_forward_metric_labels())
467466
.inc_by((messages_count - fallback_messages_count - dropped_messages_count) as u64);
467+
468+
if dropped_messages_count > 0 {
469+
monovertex_metrics()
470+
.sink
471+
.dropped_total
472+
.get_or_create(mvtx_forward_metric_labels())
473+
.inc_by(dropped_messages_count as u64);
474+
}
468475
} else {
469476
let mut labels = pipeline_metric_labels(VERTEX_TYPE_SINK).clone();
470477
labels.push((
@@ -487,6 +494,18 @@ impl SinkWriter {
487494
.write_processing_time
488495
.get_or_create(&labels)
489496
.observe(write_start_time.elapsed().as_micros() as f64);
497+
498+
if dropped_messages_count > 0 {
499+
pipeline_metrics()
500+
.forwarder
501+
.drop_total
502+
.get_or_create(&pipeline_drop_metric_labels(
503+
VERTEX_TYPE_SINK,
504+
get_vertex_name(),
505+
"Retries exhausted in the Sink",
506+
))
507+
.inc_by(dropped_messages_count as u64);
508+
}
490509
}
491510
}
492511

0 commit comments

Comments
 (0)