Skip to content

Commit 8bb389b

Browse files
Merge branch 'main' into update-error-handling-api
2 parents 3174080 + 7acac04 commit 8bb389b

File tree

16 files changed

+559
-102
lines changed

16 files changed

+559
-102
lines changed

.toys/.data/releases.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,10 @@ gems:
229229
directory: processor/baggage
230230
version_constant: [OpenTelemetry, Processor, Baggage, VERSION]
231231

232+
- name: opentelemetry-propagator-google_cloud_trace_context
233+
directory: propagator/google_cloud_trace_context
234+
version_constant: [OpenTelemetry, Propagator, GoogleCloudTraceContext, VERSION]
235+
232236
- name: opentelemetry-propagator-ottrace
233237
directory: propagator/ottrace
234238
version_constant: [OpenTelemetry, Propagator, OTTrace, VERSION]

instrumentation/rdkafka/Appraisals

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#
55
# SPDX-License-Identifier: Apache-2.0
66

7-
%w[0.12.0 0.13.0 0.14.0 0.15.0 0.16.0 0.17.0 0.18.0 0.19.0].each do |version|
7+
%w[0.18.0 0.19.0 0.20.0 0.21.0].each do |version|
88
appraise "rdkafka-#{version}" do
99
gem 'rdkafka', "~> #{version}"
1010
end

instrumentation/rdkafka/lib/opentelemetry/instrumentation/rdkafka/instrumentation.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ module Rdkafka
1111
class Instrumentation < OpenTelemetry::Instrumentation::Base
1212
compatible do
1313
gem_version = Gem::Version.new(::Rdkafka::VERSION)
14-
Gem::Requirement.new('>= 0.10.0', '< 0.20.0').satisfied_by?(gem_version)
14+
Gem::Requirement.new('>= 0.18.0').satisfied_by?(gem_version)
1515
end
1616

1717
install do |_config|

instrumentation/rdkafka/lib/opentelemetry/instrumentation/rdkafka/patches/consumer.rb

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,7 @@ module Rdkafka
1010
module Patches
1111
# The Consumer module contains the instrumentation patch for the Consumer class
1212
module Consumer
13-
GETTER = if Gem::Version.new(::Rdkafka::VERSION) >= Gem::Version.new('0.13.0')
14-
Context::Propagation.text_map_getter
15-
else
16-
OpenTelemetry::Common::Propagation.symbol_key_getter
17-
end
13+
GETTER = Context::Propagation.text_map_getter
1814
private_constant :GETTER
1915

2016
def each
@@ -42,26 +38,30 @@ def each
4238
end
4339
end
4440

45-
def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
46-
super do |messages, error|
47-
if messages.empty?
48-
yield messages, error
49-
else
50-
attributes = {
51-
'messaging.system' => 'kafka',
52-
'messaging.destination_kind' => 'topic',
53-
'messaging.kafka.message_count' => messages.size
54-
}
41+
# each_batch method is deleted in rdkafka-ruby-0.20.0
42+
# But, rdkafka-ruby-0.19.x and 0.18.x are still maintained
43+
if Gem::Version.new(::Rdkafka::VERSION) < Gem::Version.new('0.20.0')
44+
def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250, yield_on_error: false, &block)
45+
super do |messages, error|
46+
if messages.empty?
47+
yield messages, error
48+
else
49+
attributes = {
50+
'messaging.system' => 'kafka',
51+
'messaging.destination_kind' => 'topic',
52+
'messaging.kafka.message_count' => messages.size
53+
}
5554

56-
links = messages.map do |message|
57-
trace_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER)
58-
span_context = OpenTelemetry::Trace.current_span(trace_context).context
59-
OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
60-
end
61-
links.compact!
55+
links = messages.map do |message|
56+
trace_context = OpenTelemetry.propagation.extract(message.headers, getter: GETTER)
57+
span_context = OpenTelemetry::Trace.current_span(trace_context).context
58+
OpenTelemetry::Trace::Link.new(span_context) if span_context.valid?
59+
end
60+
links.compact!
6261

63-
tracer.in_span('batch process', attributes: attributes, links: links, kind: :consumer) do
64-
yield messages, error
62+
tracer.in_span('batch process', attributes: attributes, links: links, kind: :consumer) do
63+
yield messages, error
64+
end
6565
end
6666
end
6767
end

instrumentation/rdkafka/test/opentelemetry/instrumentation/rdkafka/patches/consumer_test.rb

Lines changed: 63 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -166,68 +166,71 @@
166166
end
167167
end
168168

