Skip to content

Commit 8ab18aa

Browse files
authored
chore(producer): Add produce time by topic (#423)
1 parent e20865a commit 8ab18aa

File tree

6 files changed

+414
-6
lines changed

6 files changed

+414
-6
lines changed

arroyo/backends/kafka/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1-
from .configuration import build_kafka_configuration, build_kafka_consumer_configuration
1+
from .configuration import (
2+
build_kafka_configuration,
3+
build_kafka_consumer_configuration,
4+
build_kafka_producer_configuration,
5+
)
26
from .consumer import KafkaConsumer, KafkaPayload, KafkaProducer
37

48
__all__ = [
59
"build_kafka_configuration",
610
"build_kafka_consumer_configuration",
11+
"build_kafka_producer_configuration",
712
"KafkaConsumer",
813
"KafkaPayload",
914
"KafkaProducer",

arroyo/backends/kafka/configuration.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,77 @@ def stats_callback(stats_json: str) -> None:
4949
)
5050

5151

52+
def producer_stats_callback(stats_json: str) -> None:
53+
stats = json.loads(stats_json)
54+
metrics = get_metrics()
55+
56+
# Extract broker-level int_latency metrics
57+
brokers = stats.get("brokers", {})
58+
for broker_id, broker_stats in brokers.items():
59+
int_latency = broker_stats.get("int_latency", {})
60+
if int_latency:
61+
p99_latency_ms = int_latency.get("p99", 0) / 1000.0
62+
metrics.timing(
63+
"arroyo.producer.librdkafka.p99_int_latency",
64+
p99_latency_ms,
65+
tags={"broker_id": str(broker_id)},
66+
)
67+
avg_latency_ms = int_latency.get("avg", 0) / 1000.0
68+
metrics.timing(
69+
"arroyo.producer.librdkafka.avg_int_latency",
70+
avg_latency_ms,
71+
tags={"broker_id": str(broker_id)},
72+
)
73+
74+
outbuf_latency = broker_stats.get("outbuf_latency", {})
75+
if outbuf_latency:
76+
p99_latency_ms = outbuf_latency.get("p99", 0) / 1000.0
77+
metrics.timing(
78+
"arroyo.producer.librdkafka.p99_outbuf_latency",
79+
p99_latency_ms,
80+
tags={"broker_id": str(broker_id)},
81+
)
82+
avg_latency_ms = outbuf_latency.get("avg", 0) / 1000.0
83+
metrics.timing(
84+
"arroyo.producer.librdkafka.avg_outbuf_latency",
85+
avg_latency_ms,
86+
tags={"broker_id": str(broker_id)},
87+
)
88+
89+
rtt = broker_stats.get("rtt", {})
90+
if rtt:
91+
p99_rtt_ms = rtt.get("p99", 0) / 1000.0
92+
metrics.timing(
93+
"arroyo.producer.librdkafka.p99_rtt",
94+
p99_rtt_ms,
95+
tags={"broker_id": str(broker_id)},
96+
)
97+
avg_rtt_ms = rtt.get("avg", 0) / 1000.0
98+
metrics.timing(
99+
"arroyo.producer.librdkafka.avg_rtt",
100+
avg_rtt_ms,
101+
tags={"broker_id": str(broker_id)},
102+
)
103+
104+
105+
def build_kafka_producer_configuration(
106+
default_config: Mapping[str, Any],
107+
bootstrap_servers: Optional[Sequence[str]] = None,
108+
override_params: Optional[Mapping[str, Any]] = None,
109+
) -> KafkaBrokerConfig:
110+
broker_config = build_kafka_configuration(
111+
default_config, bootstrap_servers, override_params
112+
)
113+
114+
broker_config.update(
115+
{
116+
"statistics.interval.ms": STATS_COLLECTION_FREQ_MS,
117+
"stats_cb": producer_stats_callback,
118+
}
119+
)
120+
return broker_config
121+
122+
52123
def build_kafka_consumer_configuration(
53124
default_config: Mapping[str, Any],
54125
group_id: str,

arroyo/utils/metric_defs.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,22 @@
106106
"arroyo.consumer.dlq_buffer.exceeded",
107107
# Gauge: Number of partitions being tracked in the DLQ buffer
108108
"arroyo.consumer.dlq_buffer.assigned_partitions",
109+
# Time: Internal producer queue latency from librdkafka statistics.
110+
# Tagged by broker_id.
111+
"arroyo.producer.librdkafka.p99_int_latency",
112+
# Time: Output buffer latency from librdkafka statistics.
113+
# Tagged by broker_id.
114+
"arroyo.producer.librdkafka.p99_outbuf_latency",
115+
# Time: Round-trip time to brokers from librdkafka statistics.
116+
# Tagged by broker_id.
117+
"arroyo.producer.librdkafka.p99_rtt",
118+
# Time: Average internal producer queue latency from librdkafka statistics.
119+
# Tagged by broker_id.
120+
"arroyo.producer.librdkafka.avg_int_latency",
121+
# Time: Average output buffer latency from librdkafka statistics.
122+
# Tagged by broker_id.
123+
"arroyo.producer.librdkafka.avg_outbuf_latency",
124+
# Time: Average round-trip time to brokers from librdkafka statistics.
125+
# Tagged by broker_id.
126+
"arroyo.producer.librdkafka.avg_rtt",
109127
]

rust-arroyo/src/backends/kafka/config.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ impl KafkaConfig {
7777
bootstrap_servers: Vec<String>,
7878
override_params: Option<HashMap<String, String>>,
7979
) -> Self {
80-
let config = KafkaConfig::new_config(bootstrap_servers, None);
80+
let mut config = KafkaConfig::new_config(bootstrap_servers, None);
81+
config.config_map.insert(
82+
"statistics.interval.ms".to_string(),
83+
STATS_COLLECTION_FREQ_MS.to_string(),
84+
);
8185

8286
apply_override_params(config, override_params)
8387
}

rust-arroyo/src/backends/kafka/producer.rs

Lines changed: 189 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,89 @@ use crate::backends::kafka::config::KafkaConfig;
22
use crate::backends::kafka::types::KafkaPayload;
33
use crate::backends::Producer as ArroyoProducer;
44
use crate::backends::ProducerError;
5+
use crate::timer;
56
use crate::types::TopicOrPartition;
7+
use rdkafka::client::ClientContext;
68
use rdkafka::config::ClientConfig;
7-
use rdkafka::producer::{DefaultProducerContext, ThreadedProducer};
9+
use rdkafka::producer::{
10+
DeliveryResult, ProducerContext as RdkafkaProducerContext, ThreadedProducer,
11+
};
12+
use rdkafka::Statistics;
13+
use std::time::Duration;
14+
15+
pub struct ProducerContext;
16+
17+
impl ClientContext for ProducerContext {
18+
fn stats(&self, stats: Statistics) {
19+
for (broker_id, broker_stats) in &stats.brokers {
20+
if let Some(int_latency) = &broker_stats.int_latency {
21+
let p99_latency_ms = int_latency.p99 as f64 / 1000.0;
22+
timer!(
23+
"arroyo.producer.librdkafka.p99_int_latency",
24+
Duration::from_millis(p99_latency_ms as u64),
25+
"broker_id" => broker_id.to_string()
26+
);
27+
// Also record average latency
28+
let avg_latency_ms = int_latency.avg as f64 / 1000.0;
29+
timer!(
30+
"arroyo.producer.librdkafka.avg_int_latency",
31+
Duration::from_millis(avg_latency_ms as u64),
32+
"broker_id" => broker_id.to_string()
33+
);
34+
}
35+
if let Some(outbuf_latency) = &broker_stats.outbuf_latency {
36+
let p99_latency_ms = outbuf_latency.p99 as f64 / 1000.0;
37+
timer!(
38+
"arroyo.producer.librdkafka.p99_outbuf_latency",
39+
Duration::from_millis(p99_latency_ms as u64),
40+
"broker_id" => broker_id.to_string()
41+
);
42+
let avg_latency_ms = outbuf_latency.avg as f64 / 1000.0;
43+
timer!(
44+
"arroyo.producer.librdkafka.avg_outbuf_latency",
45+
Duration::from_millis(avg_latency_ms as u64),
46+
"broker_id" => broker_id.to_string()
47+
);
48+
}
49+
if let Some(rtt) = &broker_stats.rtt {
50+
let p99_rtt_ms = rtt.p99 as f64 / 1000.0;
51+
timer!(
52+
"arroyo.producer.librdkafka.p99_rtt",
53+
Duration::from_millis(p99_rtt_ms as u64),
54+
"broker_id" => broker_id.to_string()
55+
);
56+
let avg_rtt_ms = rtt.avg as f64 / 1000.0;
57+
timer!(
58+
"arroyo.producer.librdkafka.avg_rtt",
59+
Duration::from_millis(avg_rtt_ms as u64),
60+
"broker_id" => broker_id.to_string()
61+
);
62+
}
63+
}
64+
}
65+
}
66+
67+
impl RdkafkaProducerContext for ProducerContext {
68+
type DeliveryOpaque = ();
69+
70+
fn delivery(
71+
&self,
72+
_delivery_result: &DeliveryResult<'_>,
73+
_delivery_opaque: Self::DeliveryOpaque,
74+
) {
75+
}
76+
}
877

978
pub struct KafkaProducer {
10-
producer: ThreadedProducer<DefaultProducerContext>,
79+
producer: ThreadedProducer<ProducerContext>,
1180
}
1281

1382
impl KafkaProducer {
1483
pub fn new(config: KafkaConfig) -> Self {
84+
let context = ProducerContext;
1585
let config_obj: ClientConfig = config.into();
16-
let threaded_producer: ThreadedProducer<_> = config_obj.create().unwrap();
86+
let threaded_producer: ThreadedProducer<_> =
87+
config_obj.create_with_context(context).unwrap();
1788

1889
Self {
1990
producer: threaded_producer,
@@ -39,11 +110,125 @@ impl ArroyoProducer<KafkaPayload> for KafkaProducer {
39110

40111
#[cfg(test)]
41112
mod tests {
42-
use super::KafkaProducer;
113+
use super::{KafkaProducer, ProducerContext};
43114
use crate::backends::kafka::config::KafkaConfig;
44115
use crate::backends::kafka::types::KafkaPayload;
45116
use crate::backends::Producer;
46117
use crate::types::{Topic, TopicOrPartition};
118+
use rdkafka::client::ClientContext;
119+
use rdkafka::statistics::{Broker, Statistics, Window};
120+
use std::collections::HashMap;
121+
122+
fn create_test_statistics_with_all_metrics() -> Statistics {
123+
let mut brokers = HashMap::new();
124+
brokers.insert(
125+
"1".to_string(),
126+
Broker {
127+
int_latency: Some(Window {
128+
p99: 2000, // microseconds -> 2.0 ms
129+
avg: 1000, // microseconds -> 1.0 ms
130+
..Default::default()
131+
}),
132+
outbuf_latency: Some(Window {
133+
p99: 4000, // microseconds -> 4.0 ms
134+
avg: 2000, // microseconds -> 2.0 ms
135+
..Default::default()
136+
}),
137+
rtt: Some(Window {
138+
p99: 1500, // microseconds -> 1.5 ms
139+
avg: 750, // microseconds -> 0.75 ms
140+
..Default::default()
141+
}),
142+
..Default::default()
143+
},
144+
);
145+
146+
Statistics {
147+
brokers,
148+
..Default::default()
149+
}
150+
}
151+
152+
fn create_test_statistics_with_partial_metrics() -> Statistics {
153+
let mut brokers = HashMap::new();
154+
brokers.insert(
155+
"1".to_string(),
156+
Broker {
157+
int_latency: Some(Window {
158+
p99: 2000,
159+
avg: 1000,
160+
..Default::default()
161+
}),
162+
outbuf_latency: Some(Window {
163+
p99: 4000,
164+
avg: 2000,
165+
..Default::default()
166+
}),
167+
// No RTT data
168+
..Default::default()
169+
},
170+
);
171+
172+
Statistics {
173+
brokers,
174+
..Default::default()
175+
}
176+
}
177+
178+
fn create_test_statistics_empty_brokers() -> Statistics {
179+
Statistics {
180+
brokers: HashMap::new(),
181+
..Default::default()
182+
}
183+
}
184+
185+
fn create_test_statistics_empty_broker_stats() -> Statistics {
186+
let mut brokers = HashMap::new();
187+
brokers.insert("1".to_string(), Broker::default());
188+
189+
Statistics {
190+
brokers,
191+
..Default::default()
192+
}
193+
}
194+
195+
#[test]
196+
fn test_producer_context_stats_with_all_metrics() {
197+
let context = ProducerContext;
198+
let stats = create_test_statistics_with_all_metrics();
199+
200+
// This test verifies that the stats callback processes all metrics correctly
201+
// We can't easily mock the timer! macro, but we can verify the method runs without panicking
202+
context.stats(stats);
203+
}
204+
205+
#[test]
206+
fn test_producer_context_stats_with_partial_metrics() {
207+
let context = ProducerContext;
208+
let stats = create_test_statistics_with_partial_metrics();
209+
210+
// This test verifies that the stats callback handles missing RTT data gracefully
211+
context.stats(stats);
212+
}
213+
214+
#[test]
215+
fn test_producer_context_stats_no_brokers() {
216+
let context = ProducerContext;
217+
let stats = create_test_statistics_empty_brokers();
218+
219+
// This test verifies that the stats callback handles empty broker data gracefully
220+
context.stats(stats);
221+
}
222+
223+
#[test]
224+
fn test_producer_context_stats_empty_broker_stats() {
225+
let context = ProducerContext;
226+
let stats = create_test_statistics_empty_broker_stats();
227+
228+
// This test verifies that the stats callback handles broker with no metrics gracefully
229+
context.stats(stats);
230+
}
231+
47232
#[test]
48233
fn test_producer() {
49234
let topic = Topic::new("test");

0 commit comments

Comments
 (0)