Skip to content

Commit 5578607

Browse files
authored
Merge pull request #496 from zendesk/dasch/instrument-at-start
Instrument the start of message/batch processing
2 parents fbc4bbf + 08aebf7 commit 5578607

File tree

1 file changed

+28
-20
lines changed

1 file changed

+28
-20
lines changed

lib/kafka/consumer.rb

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -203,17 +203,21 @@ def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatica
203203

204204
batches.each do |batch|
205205
batch.messages.each do |message|
206-
@instrumenter.instrument("process_message.consumer") do |notification|
207-
notification.update(
208-
topic: message.topic,
209-
partition: message.partition,
210-
offset: message.offset,
211-
offset_lag: batch.highwater_mark_offset - message.offset - 1,
212-
create_time: message.create_time,
213-
key: message.key,
214-
value: message.value,
215-
)
216-
206+
notification = {
207+
topic: message.topic,
208+
partition: message.partition,
209+
offset: message.offset,
210+
offset_lag: batch.highwater_mark_offset - message.offset - 1,
211+
create_time: message.create_time,
212+
key: message.key,
213+
value: message.value,
214+
}
215+
216+
# Instrument an event immediately so that subscribers don't have to wait until
217+
# the block is completed.
218+
@instrumenter.instrument("start_process_message.consumer", notification)
219+
220+
@instrumenter.instrument("process_message.consumer", notification) do
217221
begin
218222
yield message
219223
@current_offsets[message.topic][message.partition] = message.offset
@@ -278,15 +282,19 @@ def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automaticall
278282

279283
batches.each do |batch|
280284
unless batch.empty?
281-
@instrumenter.instrument("process_batch.consumer") do |notification|
282-
notification.update(
283-
topic: batch.topic,
284-
partition: batch.partition,
285-
offset_lag: batch.offset_lag,
286-
highwater_mark_offset: batch.highwater_mark_offset,
287-
message_count: batch.messages.count,
288-
)
289-
285+
notification = {
286+
topic: batch.topic,
287+
partition: batch.partition,
288+
offset_lag: batch.offset_lag,
289+
highwater_mark_offset: batch.highwater_mark_offset,
290+
message_count: batch.messages.count,
291+
}
292+
293+
# Instrument an event immediately so that subscribers don't have to wait until
294+
# the block is completed.
295+
@instrumenter.instrument("start_process_batch.consumer", notification)
296+
297+
@instrumenter.instrument("process_batch.consumer", notification) do
290298
begin
291299
yield batch
292300
@current_offsets[batch.topic][batch.partition] = batch.last_offset

0 commit comments

Comments
 (0)