File tree Expand file tree Collapse file tree 4 files changed +94
-0
lines changed Expand file tree Collapse file tree 4 files changed +94
-0
lines changed Original file line number Diff line number Diff line change @@ -250,3 +250,35 @@ let parallel_scan pool op elements =
250250
251251 prefix_s
252252 end
253+
254+ let parallel_find (type a ) ?(chunk_size =0 ) ~start ~finish ~body pool =
255+ let pd = get_pool_data pool in
256+ let found : a option Atomic.t = Atomic. make None in
257+ let chunk_size = if chunk_size > 0 then chunk_size
258+ else begin
259+ let n_domains = (Array. length pd.domains) + 1 in
260+ let n_tasks = finish - start + 1 in
261+ if n_domains = 1 then n_tasks
262+ else max 1 (n_tasks/ (8 * n_domains))
263+ end
264+ in
265+ let rec work pool fn s e =
266+ if e - s < chunk_size then
267+ let i = ref s in
268+ while ! i < = e && Option. is_none (Atomic. get found) do
269+ begin match fn ! i with
270+ | None -> ()
271+ | Some _ as some -> Atomic. set found some
272+ end ;
273+ incr i;
274+ done
275+ else if Option. is_some (Atomic. get found) then ()
276+ else begin
277+ let d = s + ((e - s) / 2 ) in
278+ let left = async pool (fun _ -> work pool fn s d) in
279+ work pool fn (d+ 1 ) e;
280+ await pool left
281+ end
282+ in
283+ work pool body start finish;
284+ Atomic. get found
Original file line number Diff line number Diff line change @@ -87,3 +87,20 @@ val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array
8787
8888 Must be called with a call to {!run} in the dynamic scope to handle the
8989 internal algebraic effects for task synchronization. *)
90+
91+ val parallel_find : ?chunk_size : int -> start :int -> finish :int ->
92+ body :(int -> 'a option ) -> pool -> 'a option
93+ (* * [parallel_find ~start ~finish ~body pool] calls [body] in parallel
94+ on the indices from [start] to [finish], in any order, until at
95+ least one of them returns [Some v].
96+
97+ Search stops when a value is found, but there is no guarantee that
98+ it stops as early as possible, other calls to [body] may happen in
99+ parallel or afterwards.
100+
101+ See {!parallel_for} for the description of the [chunk_size]
102+ parameter and the scheduling strategy.
103+
104+ Must be called with a call to {!run} in the dynamic scope to
105+ handle the internal algebraic effects for task synchronization.
106+ *)
Original file line number Diff line number Diff line change 8181 (modules test_task)
8282 (modes native))
8383
84+ (test
85+ (name test_parallel_find)
86+ (libraries domainslib)
87+ (modules test_parallel_find)
88+ (modes native))
89+
8490(test
8591 (name test_deadlock)
8692 (libraries domainslib)
Original file line number Diff line number Diff line change 1+ let len = 1_000_000
2+ let nb_needles = 4
3+
4+ let () = Random. init 42
5+
6+ let needles =
7+ Array. init nb_needles (fun _ -> Random. int len)
8+
9+ let input =
10+ let t = Array. make len false in
11+ needles |> Array. iter (fun needle ->
12+ t.(needle) < - true
13+ );
14+ t
15+
16+ open Domainslib
17+
18+ let search_needle pool ~chunk_size =
19+ Task. parallel_find pool ~chunk_size ~start: 0 ~finish: (len - 1 ) ~body: (fun i ->
20+ if input.(i) then Some i
21+ else None
22+ )
23+
24+ let test_search pool ~chunk_size =
25+ match search_needle pool ~chunk_size with
26+ | None -> assert false
27+ | Some needle ->
28+ assert (Array. exists ((= ) needle) needles)
29+
30+ let () =
31+ let num_domains = Domain. recommended_domain_count () in
32+ Printf. eprintf " test_parallel_find on %d domains.\n " num_domains;
33+ let pool = Task. setup_pool ~num_domains ~name: " pool" () in
34+ Task. run pool begin fun () ->
35+ [0 ; 16 ; 32 ; 1000 ] |> List. iter (fun chunk_size ->
36+ test_search pool ~chunk_size )
37+ end;
38+ Task. teardown_pool pool;
39+ prerr_endline " Success." ;
You can’t perform that action at this time.
0 commit comments