Skip to content

Commit de95123

Browse files
committed
fix: delete each_batch method from targets of instrumentation
1 parent 2d68893 commit de95123

File tree

2 files changed

+0
-96
lines changed
  • instrumentation/rdkafka

2 files changed

+0
-96
lines changed

instrumentation/rdkafka/lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,34 +42,6 @@ def each
4242
end
4343
end
4444

45-
# each_batch method is deleted in rdkafka 0.20.0
46-
if Gem::Version.new(::Rdkafka::VERSION) < Gem::Version.new('0.20.0')
47-
def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
48-
super do |messages, error|
49-
if messages.empty?
50-
yield messages, error
51-
else
52-
attributes = {
53-
'messaging.system' => 'kafka',
54-
'messaging.destination_kind' => 'topic',
55-
'messaging.kafka.message_count' => messages.size
56-
}
57-
58-
links = messages.map do |message|
59-
trace_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER)
60-
span_context = OpenTelemetry::Trace.current_span(trace_context).context
61-
OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
62-
end
63-
links.compact!
64-
65-
tracer.in_span('batch process', attributes: attributes, links: links, kind: :consumer) do
66-
yield messages, error
67-
end
68-
end
69-
end
70-
end
71-
end
72-
7345
private
7446

7547
def tracer

instrumentation/rdkafka/test/opentelemetry/instrumentation/rdkafka/patches/consumer_test.rb

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -165,74 +165,6 @@
165165
begin; consumer&.close; rescue StandardError; end
166166
end
167167
end
168-
169-
# each_batch method is deleted in rdkafka 0.20.0
170-
if Gem::Version.new(Rdkafka::VERSION) < Gem::Version.new('0.20.0')
171-
describe '#each_batch' do
172-
it 'traces each_batch call' do
173-
skip "#{Rdkafka::VERSION} is not supported" unless instrumentation.compatible?
174-
175-
rand_hash = SecureRandom.hex(10)
176-
topic_name = "consumer-patch-batch-trace-#{rand_hash}"
177-
config = { 'bootstrap.servers': "#{host}:#{port}" }
178-
179-
producer = Rdkafka::Config.new(config).producer
180-
delivery_handles = []
181-
182-
delivery_handles << producer.produce(
183-
topic: topic_name,
184-
payload: 'wow',
185-
key: 'Key 1'
186-
)
187-
188-
delivery_handles << producer.produce(
189-
topic: topic_name,
190-
payload: 'super',
191-
key: 'Key 2'
192-
)
193-
194-
delivery_handles.each(&:wait)
195-
196-
consumer_config = config.merge(
197-
'group.id': 'me',
198-
'auto.offset.reset': 'smallest' # https://stackoverflow.com/a/51081649
199-
)
200-
consumer = Rdkafka::Config.new(config.merge(consumer_config)).consumer
201-
consumer.subscribe(topic_name)
202-
203-
begin
204-
consumer.each_batch(max_items: 2) do |messages|
205-
raise 'oops' unless messages.empty?
206-
end
207-
rescue StandardError
208-
end
209-
210-
span = spans.find { |s| s.name == 'batch process' }
211-
_(span.kind).must_equal(:consumer)
212-
_(span.attributes['messaging.kafka.message_count']).must_equal(2)
213-
214-
event = span.events.first
215-
_(event.name).must_equal('exception')
216-
_(event.attributes['exception.type']).must_equal('RuntimeError')
217-
_(event.attributes['exception.message']).must_equal('oops')
218-
219-
first_link = span.links[0]
220-
linked_span_context = first_link.span_context
221-
_(linked_span_context.trace_id).must_equal(spans[0].trace_id)
222-
_(linked_span_context.span_id).must_equal(spans[0].span_id)
223-
224-
second_link = span.links[1]
225-
linked_span_context = second_link.span_context
226-
_(linked_span_context.trace_id).must_equal(spans[1].trace_id)
227-
_(linked_span_context.span_id).must_equal(spans[1].span_id)
228-
229-
_(spans.size).must_equal(3)
230-
ensure
231-
begin; producer&.close; rescue StandardError; end
232-
begin; consumer&.close; rescue StandardError; end
233-
end
234-
end
235-
end
236168
end
237169
end
238170

0 commit comments

Comments
 (0)