Skip to content

Commit f7abf69

Browse files
committed
get processor range for general iterators with n arrays
1 parent 29700c9 commit f7abf69

File tree

1 file changed

+48
-13
lines changed

1 file changed

+48
-13
lines changed

src/ParallelUtilities.jl

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ function split_across_processors(arr₁,num_procs=nworkers(),proc_id=worker_rank
3737
Iterators.take(Iterators.drop(arr₁,task_start-1),num_tasks_on_proc)
3838
end
3939

40-
function split_product_across_processors(arr₁,arr₂,num_procs::Integer=nworkers(),proc_id::Integer=worker_rank())
40+
function split_product_across_processors(arr₁::AbstractVector,arr₂::AbstractVector,
41+
num_procs::Integer=nworkers(),proc_id::Integer=worker_rank())
4142
# arr₁ will change faster
4243
split_across_processors(Iterators.product(arr₁,arr₂),num_procs,proc_id)
4344
end
@@ -46,7 +47,8 @@ function split_product_across_processors(arrs_tuple,num_procs::Integer=nworkers(
4647
return split_across_processors(Iterators.product(arrs_tuple...),num_procs,proc_id)
4748
end
4849

49-
function get_processor_id_from_split_array(arr₁,arr₂,(arr₁_value,arr₂_value)::Tuple,num_procs)
50+
function get_processor_id_from_split_array(arr₁::AbstractVector,arr₂::AbstractVector,
51+
(arr₁_value,arr₂_value)::Tuple,num_procs)
5052
# Find the closest match in arrays
5153

5254
if (arr₁_value arr₁) || (arr₂_value arr₂)
@@ -87,43 +89,76 @@ function get_processor_id_from_split_array(arr₁,arr₂,(arr₁_value,arr₂_va
8789
return proc_id
8890
end
8991

90-
function get_processor_range_from_split_array(arr₁,arr₂,modes_on_proc,num_procs)
92+
function get_processor_id_from_split_array(iter,val::Tuple,num_procs)
93+
for proc_id in 1:num_procs
94+
tasks_on_proc = split_across_processors(iter,num_procs,proc_id)
95+
if val tasks_on_proc
96+
return proc_id
97+
end
98+
end
99+
return 0
100+
end
101+
102+
function get_processor_range_from_split_array(arr₁::AbstractVector,arr₂::AbstractVector,
103+
iter_section,num_procs::Integer)
91104

92-
if isempty(modes_on_proc)
105+
if isempty(iter_section)
93106
return 0:-1 # empty range
94107
end
95108

96-
tasks_arr = collect(modes_on_proc)
109+
tasks_arr = collect(iter_section)
97110
proc_id_start = get_processor_id_from_split_array(arr₁,arr₂,first(tasks_arr),num_procs)
98111
proc_id_end = get_processor_id_from_split_array(arr₁,arr₂,last(tasks_arr),num_procs)
99112
return proc_id_start:proc_id_end
100113
end
101114

102-
function get_index_in_split_array(modes_on_proc,(arr₁_value,arr₂_value))
103-
if isnothing(modes_on_proc)
115+
function get_processor_range_from_split_array(iter,iter_section,num_procs::Integer)
116+
117+
if isempty(iter_section)
118+
return 0:-1 # empty range
119+
end
120+
121+
tasks_arr = collect(iter_section)
122+
proc_id_start = get_processor_id_from_split_array(iter,first(tasks_arr),num_procs)
123+
proc_id_end = get_processor_id_from_split_array(iter,last(tasks_arr),num_procs)
124+
return proc_id_start:proc_id_end
125+
end
126+
127+
function get_index_in_split_array(iter_section,val::Tuple)
128+
if isnothing(iter_section)
104129
return nothing
105130
end
106-
for (ind,(t1,t2)) in enumerate(modes_on_proc)
107-
if (t1==arr₁_value) && (t2 == arr₂_value)
131+
for (ind,val_ind) in enumerate(iter_section)
132+
if val_ind == val
108133
return ind
109134
end
110135
end
111136
nothing
112137
end
113138

114-
function procid_and_mode_index(arr₁,arr₂,(arr₁_value,arr₂_value),num_procs)
139+
function procid_and_mode_index(arr₁::AbstractVector,arr₂::AbstractVector,
140+
(arr₁_value,arr₂_value)::Tuple,num_procs::Integer)
115141
proc_id_mode = get_processor_id_from_split_array(arr₁,arr₂,(arr₁_value,arr₂_value),num_procs)
116142
modes_in_procid_file = split_product_across_processors(arr₁,arr₂,num_procs,proc_id_mode)
117143
mode_index = get_index_in_split_array(modes_in_procid_file,(arr₁_value,arr₂_value))
118144
return proc_id_mode,mode_index
119145
end
120146

121-
function mode_index_in_file(arr₁,arr₂,(arr₁_value,arr₂_value),num_procs,proc_id_mode)
147+
function procid_and_mode_index(iter,val::Tuple,num_procs::Integer)
148+
proc_id_mode = get_processor_id_from_split_array(iter,val,num_procs)
149+
modes_in_procid_file = split_across_processors(iter,num_procs,proc_id_mode)
150+
mode_index = get_index_in_split_array(modes_in_procid_file,val)
151+
return proc_id_mode,mode_index
152+
end
153+
154+
function mode_index_in_file(arr₁::AbstractVector,arr₂::AbstractVector,
155+
(arr₁_value,arr₂_value)::Tuple,num_procs::Integer,proc_id_mode::Integer)
122156
modes_in_procid_file = split_product_across_processors(arr₁,arr₂,num_procs,proc_id_mode)
123157
mode_index = get_index_in_split_array(modes_in_procid_file,(arr₁_value,arr₂_value))
124158
end
125159

126-
function procid_allmodes(arr₁,arr₂,iter,num_procs=nworkers_active(arr₁,arr₂))
160+
function procid_allmodes(arr₁::AbstractVector,arr₂::AbstractVector,
161+
iter,num_procs=nworkers_active(arr₁,arr₂))
127162
procid = zeros(Int64,length(iter))
128163
for (ind,mode) in enumerate(iter)
129164
procid[ind] = get_processor_id_from_split_array(arr₁,arr₂,mode,num_procs)
@@ -133,7 +168,7 @@ end
133168

134169
workers_active(arr) = workers()[1:min(length(arr),nworkers())]
135170

136-
workers_active(arr₁,arr₂) = workers_active(Iterators.product(arr₁,arr₂))
171+
workers_active(arrs...) = workers_active(Iterators.product(arrs...))
137172

138173
nworkers_active(args...) = length(workers_active(args...))
139174

0 commit comments

Comments
 (0)