Skip to content

Commit 088112d

Browse files
committed
remotechannels seem to be working
1 parent 5ca5b4a commit 088112d

File tree

1 file changed

+91
-44
lines changed

1 file changed

+91
-44
lines changed

src/ParallelUtilities.jl

Lines changed: 91 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,46 @@ module ParallelUtilities
33
using Reexport
44
@reexport using Distributed
55

6-
worker_rank() = myid()-minimum(workers())+1
6+
export split_across_processors,split_product_across_processors,
7+
get_processor_id_from_split_array,
8+
procid_allmodes,mode_index_in_file,
9+
get_processor_range_from_split_array,workers_active,worker_rank,
10+
get_index_in_split_array,procid_and_mode_index,minmax_from_split_array,
11+
node_remotechannels,pmapsum,sum_at_node,pmap_onebatch_per_worker,
12+
get_nodes,get_hostnames,get_nprocs_node
13+
14+
function worker_rank()
15+
if nworkers()==1
16+
return 1
17+
end
18+
myid()-minimum(workers())+1
19+
end
720

821
function split_across_processors(num_tasks::Integer,num_procs=nworkers(),proc_id=worker_rank())
9-
if num_procs == 1
10-
return num_tasks
11-
end
22+
if num_procs == 1
23+
return num_tasks
24+
end
1225

13-
num_tasks_per_process,num_tasks_leftover = div(num_tasks,num_procs),mod(num_tasks,num_procs)
26+
num_tasks_per_process,num_tasks_leftover = div(num_tasks,num_procs),mod(num_tasks,num_procs)
1427

15-
num_tasks_on_proc = num_tasks_per_process + (proc_id <= mod(num_tasks,num_procs) ? 1 : 0 );
16-
task_start = num_tasks_per_process*(proc_id-1) + min(num_tasks_leftover+1,proc_id);
28+
num_tasks_on_proc = num_tasks_per_process + (proc_id <= mod(num_tasks,num_procs) ? 1 : 0 );
29+
task_start = num_tasks_per_process*(proc_id-1) + min(num_tasks_leftover+1,proc_id);
1730

18-
return task_start:(task_start+num_tasks_on_proc-1)
31+
return task_start:(task_start+num_tasks_on_proc-1)
1932
end
2033

2134
function split_across_processors(arr₁,num_procs=nworkers(),proc_id=worker_rank())
2235

23-
@assert(proc_id<=num_procs,"processor rank has to be less than number of workers engaged")
36+
@assert(proc_id<=num_procs,"processor rank has to be less than number of workers engaged")
2437

25-
num_tasks = length(arr₁);
38+
num_tasks = length(arr₁);
2639

27-
num_tasks_per_process,num_tasks_leftover = div(num_tasks,num_procs),mod(num_tasks,num_procs)
40+
num_tasks_per_process,num_tasks_leftover = div(num_tasks,num_procs),mod(num_tasks,num_procs)
2841

29-
num_tasks_on_proc = num_tasks_per_process + (proc_id <= mod(num_tasks,num_procs) ? 1 : 0 );
30-
task_start = num_tasks_per_process*(proc_id-1) + min(num_tasks_leftover+1,proc_id);
42+
num_tasks_on_proc = num_tasks_per_process + (proc_id <= mod(num_tasks,num_procs) ? 1 : 0 );
43+
task_start = num_tasks_per_process*(proc_id-1) + min(num_tasks_leftover+1,proc_id);
3144

32-
Iterators.take(Iterators.drop(arr₁,task_start-1),num_tasks_on_proc)
45+
Iterators.take(Iterators.drop(arr₁,task_start-1),num_tasks_on_proc)
3346
end
3447

