Skip to content

Commit bd7de32

Browse files
authored
Merge pull request #45 from Sudha247/named_pools2
Add named pools
2 parents 85569fb + 340773a commit bd7de32

13 files changed

+119
-45
lines changed

lib/task.ml

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@ type task_msg =
88
Task : 'a task * 'a promise -> task_msg
99
| Quit : task_msg
1010

11-
type pool =
12-
{domains : unit Domain.t array;
13-
task_chan : task_msg Multi_channel.t}
11+
type pool_data = {
12+
domains : unit Domain.t array;
13+
task_chan : task_msg Multi_channel.t;
14+
name: string option
15+
}
16+
17+
type pool = pool_data option Atomic.t
1418

1519
let do_task f p =
1620
try
@@ -22,7 +26,15 @@ let do_task f p =
2226
| TasksActive -> raise e
2327
| _ -> ()
2428

25-
let setup_pool ~num_additional_domains =
29+
let named_pools = Hashtbl.create 8
30+
31+
let named_pools_mutex = Mutex.create ()
32+
33+
let setup_pool ?name ~num_additional_domains () =
34+
if num_additional_domains < 0 then
35+
raise (Invalid_argument
36+
"Task.setup_pool: num_additional_domains must be at least 0")
37+
else
2638
let task_chan = Multi_channel.make (num_additional_domains+1) in
2739
let rec worker () =
2840
match Multi_channel.recv task_chan with
@@ -32,19 +44,34 @@ let setup_pool ~num_additional_domains =
3244
worker ()
3345
in
3446
let domains = Array.init num_additional_domains (fun _ -> Domain.spawn worker) in
35-
{domains; task_chan}
47+
let p = Atomic.make (Some {domains; task_chan; name}) in
48+
begin match name with
49+
| None -> ()
50+
| Some x ->
51+
Mutex.lock named_pools_mutex;
52+
Hashtbl.add named_pools x p;
53+
Mutex.unlock named_pools_mutex
54+
end;
55+
p
56+
57+
let get_pool_data p =
58+
match Atomic.get p with
59+
| None -> raise (Invalid_argument "pool already torn down")
60+
| Some p -> p
3661

3762
let async pool task =
63+
let pd = get_pool_data pool in
3864
let p = Atomic.make None in
39-
Multi_channel.send pool.task_chan (Task(task,p));
65+
Multi_channel.send pd.task_chan (Task(task,p));
4066
p
4167

4268
let rec await pool promise =
69+
let pd = get_pool_data pool in
4370
match Atomic.get promise with
4471
| None ->
4572
begin
4673
try
47-
match Multi_channel.recv_poll pool.task_chan with
74+
match Multi_channel.recv_poll pd.task_chan with
4875
| Task (t, p) -> do_task t p
4976
| Quit -> raise TasksActive
5077
with
@@ -55,16 +82,37 @@ let rec await pool promise =
5582
| Some (Error e) -> raise e
5683

5784
let teardown_pool pool =
58-
for _i=1 to Array.length pool.domains do
59-
Multi_channel.send pool.task_chan Quit
85+
let pd = get_pool_data pool in
86+
for _i=1 to Array.length pd.domains do
87+
Multi_channel.send pd.task_chan Quit
6088
done;
6189
Multi_channel.clear_local_state ();
62-
Array.iter Domain.join pool.domains
90+
Array.iter Domain.join pd.domains;
91+
(* Remove the pool from the table *)
92+
begin match pd.name with
93+
| None -> ()
94+
| Some n ->
95+
Mutex.lock named_pools_mutex;
96+
Hashtbl.remove named_pools n;
97+
Mutex.unlock named_pools_mutex
98+
end;
99+
Atomic.set pool None
100+
101+
let lookup_pool name =
102+
Mutex.lock named_pools_mutex;
103+
let p = Hashtbl.find_opt named_pools name in
104+
Mutex.unlock named_pools_mutex;
105+
p
106+
107+
let get_num_domains pool =
108+
let pd = get_pool_data pool in
109+
Array.length pd.domains + 1
63110

64111
let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun init =
112+
let pd = get_pool_data pool in
65113
let chunk_size = if chunk_size > 0 then chunk_size
66114
else begin
67-
let n_domains = (Array.length pool.domains) + 1 in
115+
let n_domains = (Array.length pd.domains) + 1 in
68116
let n_tasks = finish - start + 1 in
69117
if n_domains = 1 then n_tasks
70118
else max 1 (n_tasks/(8*n_domains))
@@ -88,9 +136,10 @@ let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun ini
88136
reduce_fun init (work start finish)
89137

90138
let parallel_for ?(chunk_size=0) ~start ~finish ~body pool =
139+
let pd = get_pool_data pool in
91140
let chunk_size = if chunk_size > 0 then chunk_size
92141
else begin
93-
let n_domains = (Array.length pool.domains) + 1 in
142+
let n_domains = (Array.length pd.domains) + 1 in
94143
let n_tasks = finish - start + 1 in
95144
if n_domains = 1 then n_tasks
96145
else max 1 (n_tasks/(8*n_domains))
@@ -109,7 +158,7 @@ let parallel_for ?(chunk_size=0) ~start ~finish ~body pool =
109158
work pool body start finish
110159

