Skip to content

Commit ffc3322

Browse files
committed
Merge pull request #501 from ruby-concurrency/fork-in-the-road
Reset ThreadPoolExecutor on fork.
2 parents bfbe92d + d53cf37 commit ffc3322

File tree

3 files changed

+37
-37
lines changed

3 files changed

+37
-37
lines changed

doc/thread_pools.md

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ If you'd like to configure a maximum number of threads, you can use the more gen
4848

4949
A [ThreadPoolExecutor](http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html) is a general-purpose thread pool that can be configured to have various behaviors.
5050

51-
A `ThreadPoolExecutor` will automatically adjust the pool size according to the bounds set by `min-threads` and `max-threads`.
52-
When a new task is submitted and fewer than `min-threads` threads are running, a new thread is created to handle the request, even if other worker threads are idle.
51+
A `ThreadPoolExecutor` will automatically adjust the pool size according to the bounds set by `min-threads` and `max-threads`.
52+
When a new task is submitted and fewer than `min-threads` threads are running, a new thread is created to handle the request, even if other worker threads are idle.
5353
If there are more than `min-threads` but less than `max-threads` threads running, a new thread will be created only if the queue is full.
5454

5555
The `CachedThreadPool` and `FixedThreadPool` are simply `ThreadPoolExecutors` with certain configuration pre-determined. For instance, to create a `ThreadPoolExecutor` that works just like a `FixedThreadPool.new 5`, you could:
@@ -159,3 +159,20 @@ future = Future.new(:executor => pool).execute do
159159
#work
160160
end
161161
~~~
162+
163+
## Forking
164+
165+
Some Ruby versions allow the Ruby process to be [forked](http://ruby-doc.org/core-2.3.0/Process.html#method-c-fork). Generally, mixing threading and forking is an [anti-pattern](https://en.wikipedia.org/wiki/Anti-pattern). Threading and forking are both concurrency techniques and mixing the two is rarely beneficial. Moreover, threads created before the fork become unusable ("dead") in the forked process. This aspect of forking is a significant issue for any application or library which spawns threads. It is strongly advised that applications using `ThreadPoolExecutor` do **not** also fork. Since Concurrent Ruby is a foundational library often used by gems which are in turn used by other applications, it is impossible to predict or prevent upstream forking. Concurrent Ruby therefore makes a few guarantees about the behavior of `ThreadPoolExecutor` after forking.
166+
167+
*Concurrent Ruby guarantees that jobs post on the parent process will be handled on the parent process; the child process does not inherit any jobs at the time of the fork. Concurrent Ruby also guarantees that thread pools copied to the child process will continue to function normally.*
168+
169+
When a fork occurs the `ThreadPoolExecutor` in the *forking* process takes no special actions whatsoever. It has no way of knowing that a fork occurred. It proceeds to process its jobs as normal and makes no attempt whatsoever to distribute those jobs to the forked process(es).
170+
171+
When a `ThreadPoolExecutor` in the *forked* process detects that a fork has occurred it immediately takes the following actions:
172+
173+
* Clears all pending jobs from its queue (assuming they will be handled by the *forking* process).
174+
* Deletes all worker threads (they will have died during the fork).
175+
* Resets all job counters (these counts will be reflected in the *forking* process).
176+
* Begins posting new jobs as normal.
177+
178+
These actions guarantee that all in-flight jobs are processed normally in the forking process and that thread pools, including the global thread pools, remain functional in the forked process(es).

examples/benchmark_thread_pool_implementations.rb

Lines changed: 0 additions & 32 deletions
This file was deleted.

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ def ns_initialize(opts)
131131
@scheduled_task_count = 0
132132
@completed_task_count = 0
133133
@largest_length = 0
134+
@ruby_pid = $$ # detects if Ruby has forked
134135

135136
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
136137
@next_gc_time = Concurrent.monotonic_time + @gc_interval
@@ -143,28 +144,30 @@ def ns_limited_queue?
143144

144145
# @!visibility private
145146
def ns_execute(*args, &task)
147+
ns_reset_if_forked
148+
146149
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
147150
@scheduled_task_count += 1
148151
else
149152
handle_fallback(*args, &task)
150153
end
151154

152155
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
153-
# raise unless @ready.empty? || @queue.empty? # assert
154156
end
155157

156158
# @!visibility private
157159
def ns_shutdown_execution
160+
ns_reset_if_forked
161+
158162
if @pool.empty?
159163
# nothing to do
160164
stopped_event.set
161165
end
166+
162167
if @queue.empty?
163168
# no more tasks will be accepted, just stop all workers
164169
@pool.each(&:stop)
165170
end
166-
167-
# raise unless @ready.empty? || @queue.empty? # assert
168171
end
169172

170173
# @!visibility private
@@ -273,6 +276,18 @@ def ns_prune_pool
273276
@next_gc_time = Concurrent.monotonic_time + @gc_interval
274277
end
275278

279+
def ns_reset_if_forked
280+
if $$ != @ruby_pid
281+
@queue.clear
282+
@ready.clear
283+
@pool.clear
284+
@scheduled_task_count = 0
285+
@completed_task_count = 0
286+
@largest_length = 0
287+
@ruby_pid = $$
288+
end
289+
end
290+
276291
# @!visibility private
277292
class Worker
278293
include Concern::Logging

0 commit comments

Comments
 (0)