Skip to content

Commit dce5b58

Browse files
indrekjfbogsany
andauthored
feat: Instrument Que poller (#1033)
Que polls the database every few seconds to find new jobs. This generates postgresql spans (if pg instrumentation library is enabled) without any parents and without much context. These can get quite noisy. This commit by default disables these traces. It is however possible to enable it with `trace_poller` option. There are also many other places where Que can be noisy, but for now just starting with the Poller. A lot of inspiration was taken from the `sidekiq` library. Co-authored-by: Francis Bogsanyi <[email protected]>
1 parent 9392dde commit dce5b58

File tree

5 files changed

+122
-25
lines changed

5 files changed

+122
-25
lines changed

instrumentation/que/lib/opentelemetry/instrumentation/que/instrumentation.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,22 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base
3939
# direct child of the span that enqueued the job.
4040
# - :none - the job's execution will not be explicitly linked to the
4141
# span that enqueued the job.
42+
# trace_poller: controls whether Que Poller is traced or not.
4243
#
4344
# Note that in all cases, we will store Que's Job ID as the
4445
# `messaging.message_id` attribute, so out-of-band correlation may
4546
# still be possible depending on your backend system.
4647
#
4748
option :propagation_style, default: :link, validate: ->(opt) { %i[link child none].include?(opt) }
49+
option :trace_poller, default: false, validate: :boolean
4850

4951
private
5052

5153
def require_dependencies
5254
require_relative 'tag_setter'
5355
require_relative 'middlewares/server_middleware'
5456
require_relative 'patches/que_job'
57+
require_relative 'patches/poller'
5558
end
5659

5760
def gem_version
@@ -60,6 +63,7 @@ def gem_version
6063

6164
def patch
6265
::Que::Job.prepend(Patches::QueJob)
66+
::Que::Poller.prepend(Patches::Poller)
6367
end
6468
end
6569
end
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module Instrumentation
9+
module Que
10+
module Patches
11+
# Instrumentation for the Que::Poller module
12+
module Poller
13+
def poll(*args, **kwargs)
14+
# Avoid tracing when should_poll? returns true. This is also used
15+
# in Poller#poll to decide if the actual poll should be executed or
16+
# not. Without this we would generate a lot of unnecessary spans.
17+
return unless should_poll?
18+
19+
if Que::Instrumentation.instance.config[:trace_poller]
20+
Que::Instrumentation.instance.tracer.in_span('Que::Poller#poll') { super(*args, **kwargs) }
21+
else
22+
OpenTelemetry::Common::Utilities.untraced { super(*args, **kwargs) }
23+
end
24+
end
25+
end
26+
end
27+
end
28+
end
29+
end

instrumentation/que/opentelemetry-instrumentation-que.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Gem::Specification.new do |spec|
3232
spec.add_development_dependency 'appraisal', '~> 2.2.0'
3333
spec.add_development_dependency 'bundler', '>= 1.17'
3434
spec.add_development_dependency 'minitest', '~> 5.0'
35+
spec.add_development_dependency 'opentelemetry-instrumentation-pg', '~> 0.19.1'
3536
spec.add_development_dependency 'opentelemetry-sdk', '~> 1.0'
3637
spec.add_development_dependency 'pg', '~> 1.1'
3738
spec.add_development_dependency 'que', '~> 1.0.0.beta4'

instrumentation/que/test/opentelemetry/instrumentation/que_test.rb

Lines changed: 80 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55
# SPDX-License-Identifier: Apache-2.0
66

77
require 'test_helper'
8+
require 'opentelemetry-instrumentation-pg'
89

910
require_relative '../../../lib/opentelemetry/instrumentation/que/instrumentation'
1011

1112
describe OpenTelemetry::Instrumentation::Que do
1213
let(:instrumentation) { OpenTelemetry::Instrumentation::Que::Instrumentation.instance }
14+
let(:pg_instrumentation) { OpenTelemetry::Instrumentation::PG::Instrumentation.instance }
1315
let(:exporter) { EXPORTER }
1416
let(:config) { { propagation_style: :link } }
1517

1618
before do
1719
prepare_que
20+
pg_instrumentation.install
1821
instrumentation.install
1922
instrumentation.instance_variable_set(:@config, config)
2023
exporter.reset
@@ -24,23 +27,23 @@
2427
it 'creates a span' do
2528
TestJobAsync.enqueue
2629

27-
_(exporter.finished_spans.size).must_equal(1)
30+
_(finished_spans.size).must_equal(1)
2831

29-
span = exporter.finished_spans.last
32+
span = finished_spans.last
3033
_(span.kind).must_equal(:producer)
3134
end
3235

3336
it 'names the created span' do
3437
TestJobAsync.enqueue
3538

36-
span = exporter.finished_spans.last
39+
span = finished_spans.last
3740
_(span.name).must_equal('TestJobAsync send')
3841
end
3942

4043
it 'records attributes' do
4144
TestJobAsync.enqueue
4245

43-
attributes = exporter.finished_spans.last.attributes
46+
attributes = finished_spans.last.attributes
4447
_(attributes['messaging.system']).must_equal('que')
4548
_(attributes['messaging.destination']).must_equal('default')
4649
_(attributes['messaging.destination_kind']).must_equal('queue')
@@ -59,19 +62,19 @@
5962
end
6063

6164
it 'creates a span' do
62-
_(exporter.finished_spans.size).must_equal(1)
65+
_(finished_spans.size).must_equal(1)
6366

64-
span = exporter.finished_spans.last
67+
span = finished_spans.last
6568
_(span.kind).must_equal(:consumer)
6669
end
6770

6871
it 'names the created span' do
69-
span = exporter.finished_spans.last
72+
span = finished_spans.last
7073
_(span.name).must_equal('TestJobAsync process')
7174
end
7275

7376
it 'records attributes' do
74-
attributes = exporter.finished_spans.last.attributes
77+
attributes = finished_spans.last.attributes
7578
_(attributes['messaging.system']).must_equal('que')
7679
_(attributes['messaging.destination']).must_equal('default')
7780
_(attributes['messaging.destination_kind']).must_equal('queue')
@@ -91,7 +94,7 @@
9194
end
9295

9396
it 'marks the span as failed' do
94-
span = exporter.finished_spans.last
97+
span = finished_spans.last
9598
_(span.status.ok?).must_equal(false)
9699
end
97100
end
@@ -103,29 +106,29 @@
103106
it 'creates two spans' do
104107
TestJobSync.enqueue
105108

106-
_(exporter.finished_spans.size).must_equal(2)
109+
_(finished_spans.size).must_equal(2)
107110

108-
span1 = exporter.finished_spans.last
111+
span1 = finished_spans.last
109112
_(span1.kind).must_equal(:producer)
110113

111-
span2 = exporter.finished_spans.first
114+
span2 = finished_spans.first
112115
_(span2.kind).must_equal(:consumer)
113116
end
114117

115118
it 'names the created span' do
116119
TestJobSync.enqueue
117120

118-
span1 = exporter.finished_spans.last
121+
span1 = finished_spans.last
119122
_(span1.name).must_equal('TestJobSync send')
120123

121-
span2 = exporter.finished_spans.first
124+
span2 = finished_spans.first
122125
_(span2.name).must_equal('TestJobSync process')
123126
end
124127

125128
it 'records attributes' do
126129
TestJobSync.enqueue
127130

128-
attributes = exporter.finished_spans.first.attributes
131+
attributes = finished_spans.first.attributes
129132
_(attributes['messaging.system']).must_equal('que')
130133
_(attributes['messaging.destination']).must_equal('default')
131134
_(attributes['messaging.destination_kind']).must_equal('queue')
@@ -168,10 +171,10 @@ def self.run(first, second); end
168171
job = TestJobAsync.enqueue
169172
Que.run_job_middleware(job) { job.tap(&:_run) }
170173

171-
_(exporter.finished_spans.size).must_equal(2)
174+
_(finished_spans.size).must_equal(2)
172175

173-
send_span = exporter.finished_spans.first
174-
process_span = exporter.finished_spans.last
176+
send_span = finished_spans.first
177+
process_span = finished_spans.last
175178

176179
_(send_span.trace_id).wont_equal(process_span.trace_id)
177180

@@ -188,10 +191,10 @@ def self.run(first, second); end
188191
job = TestJobAsync.enqueue
189192
Que.run_job_middleware(job) { job.tap(&:_run) }
190193

191-
_(exporter.finished_spans.size).must_equal(2)
194+
_(finished_spans.size).must_equal(2)
192195

193-
send_span = exporter.finished_spans.first
194-
process_span = exporter.finished_spans.last
196+
send_span = finished_spans.first
197+
process_span = finished_spans.last
195198

196199
_(send_span.trace_id).must_equal(process_span.trace_id)
197200
_(process_span.parent_span_id).must_equal(send_span.span_id)
@@ -218,10 +221,10 @@ def self.run(first, second); end
218221
job = TestJobAsync.enqueue
219222
Que.run_job_middleware(job) { job.tap(&:_run) }
220223

221-
_(exporter.finished_spans.size).must_equal(2)
224+
_(finished_spans.size).must_equal(2)
222225

223-
send_span = exporter.finished_spans.first
224-
process_span = exporter.finished_spans.last
226+
send_span = finished_spans.first
227+
process_span = finished_spans.last
225228

226229
_(send_span.trace_id).wont_equal(process_span.trace_id)
227230
_(send_span.total_recorded_links).must_equal(0)
@@ -230,8 +233,61 @@ def self.run(first, second); end
230233
end
231234
end
232235

236+
describe 'que poller' do
237+
describe 'when trace_poller is enabled' do
238+
let(:config) { { trace_poller: true } }
239+
240+
it 'traces calls to Que::Poller#poll' do
241+
spans = run_one_locker_cycle
242+
spans = spans.sort_by(&:name)
243+
244+
root_span = spans.detect { |span| span.name == 'Que::Poller#poll' }
245+
_(root_span.parent_span_id).must_equal(OpenTelemetry::Trace::INVALID_SPAN_ID)
246+
247+
other_spans = spans - [root_span]
248+
_(other_spans.count.positive?).must_equal(true)
249+
250+
other_spans.each do |other_span|
251+
_(other_span.parent_span_id).must_equal(root_span.span_id)
252+
end
253+
end
254+
end
255+
256+
describe 'when trace_poller is disabled' do
257+
let(:config) { { trace_poller: false } }
258+
259+
it 'does not create any spans' do
260+
spans = run_one_locker_cycle
261+
_(spans).must_equal([])
262+
end
263+
end
264+
end
265+
266+
def finished_spans(include_pg: false)
267+
if include_pg
268+
exporter.finished_spans
269+
else
270+
exporter.finished_spans.reject { |span| span.name =~ /#{database_name}/ }
271+
end
272+
end
273+
233274
def last_record_in_database
234275
require 'que/active_record/model'
235276
Que::ActiveRecord::Model.last
236277
end
278+
279+
def run_one_locker_cycle
280+
poll_interval = 0.1 # in seconds
281+
locker = Que::Locker.new(poll_interval: poll_interval, wait_period: 0.001)
282+
283+
# Clear all the startup spans
284+
sleep 0.05
285+
exporter.reset
286+
287+
sleep poll_interval
288+
finished_spans = finished_spans(include_pg: true)
289+
locker.stop!
290+
291+
finished_spans
292+
end
237293
end unless ENV['OMIT_SERVICES']

instrumentation/que/test/test_helper.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,17 @@ def prepare_que
3838
host: ENV.fetch('TEST_POSTGRES_HOST', '127.0.0.1'),
3939
port: ENV.fetch('TEST_POSTGRES_PORT', '5432'),
4040
user: ENV.fetch('TEST_POSTGRES_USER', 'postgres'),
41-
database: ENV.fetch('TEST_POSTGRES_DB', 'postgres'),
41+
database: database_name,
4242
password: ENV.fetch('TEST_POSTGRES_PASSWORD', 'postgres')
4343
)
4444

4545
Que.connection = ActiveRecord
4646
Que.migrate!(version: 4)
47+
48+
# Make sure the que_jobs table is empty before running tests.
49+
ActiveRecord::Base.connection.execute('TRUNCATE que_jobs')
50+
end
51+
52+
def database_name
53+
ENV.fetch('TEST_POSTGRES_DB', 'postgres')
4754
end

0 commit comments

Comments
 (0)