Skip to content

Commit b64178a

Browse files
committed
Support passing kwargs as last element of the arguments array
If we don't explicitly add a ruby2_keywords flag, Active Job will any hash included in the arguments array with keys as `_aj_symbol_keys`, and when deserialized, it'd be treated always as a hash argument instead of keyword arguments. Depending on the job, this might work fine, but if the job uses keyword arguments, trying to execute the job with deserialized arguments will fail. However, the opposite is not true: if the job accepts a hash argument and we pass a hash with the ruby2_keywords flag, it'll work just fine as Active Job will serialize that with keys as `_aj_ruby2_keywords`, so we take advantage of that to simplify the task definition and not having to distinguish between args and kwargs.
1 parent b2f74a3 commit b64178a

File tree

6 files changed

+183
-8
lines changed

6 files changed

+183
-8
lines changed

lib/solid_queue/dispatcher/recurring_task.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ def valid?
4242
end
4343

4444
def to_s
45-
"#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) [ #{schedule.original} ]"
45+
"#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) [ #{parsed_schedule} ]"
46+
end
47+
48+
def parsed_schedule
49+
schedule.original.to_s
4650
end
4751

4852
private
@@ -55,7 +59,15 @@ def perform_later_and_record(run_at:)
5559
end
5660

5761
def perform_later
58-
job_class.perform_later(*arguments)
62+
job_class.perform_later(*arguments_with_kwargs)
63+
end
64+
65+
def arguments_with_kwargs
66+
if arguments.last.is_a?(Hash)
67+
arguments[0...-1] + [ Hash.ruby2_keywords_hash(arguments.last) ]
68+
else
69+
arguments
70+
end
5971
end
6072

6173
def job_class
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
class StoreResultJob < ApplicationJob
22
queue_as :background
33

4-
def perform(value, pause: nil, exception: nil, exit: nil)
4+
def perform(value, status: :completed, pause: nil, exception: nil, exit: nil)
55
result = JobResult.create!(queue_name: queue_name, status: "started", value: value)
66

77
sleep(pause) if pause
88
raise exception.new if exception
99
exit! if exit
1010

11-
result.update!(status: "completed")
11+
result.update!(status: status)
1212
end
1313
end

test/dummy/config/solid_queue.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ default: &default
88
- polling_interval: 1
99
batch_size: 500
1010
recurring_tasks:
11-
periodic-add-to-buffer:
12-
class: AddToBufferJob
13-
args: 42
11+
periodic_store_result:
12+
class: StoreResultJob
13+
args: [ 42, { status: "custom_status" } ]
1414
schedule: every second
1515

1616
development:
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# frozen_string_literal: true
2+
require "test_helper"
3+
4+
class RecurringTasksTest < ActiveSupport::TestCase
5+
self.use_transactional_tests = false
6+
7+
setup do
8+
@pid = run_supervisor_as_fork(mode: :all)
9+
# 1 supervisor + 2 workers + 1 dispatcher
10+
wait_for_registered_processes(4, timeout: 3.second)
11+
end
12+
13+
teardown do
14+
terminate_process(@pid) if process_exists?(@pid)
15+
16+
SolidQueue::Process.destroy_all
17+
SolidQueue::Job.destroy_all
18+
JobResult.delete_all
19+
end
20+
21+
test "enqueue and process periodic tasks" do
22+
wait_for_jobs_to_be_enqueued(2, timeout: 2.seconds)
23+
wait_for_jobs_to_finish_for(2.seconds)
24+
25+
terminate_process(@pid)
26+
27+
skip_active_record_query_cache do
28+
assert SolidQueue::Job.count >= 2
29+
SolidQueue::Job.all.each do |job|
30+
assert_equal "periodic_store_result", job.recurring_execution.task_key
31+
assert_equal "StoreResultJob", job.class_name
32+
end
33+
34+
assert_equal 2, JobResult.count
35+
JobResult.all.each do |result|
36+
assert_equal "custom_status", result.status
37+
assert_equal "42", result.value
38+
end
39+
end
40+
end
41+
42+
private
43+
def wait_for_jobs_to_be_enqueued(count, timeout: 1.second)
44+
wait_while_with_timeout(timeout) { SolidQueue::Job.count < count }
45+
end
46+
end

test/unit/dispatcher_test.rb

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ class DispatcherTest < ActiveSupport::TestCase
1212

1313
teardown do
1414
@dispatcher.stop
15+
SolidQueue::Job.delete_all
16+
SolidQueue::Process.delete_all
1517
end
1618

1719
test "dispatcher is registered as process" do
@@ -79,7 +81,7 @@ class DispatcherTest < ActiveSupport::TestCase
7981
SolidQueue.silence_polling = old_silence_polling
8082
end
8183

82-
test "run more than one instance of the dispatcher" do
84+
test "run more than one instance of the dispatcher without recurring tasks" do
8385
15.times do
8486
AddToBufferJob.set(wait: 0.2).perform_later("I'm scheduled")
8587
end
@@ -97,4 +99,23 @@ class DispatcherTest < ActiveSupport::TestCase
9799

