File tree Expand file tree Collapse file tree 5 files changed +48
-2
lines changed
active_job/queue_adapters Expand file tree Collapse file tree 5 files changed +48
-2
lines changed Original file line number Diff line number Diff line change @@ -33,8 +33,10 @@ def enqueue_all(active_jobs) # :nodoc:
33
33
private
34
34
35
35
def select_shard
36
- if @db_shard
37
- ActiveRecord ::Base . connected_to ( shard : @db_shard ) { yield }
36
+ shard = @db_shard || SolidQueue . primary_shard
37
+
38
+ if shard
39
+ ActiveRecord ::Base . connected_to ( shard : shard ) { yield }
38
40
else
39
41
yield
40
42
end
Original file line number Diff line number Diff line change @@ -41,6 +41,8 @@ module SolidQueue
41
41
mattr_accessor :clear_finished_jobs_after , default : 1 . day
42
42
mattr_accessor :default_concurrency_control_period , default : 3 . minutes
43
43
44
+ mattr_accessor :primary_shard , :active_shard
45
+
44
46
delegate :on_start , :on_stop , to : Supervisor
45
47
46
48
def on_worker_start ( ...)
Original file line number Diff line number Diff line change @@ -6,6 +6,23 @@ class Base
6
6
include Callbacks # Defines callbacks needed by other concerns
7
7
include AppExecutor , Registrable , Procline
8
8
9
+ after_boot -> do
10
+ if SolidQueue . connects_to . key? ( :shards )
11
+ # Record the name of the primary shard, which should be used for
12
+ # adapter less jobs
13
+ if SolidQueue . primary_shard . nil?
14
+ SolidQueue . primary_shard = SolidQueue . connects_to [ :shards ] . keys . first
15
+ end
16
+
17
+ # Move active_shard to first position in connects_to[:shards] Hash to
18
+ # make it the default
19
+ if SolidQueue . active_shard . present? &&
20
+ SolidQueue . connects_to [ :shards ] . key? ( SolidQueue . active_shard )
21
+ SolidQueue ::Record . default_shard = SolidQueue . active_shard
22
+ end
23
+ end
24
+ end
25
+
9
26
attr_reader :name
10
27
11
28
def initialize ( *)
Original file line number Diff line number Diff line change @@ -31,6 +31,22 @@ class JobsLifecycleTest < ActiveSupport::TestCase
31
31
assert_equal 2 , SolidQueue ::Job . finished . count
32
32
end
33
33
34
+ test "enqueue and run jobs from different shards" do
35
+ AddToBufferJob . perform_later "hey"
36
+ ShardTwoJob . perform_later "ho"
37
+
38
+ change_active_shard_to ( :queue_shard_two ) do
39
+ @dispatcher . start
40
+ @worker . start
41
+
42
+ wait_for_jobs_to_finish_for ( 2 . seconds )
43
+ end
44
+
45
+ assert_equal [ "ho" ] , JobBuffer . values . sort
46
+ assert_equal 1 , SolidQueue ::ReadyExecution . count
47
+ assert_equal 1 , ActiveRecord ::Base . connected_to ( shard : :queue_shard_two ) { SolidQueue ::Job . finished . count }
48
+ end
49
+
34
50
test "enqueue and run jobs that fail without retries" do
35
51
RaisingJob . perform_later ( ExpectedTestError , "A" )
36
52
RaisingJob . perform_later ( ExpectedTestError , "B" )
Original file line number Diff line number Diff line change @@ -34,4 +34,13 @@ def assert_claimed_jobs(count = 1)
34
34
assert_equal count , SolidQueue ::ClaimedExecution . count
35
35
end
36
36
end
37
+
38
+ def change_active_shard_to ( new_shard_name , &block )
39
+ old_shard_name = SolidQueue . active_shard
40
+ SolidQueue . active_shard = new_shard_name
41
+ block . call
42
+ ensure
43
+ SolidQueue . active_shard = old_shard_name
44
+ SolidQueue ::Record . connects_to ( **SolidQueue . connects_to ) if SolidQueue . connects_to
45
+ end
37
46
end
You can’t perform that action at this time.
0 commit comments