Skip to content

Commit a7a3541

Browse files
committed
speculative API for metrics in instrumentation
1 parent 27b97f4 commit a7a3541

File tree

4 files changed

+176
-54
lines changed

4 files changed

+176
-54
lines changed

instrumentation/base/lib/opentelemetry/instrumentation/base.rb

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,13 +166,44 @@ def option(name, default:, validate:)
166166
def instance
167167
@instance || SINGLETON_MUTEX.synchronize do
168168
@instance ||= new(instrumentation_name, instrumentation_version, install_blk,
169-
present_blk, compatible_blk, options)
169+
present_blk, compatible_blk, options, instrument_configs)
170170
end
171171
end
172172

173+
if defined?(OpenTelemetry::Metrics)
174+
%i[
175+
counter asynchronous_counter
176+
histogram gauge asynchronous_gauge
177+
updown_counter asynchronous_updown_counter
178+
].each do |instrument_kind|
179+
define_method(instrument_kind) do |name, **opts|
180+
register_instrument(instrument_kind, name, **opts)
181+
end
182+
end
183+
184+
def register_instrument(kind, name, **opts)
185+
@instrument_configs ||= {}
186+
187+
key = [kind, name]
188+
if @instrument_configs.key?(key)
189+
warn("Duplicate instrument configured for #{self}: #{key.inspect}")
190+
else
191+
@instrument_configs[key] = opts
192+
end
193+
end
194+
else
195+
def counter(*, **); end
196+
def asynchronous_counter(*, **); end
197+
def histogram(*, **); end
198+
def gauge(*, **); end
199+
def asynchronous_gauge(*, **); end
200+
def updown_counter(*, **); end
201+
def asynchronous_updown_counter(*, **); end
202+
end
203+
173204
private
174205

175-
attr_reader :install_blk, :present_blk, :compatible_blk, :options
206+
attr_reader :install_blk, :present_blk, :compatible_blk, :options, :instrument_configs
176207

177208
def infer_name
178209
@inferred_name ||= if (md = name.match(NAME_REGEX)) # rubocop:disable Naming/MemoizedInstanceVariableName
@@ -192,13 +223,13 @@ def infer_version
192223
end
193224
end
194225

195-
attr_reader :name, :version, :config, :installed, :tracer, :meter
226+
attr_reader :name, :version, :config, :installed, :tracer, :meter, :instrument_configs
196227

197228
alias installed? installed
198229

