Skip to content

Commit 9437f6d

Browse files
authored
Merge pull request rails#41495 from eileencodes/make-executor-configurable
Allow async executor to be configurable
2 parents 5b2fb8a + d9639a2 commit 9437f6d

File tree

9 files changed

+394
-122
lines changed

9 files changed

+394
-122
lines changed

activerecord/CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
* Allow applications to configure the thread pool for async queries
2+
3+
Some applications may want one thread pool per database whereas others want to use
4+
a single global thread pool for all queries. By default Rails will set `async_query_executor`
5+
to `:immediate` and create a `Concurrent::ImmediateExecutor` object which is essentially a no-op.
6+
To create one thread pool for all database connections to use applications can set
7+
`config.active_record.async_query_executor` to `:global_thread_pool` and optionally define
8+
`config.active_record.global_executor_concurrency`. This defaults to 4. For applications that want
9+
to have a thread pool for each database connection, `config.active_record.async_query_executor` can
10+
be set to `:multi_thread_pool`. The configuration for each thread pool is set in the database
11+
configuration.
12+
13+
*Eileen M. Uchitelle*
14+
115
* Allow new syntax for `enum` to avoid leading `_` from reserved options.
216

317
Before:

activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,7 @@ def initialize(pool_config)
144144

145145
@lock_thread = false
146146

147-
@async_executor = Concurrent::ThreadPoolExecutor.new(
148-
min_threads: 0,
149-
max_threads: @size,
150-
max_queue: @size * 4,
151-
fallback_policy: :caller_runs
152-
)
147+
@async_executor = build_async_executor
153148

154149
@reaper = Reaper.new(self, db_config.reaping_frequency)
155150
@reaper.run
@@ -463,6 +458,22 @@ def schedule_query(future_result) # :nodoc:
463458
end
464459

465460
private
461+
def build_async_executor
462+
case Base.async_query_executor
463+
when :multi_thread_pool
464+
Concurrent::ThreadPoolExecutor.new(
465+
min_threads: @db_config.min_threads,
466+
max_threads: @db_config.max_threads,
467+
max_queue: @db_config.max_queue,
468+
fallback_policy: :caller_runs
469+
)
470+
when :global_thread_pool
471+
Base.global_thread_pool_async_query_executor
472+
else
473+
Base.immediate_query_executor
474+
end
475+
end
476+
466477
#--
467478
# this is unfortunately not concurrent
468479
def bulk_make_new_connections(num_new_conns_needed)

activerecord/lib/active_record/core.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,44 @@ def self.configurations
157157

158158
mattr_accessor :application_record_class, instance_accessor: false, default: nil
159159

160+
# Sets the async_query_executor for an application. By default the thread pool executor
161+
# set to `:immediate. Options are:
162+
#
163+
# * :immediate - Initializes a single +Concurrent::ImmediateExecutor+
164+
# * :global_thread_pool - Initializes a single +Concurrent::ThreadPoolExecutor+
165+
# that uses the +async_query_concurrency+ for the +max_threads+ value.
166+
# * :multi_thread_pool - Initializes a +Concurrent::ThreadPoolExecutor+ for each
167+
# database connection. The initializer values are defined in the configuration hash.
168+
mattr_accessor :async_query_executor, instance_accessor: false, default: :immediate
169+
170+
def self.immediate_query_executor # :nodoc:
171+
@@immediate_query_executor ||= Concurrent::ImmediateExecutor.new
172+
end
173+
174+
def self.global_thread_pool_async_query_executor # :nodoc:
175+
concurrency = global_executor_concurrency || 4
176+
@@global_thread_pool_async_query_executor ||= Concurrent::ThreadPoolExecutor.new(
177+
min_threads: 0,
178+
max_threads: concurrency,
179+
max_queue: concurrency * 4,
180+
fallback_policy: :caller_runs
181+
)
182+
end
183+
184+
# Set the +global_executor_concurrency+. This configuration value can only be used
185+
# with the global thread pool async query executor.
186+
def self.global_executor_concurrency=(global_executor_concurrency)
187+
if async_query_executor == :immediate || async_query_executor == :multi_thread_pool
188+
raise ArgumentError, "`global_executor_concurrency` cannot be set when using either immediate or multiple thread pools. For multiple thread pools, please set the concurrency in your database configuration. Immediate thread pools are essentially a no-op."
189+
end
190+
191+
@@global_executor_concurrency = global_executor_concurrency
192+
end
193+
194+
def self.global_executor_concurrency # :nodoc:
195+
@@global_executor_concurrency ||= nil
196+
end
197+
160198
def self.application_record_class? # :nodoc:
161199
if Base.application_record_class
162200
self == Base.application_record_class

