Skip to content

Commit 0c5e5bf

Browse files
committed
fix: consider each_batch removed in rdkafka-0.20.0
1 parent 572b3f8 commit 0c5e5bf

File tree

2 files changed

+84
-78
lines changed
  • instrumentation/rdkafka

2 files changed

+84
-78
lines changed

instrumentation/rdkafka/lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,26 +42,29 @@ def each
4242
end
4343
end
4444

45-
def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
46-
super do |messages, error|
47-
if messages.empty?
48-
yield messages, error
49-
else
50-
attributes = {
51-
'messaging.system' => 'kafka',
52-
'messaging.destination_kind' => 'topic',
53-
'messaging.kafka.message_count' => messages.size
54-
}
45+
# each_batch method is deleted in rdkafka 0.20.0
46+
if Gem::Version.new(::Rdkafka::VERSION) < Gem::Version.new('0.20.0')
47+
def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
48+
super do |messages, error|
49+
if messages.empty?
50+
yield messages, error
51+
else
52+
attributes = {
53+
'messaging.system' => 'kafka',
54+
'messaging.destination_kind' => 'topic',
55+
'messaging.kafka.message_count' => messages.size
56+
}
5557

56-
links = messages.map do |message|
57-
trace_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER)
58-
span_context = OpenTelemetry::Trace.current_span(trace_context).context
59-
OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
60-
end
61-
links.compact!
58+
links = messages.map do |message|
59+
trace_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER)
60+
span_context = OpenTelemetry::Trace.current_span(trace_context).context
61+
OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
62+
end
63+
links.compact!
6264

63-
tracer.in_span('batch process', attributes: attributes, links: links, kind: :consumer) do
64-
yield messages, error
65+
tracer.in_span('batch process', attributes: attributes, links: links, kind: :consumer) do
66+
yield messages, error
67+
end
6568
end
6669
end
6770
end

instrumentation/rdkafka/test/opentelemetry/instrumentation/rdkafka/patches/consumer_test.rb

Lines changed: 63 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -166,68 +166,71 @@
166166
end
167167
end
168168

169-
describe '#each_batch' do
170-
it 'traces each_batch call' do
171-
skip "#{Rdkafka::VERSION} is not supported" unless instrumentation.compatible?
172-
173-
rand_hash = SecureRandom.hex(10)
174-
topic_name = "consumer-patch-batch-trace-#{rand_hash}"
175-
config = { 'bootstrap.servers': "#{host}:#{port}" }
176-
177-
producer = Rdkafka::Config.new(config).producer
178-
delivery_handles = []
179-
180-
delivery_handles << producer.produce(
181-
topic: topic_name,
182-
payload: 'wow',
183-
key: 'Key 1'
184-
)
185-
186-
delivery_handles << producer.produce(
187-
topic: topic_name,
188-
payload: 'super',
189-
key: 'Key 2'
190-
)
191-
192-
delivery_handles.each(&:wait)
193-
194-
consumer_config = config.merge(
195-
'group.id': 'me',
196-
'auto.offset.reset': 'smallest' # https://stackoverflow.com/a/51081649
197-
)
198-
consumer = Rdkafka::Config.new(config.merge(consumer_config)).consumer
199-
consumer.subscribe(topic_name)
200-
201-
begin
202-
consumer.each_batch(max_items: 2) do |messages|
203-
raise 'oops' unless messages.empty?
169+
# each_batch method is deleted in rdkafka 0.20.0
170+
if Gem::Version.new(::Rdkafka::VERSION) < Gem::Version.new('0.20.0')
171+
describe '#each_batch' do
172+
it 'traces each_batch call' do
173+
skip "#{Rdkafka::VERSION} is not supported" unless instrumentation.compatible?
174+
175+
rand_hash = SecureRandom.hex(10)
176+
topic_name = "consumer-patch-batch-trace-#{rand_hash}"
177+
config = { 'bootstrap.servers': "#{host}:#{port}" }
178+
179+
producer = Rdkafka::Config.new(config).producer
180+
delivery_handles = []
181+
182+
delivery_handles << producer.produce(
183+
topic: topic_name,
184+
payload: 'wow',
185+
key: 'Key 1'
186+
)
187+
188+
delivery_handles << producer.produce(
189+
topic: topic_name,
190+
payload: 'super',
191+
key: 'Key 2'
192+
)
193+
194+
delivery_handles.each(&:wait)
195+
196+
consumer_config = config.merge(
197+
'group.id': 'me',
198+
'auto.offset.reset': 'smallest' # https://stackoverflow.com/a/51081649
199+
)
200+
consumer = Rdkafka::Config.new(config.merge(consumer_config)).consumer
201+
consumer.subscribe(topic_name)
202+
203+
begin
204+
consumer.each_batch(max_items: 2) do |messages|
205+
raise 'oops' unless messages.empty?
206+
end
207+
rescue StandardError
204208
end
205-
rescue StandardError
206-
end
207-
208-
span = spans.find { |s| s.name == 'batch process' }
209-
_(span.kind).must_equal(:consumer)
210-
_(span.attributes['messaging.kafka.message_count']).must_equal(2)
211209

212-
event = span.events.first
213-
_(event.name).must_equal('exception')
214-
_(event.attributes['exception.type']).must_equal('RuntimeError')
215-
_(event.attributes['exception.message']).must_equal('oops')
216-
217-
first_link = span.links[0]
218-
linked_span_context = first_link.span_context
219-
_(linked_span_context.trace_id).must_equal(spans[0].trace_id)
220-
_(linked_span_context.span_id).must_equal(spans[0].span_id)
221-
222-
second_link = span.links[1]
223-
linked_span_context = second_link.span_context
224-
_(linked_span_context.trace_id).must_equal(spans[1].trace_id)
225-
_(linked_span_context.span_id).must_equal(spans[1].span_id)
226-
227-
_(spans.size).must_equal(3)
228-
ensure
229-
begin; producer&.close; rescue StandardError; end
230-
begin; consumer&.close; rescue StandardError; end
210+
span = spans.find { |s| s.name == 'batch process' }
211+
_(span.kind).must_equal(:consumer)
212+
_(span.attributes['messaging.kafka.message_count']).must_equal(2)
213+
214+
event = span.events.first
215+
_(event.name).must_equal('exception')
216+
_(event.attributes['exception.type']).must_equal('RuntimeError')
217+
_(event.attributes['exception.message']).must_equal('oops')
218+
219+
first_link = span.links[0]
220+
linked_span_context = first_link.span_context
221+
_(linked_span_context.trace_id).must_equal(spans[0].trace_id)
222+
_(linked_span_context.span_id).must_equal(spans[0].span_id)
223+
224+
second_link = span.links[1]
225+
linked_span_context = second_link.span_context
226+
_(linked_span_context.trace_id).must_equal(spans[1].trace_id)
227+
_(linked_span_context.span_id).must_equal(spans[1].span_id)
228+
229+
_(spans.size).must_equal(3)
230+
ensure
231+
begin; producer&.close; rescue StandardError; end
232+
begin; consumer&.close; rescue StandardError; end
233+
end
231234
end
232235
end
233236
end

0 commit comments

Comments
 (0)