Skip to content

Commit cfab551

Browse files
authored
Merge pull request #409 from JuliaParallel/jps/mutable-auto-remote
at-mutable: Add remote execution like shard
2 parents a64b7ff + 0489464 commit cfab551

File tree

4 files changed

+58
-37
lines changed

4 files changed

+58
-37
lines changed

docs/src/data-management.md

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,24 @@ argument to a task, the task will be forced to execute on the same worker that
4141
`@mutable` was called on. For example:
4242

4343
```julia
44-
x = remotecall_fetch(2) do
45-
Dagger.@mutable Threads.Atomic{Int}(0)
46-
end
44+
Dagger.@mutable worker=2 Threads.Atomic{Int}(0)
4745
x::Dagger.Chunk # The result is always a `Chunk`
4846

4947
# x is now considered mutable, and may only be accessed on worker 2:
50-
fetch(Dagger.@spawn Threads.atomic_add!(x, 3)) # Always executed on worker 2
51-
fetch(Dagger.@spawn single=1 Threads.atomic_add!(x, 3)) # SchedulingException
48+
wait(Dagger.@spawn Threads.atomic_add!(x, 1)) # Always executed on worker 2
49+
wait(Dagger.@spawn scope=Dagger.scope(worker=1) Threads.atomic_add!(x, 1)) # SchedulingException
5250
```
5351

54-
`@mutable`, when called as above, gain a scope of `ProcessorScope(myid())`,
55-
which means that any processor on that worker is allowed to execute tasks that
56-
use the object (subject to the usual scheduling rules).
52+
`@mutable`, when called as above, is constructed on worker 2, and the data
53+
gains a scope of `ProcessScope(myid())`, which means that any processor on that
54+
worker is allowed to execute tasks that use the object (subject to the usual
55+
scheduling rules).
5756

58-
`@mutable` also has two other forms, allowing the processor and scope to be
59-
manually supplied:
57+
`@mutable` also allows the scope to be manually supplied, if more specific
58+
restrictions are desirable:
6059

6160
```julia
62-
proc1 = Dagger.ThreadProc(myid(), 3)
63-
proc2 = Dagger.ThreadProc(myid(), 4)
64-
scope = Dagger.UnionScope(ExactScope.([proc1, proc2]))
65-
x = @mutable OSProc() scope rand(100)
61+
x = @mutable scope=Dagger.scope(worker=1, threads=[3,4]) rand(100)
6662
# x is now scoped to threads 3 and 4 on worker `myid()`
6763
```
6864

docs/src/index.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ A = Dagger.spawn() do
104104
end
105105
```
106106

107+
or by specifying the `worker` argument to `@mutable`:
108+
109+
```julia
110+
A = Dagger.@mutable worker=2 rand(1000, 1000)
111+
```
112+
107113
### Parallel reduction
108114

109115
Reductions are often parallelized by reducing a set of partitions on each

src/chunks.jl

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -115,23 +115,42 @@ affinity(r::FileRef) = OSProc(1)=>r.size
115115

116116
### Mutation
117117

118-
"Wraps `x` in a `Chunk` on `proc`, scoped to `scope`, which allows `x` to be mutated by tasks that use it."
119-
macro mutable(proc, scope, x)
120-
:(Dagger.tochunk($(esc(x)), $(esc(proc)), $(esc(scope))))
118+
function _mutable_inner(@nospecialize(f), proc, scope)
119+
result = f()
120+
return Ref(Dagger.tochunk(result, proc, scope))
121121
end
122-
"Creates a mutable `Chunk` on `proc`, scoped to exactly `proc`."
123-
macro mutable(proc, x)
124-
quote
125-
let proc = $(esc(proc))
126-
let scope = proc isa OSProc ? Dagger.ProcessScope(proc.pid) : Dagger.ExactScope(proc)
127-
Dagger.@mutable proc scope $(esc(x))
128-
end
122+
123+
"""
124+
mutable(f::Base.Callable; worker, processor, scope) -> Chunk
125+
@mutable [worker=1] [processor=OSProc()] [scope=ProcessorScope()] f()
126+
127+
Calls `f()` on the specified worker or processor, returning a `Chunk`
128+
referencing the result with the specified scope `scope`.
129+
"""
130+
function mutable(@nospecialize(f); worker=nothing, processor=nothing, scope=nothing)
131+
if processor === nothing
132+
if worker === nothing
133+
processor = OSProc()
134+
else
135+
processor = OSProc(worker)
129136
end
137+
else
138+
@assert worker === nothing "mutable: Can't mix worker and processor"
139+
end
140+
if scope === nothing
141+
scope = processor isa OSProc ? ProcessScope(processor) : ExactScope(processor)
130142
end
143+
return fetch(Dagger.@spawn scope=scope _mutable_inner(f, processor, scope))[]
131144
end
132-
"Creates a mutable `Chunk` on the current worker."
133-
macro mutable(x)
134-
:(Dagger.@mutable OSProc() Dagger.ProcessScope() $(esc(x)))
145+
146+
macro mutable(exs...)
147+
opts = esc.(exs[1:end-1])
148+
ex = exs[end]
149+
quote
150+
let f = @noinline ()->$(esc(ex))
151+
$mutable(f; $(opts...))
152+
end
153+
end
135154
end
136155

137156
"""
@@ -181,14 +200,11 @@ function shard(@nospecialize(f); procs=nothing, workers=nothing, per_thread=fals
181200
shard_dict = Dict{Processor,Chunk}()
182201
for proc in procs
183202
scope = proc isa OSProc ? ProcessScope(proc) : ExactScope(proc)
184-
thunk = Dagger.@spawn scope=scope _shard_inner(f, proc, scope)
203+
thunk = Dagger.@spawn scope=scope _mutable_inner(f, proc, scope)
185204
shard_dict[proc] = fetch(thunk)[]
186205
end
187206
return Shard(shard_dict)
188207
end
189-
function _shard_inner(@nospecialize(f), proc, scope)
190-
Ref(Dagger.@mutable proc scope f())
191-
end
192208

193209
"Creates a `Shard`. See [`Dagger.shard`](@ref) for details."
194210
macro shard(exs...)

test/mutation.jl

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,19 @@
3737
end
3838
end
3939

40+
@everywhere mutable_update!(x) = x[] = myid()
41+
4042
@testset "Mutation" begin
4143

4244
@testset "@mutable" begin
43-
w = first(workers())
44-
@assert w != 1 "Not enough workers to test mutability"
45-
x = remotecall_fetch(w) do
46-
Dagger.@mutable Ref{Int}()
45+
w1 = first(workers())
46+
@assert w1 != 1 "Not enough workers to test mutability"
47+
for (w, wo) in [(1, w1), (w1, 1)]
48+
x = Dagger.@mutable worker=w Ref{Int}()
49+
@test fetch(Dagger.@spawn mutable_update!(x)) == w
50+
wo_scope = Dagger.ProcessScope(wo)
51+
@test_throws_unwrap Dagger.ThunkFailedException fetch(Dagger.@spawn scope=wo_scope mutable_update!(x))
4752
end
48-
@test fetch(Dagger.@spawn (x->x[] = myid())(x)) == w
49-
@test_throws_unwrap Dagger.ThunkFailedException fetch(Dagger.@spawn single=1 (x->x[] = myid())(x))
5053
end # @testset "@mutable"
5154

5255
@testset "Shard" begin

0 commit comments

Comments
 (0)