169-
describe '#each_batch' do
170-
it 'traces each_batch call' do
171-
skip "#{Rdkafka::VERSION} is not supported" unless instrumentation.compatible?
172-
173-
rand_hash = SecureRandom.hex(10)
174-
topic_name = "consumer-patch-batch-trace-#{rand_hash}"
175-
config = { 'bootstrap.servers': "#{host}:#{port}" }
176-
177-
producer = Rdkafka::Config.new(config).producer
178-
delivery_handles = []
179-
180-
delivery_handles << producer.produce(
181-
topic: topic_name,
182-
payload: 'wow',
183-
key: 'Key 1'
184-
)
185-
186-
delivery_handles << producer.produce(
187-
topic: topic_name,
188-
payload: 'super',
189-
key: 'Key 2'
190-
)
191-
192-
delivery_handles.each(&:wait)
193-
194-
consumer_config = config.merge(
195-
'group.id': 'me',
196-
'auto.offset.reset': 'smallest' # https://stackoverflow.com/a/51081649
197-
)
198-
consumer = Rdkafka::Config.new(config.merge(consumer_config)).consumer
199-
consumer.subscribe(topic_name)
200-
201-
begin
202-
consumer.each_batch(max_items: 2) do |messages|
203-
raise 'oops' unless messages.empty?
169+
# each_batch method is deleted in rdkafka 0.20.0
170+
if Gem::Version.new(Rdkafka::VERSION) < Gem::Version.new('0.20.0')
171+
describe '#each_batch' do
172+
it 'traces each_batch call' do
173+
skip "#{Rdkafka::VERSION} is not supported" unless instrumentation.compatible?
174+
175+
rand_hash = SecureRandom.hex(10)
176+
topic_name = "consumer-patch-batch-trace-#{rand_hash}"
177+
config = { 'bootstrap.servers': "#{host}:#{port}" }
178+
179+
producer = Rdkafka::Config.new(config).producer
180+
delivery_handles = []
181+
182+
delivery_handles << producer.produce(
183+
topic: topic_name,
184+
payload: 'wow',
185+
key: 'Key 1'
186+
)
187+
188+
delivery_handles << producer.produce(
189+
topic: topic_name,
190+
payload: 'super',
191+
key: 'Key 2'
192+
)
193+
194+
delivery_handles.each(&:wait)
195+
196+
consumer_config = config.merge(
197+
'group.id': 'me',
198+
'auto.offset.reset': 'smallest' # https://stackoverflow.com/a/51081649
199+
)
200+
consumer = Rdkafka::Config.new(config.merge(consumer_config)).consumer
201+
consumer.subscribe(topic_name)
202+
203+
begin
204+
consumer.each_batch(max_items: 2) do |messages|
205+
raise 'oops' unless messages.empty?
206+
end
207+
rescue StandardError
204208
end
205-
rescue StandardError
206-
end
207-
208-
span = spans.find { |s| s.name == 'batch process' }
209-
_(span.kind).must_equal(:consumer)
210-
_(span.attributes['messaging.kafka.message_count']).must_equal(2)
211209

212-
event = span.events.first
213-
_(event.name).must_equal('exception')
214-
_(event.attributes['exception.type']).must_equal('RuntimeError')
215-
_(event.attributes['exception.message']).must_equal('oops')
216-
217-
first_link = span.links[0]
218-
linked_span_context = first_link.span_context
219-
_(linked_span_context.trace_id).must_equal(spans[0].trace_id)
220-
_(linked_span_context.span_id).must_equal(spans[0].span_id)
221-
222-
second_link = span.links[1]
223-
linked_span_context = second_link.span_context
224-
_(linked_span_context.trace_id).must_equal(spans[1].trace_id)
225-
_(linked_span_context.span_id).must_equal(spans[1].span_id)
226-
227-
_(spans.size).must_equal(3)
228-
ensure
229-
begin; producer&.close; rescue StandardError; end
230-
begin; consumer&.close; rescue StandardError; end
210+
span = spans.find { |s| s.name == 'batch process' }
211+
_(span.kind).must_equal(:consumer)
212+
_(span.attributes['messaging.kafka.message_count']).must_equal(2)
213+
214+
event = span.events.first
215+
_(event.name).must_equal('exception')
216+
_(event.attributes['exception.type']).must_equal('RuntimeError')
217+
_(event.attributes['exception.message']).must_equal('oops')
218+
219+
first_link = span.links[0]
220+
linked_span_context = first_link.span_context
221+
_(linked_span_context.trace_id).must_equal(spans[0].trace_id)
222+
_(linked_span_context.span_id).must_equal(spans[0].span_id)
223+
224+
second_link = span.links[1]
225+
linked_span_context = second_link.span_context
226+
_(linked_span_context.trace_id).must_equal(spans[1].trace_id)
227+
_(linked_span_context.span_id).must_equal(spans[1].span_id)
228+
229+
_(spans.size).must_equal(3)
230+
ensure
231+
begin; producer&.close; rescue StandardError; end
232+
begin; consumer&.close; rescue StandardError; end
233+
end
231234
end
232235
end
233236
end
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
11
# Release History: opentelemetry-propagator-google_cloud_trace_context
2+
3+
### v0.1.0 / 2025-05-01
4+
5+
Initial release.