98100
another_dispatcher.stop
99101
end
102+
103+
test "run more than one instance of the dispatcher with recurring tasks" do
104+
recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
105+
dispatchers = 2.times.collect do
106+
SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task)
107+
end
108+
109+
dispatchers.each(&:start)
110+
sleep 2
111+
dispatchers.each(&:stop)
112+
113+
assert_equal SolidQueue::Job.count, SolidQueue::RecurringExecution.count
114+
assert SolidQueue::Job.count < 4
115+
116+
run_at_times = SolidQueue::RecurringExecution.all.map(&:run_at).sort
117+
0.upto(run_at_times.length - 2) do |i|
118+
assert_equal 1, run_at_times[i + 1] - run_at_times[i]
119+
end
120+
end
100121
end

test/unit/recurring_task_test.rb

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
require "test_helper"
2+
3+
class RecurringTaskTest < ActiveSupport::TestCase
4+
class JobWithoutArguments < ApplicationJob
5+
def perform
6+
JobBuffer.add "job_without_arguments"
7+
end
8+
end
9+
10+
class JobWithRegularHashArguments < ApplicationJob
11+
def perform(value, options)
12+
JobBuffer.add [ value, options[:value] ]
13+
end
14+
end
15+
16+
class JobWithKeywordArgument < ApplicationJob
17+
def perform(value, value_kwarg:)
18+
JobBuffer.add [ value, value_kwarg ]
19+
end
20+
end
21+
22+
class JobWithMultipleTypeArguments < ApplicationJob
23+
def perform(value, options = {}, **kwargs)
24+
JobBuffer.add [ value, options[:value], kwargs[:value_kwarg] ]
25+
end
26+
end
27+
28+
setup do
29+
@worker = SolidQueue::Worker.new(queues: "*")
30+
@worker.mode = :inline
31+
end
32+
33+
test "job without arguments" do
34+
task = recurring_task_with(class_name: "JobWithoutArguments")
35+
enqueue_and_assert_performed_with_result task, "job_without_arguments"
36+
end
37+
38+
test "job with regular hash argument" do
39+
task = recurring_task_with(class_name: "JobWithRegularHashArguments", args: [ "regular_hash_argument", { value: 42, not_used: 24 } ])
40+
41+
enqueue_and_assert_performed_with_result task, [ "regular_hash_argument", 42 ]
42+
end
43+
44+
test "job with keyword argument" do
45+
task = recurring_task_with(class_name: "JobWithKeywordArgument", args: [ "keyword_argument", { value_kwarg: [ 42, 24 ] } ])
46+
enqueue_and_assert_performed_with_result task, [ "keyword_argument", [ 42, 24 ] ]
47+
end
48+
49+
test "job with arguments of multiple types" do
50+
task = recurring_task_with(class_name: "JobWithMultipleTypeArguments", args:
51+
[ "multiple_types", { value: "regular_hash_value", not_used: 28 }, value_kwarg: 42, not_used: 32 ])
52+
enqueue_and_assert_performed_with_result task, [ "multiple_types", "regular_hash_value", 42 ]
53+
end
54+
55+
test "job with arguments of multiple types ignoring optional regular hash" do
56+
task = recurring_task_with(class_name: "JobWithMultipleTypeArguments", args:
57+
[ "multiple_types", value: "regular_hash_value", value_kwarg: 42, not_used: 32 ])
58+
enqueue_and_assert_performed_with_result task, [ "multiple_types", nil, 42 ]
59+
end
60+
61+
test "valid and invalid schedules" do
62+
assert_not recurring_task_with(class_name: "JobWithoutArguments", schedule: "once a year").valid?
63+
assert_not recurring_task_with(class_name: "JobWithoutArguments", schedule: "tomorrow").valid?
64+
65+
task = recurring_task_with(class_name: "JobWithoutArguments", schedule: "every Thursday at 1 AM")
66+
assert task.valid?
67+
# At 1 AM on the 4th day of the week
68+
assert_equal "0 1 * * 4", task.parsed_schedule
69+
70+
task = recurring_task_with(class_name: "JobWithoutArguments", schedule: "every month")
71+
assert task.valid?
72+
# At 12:00 AM, on day 1 of the month
73+
assert_equal "0 0 1 * *", task.parsed_schedule
74+
75+
task = recurring_task_with(class_name: "JobWithoutArguments", schedule: "every second")
76+
assert task.valid?
77+
assert_equal "* * * * * *", task.parsed_schedule
78+
end
79+
80+
private
81+
def enqueue_and_assert_performed_with_result(task, result)
82+
assert_difference [ -> { SolidQueue::Job.count }, -> { SolidQueue::ReadyExecution.count } ], +1 do
83+
task.enqueue(at: Time.now)
84+
end
85+
86+
assert_difference -> { JobBuffer.size }, +1 do
87+
@worker.start
88+
end
89+
90+
assert_equal result, JobBuffer.last_value
91+
end
92+
93+
def recurring_task_with(class_name:, schedule: "every hour", args: nil)
94+
SolidQueue::Dispatcher::RecurringTask.from_configuration("task-id", class: "RecurringTaskTest::#{class_name}", schedule: schedule, args: args)
95+
end
96+
end

0 commit comments

Comments
 (0)