111160
let parallel_scan pool op elements =
112-
161+
let pd = get_pool_data pool in
113162
let scan_part op elements prefix_sum start finish =
114163
assert (Array.length elements > (finish - start));
115164
for i = (start + 1) to finish do
@@ -123,7 +172,7 @@ let parallel_scan pool op elements =
123172
done
124173
in
125174
let n = Array.length elements in
126-
let p = (Array.length pool.domains) + 1 in
175+
let p = (Array.length pd.domains) + 1 in
127176
let prefix_s = Array.copy elements in
128177

129178
parallel_for pool ~chunk_size:1 ~start:0 ~finish:(p - 1)

lib/task.mli

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,26 @@ type 'a promise
77
type pool
88
(** Type of task pool *)
99

10-
val setup_pool : num_additional_domains:int -> pool
10+
val setup_pool : ?name:string -> num_additional_domains:int -> unit -> pool
1111
(** Sets up a task execution pool with [num_additional_domains + 1] domains
12-
* including the current domain *)
12+
* including the current domain. If [name] is provided, the pool is mapped to
13+
* [name] which can be looked up later with [lookup_pool name].
14+
* Raises [Invalid_argumet] when [num_additional_domains] is less than 0. *)
1315

1416
exception TasksActive
1517

1618
val teardown_pool : pool -> unit
1719
(** Tears down the task execution pool.
1820
* Raises [TasksActive] exception if any tasks are currently active. *)
1921

22+
val lookup_pool : string -> pool option
23+
(** [lookup_pool name] returns [Some pool] if [pool] is associated to [name] or
24+
* returns [None] if no value is associated to it. *)
25+
26+
val get_num_domains : pool -> int
27+
(** [get_num_domains pool] returns the total number of domains in [pool]
28+
* including the parent domain. *)
29+
2030
val async : pool -> 'a task -> 'a promise
2131
(** [async p t] runs the task [t] asynchronously in the pool [p]. The function
2232
* returns a promise [r] in which the result of the task [t] will be stored.

test/LU_decomposition_multicore.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ let lup pool (a0 : float array) =
5454
a
5555

5656
let () =
57-
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
57+
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
5858
let a = parallel_create pool
5959
(fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in
6060
let lu = lup pool a in

test/enumerate_par.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ let n = try int_of_string Sys.argv.(2) with _ -> 100
44
module T = Domainslib.Task
55

66
let _ =
7-
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
7+
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
88
T.parallel_for p ~start:0 ~finish:(n-1) ~chunk_size:16 ~body:(fun i ->
99
print_string @@ Printf.sprintf "[%d] %d\n%!" (Domain.self () :> int) i);
1010
T.teardown_pool p

test/fib_par.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ let rec fib_par pool n =
1515
T.await pool a + T.await pool b
1616

1717
let main =
18-
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
18+
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
1919
let res = fib_par pool n in
2020
T.teardown_pool pool;
2121
Printf.printf "fib(%d) = %d\n" n res

test/game_of_life_multicore.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ let rec repeat pool n =
6262
| _-> next pool; repeat pool (n-1)
6363

6464
let ()=
65-
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
65+
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
6666
print !rg;
6767
repeat pool n_times;
6868
print !rg;

test/prefix_sum.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ let gen n = Array.make n 1 (*(fun _ -> Random.int n)*)
77
let prefix_sum pool = T.parallel_scan pool (+)
88

99
let _ =
10-
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
10+
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
1111
let arr = gen n in
1212
let t = Unix.gettimeofday() in
1313
let _ = prefix_sum pool arr in

test/spectralnorm2_multicore.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ let eval_AtA_times_u pool u v =
3434
eval_A_times_u pool u w; eval_At_times_u pool w v
3535

3636
let () =
37-
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
37+
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
3838
let u = Array.make n 1.0 and v = Array.make n 0.0 in
3939
for _i = 0 to 9 do
4040
eval_AtA_times_u pool u v; eval_AtA_times_u pool v u

test/sum_par.ml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ module T = Domainslib.Task
55

66
let _ =
77
(* use parallel_for_reduce *)
8-
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
8+
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
99
let sum =
1010
T.parallel_for_reduce p (+) 0 ~chunk_size:(n/(4*num_domains)) ~start:0
1111
~finish:(n-1) ~body:(fun _i -> 1)
@@ -16,7 +16,7 @@ let _ =
1616

1717
let _ =
1818
(* explictly use empty pool and default chunk_size *)
19-
let p = T.setup_pool ~num_additional_domains:0 in
19+
let p = T.setup_pool ~num_additional_domains:0 () in
2020
let sum = Atomic.make 0 in
2121
T.parallel_for p ~start:0 ~finish:(n-1)
2222
~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1));
@@ -27,7 +27,7 @@ let _ =
2727

2828
let _ =
2929
(* configured num_domains and default chunk_size *)
30-
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) in
30+
let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
3131
let sum = Atomic.make 0 in
3232
T.parallel_for p ~start:0 ~finish:(n-1)
3333
~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1));

test/summed_area_table.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ let calc_table pool mat =
2929
let _ =
3030
let m = Array.make_matrix size size 1 (*Array.init size (fun _ -> Array.init size (fun _ -> Random.int size))*)
3131
in
32-
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) in
32+
let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in
3333
let _ = calc_table pool m in
3434

3535
(* for i = 0 to size-1 do

0 commit comments

Comments
 (0)