Skip to content

Commit c7f06b5

Browse files
authored
Merge pull request rails#47844 from fatkodima/enqueue_all-logging
[ActiveJob] Add logging for `enqueue_all`
2 parents a1a026f + 5ab2034 commit c7f06b5

File tree

6 files changed

+108
-10
lines changed

6 files changed

+108
-10
lines changed

activejob/lib/active_job/enqueuing.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def perform_all_later(*jobs)
3131
rescue EnqueueError => e
3232
job.enqueue_error = e
3333
end
34+
adapter_jobs.count(&:successfully_enqueued?)
3435
end
3536
end
3637
end

activejob/lib/active_job/log_subscriber.rb

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,32 @@ def enqueue_at(event)
4747
end
4848
subscribe_log_level :enqueue_at, :info
4949

50+
def enqueue_all(event)
51+
info do
52+
jobs = event.payload[:jobs]
53+
adapter = event.payload[:adapter]
54+
enqueued_count = event.payload[:enqueued_count]
55+
56+
if enqueued_count == jobs.size
57+
enqueued_jobs_message(adapter, jobs)
58+
elsif jobs.any?(&:successfully_enqueued?)
59+
enqueued_jobs = jobs.select(&:successfully_enqueued?)
60+
61+
failed_enqueue_count = jobs.size - enqueued_count
62+
if failed_enqueue_count == 0
63+
enqueued_jobs_message(adapter, enqueued_jobs)
64+
else
65+
"#{enqueued_jobs_message(adapter, enqueued_jobs)}. "\
66+
"Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)}"
67+
end
68+
else
69+
failed_enqueue_count = jobs.size - enqueued_count
70+
"Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)} to #{adapter_name(adapter)}"
71+
end
72+
end
73+
end
74+
subscribe_log_level :enqueue_all, :info
75+
5076
def perform_start(event)
5177
info do
5278
job = event.payload[:job]
@@ -111,7 +137,11 @@ def discard(event)
111137

112138
private
113139
def queue_name(event)
114-
event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
140+
adapter_name(event.payload[:adapter]) + "(#{event.payload[:job].queue_name})"
141+
end
142+
143+
def adapter_name(adapter)
144+
adapter.class.name.demodulize.delete_suffix("Adapter")
115145
end
116146

117147
def args_info(job)
@@ -171,6 +201,13 @@ def log_enqueue_source
171201
def extract_enqueue_source_location(locations)
172202
backtrace_cleaner.clean(locations.lazy).first
173203
end
204+
205+
def enqueued_jobs_message(adapter, enqueued_jobs)
206+
enqueued_count = enqueued_jobs.size
207+
job_classes_counts = enqueued_jobs.map(&:class).tally.sort_by { |_k, v| -v }
208+
"Enqueued #{enqueued_count} #{'job'.pluralize(enqueued_count)} to #{adapter_name(adapter)}"\
209+
" (#{job_classes_counts.map { |klass, count| "#{count} #{klass}" }.join(', ')})"
210+
end
174211
end
175212
end
176213

activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,34 @@ def enqueue_at(job, timestamp) # :nodoc:
3333
end
3434

3535
def enqueue_all(jobs) # :nodoc:
36+
enqueued_count = 0
3637
jobs.group_by(&:class).each do |job_class, same_class_jobs|
3738
same_class_jobs.group_by(&:queue_name).each do |queue, same_class_and_queue_jobs|
3839
immediate_jobs, scheduled_jobs = same_class_and_queue_jobs.partition { |job| job.scheduled_at.nil? }
3940

