Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,19 @@ Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"

[compat]
Aqua = "0.8.10"
LinearAlgebra = "1"
Random = "1"
Serialization = "1"
Sockets = "1"
Test = "1"
julia = "1"

[extras]
Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["LinearAlgebra", "Test"]
test = ["Aqua", "LinearAlgebra", "Test"]
27 changes: 27 additions & 0 deletions src/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@

t = Threads.@spawn Threads.threadpool() try
wait(x)
catch # just wait, ignore errors here
finally
put!(pool, worker)
end
Expand Down Expand Up @@ -400,3 +401,29 @@
put!(pool, worker)
end
end

# Specialization for remotecall. We have to wait for the Future it returns
# before putting the worker back in the pool.
function remotecall_pool(rc_f::typeof(remotecall), f, pool::CachingPool, args...; kwargs...)
worker = take!(pool)
f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker)))
isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker

local x
try
x = rc_f(exec_from_cache, worker, f_ref, args...; kwargs...)
catch
put!(pool, worker)
rethrow()

Check warning on line 417 in src/workerpool.jl

View check run for this annotation

Codecov / codecov/patch

src/workerpool.jl#L416-L417

Added lines #L416 - L417 were not covered by tests
end

t = Threads.@spawn Threads.threadpool() try
wait(x)
catch # just wait, ignore errors here
finally
put!(pool, worker)
end
errormonitor(t)

return x
end
9 changes: 9 additions & 0 deletions test/aqua.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Aqua
using Distributed

Aqua.test_all(
Distributed,
# This should be excluded, but it's not clear how to do that on Aqua's API
# given it's not-defined. (The Julia Base ambiguity test does it something like this)
# ambiguities=(exclude=[GlobalRef(Distributed, :cluster_manager)])
)
26 changes: 16 additions & 10 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,8 @@ wp = WorkerPool(workers())
@test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp)))
wp = WorkerPool(2:3)
@test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3]
@test fetch(remotecall(myid, wp)) in wp.workers
@test_throws RemoteException fetch(remotecall(error, wp))

# wait on worker pool
wp = WorkerPool(2:2)
Expand All @@ -743,6 +745,8 @@ status = timedwait(() -> isready(f), 10)
# CachingPool tests
wp = CachingPool(workers())
@test [1:100...] == pmap(x->x, wp, 1:100)
@test fetch(remotecall(myid, wp)) in wp.workers
@test_throws RemoteException fetch(remotecall(error, wp))

clear!(wp)
@test length(wp.map_obj2ref) == 0
Expand Down Expand Up @@ -1092,16 +1096,18 @@ end
# Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should
# keep the worker out of the pool until the underlying remotecall has
# finished.
let
remotechan = RemoteChannel(wrkr1)
pool = WorkerPool([wrkr1])
put_future = remotecall(() -> wait(remotechan), pool)
@test !isready(pool)
put!(remotechan, 1)
wait(put_future)
# The task that waits on the future to put it back into the pool runs
# asynchronously so we use timedwait() to check when the worker is back in.
@test timedwait(() -> isready(pool), 10) === :ok
for PoolType in (WorkerPool, CachingPool)
let
remotechan = RemoteChannel(wrkr1)
pool = PoolType([wrkr1])
put_future = remotecall(() -> wait(remotechan), pool)
@test !isready(pool)
put!(remotechan, 1)
wait(put_future)
# The task that waits on the future to put it back into the pool runs
# asynchronously so we use timedwait() to check when the worker is back in.
@test timedwait(() -> isready(pool), 10) === :ok
end
end

# Test calling @everywhere from a module not defined on the workers
Expand Down
7 changes: 7 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Test
using Distributed

if !Base.get_bool_env("BUILDKITE", false)
@testset "Aqua.jl tests" begin
include("aqua.jl")
end
end

# Run the distributed test outside of the main driver since it needs its own
# set of dedicated workers.
Expand Down
Loading