diff --git a/bench/bench_binaries.ml b/bench/bench_binaries.ml index b8adee3b..953d41b6 100644 --- a/bench/bench_binaries.ml +++ b/bench/bench_binaries.ml @@ -15,6 +15,7 @@ let paths = lib "picos"; lib "picos.domain"; lib "picos.thread"; + lib "picos_aux.adaptive_backoff"; lib "picos_aux.htbl"; lib "picos_aux.mpmcq"; lib "picos_aux.mpscq"; diff --git a/bench/bench_mpmcq.ml b/bench/bench_mpmcq.ml index 508562fc..4e4f5dbb 100644 --- a/bench/bench_mpmcq.ml +++ b/bench/bench_mpmcq.ml @@ -56,9 +56,7 @@ let run_one ~budgetf ~n_adders ~n_takers () = if 0 < n then begin match Mpmcq.pop_exn t with | _ -> loop (n - 1) - | exception Mpmcq.Empty -> - Backoff.once Backoff.default |> ignore; - loop n + | exception Mpmcq.Empty -> loop n end else work () in diff --git a/bench/bench_mpscq.ml b/bench/bench_mpscq.ml index a1a46e4e..54ab406f 100644 --- a/bench/bench_mpscq.ml +++ b/bench/bench_mpscq.ml @@ -51,9 +51,7 @@ let run_one ~budgetf ~n_adders () = if 0 < n then match Mpscq.pop_exn t with | _ -> loop (n - 1) - | exception Mpscq.Empty -> - Backoff.once Backoff.default |> ignore; - loop n + | exception Mpscq.Empty -> loop n in loop n_msgs in diff --git a/lib/picos_aux.adaptive_backoff/adaptive_backoff.ml b/lib/picos_aux.adaptive_backoff/adaptive_backoff.ml new file mode 100644 index 00000000..92cfa7c3 --- /dev/null +++ b/lib/picos_aux.adaptive_backoff/adaptive_backoff.ml @@ -0,0 +1,24 @@ +let has_domains = 1 < Domain.recommended_domain_count () +let n = 128 * 4 +let counters = Array.init n (fun _ -> Atomic.make 0) + +let[@inline never] once_at counter ~log_scale = + if has_domains then begin + let n_contending_threads = Atomic.fetch_and_add counter 1 + 1 in + let n = ref (Random.int ((n_contending_threads lsl log_scale) + 0)) in + while 0 <= !n do + Domain.cpu_relax (); + decr n + done; + Atomic.decr counter + end + +let[@inline never] once ~random_key ~log_scale = + let i = random_key land (n - 1) in + let counter = Array.unsafe_get counters i in + once_at counter ~log_scale + +let[@inline] once_unless_alone ~random_key ~log_scale = + let i = random_key land (n - 1) in + let counter = Array.unsafe_get counters i in + if 0 <> Atomic.get counter then once_at counter ~log_scale diff --git a/lib/picos_aux.adaptive_backoff/adaptive_backoff.mli b/lib/picos_aux.adaptive_backoff/adaptive_backoff.mli new file mode 100644 index 00000000..99a473dc --- /dev/null +++ b/lib/picos_aux.adaptive_backoff/adaptive_backoff.mli @@ -0,0 +1,2 @@ +val once : random_key:int -> log_scale:int -> unit +val once_unless_alone : random_key:int -> log_scale:int -> unit diff --git a/lib/picos_aux.adaptive_backoff/domain.ocaml4.ml b/lib/picos_aux.adaptive_backoff/domain.ocaml4.ml new file mode 100644 index 00000000..d21eb739 --- /dev/null +++ b/lib/picos_aux.adaptive_backoff/domain.ocaml4.ml @@ -0,0 +1,2 @@ +let cpu_relax = Fun.id +let recommended_domain_count () = 1 diff --git a/lib/picos_aux.adaptive_backoff/dune b/lib/picos_aux.adaptive_backoff/dune new file mode 100644 index 00000000..ffef448b --- /dev/null +++ b/lib/picos_aux.adaptive_backoff/dune @@ -0,0 +1,13 @@ +(library + (name picos_aux_adaptive_backoff) + (public_name picos_aux.adaptive_backoff)) + +(rule + (package picos_aux) + (targets domain.ml) + (deps domain.ocaml4.ml) + (enabled_if + (< %{ocaml_version} 5.0.0)) + (action + (progn + (copy domain.ocaml4.ml domain.ml)))) diff --git a/lib/picos_aux.mpmcq/dune b/lib/picos_aux.mpmcq/dune index fe0406d6..ea89b6e0 100644 --- a/lib/picos_aux.mpmcq/dune +++ b/lib/picos_aux.mpmcq/dune @@ -1,7 +1,7 @@ (library (name picos_aux_mpmcq) (public_name picos_aux.mpmcq) - (libraries backoff multicore-magic)) + (libraries picos_aux.adaptive_backoff multicore-magic)) (mdx (package picos_meta) diff --git a/lib/picos_aux.mpmcq/picos_aux_mpmcq.ml b/lib/picos_aux.mpmcq/picos_aux_mpmcq.ml index 84d69125..9ada2bda 100644 --- a/lib/picos_aux.mpmcq/picos_aux_mpmcq.ml +++ b/lib/picos_aux.mpmcq/picos_aux_mpmcq.ml @@ -1,6 +1,11 @@ +open Picos_aux_adaptive_backoff module Atomic = Multicore_magic.Transparent_atomic -type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t } +type 'a t = { + random_key : int; + head : 'a head Atomic.t; + tail : 'a tail Atomic.t; +} and ('a, _) tdt = | Cons : { @@ -27,6 +32,7 @@ and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed] and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed] let create ?padded () = + let random_key = Int64.to_int (Random.bits64 ()) in let head = Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as ?padded in @@ -34,7 +40,7 @@ let create ?padded () = Atomic.make (T (Tail { counter = 0; move = Used })) |> Multicore_magic.copy_as ?padded in - Multicore_magic.copy_as ?padded { head; tail } + Multicore_magic.copy_as ?padded { random_key; head; tail } let rec rev (suffix : (_, [< `Cons ]) tdt) = function | T (Snoc { counter; prefix; value }) -> @@ -47,117 +53,160 @@ let rev = function (Cons { counter; value; suffix = H (Head { counter = counter + 1 }) }) prefix -let rec push t value backoff = function +let log_scale = 10 + +let[@inline] backoff { random_key; _ } = + Adaptive_backoff.once ~random_key ~log_scale + +let[@inline] backoff_unless_alone { random_key; _ } = + Adaptive_backoff.once_unless_alone ~random_key ~log_scale + +let rec push t value = function | T (Snoc snoc_r) as prefix -> let after = Snoc { counter = snoc_r.counter + 1; prefix; value } in - if not (Atomic.compare_and_set t.tail prefix (T after)) then - let backoff = Backoff.once backoff in - push t value backoff (Atomic.fenceless_get t.tail) + if not (Atomic.compare_and_set t.tail prefix (T after)) then begin + backoff t; + push t value (Atomic.fenceless_get t.tail) + end | T (Tail tail_r) as prefix -> begin match tail_r.move with | Used -> let after = Snoc { counter = tail_r.counter + 1; prefix; value } in - if not (Atomic.compare_and_set t.tail prefix (T after)) then - let backoff = Backoff.once backoff in - push t value backoff (Atomic.fenceless_get t.tail) + if not (Atomic.compare_and_set t.tail prefix (T after)) then begin + backoff t; + push t value (Atomic.fenceless_get t.tail) + end | Snoc move_r as move -> begin match Atomic.get t.head with | H (Head head_r as head) when head_r.counter < move_r.counter -> let after = rev move in - if - Atomic.fenceless_get t.head == H head - && Atomic.compare_and_set t.head (H head) (H after) - then tail_r.move <- Used + if Atomic.fenceless_get t.head == H head then + if Atomic.compare_and_set t.head (H head) (H after) then + tail_r.move <- Used + else backoff t | _ -> tail_r.move <- Used end; - push t value backoff (Atomic.get t.tail) + push t value (Atomic.get t.tail) end exception Empty -let rec pop t backoff = function +let rec pop t = function | H (Cons cons_r as cons) -> if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value - else - let backoff = Backoff.once backoff in - pop t backoff (Atomic.fenceless_get t.head) + else begin + backoff t; + pop t (Atomic.fenceless_get t.head) + end | H (Head head_r as head) -> begin match Atomic.get t.tail with | T (Snoc snoc_r as move) -> if head_r.counter = snoc_r.counter then if Atomic.compare_and_set t.tail (T move) snoc_r.prefix then snoc_r.value - else pop t backoff (Atomic.fenceless_get t.head) + else begin + backoff t; + pop t (Atomic.fenceless_get t.head) + end else let (Tail tail_r as tail : (_, [ `Tail ]) tdt) = Tail { counter = snoc_r.counter; move } in let new_head = Atomic.get t.head in - if new_head != H head then pop t backoff new_head + if new_head != H head then begin + (* backoff t; *) + pop t new_head + end else if Atomic.compare_and_set t.tail (T move) (T tail) then let (Cons cons_r) = rev move in let after = cons_r.suffix in let new_head = Atomic.get t.head in - if new_head != H head then pop t backoff new_head + if new_head != H head then begin + (* backoff t; *) + pop t new_head + end else if Atomic.compare_and_set t.head (H head) after then begin tail_r.move <- Used; cons_r.value end - else - let backoff = Backoff.once backoff in - pop t backoff (Atomic.fenceless_get t.head) - else pop t backoff (Atomic.fenceless_get t.head) + else begin + backoff t; + pop t (Atomic.fenceless_get t.head) + end + else begin + (* backoff t; *) + pop t (Atomic.fenceless_get t.head) + end | T (Tail tail_r) -> begin match tail_r.move with | Used -> let new_head = Atomic.get t.head in - if new_head != H head then pop t backoff new_head - else raise_notrace Empty + if new_head != H head then begin + (* backoff t; *) + pop t new_head + end + else begin + backoff_unless_alone t; + raise_notrace Empty + end | Snoc move_r as move -> if head_r.counter < move_r.counter then let (Cons cons_r) = rev move in let after = cons_r.suffix in let new_head = Atomic.get t.head in - if new_head != H head then pop t backoff new_head + if new_head != H head then begin + (* backoff t; *) + pop t new_head + end else if Atomic.compare_and_set t.head (H head) after then begin tail_r.move <- Used; cons_r.value end - else - let backoff = Backoff.once backoff in - pop t backoff (Atomic.fenceless_get t.head) + else begin + backoff t; + pop t (Atomic.fenceless_get t.head) + end else let new_head = Atomic.get t.head in - if new_head != H head then pop t backoff new_head - else raise_notrace Empty + if new_head != H head then begin + (* backoff t; *) + pop t new_head + end + else begin + backoff_unless_alone t; + raise_notrace Empty + end end end -let rec push_head t value backoff = +let rec push_head t value = match Atomic.get t.head with | H (Cons cons_r) as suffix -> let after = Cons { counter = cons_r.counter - 1; value; suffix } in - if not (Atomic.compare_and_set t.head suffix (H after)) then - push_head t value (Backoff.once backoff) + if not (Atomic.compare_and_set t.head suffix (H after)) then begin + backoff t; + push_head t value + end | H (Head head_r) as head -> begin match Atomic.get t.tail with | T (Snoc snoc_r as move) -> - if Atomic.get t.head != head then push_head t value backoff + if Atomic.get t.head != head then push_head t value else if head_r.counter = snoc_r.counter then begin let prefix = T (Snoc { snoc_r with value }) in let after = Snoc { snoc_r with counter = snoc_r.counter + 1; prefix } in - if not (Atomic.compare_and_set t.tail (T move) (T after)) then - push_head t value (Backoff.once backoff) + if not (Atomic.compare_and_set t.tail (T move) (T after)) then begin + backoff t; + push_head t value + end end - else + else begin let tail = Tail { counter = snoc_r.counter; move } in - let backoff = - if Atomic.compare_and_set t.tail (T move) (T tail) then backoff - else Backoff.once backoff - in - push_head t value backoff + if not (Atomic.compare_and_set t.tail (T move) (T tail)) then + backoff t; + push_head t value + end | T (Tail tail_r) as prefix -> begin match tail_r.move with | Used -> @@ -165,21 +214,23 @@ let rec push_head t value backoff = let tail = Snoc { counter = tail_r.counter + 1; value; prefix } in - if not (Atomic.compare_and_set t.tail prefix (T tail)) then - push_head t value (Backoff.once backoff) + if not (Atomic.compare_and_set t.tail prefix (T tail)) then begin + backoff t; + push_head t value + end end - else push_head t value backoff + else push_head t value | Snoc move_r as move -> begin match Atomic.get t.head with | H (Head head_r as head) when head_r.counter < move_r.counter -> let after = rev move in - if - Atomic.fenceless_get t.head == H head - && Atomic.compare_and_set t.head (H head) (H after) - then tail_r.move <- Used + if Atomic.fenceless_get t.head == H head then + if Atomic.compare_and_set t.head (H head) (H after) then + tail_r.move <- Used + else backoff t | _ -> tail_r.move <- Used end; - push_head t value backoff + push_head t value end end @@ -204,9 +255,5 @@ let[@inline] length t = tail_at - head_at + 1 let[@inline] is_empty t = length t == 0 -let[@inline] pop_exn t = pop t Backoff.default (Atomic.fenceless_get t.head) - -let[@inline] push t value = - push t value Backoff.default (Atomic.fenceless_get t.tail) - -let[@inline] push_head t value = push_head t value Backoff.default +let[@inline] pop_exn t = pop t (Atomic.fenceless_get t.head) +let[@inline] push t value = push t value (Atomic.fenceless_get t.tail) diff --git a/lib/picos_aux.mpscq/dune b/lib/picos_aux.mpscq/dune index 94d15584..c1b14356 100644 --- a/lib/picos_aux.mpscq/dune +++ b/lib/picos_aux.mpscq/dune @@ -1,7 +1,7 @@ (library (name picos_aux_mpscq) (public_name picos_aux.mpscq) - (libraries backoff multicore-magic)) + (libraries picos_aux.adaptive_backoff multicore-magic)) (mdx (package picos_meta) diff --git a/lib/picos_aux.mpscq/picos_aux_mpscq.ml b/lib/picos_aux.mpscq/picos_aux_mpscq.ml index 4e8ecef4..ee8a0e33 100644 --- a/lib/picos_aux.mpscq/picos_aux_mpscq.ml +++ b/lib/picos_aux.mpscq/picos_aux_mpscq.ml @@ -1,4 +1,10 @@ -type 'a t = { tail : 'a tail Atomic.t; head : 'a head Atomic.t } +open Picos_aux_adaptive_backoff + +type 'a t = { + random_key : int; + tail : 'a tail Atomic.t; + head : 'a head Atomic.t; +} and ('a, _) tdt = | Head : ('a, [> `Head ]) tdt @@ -14,51 +20,57 @@ exception Empty let[@inline never] impossible () = invalid_arg "multiple consumers not allowed" let create ?padded () = + let random_key = Int64.to_int (Random.bits64 ()) in let tail = Multicore_magic.copy_as ?padded @@ Atomic.make (T Tail) in let head = Multicore_magic.copy_as ?padded @@ Atomic.make (H Head) in - Multicore_magic.copy_as ?padded { tail; head } + Multicore_magic.copy_as ?padded { random_key; tail; head } + +let log_scale = 8 + +let[@inline] backoff { random_key; _ } = + Adaptive_backoff.once ~random_key ~log_scale -let rec push_head head (Cons r as after : (_, [< `Cons ]) tdt) backoff = - let before = Atomic.get head in +let[@inline] backoff_unless_alone { random_key; _ } = + Adaptive_backoff.once_unless_alone ~random_key ~log_scale + +let rec push_head t (Cons r as after : (_, [< `Cons ]) tdt) = + backoff t; + let before = Atomic.get t.head in r.next <- before; - if not (Atomic.compare_and_set head before (H after)) then - push_head head after (Backoff.once backoff) + if not (Atomic.compare_and_set t.head before (H after)) then push_head t after let push_head t value = - let head = t.head in - let before = Atomic.get head in + let before = Atomic.get t.head in let after = Cons { value; next = before } in - if not (Atomic.compare_and_set head before (H after)) then - push_head head after Backoff.default + if not (Atomic.compare_and_set t.head before (H after)) then push_head t after let rec append_to (Cons cons_r : (_, [< `Cons ]) tdt) tail = match cons_r.next with | H Head -> cons_r.next <- tail | H (Cons _ as head) -> append_to head tail -let rec push tail (Snoc r as after : (_, [< `Snoc ]) tdt) backoff = - let before = Atomic.get tail in +let rec push t (Snoc r as after : (_, [< `Snoc ]) tdt) = + backoff t; + let before = Atomic.get t.tail in r.prev <- before; - if not (Atomic.compare_and_set tail before (T after)) then - push tail after (Backoff.once backoff) + if not (Atomic.compare_and_set t.tail before (T after)) then push t after let push t value = - let tail = t.tail in - let before = Atomic.get tail in + let before = Atomic.get t.tail in let after = Snoc { prev = before; value } in - if not (Atomic.compare_and_set tail before (T after)) then - push tail after Backoff.default + if not (Atomic.compare_and_set t.tail before (T after)) then push t after let rec rev_to head (Snoc r : (_, [< `Snoc ]) tdt) = let head = Cons { value = r.value; next = H head } in match r.prev with T Tail -> head | T (Snoc _ as prev) -> rev_to head prev -let rec pop_exn t backoff = function +let rec pop_exn t = function | H (Cons head_r as head) -> if Atomic.compare_and_set t.head (H head) head_r.next then head_r.value - else - let backoff = Backoff.once backoff in - pop_exn t backoff (Atomic.get t.head) + else begin + backoff t; + pop_exn t (Atomic.get t.head) + end | H Head -> begin match Atomic.get t.tail with | T (Snoc tail_r) -> begin @@ -79,8 +91,7 @@ let rec pop_exn t backoff = function | H Head -> snoc_r.value | H (Cons _ as head) -> let next = Cons { value = snoc_r.value; next = H Head } in - append_to head (H next); - pop_head_exn t backoff head + append_to_and_pop_head_exn t next head end | T (Snoc _ as prev) -> begin let next = Cons { value = snoc_r.value; next = H Head } in @@ -92,25 +103,38 @@ let rec pop_exn t backoff = function else match Atomic.get t.head with | H Head -> impossible () - | H (Cons _ as head) -> - append_to head (H next); - pop_head_exn t backoff head + | H (Cons _ as head) -> append_to_and_pop_head_exn t next head end end | T Tail -> begin match Atomic.get t.head with - | H Head -> raise_notrace Empty - | H (Cons _ as head) -> pop_head_exn t backoff head + | H Head -> + backoff_unless_alone t; + raise_notrace Empty + | H (Cons head_r as head) -> + (* We assume [push_head] is rare. *) + if Atomic.compare_and_set t.head (H head) head_r.next then + head_r.value + else begin + backoff t; + pop_exn t (Atomic.get t.head) + end end end -and pop_head_exn t backoff (Cons head_r as head : (_, [< `Cons ]) tdt) = - if Atomic.compare_and_set t.head (H head) head_r.next then head_r.value - else - let backoff = Backoff.once backoff in - pop_exn t backoff (Atomic.get t.head) +and append_to_and_pop_head_exn t next + (Cons head_r as head : (_, [< `Cons ]) tdt) = + append_to head (H next); + (* We assume [push_head] is rare. *) + let new_head = Atomic.get t.head in + if new_head != H head then pop_exn t new_head + else if Atomic.compare_and_set t.head (H head) head_r.next then head_r.value + else begin + backoff t; + pop_exn t (Atomic.get t.head) + end -let[@inline] pop_exn t = pop_exn t Backoff.default (Atomic.get t.head) +let[@inline] pop_exn t = pop_exn t (Atomic.get t.head) let rec prepend_to_seq t tl = match t with diff --git a/lib/picos_aux/index.mld b/lib/picos_aux/index.mld index 69dbf5d0..6f919046 100644 --- a/lib/picos_aux/index.mld +++ b/lib/picos_aux/index.mld @@ -4,6 +4,7 @@ This package contains auxiliary libraries used in the implementation of other Picos libraries. {!modules: + Picos_aux_adaptive_backoff Picos_aux_htbl Picos_aux_mpmcq Picos_aux_mpscq diff --git a/test/dune b/test/dune index dd98e136..b65740ed 100644 --- a/test/dune +++ b/test/dune @@ -198,7 +198,11 @@ (modules test_mpmcq_dscheck picos_aux_mpmcq) (build_if (>= %{ocaml_version} 5)) - (libraries backoff multicore-magic-dscheck dscheck alcotest) + (libraries + picos_aux.adaptive_backoff + multicore-magic-dscheck + dscheck + alcotest) (flags (:standard -open Multicore_magic_dscheck)))