Skip to content

Commit 390047b

Browse files
committed
Make AbstractWorkerPool methods thread-safe and more consistent
Previously they did not handle dead workers in the same way. In particular `take!` would remove dead workers but none of the other methods did, leading to cases where `isready` might return true but `take!` would still block. Now they should all be consistent with each other; except for `wait` which will block if the pool is empty, unlike `take!` which will throw an exception. This seems like a reasonable tradeoff to minimize breakage while still ensuring that 'take!() will block if wait() blocks' holds. In theory one could put the dead worker checks in other methods like `length` and `put!`, but the checks would still need to be in `take!`/`isready` etc so it seems simpler to just acknowledge the lack of thread-safety in these methods upfront.
1 parent 0cca4d3 commit 390047b

File tree

3 files changed

+88
-14
lines changed

3 files changed

+88
-14
lines changed

docs/src/_changelog.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ This documents notable changes in DistributedNext.jl. The format is based on
1111

1212
### Fixed
1313
- Fixed a cause of potential hangs when exiting the process ([#16]).
14+
- Modified the default implementations of methods like `take!` and `wait` on
15+
[`AbstractWorkerPool`](@ref) to be threadsafe and behave more consistently
16+
with each other. This is technically breaking, but it's a strict bugfix to
17+
correct previous inconsistent behaviour so it will still land in a minor
18+
release.
1419

1520
### Added
1621
- A watcher mechanism has been added to detect when both the Distributed stdlib

src/workerpool.jl

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ The default implementations of the above (on a `AbstractWorkerPool`) require fie
1616
- `channel::Channel{Int}`
1717
- `workers::Set{Int}`
1818
where `channel` contains free worker pids and `workers` is the set of all workers associated with this pool.
19+
20+
The default implementations of the above handle dead workers by removing them
21+
from the pool. Be aware that since workers could die at any time, depending on
22+
the results of functions like `length` or `isready` is not thread-safe.
1923
"""
2024
abstract type AbstractWorkerPool end
2125

@@ -71,7 +75,43 @@ deserialize(S::AbstractSerializer, t::Type{T}) where {T<:WorkerPool} = T(deseria
7175

7276
wp_local_push!(pool::AbstractWorkerPool, w::Int) = (push!(pool.workers, w); put!(pool.channel, w); pool)
7377
wp_local_length(pool::AbstractWorkerPool) = length(pool.workers)
74-
wp_local_isready(pool::AbstractWorkerPool) = isready(pool.channel)
78+
79+
function check_valid_worker!(pool::AbstractWorkerPool, worker)
80+
if !id_in_procs(worker)
81+
# We abuse the Channel lock to provide thread-safety when we modify the
82+
# worker set.
83+
@lock pool.channel delete!(pool.workers, worker)
84+
return false
85+
else
86+
return true
87+
end
88+
end
89+
90+
function default_and_empty(pool::AbstractWorkerPool)
91+
length(pool) == 0 && pool === default_worker_pool()
92+
end
93+
94+
function wp_local_isready(pool::AbstractWorkerPool)
95+
if default_and_empty(pool)
96+
# This state wouldn't block take!() so we return true
97+
return true
98+
end
99+
100+
# Otherwise we lock the channel to prevent anyone else from touching it and
101+
# take!() until we either run out of workers or get a valid one. Locking is
102+
# necessary to avoid blocking on take!() or fetch().
103+
@lock pool.channel begin
104+
while isready(pool.channel)
105+
worker = take!(pool.channel)
106+
if check_valid_worker!(pool, worker)
107+
put!(pool.channel, worker)
108+
break
109+
end
110+
end
111+
112+
return isready(pool.channel)
113+
end
114+
end
75115

76116
function wp_local_put!(pool::AbstractWorkerPool, w::Int)
77117
# In case of default_worker_pool, the master is implicitly considered a worker, i.e.,
@@ -101,29 +141,39 @@ function wp_local_take!(pool::AbstractWorkerPool)
101141
# Find an active worker
102142
worker = 0
103143
while true
104-
if length(pool) == 0
105-
if pool === default_worker_pool()
106-
# No workers, the master process is used as a worker
107-
worker = 1
108-
break
109-
else
110-
throw(ErrorException("No active worker available in pool"))
111-
end
144+
if default_and_empty(pool)
145+
# No workers, the master process is used as a worker
146+
worker = 1
147+
break
148+
elseif length(pool) == 0
149+
throw(ErrorException("No active worker available in pool"))
112150
end
113151

114152
worker = take!(pool.channel)
115-
if id_in_procs(worker)
153+
if check_valid_worker!(pool, worker)
116154
break
117-
else
118-
delete!(pool.workers, worker) # Remove invalid worker from pool
119155
end
120156
end
121157
return worker
122158
end
123159

124160
function wp_local_wait(pool::AbstractWorkerPool)
125-
wait(pool.channel)
126-
return nothing
161+
if default_and_empty(pool)
162+
# This state wouldn't block take!() so we return
163+
return nothing
164+
end
165+
166+
while true
167+
# We don't use take!(::AbstractWorkerPool) because that will throw if
168+
# the pool is empty. This will wait forever until one becomes
169+
# available.
170+
worker = take!(pool.channel)
171+
172+
if check_valid_worker!(pool, worker)
173+
put!(pool.channel, worker)
174+
return nothing
175+
end
176+
end
127177
end
128178

129179
function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)

test/distributed_exec.jl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,25 @@ end
744744
status = timedwait(() -> isready(f), 10)
745745
@test status == :ok
746746

747+
# Test behaviour with missing workers. Note that pool_workers is assigned
748+
# such that the FIFO behaviour of Channel's will ensure that all the tested
749+
# methods will see the bad_worker first.
750+
bad_worker = maximum(workers()) + 1
751+
pool_workers = [bad_worker, 1]
752+
753+
wp = WorkerPool(pool_workers)
754+
@test take!(wp) == 1 # Test take!()
755+
@test !isready(wp)
756+
@test bad_worker wp.workers
757+
758+
@test !isready(WorkerPool([bad_worker]))
759+
760+
wp = WorkerPool(pool_workers)
761+
# This should not hang, and it should end up removing the dead worker
762+
wait(wp)
763+
@test isready(wp)
764+
@test bad_worker wp.workers
765+
747766
# CachingPool tests
748767
wp = CachingPool(workers())
749768
@test [1:100...] == pmap(x->x, wp, 1:100)

0 commit comments

Comments
 (0)