Skip to content

Commit 8890288

Browse files
Merge pull request #127 from JuliaLang/ib/fix_ambiguity
2 parents 2c0e979 + 85ab2a2 commit 8890288

File tree

5 files changed

+70
-11
lines changed

5 files changed

+70
-11
lines changed

Project.toml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,19 @@ Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
77
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
88
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
99

10+
[compat]
11+
Aqua = "0.8.10"
12+
LinearAlgebra = "1"
13+
Random = "1"
14+
Serialization = "1"
15+
Sockets = "1"
16+
Test = "1"
17+
julia = "1"
18+
1019
[extras]
20+
Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595"
1121
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
1222
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
1323

1424
[targets]
15-
test = ["LinearAlgebra", "Test"]
25+
test = ["Aqua", "LinearAlgebra", "Test"]

src/workerpool.jl

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool,
149149

150150
t = Threads.@spawn Threads.threadpool() try
151151
wait(x)
152+
catch # just wait, ignore errors here
152153
finally
153154
put!(pool, worker)
154155
end
@@ -400,3 +401,29 @@ function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...)
400401
put!(pool, worker)
401402
end
402403
end
404+
405+
# Specialization for remotecall. We have to wait for the Future it returns
406+
# before putting the worker back in the pool.
407+
function remotecall_pool(rc_f::typeof(remotecall), f, pool::CachingPool, args...; kwargs...)
408+
worker = take!(pool)
409+
f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker)))
410+
isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker
411+
412+
local x
413+
try
414+
x = rc_f(exec_from_cache, worker, f_ref, args...; kwargs...)
415+
catch
416+
put!(pool, worker)
417+
rethrow()
418+
end
419+
420+
t = Threads.@spawn Threads.threadpool() try
421+
wait(x)
422+
catch # just wait, ignore errors here
423+
finally
424+
put!(pool, worker)
425+
end
426+
errormonitor(t)
427+
428+
return x
429+
end

test/aqua.jl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using Aqua
2+
using Distributed
3+
4+
Aqua.test_all(
5+
Distributed,
6+
# This should be excluded, but it's not clear how to do that on Aqua's API
7+
# given it's not-defined. (The Julia Base ambiguity test does it something like this)
8+
# ambiguities=(exclude=[GlobalRef(Distributed, :cluster_manager)])
9+
)

test/distributed_exec.jl

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,8 @@ wp = WorkerPool(workers())
718718
@test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp)))
719719
wp = WorkerPool(2:3)
720720
@test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3]
721+
@test fetch(remotecall(myid, wp)) in wp.workers
722+
@test_throws RemoteException fetch(remotecall(error, wp))
721723

722724
# wait on worker pool
723725
wp = WorkerPool(2:2)
@@ -743,6 +745,8 @@ status = timedwait(() -> isready(f), 10)
743745
# CachingPool tests
744746
wp = CachingPool(workers())
745747
@test [1:100...] == pmap(x->x, wp, 1:100)
748+
@test fetch(remotecall(myid, wp)) in wp.workers
749+
@test_throws RemoteException fetch(remotecall(error, wp))
746750

747751
clear!(wp)
748752
@test length(wp.map_obj2ref) == 0
@@ -1092,16 +1096,18 @@ end
10921096
# Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should
10931097
# keep the worker out of the pool until the underlying remotecall has
10941098
# finished.
1095-
let
1096-
remotechan = RemoteChannel(wrkr1)
1097-
pool = WorkerPool([wrkr1])
1098-
put_future = remotecall(() -> wait(remotechan), pool)
1099-
@test !isready(pool)
1100-
put!(remotechan, 1)
1101-
wait(put_future)
1102-
# The task that waits on the future to put it back into the pool runs
1103-
# asynchronously so we use timedwait() to check when the worker is back in.
1104-
@test timedwait(() -> isready(pool), 10) === :ok
1099+
for PoolType in (WorkerPool, CachingPool)
1100+
let
1101+
remotechan = RemoteChannel(wrkr1)
1102+
pool = PoolType([wrkr1])
1103+
put_future = remotecall(() -> wait(remotechan), pool)
1104+
@test !isready(pool)
1105+
put!(remotechan, 1)
1106+
wait(put_future)
1107+
# The task that waits on the future to put it back into the pool runs
1108+
# asynchronously so we use timedwait() to check when the worker is back in.
1109+
@test timedwait(() -> isready(pool), 10) === :ok
1110+
end
11051111
end
11061112

11071113
# Test calling @everywhere from a module not defined on the workers

test/runtests.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
# This file is a part of Julia. License is MIT: https://julialang.org/license
22

33
using Test
4+
using Distributed
5+
6+
if !Base.get_bool_env("BUILDKITE", false)
7+
@testset "Aqua.jl tests" begin
8+
include("aqua.jl")
9+
end
10+
end
411

512
# Run the distributed test outside of the main driver since it needs its own
613
# set of dedicated workers.

0 commit comments

Comments
 (0)