Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
_build
_opam
*.tmp
*.pdf
36 changes: 36 additions & 0 deletions src/core/fut.ml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,42 @@ let for_list ~on l f : unit t =
let futs = List.rev_map (fun x -> spawn ~on (fun () -> f x)) l in
wait_list futs

type 'a iter = ('a -> unit) -> unit

let for_iter ~on (it : _ iter) f : unit t =
let fut, promise = make () in

(* start at one for the task that traverses [it] *)
let missing = A.make 1 in

(* callback called when a future is resolved *)
let on_res = function
| None ->
let n = A.fetch_and_add missing (-1) in
if n = 1 then
(* last future, we know they all succeeded, so resolve [fut] *)
fulfill promise (Ok ())
| Some e_bt ->
(* immediately cancel all other [on_res] *)
let n = A.exchange missing 0 in
if n > 0 then
(* we're the only one to set to 0, so we can fulfill [fut]
with an error. *)
fulfill promise (Error e_bt)
in

let fut_iter =
spawn ~on (fun () ->
it (fun item ->
A.incr missing;
let fut = spawn ~on (fun () -> f item) in
on_result_ignore fut on_res))
in

on_result_ignore fut_iter on_res;

fut

(* ### blocking ### *)

let push_queue_ _tr q () = Bb_queue.push q ()
Expand Down
8 changes: 8 additions & 0 deletions src/core/fut.mli
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
(** [for_list ~on l f] is like [for_array ~on (Array.of_list l) f].
@since 0.2 *)

type 'a iter = ('a -> unit) -> unit
(** ['a iter] is an iterator on ['a].
@since NEXT_RELEASE *)

val for_iter : on:Runner.t -> 'a iter -> ('a -> unit) -> unit t
(** [for_iter ~on iter f] runs [f] on every item in [iter] in parallel.
@since NEXT_RELEASE *)

(** {2 Await}

This suspends the current task using an OCaml 5 algebraic effect, and makes
Expand Down
21 changes: 21 additions & 0 deletions test/t_futs1.ml
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,24 @@ let () =
in

List.iter run_for [ 1; 10; 50; 1_000 ]

let () =
let n_items = Atomic.make 0 in
let fut =
Fut.for_iter ~on:pool (fun _yield -> ()) (fun _item -> Atomic.incr n_items)
in
Fut.wait_block_exn fut;
assert (Atomic.get n_items = 0)

let () =
let run_for n =
let l = List.init n (fun x -> x) in
let sum = Atomic.make 0 in
let iter = fun yield -> List.iter yield l in
Fut.for_iter ~on:pool iter (fun x ->
ignore (Atomic.fetch_and_add sum x : int))
|> Fut.wait_block_exn;
assert (Atomic.get sum = List.fold_left ( + ) 0 l)
in

List.iter run_for [ 0; 1; 2; 3; 10; 50; 1_000 ]