Skip to content

Commit 3d981ce

Browse files
authored
Merge pull request #90 from gasche/parallel_find
parallel_find : ... -> (unit -> 'a option) -> 'a option
2 parents a43b54a + 4c770d3 commit 3d981ce

File tree

5 files changed

+100
-0
lines changed

5 files changed

+100
-0
lines changed

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## Next release
2+
3+
* `parallel_find` function that stops early (#129, #130)
4+
15
## v0.5.0
26

37
This release includes:

lib/task.ml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,3 +281,35 @@ let parallel_scan pool op elements =
281281

282282
prefix_s
283283
end
284+
285+
let parallel_find (type a) ?(chunk_size=0) ~start ~finish ~body pool =
286+
let pd = get_pool_data pool in
287+
let found : a option Atomic.t = Atomic.make None in
288+
let chunk_size = if chunk_size > 0 then chunk_size
289+
else begin
290+
let n_domains = (Array.length pd.domains) + 1 in
291+
let n_tasks = finish - start + 1 in
292+
if n_domains = 1 then n_tasks
293+
else max 1 (n_tasks/(8*n_domains))
294+
end
295+
in
296+
let rec work pool fn s e =
297+
if e - s < chunk_size then
298+
let i = ref s in
299+
while !i <= e && Option.is_none (Atomic.get found) do
300+
begin match fn !i with
301+
| None -> ()
302+
| Some _ as some -> Atomic.set found some
303+
end;
304+
incr i;
305+
done
306+
else if Option.is_some (Atomic.get found) then ()
307+
else begin
308+
let d = s + ((e - s) / 2) in
309+
let left = async pool (fun _ -> work pool fn s d) in
310+
work pool fn (d+1) e;
311+
await pool left
312+
end
313+
in
314+
work pool body start finish;
315+
Atomic.get found

lib/task.mli

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,20 @@ val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array
8686
8787
Must be called with a call to {!run} in the dynamic scope to handle the
8888
internal algebraic effects for task synchronization. *)
89+
90+
val parallel_find : ?chunk_size:int -> start:int -> finish:int ->
91+
body:(int -> 'a option) -> pool -> 'a option
92+
(** [parallel_find ~start ~finish ~body pool] calls [body] in parallel
93+
on the indices from [start] to [finish], in any order, until at
94+
least one of them returns [Some v].
95+
96+
Search stops when a value is found, but there is no guarantee that
97+
it stops as early as possible, other calls to [body] may happen in
98+
parallel or afterwards.
99+
100+
See {!parallel_for} for the description of the [chunk_size]
101+
parameter and the scheduling strategy.
102+
103+
Must be called with a call to {!run} in the dynamic scope to
104+
handle the internal algebraic effects for task synchronization.
105+
*)

test/dune

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@
8787
(modules test_task)
8888
(modes native))
8989

90+
(test
91+
(name test_parallel_find)
92+
(libraries domainslib)
93+
(modules test_parallel_find)
94+
(modes native))
95+
9096
(test
9197
(name test_deadlock)
9298
(libraries domainslib)

test/test_parallel_find.ml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
let len = 1_000_000
2+
let nb_needles = 4
3+
4+
let () = Random.init 42
5+
6+
let needles =
7+
Array.init nb_needles (fun _ -> Random.int len)
8+
9+
let input =
10+
let t = Array.make len false in
11+
needles |> Array.iter (fun needle ->
12+
t.(needle) <- true
13+
);
14+
t
15+
16+
open Domainslib
17+
18+
let search_needle pool ~chunk_size =
19+
Task.parallel_find pool ~chunk_size ~start:0 ~finish:(len - 1) ~body:(fun i ->
20+
if input.(i) then Some i
21+
else None
22+
)
23+
24+
let test_search pool ~chunk_size =
25+
match search_needle pool ~chunk_size with
26+
| None -> assert false
27+
| Some needle ->
28+
assert (Array.exists ((=) needle) needles)
29+
30+
let () =
31+
(* [num_domains] is the number of *new* domains spawned by the pool
32+
performing computations in addition to the current domain. *)
33+
let num_domains = Domain.recommended_domain_count () - 1 in
34+
Printf.eprintf "test_parallel_find on %d domains.\n" (num_domains + 1);
35+
let pool = Task.setup_pool ~num_domains ~name:"pool" () in
36+
Task.run pool begin fun () ->
37+
[0; 16; 32; 1000] |> List.iter (fun chunk_size ->
38+
test_search pool ~chunk_size)
39+
end;
40+
Task.teardown_pool pool;
41+
prerr_endline "Success.";

0 commit comments

Comments
 (0)