activerecord/lib/active_record/database_configurations/database_config.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@ def pool
4848
raise NotImplementedError
4949
end
5050

51+
def min_threads
52+
raise NotImplementedError
53+
end
54+
55+
def max_threads
56+
raise NotImplementedError
57+
end
58+
59+
def max_queue
60+
raise NotImplementedError
61+
end
62+
5163
def checkout_timeout
5264
raise NotImplementedError
5365
end

activerecord/lib/active_record/database_configurations/hash_config.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ def pool
6666
(configuration_hash[:pool] || 5).to_i
6767
end
6868

69+
def min_threads
70+
(configuration_hash[:min_threads] || 0).to_i
71+
end
72+
73+
def max_threads
74+
(configuration_hash[:max_threads] || pool).to_i
75+
end
76+
77+
def max_queue
78+
max_threads * 4
79+
end
80+
6981
def checkout_timeout
7082
(configuration_hash[:checkout_timeout] || 5).to_f
7183
end

activerecord/test/cases/adapter_test.rb

Lines changed: 0 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -343,122 +343,6 @@ def test_in_clause_length_is_deprecated
343343
end
344344
end
345345

346-
module AsynchronousQueriesSharedTests
347-
def test_async_select_failure
348-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
349-
350-
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
351-
assert_kind_of ActiveRecord::FutureResult, future_result
352-
assert_raises ActiveRecord::StatementInvalid do
353-
future_result.result
354-
end
355-
ensure
356-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
357-
end
358-
359-
def test_async_query_from_transaction
360-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
361-
362-
assert_nothing_raised do
363-
@connection.select_all "SELECT * FROM posts", async: true
364-
end
365-
366-
@connection.transaction do
367-
assert_raises AsynchronousQueryInsideTransactionError do
368-
@connection.select_all "SELECT * FROM posts", async: true
369-
end
370-
end
371-
ensure
372-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
373-
end
374-
375-
def test_async_query_cache
376-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
377-
378-
@connection.enable_query_cache!
379-
380-
@connection.select_all "SELECT * FROM posts"
381-
result = @connection.select_all "SELECT * FROM posts", async: true
382-
assert_equal Result, result.class
383-
ensure
384-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
385-
@connection.disable_query_cache!
386-
end
387-
388-
def test_async_query_foreground_fallback
389-
status = {}
390-
391-
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
392-
if event.payload[:sql] == "SELECT * FROM does_not_exists"
393-
status[:executed] = true
394-
status[:async] = event.payload[:async]
395-
end
396-
end
397-
398-
@connection.pool.stub(:schedule_query, proc { }) do
399-
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
400-
assert_kind_of ActiveRecord::FutureResult, future_result
401-
assert_raises ActiveRecord::StatementInvalid do
402-
future_result.result
403-
end
404-
end
405-
406-
assert_equal true, status[:executed]
407-
assert_equal false, status[:async]
408-
ensure
409-
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
410-
end
411-
end
412-
413-
class AsynchronousQueriesTest < ActiveRecord::TestCase
414-
self.use_transactional_tests = false
415-
416-
include AsynchronousQueriesSharedTests
417-
418-
def setup
419-
@connection = ActiveRecord::Base.connection
420-
end
421-
422-
def test_async_select_all
423-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
424-
status = {}
425-
426-
monitor = Monitor.new
427-
condition = monitor.new_cond
428-
429-
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
430-
if event.payload[:sql] == "SELECT * FROM posts"
431-
status[:executed] = true
432-
status[:async] = event.payload[:async]
433-
monitor.synchronize { condition.signal }
434-
end
435-
end
436-
437-
future_result = @connection.select_all "SELECT * FROM posts", async: true
438-
assert_kind_of ActiveRecord::FutureResult, future_result
439-
440-
monitor.synchronize do
441-
condition.wait_until { status[:executed] }
442-
end
443-
assert_kind_of ActiveRecord::Result, future_result.result
444-
assert_equal @connection.supports_concurrent_connections?, status[:async]
445-
ensure
446-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
447-
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
448-
end
449-
end
450-
451-
class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase
452-
self.use_transactional_tests = true
453-
454-
include AsynchronousQueriesSharedTests
455-
456-
def setup
457-
@connection = ActiveRecord::Base.connection
458-
@connection.materialize_transactions
459-
end
460-
end
461-
462346
class AdapterForeignKeyTest < ActiveRecord::TestCase
463347
self.use_transactional_tests = false
464348

0 commit comments

Comments
 (0)