3548
function split_product_across_processors(arr₁,arr₂,num_procs::Integer=nworkers(),proc_id::Integer=worker_rank())
@@ -94,16 +107,28 @@ function get_processor_range_from_split_array(arr₁,arr₂,modes_on_proc,num_pr
94107
return proc_id_start:proc_id_end
95108
end
96109

97-
function get_index_in_split_array(modes_on_proc,(arr₁_value,arr₂_value))
98-
if isnothing(modes_on_proc)
99-
return nothing
100-
end
101-
for (ind,(t1,t2)) in enumerate(modes_on_proc)
102-
if (t1==arr₁_value) && (t2 == arr₂_value)
103-
return ind
104-
end
105-
end
106-
nothing
110+
function get_processor_range_from_split_array(iter,iter_section,num_procs::Integer)
111+
112+
if isempty(iter_section)
113+
return 0:-1 # empty range
114+
end
115+
116+
tasks_arr = collect(iter_section)
117+
proc_id_start = get_processor_id_from_split_array(iter,first(tasks_arr),num_procs)
118+
proc_id_end = get_processor_id_from_split_array(iter,last(tasks_arr),num_procs)
119+
return proc_id_start:proc_id_end
120+
end
121+
122+
function get_index_in_split_array(iter_section,val::Tuple)
123+
if isnothing(iter_section)
124+
return nothing
125+
end
126+
for (ind,val_ind) in enumerate(iter_section)
127+
if val_ind == val
128+
return ind
129+
end
130+
end
131+
nothing
107132
end
108133

109134
function procid_and_mode_index(arr₁,arr₂,(arr₁_value,arr₂_value),num_procs)
@@ -113,6 +138,13 @@ function procid_and_mode_index(arr₁,arr₂,(arr₁_value,arr₂_value),num_pro
113138
return proc_id_mode,mode_index
114139
end
115140

141+
function procid_and_mode_index(iter,val::Tuple,num_procs::Integer)
142+
proc_id_mode = get_processor_id_from_split_array(iter,val,num_procs)
143+
modes_in_procid_file = split_across_processors(iter,num_procs,proc_id_mode)
144+
mode_index = get_index_in_split_array(modes_in_procid_file,val)
145+
return proc_id_mode,mode_index
146+
end
147+
116148
function mode_index_in_file(arr₁,arr₂,(arr₁_value,arr₂_value),num_procs,proc_id_mode)
117149
modes_in_procid_file = split_product_across_processors(arr₁,arr₂,num_procs,proc_id_mode)
118150
mode_index = get_index_in_split_array(modes_in_procid_file,(arr₁_value,arr₂_value))
@@ -132,16 +164,41 @@ workers_active(arr₁,arr₂) = workers_active(Iterators.product(arr₁,arr₂))
132164

133165
nworkers_active(args...) = length(workers_active(args...))
134166

135-
function minmax_from_split_array(iterable)
136-
arr₁_min,arr₂_min = first(iterable)
137-
arr₁_max,arr₂_max = arr₁_min,arr₂_min
138-
for (arr₁_value,arr₂_value) in iterable
139-
arr₁_min = min(arr₁_min,arr₁_value)
140-
arr₁_max = max(arr₁_max,arr₁_value)
141-
arr₂_min = min(arr₂_min,arr₂_value)
142-
arr₂_max = max(arr₂_max,arr₂_value)
143-
end
144-
return (arr₁_min=arr₁_min,arr₁_max=arr₁_max,arr₂_min=arr₂_min,arr₂_max=arr₂_max)
167+
function extrema_from_split_array(iterable)
168+
val_first = first(iterable)
169+
min_vals = collect(val_first)
170+
max_vals = copy(min_vals)
171+
172+
for val in iterable
173+
for (ind,vi) in enumerate(val)
174+
min_vals[ind] = min(min_vals[ind],vi)
175+
max_vals[ind] = max(max_vals[ind],vi)
176+
end
177+
end
178+
collect(zip(min_vals,max_vals))
179+
end
180+
181+
function moderanges_common_lastarray(iterable)
182+
m = extrema_from_split_array(iterable)
183+
lastvar_min = last(m)[1]
184+
lastvar_max = last(m)[2]
185+
186+
val_first = first(iterable)
187+
min_vals = collect(val_first[1:end-1])
188+
max_vals = copy(min_vals)
189+
190+
for val in iterable
191+
for (ind,vi) in enumerate(val[1:end-1])
192+
if val[end]==lastvar_min
193+
min_vals[ind] = min(min_vals[ind],vi)
194+
end
195+
if val[end]==lastvar_max
196+
max_vals[ind] = max(max_vals[ind],vi)
197+
end
198+
end
199+
end
200+
201+
[(m,lastvar_min) for m in min_vals],[(m,lastvar_max) for m in max_vals]
145202
end
146203

147204
function get_hostnames(procs_used=workers())
@@ -229,14 +286,4 @@ function pmap_onebatch_per_worker(f::Function,iterable,args...;num_workers=nothi
229286
return futures
230287
end
231288

232-
#############################################################################
233-
234-
export split_across_processors,split_product_across_processors,
235-
get_processor_id_from_split_array,
236-
procid_allmodes,mode_index_in_file,
237-
get_processor_range_from_split_array,workers_active,worker_rank,
238-
get_index_in_split_array,procid_and_mode_index,minmax_from_split_array,
239-
node_remotechannels,pmapsum,sum_at_node,pmap_onebatch_per_worker,
240-
get_nodes,get_hostnames,get_nprocs_node
241-
242289
end # module

0 commit comments

Comments
 (0)