propagator/google_cloud_trace_context/lib/opentelemetry/propagator/google_cloud_trace_context/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ module Propagator
1616
# Namespace for OpenTelemetry GoogleCloudTraceContext propagation
1717
module GoogleCloudTraceContext
1818
# Current gem version
19-
VERSION = '0.0.0'
19+
VERSION = '0.1.0'
2020
end
2121
end
2222
end

propagator/xray/lib/opentelemetry/propagator/xray.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the documentation for the `opentelemetry-api` gem for details.
1313

1414
require_relative 'xray/text_map_propagator'
15+
require_relative 'xray/lambda_text_map_propagator'
1516

1617
module OpenTelemetry
1718
# Namespace for OpenTelemetry propagator extension libraries
@@ -22,7 +23,8 @@ module XRay
2223

2324
DEBUG_CONTEXT_KEY = Context.create_key('xray-debug-key')
2425
TEXT_MAP_PROPAGATOR = TextMapPropagator.new
25-
private_constant :DEBUG_CONTEXT_KEY, :TEXT_MAP_PROPAGATOR
26+
LAMBDA_TEXT_MAP_PROPAGATOR = LambdaTextMapPropagator.new
27+
private_constant :DEBUG_CONTEXT_KEY, :TEXT_MAP_PROPAGATOR, :LAMBDA_TEXT_MAP_PROPAGATOR
2628

2729
# @api private
2830
# Returns a new context with the xray debug flag enabled
@@ -41,6 +43,12 @@ def debug?(context)
4143
def text_map_propagator
4244
TEXT_MAP_PROPAGATOR
4345
end
46+
47+
# Returns a text map propagator that propagates context in the XRay
48+
# format with special handling for Lambda environment.
49+
def lambda_text_map_propagator
50+
LAMBDA_TEXT_MAP_PROPAGATOR
51+
end
4452
end
4553
end
4654
end
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
# OpenTelemetry is an open source observability framework, providing a
8+
# general-purpose API, SDK, and related tools required for the instrumentation
9+
# of cloud-native software, frameworks, and libraries.
10+
#
11+
# The OpenTelemetry module provides global accessors for telemetry objects.
12+
# See the documentation for the `opentelemetry-api` gem for details.
13+
module OpenTelemetry
14+
# Namespace for OpenTelemetry propagator extension libraries
15+
module Propagator
16+
# Namespace for OpenTelemetry XRay propagation
17+
module XRay
18+
# Implementation of AWS X-Ray Trace Header propagation with special handling for
19+
# Lambda's _X_AMZN_TRACE_ID environment variable
20+
class LambdaTextMapPropagator < TextMapPropagator
21+
AWS_TRACE_HEADER_ENV_KEY = '_X_AMZN_TRACE_ID'
22+
23+
# Extract trace context from the supplied carrier or from Lambda environment variable
24+
# If extraction fails, the original context will be returned
25+
#
26+
# @param [Carrier] carrier The carrier to get the header from
27+
# @param [optional Context] context Context to be updated with the trace context
28+
# extracted from the carrier. Defaults to +Context.current+.
29+
# @param [optional Getter] getter If the optional getter is provided, it
30+
# will be used to read the header from the carrier, otherwise the default
31+
# text map getter will be used.
32+
#
33+
# @return [Context] context updated with extracted baggage, or the original context
34+
# if extraction fails
35+
def extract(carrier, context: Context.current, getter: Context::Propagation.text_map_getter)
36+
# Check if the original input context already has a valid span
37+
span_context = Trace.current_span(context).context
38+
# If original context is valid, just return it - do not extract from carrier
39+
return context if span_context.valid?
40+
41+
# First try to extract from the carrier using the standard X-Ray propagator
42+
xray_context = super
43+
44+
# Check if we successfully extracted a context from the carrier
45+
span_context = Trace.current_span(xray_context).context
46+
return xray_context if span_context.valid?
47+
48+
# If not, check for the Lambda environment variable
49+
trace_header = ENV.fetch(AWS_TRACE_HEADER_ENV_KEY, nil)
50+
return xray_context unless trace_header
51+
52+
# Create a carrier with the trace header and extract from it
53+
env_carrier = { XRAY_CONTEXT_KEY => trace_header }
54+
super(env_carrier, context: xray_context, getter: getter)
55+
rescue OpenTelemetry::Error
56+
context
57+
end
58+
end
59+
end
60+
end
61+
end

0 commit comments

Comments
 (0)