Skip to content

Commit a697fd2

Browse files
committed
Instrument fetch_batch for each_message
1 parent ef79678 commit a697fd2

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

lib/kafka/consumer.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,15 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica
202202
)
203203

204204
batches.each do |batch|
205+
unless batch.empty?
206+
@instrumenter.instrument("fetch_batch.consumer", {
207+
topic: batch.topic,
208+
partition: batch.partition,
209+
offset_lag: batch.offset_lag,
210+
highwater_mark_offset: batch.highwater_mark_offset,
211+
message_count: batch.messages.count,
212+
})
213+
end
205214
batch.messages.each do |message|
206215
notification = {
207216
topic: message.topic,

spec/consumer_spec.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,18 @@
7575
]
7676
}
7777

78+
it "instruments" do
79+
expect(instrumenter).to receive(:instrument).once.with('fetch_batch.consumer', anything)
80+
expect(instrumenter).to receive(:instrument).once.with('start_process_message.consumer', anything)
81+
expect(instrumenter).to receive(:instrument).once.with('process_message.consumer', anything)
82+
83+
allow(instrumenter).to receive(:instrument).and_call_original
84+
85+
consumer.each_message do |message|
86+
consumer.stop
87+
end
88+
end
89+
7890
it "raises ProcessingError if the processing code fails" do
7991
expect {
8092
consumer.each_message do |message|

0 commit comments

Comments
 (0)