Skip to content

Commit 94ae87a

Browse files
committed
Instrument recurring task enqueuing
1 parent af48e6e commit 94ae87a

File tree

4 files changed

+70
-10
lines changed

4 files changed

+70
-10
lines changed

app/models/solid_queue/recurring_execution.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ class RecurringExecution < Execution
77
class << self
88
def record(task_key, run_at, &block)
99
transaction do
10-
if job_id = block.call
11-
create!(job_id: job_id, task_key: task_key, run_at: run_at)
10+
block.call.tap do |active_job|
11+
create!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at)
1212
end
1313
end
1414
rescue ActiveRecord::RecordNotUnique
15-
SolidQueue.logger.info("[SolidQueue] Skipped recurring task #{task_key} at #{run_at}already dispatched")
15+
# Task already dispatched
1616
end
1717

1818
def clear_in_batches(batch_size: 500)

lib/solid_queue/dispatcher/recurring_task.rb

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,16 @@ def next_time
3030
end
3131

3232
def enqueue(at:)
33-
if using_solid_queue_adapter?
34-
perform_later_and_record(run_at: at)
35-
else
36-
perform_later
33+
SolidQueue.instrument(:enqueue_recurring_task, task: key, at: at) do |payload|
34+
if using_solid_queue_adapter?
35+
perform_later_and_record(run_at: at)
36+
else
37+
payload[:other_adapter] = true
38+
39+
perform_later
40+
end.tap do |active_job|
41+
payload[:active_job_id] = active_job&.job_id
42+
end
3743
end
3844
end
3945

@@ -59,7 +65,7 @@ def using_solid_queue_adapter?
5965
end
6066

6167
def perform_later_and_record(run_at:)
62-
RecurringExecution.record(key, run_at) { perform_later.provider_job_id }
68+
RecurringExecution.record(key, run_at) { perform_later }
6369
end
6470

6571
def perform_later

lib/solid_queue/log_subscriber.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ def release_blocked(event)
3939
debug formatted_event(event, action: "Release blocked job", **event.payload.slice(:job_id, :concurrency_key, :released))
4040
end
4141

42+
def enqueue_recurring_task(event)
43+
attributes = event.payload.slice(:task, :at, :active_job_id)
44+
45+
if event.payload[:other_adapter]
46+
debug formatted_event(event, action: "Enqueued recurring task outside Solid Queue", **attributes)
47+
else
48+
action = attributes[:active_job_id].present? ? "Enqueued recurring task" : "Skipped recurring task – already dispatched"
49+
info formatted_event(event, action: action, **attributes)
50+
end
51+
end
52+
4253
def register_process(event)
4354
attributes = event.payload.slice(:kind, :pid, :hostname)
4455

test/integration/instrumentation_test.rb

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@ class InstrumentationTest < ActiveSupport::TestCase
2525
events = subscribed(/release.*_claimed\.solid_queue/) do
2626
worker = SolidQueue::Worker.new.tap(&:start)
2727

28-
wait_while_with_timeout(0.5.seconds) { SolidQueue::ReadyExecution.any? }
28+
wait_while_with_timeout(1.seconds) { SolidQueue::ReadyExecution.any? }
2929
process = SolidQueue::Process.last
3030

3131
worker.stop
32+
wait_for_registered_processes(0, timeout: 1.second)
3233
end
3334

3435
assert_equal 2, events.size
@@ -43,7 +44,7 @@ class InstrumentationTest < ActiveSupport::TestCase
4344

4445
events = subscribed(/(register|deregister)_process\.solid_queue/) do
4546
worker = SolidQueue::Worker.new.tap(&:start)
46-
wait_for_registered_processes(1, timeout: 1.second)
47+
wait_while_with_timeout(1.seconds) { SolidQueue::ReadyExecution.any? }
4748

4849
process = SolidQueue::Process.last
4950

@@ -197,6 +198,48 @@ class InstrumentationTest < ActiveSupport::TestCase
197198
assert_event events.second, "release_many_blocked", limit: 5, size: 0
198199
end
199200

201+
test "enqueuing recurring task emits enqueue_recurring_task event" do
202+
recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
203+
dispatcher = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task)
204+
205+
events = subscribed("enqueue_recurring_task.solid_queue") do
206+
dispatcher.start
207+
sleep 1.01
208+
dispatcher.stop
209+
end
210+
211+
assert events.size >= 1
212+
event = events.last
213+
214+
assert_event event, "enqueue_recurring_task", task: :example_task, active_job_id: SolidQueue::Job.last.active_job_id
215+
assert event.last[:at].present?
216+
assert_nil event.last[:other_adapter]
217+
end
218+
219+
test "skipping a recurring task is reflected in the enqueue_recurring_task event" do
220+
recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
221+
dispatchers = 2.times.collect { SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task) }
222+
223+
events = subscribed("enqueue_recurring_task.solid_queue") do
224+
dispatchers.each(&:start)
225+
sleep 1.01
226+
dispatchers.each(&:stop)
227+
end
228+
229+
assert events.size >= 2
230+
events.each do |event|
231+
assert_event event, "enqueue_recurring_task", task: :example_task
232+
end
233+
234+
active_job_ids = SolidQueue::Job.all.map(&:active_job_id)
235+
events.group_by { |event| event.last[:at] }.each do |_, events_by_time|
236+
if events_by_time.many?
237+
assert events_by_time.any? { |e| e.last[:active_job_id].nil? }
238+
assert events_by_time.any? { |e| e.last[:active_job_id].in? active_job_ids }
239+
end
240+
end
241+
end
242+
200243
private
201244
def subscribed(name, &block)
202245
[].tap do |events|

0 commit comments

Comments
 (0)