199230
# rubocop:disable Metrics/ParameterLists
200231
def initialize(name, version, install_blk, present_blk,
201-
compatible_blk, options)
232+
compatible_blk, options, instrument_configs)
202233
@name = name
203234
@version = version
204235
@install_blk = install_blk
@@ -209,7 +240,8 @@ def initialize(name, version, install_blk, present_blk,
209240
@options = options
210241
@tracer = OpenTelemetry::Trace::Tracer.new # default no-op tracer
211242

212-
@meter = OpenTelemetry::Metrics::Meter.new if defined?(OpenTelemetry::Meter) # default no-op meter
243+
@meter = OpenTelemetry::Metrics::Meter.new if defined?(OpenTelemetry::Metrics::Meter) # default no-op meter
244+
@instrument_configs = instrument_configs || {}
213245
end
214246
# rubocop:enable Metrics/ParameterLists
215247

@@ -277,6 +309,7 @@ def metrics_enabled?
277309
end
278310

279311
# @api private
312+
# ONLY yields if the meter is enabled.
280313
def with_meter
281314
yield @meter if metrics_enabled?
282315
end

instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/instrumentation.rb

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,26 +109,61 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base
109109
option :peer_service, default: nil, validate: :string
110110
option :metrics, default: false, validate: :boolean
111111

112+
# FIXME: descriptions?
113+
114+
if defined?(OpenTelemetry::Metrics)
115+
counter 'messaging.client.sent.messages'
116+
histogram 'messaging.client.operation.duration', unit: 's' # FIXME: UCUM::S
117+
counter 'messaging.client.consumed.messages'
118+
histogram 'messaging.process.duration', unit: 's'
119+
120+
# FIXME: not semconv
121+
gauge 'messaging.queue.latency', unit: 's'
122+
end
123+
112124
# FIXME: upstream
113-
def get_counter(name, description: nil)
114-
return unless metrics_enabled?
125+
def counter(name)
126+
get_instrument(:counter, name)
127+
end
115128

116-
binding.pry
117-
# FIXME: structural keys
118-
# FIXME: mutex counter creation (& reads?)
119-
INSTRUMENTS[[name, description]] ||= meter.create_counter(name, description: description)
129+
# FIXME: upstream
130+
def histogram(name)
131+
get_instrument(:histogram, name)
132+
end
133+
134+
# FIXME: upstream
135+
def gauge(name)
136+
get_instrument(:gauge, name)
120137
end
121138

122139
private
123140

124-
# FIXME: upstream
125-
INSTRUMENTS = {}
141+
def get_instrument(kind, name)
142+
return unless metrics_enabled?
143+
144+
@instruments ||= {}
145+
@instruments[[kind, name]] ||= create_configured_instrument(kind, name)
146+
end
147+
148+
def create_configured_instrument(kind, name)
149+
config = @instrument_configs[[kind, name]]
150+
151+
if config.nil?
152+
Kernel.warn("unconfigured instrument requested: #{kind} of '#{name}'")
153+
return
154+
end
155+
156+
# FIXME: some of these have different opts;
157+
# should verify that they work before this point.
158+
meter.public_send(:"create_#{kind}", name, **config)
159+
end
126160

127161
def gem_version
128162
Gem::Version.new(::Sidekiq::VERSION)
129163
end
130164

131165
def require_dependencies
166+
require_relative 'middlewares/common'
132167
require_relative 'middlewares/client/tracer_middleware'
133168
require_relative 'middlewares/server/tracer_middleware'
134169

instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/client/tracer_middleware.rb

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,27 +34,41 @@ def call(_worker_class, job, _queue, _redis_pool)
3434
span.add_event('created_at', timestamp: job['created_at'])
3535
yield
3636
end.tap do
37-
# FIXME: is it possible to detect failures here? Does sidekiq bubble them up the middlewares?
38-
with_meter do |meter|
39-
counter_attributes = metrics_attributes(job).merge(
40-
{
41-
'messaging.operation.name' => 'enqueue' # FIXME: metrics semconv
42-
# server.address => # FIXME: required if available
43-
# messaging.destination.partition.id => FIXME: recommended
44-
# server.port => # FIXME: recommended
45-
}
46-
)
47-
48-
# FIXME: avoid create_counter repetition?
49-
binding.pry
50-
counter = instrumentation.get_counter('messaging.client.sent.messages')
51-
counter.add(1, attributes: counter_attributes)
52-
end
37+
# FIXME: is it possible/necessary to detect failures here? Does sidekiq bubble them up the middlewares?
38+
count_sent_message(job)
5339
end
5440
end
5541

5642
private
5743

44+
def count_sent_message(job)
45+
with_meter do |_meter|
46+
counter_attributes = metrics_attributes(job).merge(
47+
{
48+
'messaging.operation.name' => 'create'
49+
# server.address => # FIXME: required if available
50+
# messaging.destination.partition.id => FIXME: recommended
51+
# server.port => # FIXME: recommended
52+
}
53+
)
54+
55+
counter = messaging_client_sent_messages_counter
56+
counter.add(1, attributes: counter_attributes)
57+
end
58+
end
59+
60+
def messaging_client_sent_messages_counter
61+
instrumentation.counter('messaging.client.sent.messages')
62+
end
63+
64+
def messaging_client_operation_duration_histogram
65+
instrumentation.histogram('messaging.client.operation.duration')
66+
end
67+
68+
def messaging_client_consumed_messages_counter
69+
instrumentation.counter('messaging.client.consumed.messages')
70+
end
71+
5872
def instrumentation
5973
Sidekiq::Instrumentation.instance
6074
end

instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb

Lines changed: 65 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module Server
1212
# TracerMiddleware propagates context and instruments Sidekiq requests
1313
# by way of its middleware system
1414
class TracerMiddleware
15+
include OpenTelemetry::Instrumentation::Sidekiq::Middlewares::Common
1516
include ::Sidekiq::ServerMiddleware if defined?(::Sidekiq::ServerMiddleware)
1617

1718
def call(_worker, msg, _queue)
@@ -32,40 +33,79 @@ def call(_worker, msg, _queue)
3233

3334
extracted_context = OpenTelemetry.propagation.extract(msg)
3435
OpenTelemetry::Context.with_current(extracted_context) do
35-
if instrumentation_config[:propagation_style] == :child
36-
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
37-
span.add_event('created_at', timestamp: msg['created_at'])
38-
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
39-
yield
40-
end
41-
else
42-
links = []
43-
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
44-
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
45-
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
46-
OpenTelemetry::Trace.with_span(span) do
47-
span.add_event('created_at', timestamp: msg['created_at'])
48-
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
49-
yield
50-
rescue Exception => e # rubocop:disable Lint/RescueException
51-
span.record_exception(e)
52-
span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
53-
raise e
54-
ensure
55-
span.finish
36+
track_queue_latency(msg)
37+
38+
timed(track_process_time_callback(msg)) do
39+
if instrumentation_config[:propagation_style] == :child
40+
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
41+
span.add_event('created_at', timestamp: msg['created_at'])
42+
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
43+
yield
44+
end
45+
else
46+
links = []
47+
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
48+
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
49+
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
50+
OpenTelemetry::Trace.with_span(span) do
51+
span.add_event('created_at', timestamp: msg['created_at'])
52+
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
53+
yield
54+
rescue Exception => e # rubocop:disable Lint/RescueException
55+
span.record_exception(e)
56+
span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
57+
raise e
58+
ensure
59+
span.finish
60+
end
5661
end
5762
end
5863
end
5964
end
6065

6166
private
6267

63-
def instrumentation_config
64-
Sidekiq::Instrumentation.instance.config
68+
def track_queue_latency(msg)
69+
with_meter do
70+
return unless (enqueued_at = msg['enqueued_at'])
71+
return unless enqueued_at.is_a?(Numeric)
72+
73+
latency = (realtime_now - enqueued_at).abs
74+
75+
queue_latency_gauge&.record(latency, attributes: metrics_attributes(msg))
76+
end
77+
end
78+
79+
def track_process_time_callback(msg)
80+
->(duration) { track_process_time(msg, duration) }
6581
end
6682

67-
def tracer
68-
Sidekiq::Instrumentation.instance.tracer
83+
def track_process_time(msg, duration)
84+
with_meter do
85+
attributes = metrics_attributes(msg).merge(
86+
{ 'messaging.operation.name' => 'process' }
87+
)
88+
messaging_process_duration_histogram&.record(duration, attributes: attributes)
89+
end
90+
end
91+
92+
def messaging_process_duration_histogram
93+
instrumentation.histogram('messaging.process.duration')
94+
end
95+
96+
def queue_latency_gauge
97+
instrumentation.gauge('messaging.queue.latency')
98+
end
99+
100+
# FIXME: dedupe
101+
def metrics_attributes(msg)
102+
{
103+
'messaging.system' => 'sidekiq', # FIXME: metrics semconv
104+
'messaging.destination.name' => msg['queue'] # FIXME: metrics semconv
105+
# server.address => # FIXME: required if available
106+
# messaging.destination.partition.id => FIXME: recommended
107+
# server.port => # FIXME: recommended
108+
}
69109
end
70110
end
71111
end

0 commit comments

Comments
 (0)