Skip to content

Commit f406d66

Browse files
committed
datadeps: Consolidate aliasing rewrap code
1 parent fa961c5 commit f406d66

File tree

2 files changed

+77
-86
lines changed

2 files changed

+77
-86
lines changed

src/datadeps/aliasing.jl

Lines changed: 76 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@ KEY CONCEPTS:
2525
1. ALIASING ANALYSIS:
2626
- Every mutable argument is analyzed for its memory access pattern
2727
- Memory spans are computed to determine which bytes in memory are accessed
28-
- Objects that access overlapping memory spans are considered "aliasing"
28+
- Arguments that access overlapping memory spans are considered "aliasing"
2929
- Examples: An array A and view(A, 2:3, 2:3) alias each other
3030
3131
2. DATA LOCALITY TRACKING:
3232
- The system tracks where the "source of truth" for each piece of data lives
3333
- As tasks execute and modify data, the source of truth may move between workers
34-
- Each aliasing region can have its own independent source of truth location
34+
- Each argument can have its own independent source of truth location
3535
3636
3. ALIASED OBJECT MANAGEMENT:
3737
- When copying arguments between workers, the system tracks "aliased objects"
3838
- This ensures that if both an array and its view need to be copied to a worker,
3939
only one copy of the underlying array is made, with the view pointing to it
40-
- The aliased_object!() functions manage this sharing
40+
- The aliased_object!() and move_rewrap() functions manage this sharing
4141
4242
THE DISTRIBUTED ALIASING PROBLEM:
4343
---------------------------------
@@ -63,11 +63,9 @@ MULTITHREADED BEHAVIOR (WORKS):
6363
- Task dependencies ensure correct ordering (e.g., Task 1 then Task 2)
6464
6565
DISTRIBUTED BEHAVIOR (THE PROBLEM):
66-
- Tasks may be scheduled on different workers
6766
- Each argument must be copied to the destination worker
68-
- Without special handling, we would copy A to worker1 and vA to worker2
69-
- This creates two separate arrays, breaking the aliasing relationship
70-
- Updates to the view on worker2 don't affect the array on worker1
67+
- Without special handling, we would copy A and vA independently to another worker
68+
- This creates two separate arrays, breaking the aliasing relationship between A and vA
7169
7270
THE SOLUTION - PARTIAL DATA MOVEMENT:
7371
-------------------------------------
@@ -706,6 +704,32 @@ function Base.setindex!(cache::AliasedObjectCache, value::Chunk, ainfo::Abstract
706704
cache_raw[ainfo] = value
707705
return
708706
end
707+
function aliased_object!(f, cache::AliasedObjectCache, x; ainfo=aliasing(x, identity))
708+
if haskey(cache, ainfo)
709+
return cache[ainfo]
710+
else
711+
y = f(x)
712+
@assert y isa Chunk "Didn't get a Chunk from functor"
713+
cache[ainfo] = y
714+
return y
715+
end
716+
end
717+
function remotecall_endpoint(f, from_proc, to_proc, from_space, to_space, data)
718+
to_w = root_worker_id(to_proc)
719+
if to_w == myid()
720+
data_converted = f(move(from_proc, to_proc, data))
721+
return tochunk(data_converted, to_proc)
722+
end
723+
return remotecall_fetch(to_w, from_proc, to_proc, to_space, data) do from_proc, to_proc, to_space, data
724+
data_converted = f(move(from_proc, to_proc, data))
725+
return tochunk(data_converted, to_proc)
726+
end
727+
end
728+
function rewrap_aliased_object!(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, x)
729+
return aliased_object!(cache, x) do x
730+
return remotecall_endpoint(identity, from_proc, to_proc, from_space, to_space, x)
731+
end
732+
end
709733
function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, data::Chunk)
710734
# Unwrap so that we hit the right dispatch
711735
wid = root_worker_id(data)
@@ -721,27 +745,58 @@ function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::P
721745
return remotecall_endpoint(identity, from_proc, to_proc, from_space, to_space, data)
722746
end
723747
end
724-
function remotecall_endpoint(f, from_proc, to_proc, from_space, to_space, data)
748+
function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, v::SubArray)
725749
to_w = root_worker_id(to_proc)
726-
if to_w == myid()
727-
data_converted = f(move(from_proc, to_proc, data))
728-
return tochunk(data_converted, to_proc)
750+
p_chunk = rewrap_aliased_object!(cache, from_proc, to_proc, from_space, to_space, parent(v))
751+
inds = parentindices(v)
752+
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p_chunk, inds) do from_proc, to_proc, from_space, to_space, p_chunk, inds
753+
p_new = move(from_proc, to_proc, p_chunk)
754+
v_new = view(p_new, inds...)
755+
return tochunk(v_new, to_proc)
729756
end
730-
return remotecall_fetch(to_w, from_proc, to_proc, to_space, data) do from_proc, to_proc, to_space, data
731-
data_converted = f(move(from_proc, to_proc, data))
732-
return tochunk(data_converted, to_proc)
757+
end
758+
# FIXME: Do this programmatically via recursive dispatch
759+
for wrapper in (UpperTriangular, LowerTriangular, UnitUpperTriangular, UnitLowerTriangular)
760+
@eval function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, v::$(wrapper))
761+
to_w = root_worker_id(to_proc)
762+
p_chunk = rewrap_aliased_object!(cache, from_proc, to_proc, from_space, to_space, parent(v))
763+
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p_chunk) do from_proc, to_proc, from_space, to_space, p_chunk
764+
p_new = move(from_proc, to_proc, p_chunk)
765+
v_new = $(wrapper)(p_new)
766+
return tochunk(v_new, to_proc)
767+
end
733768
end
734769
end
735-
function aliased_object!(f, cache::AliasedObjectCache, x; ainfo=aliasing(x, identity))
736-
if haskey(cache, ainfo)
737-
return cache[ainfo]
770+
function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, v::Base.RefValue)
771+
to_w = root_worker_id(to_proc)
772+
p_chunk = rewrap_aliased_object!(cache, from_proc, to_proc, from_space, to_space, v[])
773+
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p_chunk) do from_proc, to_proc, from_space, to_space, p_chunk
774+
p_new = move(from_proc, to_proc, p_chunk)
775+
v_new = Ref(p_new)
776+
return tochunk(v_new, to_proc)
777+
end
778+
end
779+
#=
780+
function move_rewrap_recursive(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, x::T) where T
781+
if isstructtype(T)
782+
# Check all object fields (recursive)
783+
for field in fieldnames(T)
784+
value = getfield(x, field)
785+
new_value = aliased_object!(cache, value) do value
786+
return move_rewrap_recursive(cache, from_proc, to_proc, from_space, to_space, value)
787+
end
788+
setfield!(x, field, new_value)
789+
end
790+
return x
738791
else
739-
y = f(x)
740-
@assert y isa Chunk "Didn't get a Chunk from functor"
741-
cache[ainfo] = y
742-
return y
792+
@warn "Cannot move-rewrap object of type $T"
793+
return x
743794
end
744795
end
796+
move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, x::String) = x # FIXME: Not necessarily true
797+
move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, x::Symbol) = x
798+
move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, x::Type) = x
799+
=#
745800