4041
if immediate_jobs.any?
41-
Sidekiq::Client.push_bulk(
42+
jids = Sidekiq::Client.push_bulk(
4243
"class" => JobWrapper,
4344
"wrapped" => job_class,
4445
"queue" => queue,
4546
"args" => immediate_jobs.map { |job| [job.serialize] },
4647
)
48+
enqueued_count += jids.compact.size
4749
end
4850

4951
if scheduled_jobs.any?
50-
Sidekiq::Client.push_bulk(
52+
jids = Sidekiq::Client.push_bulk(
5153
"class" => JobWrapper,
5254
"wrapped" => job_class,
5355
"queue" => queue,
5456
"args" => scheduled_jobs.map { |job| [job.serialize] },
5557
"at" => scheduled_jobs.map { |job| job.scheduled_at }
5658
)
59+
enqueued_count += jids.compact.size
5760
end
5861
end
5962
end
63+
enqueued_count
6064
end
6165

6266
class JobWrapper # :nodoc:

activejob/test/cases/logging_test.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
require "jobs/retry_job"
1212
require "jobs/disable_log_job"
1313
require "jobs/abort_before_enqueue_job"
14+
require "jobs/enqueue_error_job"
1415
require "models/person"
1516

1617
class LoggingTest < ActiveSupport::TestCase
@@ -298,6 +299,28 @@ def test_discard_logging
298299
end
299300
end
300301

302+
def test_enqueue_all_job_logging_some_jobs_failed_enqueuing
303+
EnqueueErrorJob.disable_test_adapter
304+
305+
EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = [false, true]
306+
307+
ActiveJob.perform_all_later(EnqueueErrorJob.new, EnqueueErrorJob.new)
308+
assert_match(/Enqueued 1 job to .+ \(1 EnqueueErrorJob\)\. Failed enqueuing 1 job/, @logger.messages)
309+
ensure
310+
EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = []
311+
end
312+
313+
def test_enqueue_all_job_logging_all_jobs_failed_enqueuing
314+
EnqueueErrorJob.disable_test_adapter
315+
316+
EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = [true, true]
317+
318+
ActiveJob.perform_all_later(EnqueueErrorJob.new, EnqueueErrorJob.new)
319+
assert_match(/Failed enqueuing 2 jobs to .+/, @logger.messages)
320+
ensure
321+
EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = []
322+
end
323+
301324
def test_verbose_enqueue_logs
302325
ActiveJob.verbose_enqueue_logs = true
303326

@@ -311,4 +334,9 @@ def test_verbose_enqueue_logs_disabled_by_default
311334
LoggingJob.perform_later "Dummy"
312335
assert_no_match("↳", @logger.messages)
313336
end
337+
338+
def test_enqueue_all_job_logging
339+
ActiveJob.perform_all_later(LoggingJob.new("Dummy"), HelloJob.new("Jamie"), HelloJob.new("John"))
340+
assert_match(/Enqueued 3 jobs to .+ \(2 HelloJob, 1 LoggingJob\)/, @logger.messages)
341+
end
314342
end

activejob/test/cases/queuing_test.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,24 @@ class QueuingTest < ActiveSupport::TestCase
7070
ActiveJob.perform_all_later([HelloJob.new("Jamie"), MultipleKwargsJob.new(argument1: "John", argument2: 42)])
7171
assert_equal ["Jamie says hello", "Job with argument1: John, argument2: 42"], JobBuffer.values.sort
7272
end
73+
74+
test "perform_all_later instrumentation" do
75+
jobs = HelloJob.new("Jamie"), HelloJob.new("John")
76+
called = false
77+
78+
subscriber = lambda do |*args|
79+
called = true
80+
event = ActiveSupport::Notifications::Event.new(*args)
81+
payload = event.payload
82+
assert payload[:adapter]
83+
assert_equal jobs, payload[:jobs]
84+
assert_equal 2, payload[:enqueued_count]
85+
end
86+
87+
ActiveSupport::Notifications.subscribed(subscriber, "enqueue_all.active_job") do
88+
ActiveJob.perform_all_later(jobs)
89+
end
90+
91+
assert called
92+
end
7393
end

activejob/test/jobs/enqueue_error_job.rb

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,25 @@
33
class EnqueueErrorJob < ActiveJob::Base
44
class EnqueueErrorAdapter
55
class << self
6-
def enqueue(*)
7-
raise ActiveJob::EnqueueError, "There was an error enqueuing the job"
8-
end
6+
attr_accessor :should_raise_sequence
7+
end
8+
self.should_raise_sequence = []
99

10-
def enqueue_at(*)
11-
raise ActiveJob::EnqueueError, "There was an error enqueuing the job"
12-
end
10+
def enqueue(*)
11+
raise ActiveJob::EnqueueError, "There was an error enqueuing the job" if should_raise?
1312
end
13+
14+
def enqueue_at(*)
15+
raise ActiveJob::EnqueueError, "There was an error enqueuing the job" if should_raise?
16+
end
17+
18+
private
19+
def should_raise?
20+
self.class.should_raise_sequence.empty? || self.class.should_raise_sequence.shift
21+
end
1422
end
1523

16-
self.queue_adapter = EnqueueErrorAdapter
24+
self.queue_adapter = EnqueueErrorAdapter.new
1725

1826
def perform
1927
raise "This should never be called"

0 commit comments

Comments
 (0)