746801
struct DataDepsSchedulerState
747802
task_to_spec::Dict{DTask,DTaskSpec}

src/datadeps/chunkview.jl

Lines changed: 1 addition & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -37,73 +37,9 @@ end
3737
memory_space(x::ChunkView) = memory_space(x.chunk)
3838
isremotehandle(x::ChunkView) = true
3939

40-
# This definition is here because it's so similar to ChunkView
41-
function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, v::SubArray)
42-
to_w = root_worker_id(to_proc)
43-
p_chunk = aliased_object!(cache, parent(v)) do p
44-
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p) do from_proc, to_proc, from_space, to_space, p
45-
return tochunk(move(from_proc, to_proc, p), to_proc)
46-
end
47-
end
48-
inds = parentindices(v)
49-
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p_chunk, inds) do from_proc, to_proc, from_space, to_space, p_chunk, inds
50-
p_new = move(from_proc, to_proc, p_chunk)
51-
v_new = view(p_new, inds...)
52-
return tochunk(v_new, to_proc)
53-
end
54-
end
55-
# FIXME: Do this programmatically via recursive dispatch
56-
for wrapper in (UpperTriangular, LowerTriangular, UnitUpperTriangular, UnitLowerTriangular)
57-
@eval function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, v::$(wrapper))
58-
to_w = root_worker_id(to_proc)
59-
p_chunk = aliased_object!(cache, parent(v)) do p
60-
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p) do from_proc, to_proc, from_space, to_space, p
61-
return tochunk(move(from_proc, to_proc, p), to_proc)
62-
end
63-
end
64-
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p_chunk) do from_proc, to_proc, from_space, to_space, p_chunk
65-
p_new = move(from_proc, to_proc, p_chunk)
66-
v_new = $(wrapper)(p_new)
67-
return tochunk(v_new, to_proc)
68-
end
69-
end
70-
end
71-
function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, v::Base.RefValue)
72-
to_w = root_worker_id(to_proc)
73-
return aliased_object!(cache, v[]) do p
74-
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p) do from_proc, to_proc, from_space, to_space, p
75-
return tochunk(Ref(move(from_proc, to_proc, p)), to_proc)
76-
end
77-
end
78-
end
79-
#=
80-
function move_rewrap_recursive(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, x::T) where T
81-
if isstructtype(T)
82-
# Check all object fields (recursive)
83-
for field in fieldnames(T)
84-
value = getfield(x, field)
85-
new_value = aliased_object!(cache, value) do value
86-
return move_rewrap_recursive(cache, from_proc, to_proc, from_space, to_space, value)
87-
end
88-
setfield!(x, field, new_value)
89-
end
90-
return x
91-
else
92-
@warn "Cannot move-rewrap object of type $T"
93-
return x
94-
end
95-
end
96-
move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, x::String) = x # FIXME: Not necessarily true
97-
move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, x::Symbol) = x
98-
move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, x::Type) = x
99-
=#
10040
function move_rewrap(cache::AliasedObjectCache, from_proc::Processor, to_proc::Processor, from_space::MemorySpace, to_space::MemorySpace, slice::ChunkView)
10141
to_w = root_worker_id(to_proc)
102-
p_chunk = aliased_object!(cache, slice.chunk) do p_chunk
103-
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p_chunk) do from_proc, to_proc, from_space, to_space, p_chunk
104-
return tochunk(move(from_proc, to_proc, p_chunk), to_proc)
105-
end
106-
end
42+
p_chunk = rewrap_aliased_object!(cache, from_proc, to_proc, from_space, to_space, slice.chunk)
10743
return remotecall_fetch(to_w, from_proc, to_proc, from_space, to_space, p_chunk, slice.slices) do from_proc, to_proc, from_space, to_space, p_chunk, inds
10844
p_new = move(from_proc, to_proc, p_chunk)
10945
v_new = view(p_new, inds...)

0 commit comments

Comments
 (0)