Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bench/bench_binaries.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ let paths =
lib "picos";
lib "picos.domain";
lib "picos.thread";
lib "picos_aux.adaptive_backoff";
lib "picos_aux.htbl";
lib "picos_aux.mpmcq";
lib "picos_aux.mpscq";
Expand Down
4 changes: 1 addition & 3 deletions bench/bench_mpmcq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ let run_one ~budgetf ~n_adders ~n_takers () =
if 0 < n then begin
match Mpmcq.pop_exn t with
| _ -> loop (n - 1)
| exception Mpmcq.Empty ->
Backoff.once Backoff.default |> ignore;
loop n
| exception Mpmcq.Empty -> loop n
end
else work ()
in
Expand Down
4 changes: 1 addition & 3 deletions bench/bench_mpscq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ let run_one ~budgetf ~n_adders () =
if 0 < n then
match Mpscq.pop_exn t with
| _ -> loop (n - 1)
| exception Mpscq.Empty ->
Backoff.once Backoff.default |> ignore;
loop n
| exception Mpscq.Empty -> loop n
in
loop n_msgs
in
Expand Down
24 changes: 24 additions & 0 deletions lib/picos_aux.adaptive_backoff/adaptive_backoff.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
let has_domains = 1 < Domain.recommended_domain_count ()
let n = 128 * 4
let counters = Array.init n (fun _ -> Atomic.make 0)

let[@inline never] once_at counter ~log_scale =
if has_domains then begin
let n_contending_threads = Atomic.fetch_and_add counter 1 + 1 in
let n = ref (Random.int ((n_contending_threads lsl log_scale) + 0)) in
while 0 <= !n do
Domain.cpu_relax ();
decr n
done;
Atomic.decr counter
end

let[@inline never] once ~random_key ~log_scale =
let i = random_key land (n - 1) in
let counter = Array.unsafe_get counters i in
once_at counter ~log_scale

let[@inline] once_unless_alone ~random_key ~log_scale =
let i = random_key land (n - 1) in
let counter = Array.unsafe_get counters i in
if 0 <> Atomic.get counter then once_at counter ~log_scale
2 changes: 2 additions & 0 deletions lib/picos_aux.adaptive_backoff/adaptive_backoff.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
val once : random_key:int -> log_scale:int -> unit
val once_unless_alone : random_key:int -> log_scale:int -> unit
2 changes: 2 additions & 0 deletions lib/picos_aux.adaptive_backoff/domain.ocaml4.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
let cpu_relax = Fun.id
let recommended_domain_count () = 1
13 changes: 13 additions & 0 deletions lib/picos_aux.adaptive_backoff/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(library
(name picos_aux_adaptive_backoff)
(public_name picos_aux.adaptive_backoff))

(rule
(package picos_aux)
(targets domain.ml)
(deps domain.ocaml4.ml)
(enabled_if
(< %{ocaml_version} 5.0.0))
(action
(progn
(copy domain.ocaml4.ml domain.ml))))
2 changes: 1 addition & 1 deletion lib/picos_aux.mpmcq/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(library
(name picos_aux_mpmcq)
(public_name picos_aux.mpmcq)
(libraries backoff multicore-magic))
(libraries picos_aux.adaptive_backoff multicore-magic))

(mdx
(package picos_meta)
Expand Down
165 changes: 106 additions & 59 deletions lib/picos_aux.mpmcq/picos_aux_mpmcq.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
open Picos_aux_adaptive_backoff
module Atomic = Multicore_magic.Transparent_atomic

type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t }
type 'a t = {
random_key : int;
head : 'a head Atomic.t;
tail : 'a tail Atomic.t;
}

and ('a, _) tdt =
| Cons : {
Expand All @@ -27,14 +32,15 @@ and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]

let create ?padded () =
let random_key = Int64.to_int (Random.bits64 ()) in
let head =
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as ?padded
in
let tail =
Atomic.make (T (Tail { counter = 0; move = Used }))
|> Multicore_magic.copy_as ?padded
in
Multicore_magic.copy_as ?padded { head; tail }
Multicore_magic.copy_as ?padded { random_key; head; tail }

let rec rev (suffix : (_, [< `Cons ]) tdt) = function
| T (Snoc { counter; prefix; value }) ->
Expand All @@ -47,139 +53,184 @@ let rev = function
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
prefix

let rec push t value backoff = function
let log_scale = 10

let[@inline] backoff { random_key; _ } =
Adaptive_backoff.once ~random_key ~log_scale

let[@inline] backoff_unless_alone { random_key; _ } =
Adaptive_backoff.once_unless_alone ~random_key ~log_scale

let rec push t value = function
| T (Snoc snoc_r) as prefix ->
let after = Snoc { counter = snoc_r.counter + 1; prefix; value } in
if not (Atomic.compare_and_set t.tail prefix (T after)) then
let backoff = Backoff.once backoff in
push t value backoff (Atomic.fenceless_get t.tail)
if not (Atomic.compare_and_set t.tail prefix (T after)) then begin
backoff t;
push t value (Atomic.fenceless_get t.tail)
end
| T (Tail tail_r) as prefix -> begin
match tail_r.move with
| Used ->
let after = Snoc { counter = tail_r.counter + 1; prefix; value } in
if not (Atomic.compare_and_set t.tail prefix (T after)) then
let backoff = Backoff.once backoff in
push t value backoff (Atomic.fenceless_get t.tail)
if not (Atomic.compare_and_set t.tail prefix (T after)) then begin
backoff t;
push t value (Atomic.fenceless_get t.tail)
end
| Snoc move_r as move ->
begin match Atomic.get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter ->
let after = rev move in
if
Atomic.fenceless_get t.head == H head
&& Atomic.compare_and_set t.head (H head) (H after)
then tail_r.move <- Used
if Atomic.fenceless_get t.head == H head then
if Atomic.compare_and_set t.head (H head) (H after) then
tail_r.move <- Used
else backoff t
| _ -> tail_r.move <- Used
end;
push t value backoff (Atomic.get t.tail)
push t value (Atomic.get t.tail)
end

exception Empty

let rec pop t backoff = function
let rec pop t = function
| H (Cons cons_r as cons) ->
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value
else
let backoff = Backoff.once backoff in
pop t backoff (Atomic.fenceless_get t.head)
else begin
backoff t;
pop t (Atomic.fenceless_get t.head)
end
| H (Head head_r as head) -> begin
match Atomic.get t.tail with
| T (Snoc snoc_r as move) ->
if head_r.counter = snoc_r.counter then
if Atomic.compare_and_set t.tail (T move) snoc_r.prefix then
snoc_r.value
else pop t backoff (Atomic.fenceless_get t.head)
else begin
backoff t;
pop t (Atomic.fenceless_get t.head)
end
else
let (Tail tail_r as tail : (_, [ `Tail ]) tdt) =
Tail { counter = snoc_r.counter; move }
in
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
if new_head != H head then begin
(* backoff t; *)
pop t new_head
end
else if Atomic.compare_and_set t.tail (T move) (T tail) then
let (Cons cons_r) = rev move in
let after = cons_r.suffix in
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
if new_head != H head then begin
(* backoff t; *)
pop t new_head
end
else if Atomic.compare_and_set t.head (H head) after then begin
tail_r.move <- Used;
cons_r.value
end
else
let backoff = Backoff.once backoff in
pop t backoff (Atomic.fenceless_get t.head)
else pop t backoff (Atomic.fenceless_get t.head)
else begin
backoff t;
pop t (Atomic.fenceless_get t.head)
end
else begin
(* backoff t; *)
pop t (Atomic.fenceless_get t.head)
end
| T (Tail tail_r) -> begin
match tail_r.move with
| Used ->
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else raise_notrace Empty
if new_head != H head then begin
(* backoff t; *)
pop t new_head
end
else begin
backoff_unless_alone t;
raise_notrace Empty
end
| Snoc move_r as move ->
if head_r.counter < move_r.counter then
let (Cons cons_r) = rev move in
let after = cons_r.suffix in
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
if new_head != H head then begin
(* backoff t; *)
pop t new_head
end
else if Atomic.compare_and_set t.head (H head) after then begin
tail_r.move <- Used;
cons_r.value
end
else
let backoff = Backoff.once backoff in
pop t backoff (Atomic.fenceless_get t.head)
else begin
backoff t;
pop t (Atomic.fenceless_get t.head)
end
else
let new_head = Atomic.get t.head in
if new_head != H head then pop t backoff new_head
else raise_notrace Empty
if new_head != H head then begin
(* backoff t; *)
pop t new_head
end
else begin
backoff_unless_alone t;
raise_notrace Empty
end
end
end

let rec push_head t value backoff =
let rec push_head t value =
match Atomic.get t.head with
| H (Cons cons_r) as suffix ->
let after = Cons { counter = cons_r.counter - 1; value; suffix } in
if not (Atomic.compare_and_set t.head suffix (H after)) then
push_head t value (Backoff.once backoff)
if not (Atomic.compare_and_set t.head suffix (H after)) then begin
backoff t;
push_head t value
end
| H (Head head_r) as head -> begin
match Atomic.get t.tail with
| T (Snoc snoc_r as move) ->
if Atomic.get t.head != head then push_head t value backoff
if Atomic.get t.head != head then push_head t value
else if head_r.counter = snoc_r.counter then begin
let prefix = T (Snoc { snoc_r with value }) in
let after =
Snoc { snoc_r with counter = snoc_r.counter + 1; prefix }
in
if not (Atomic.compare_and_set t.tail (T move) (T after)) then
push_head t value (Backoff.once backoff)
if not (Atomic.compare_and_set t.tail (T move) (T after)) then begin
backoff t;
push_head t value
end
end
else
else begin
let tail = Tail { counter = snoc_r.counter; move } in
let backoff =
if Atomic.compare_and_set t.tail (T move) (T tail) then backoff
else Backoff.once backoff
in
push_head t value backoff
if not (Atomic.compare_and_set t.tail (T move) (T tail)) then
backoff t;
push_head t value
end
| T (Tail tail_r) as prefix -> begin
match tail_r.move with
| Used ->
if Atomic.get t.head == head then begin
let tail =
Snoc { counter = tail_r.counter + 1; value; prefix }
in
if not (Atomic.compare_and_set t.tail prefix (T tail)) then
push_head t value (Backoff.once backoff)
if not (Atomic.compare_and_set t.tail prefix (T tail)) then begin
backoff t;
push_head t value
end
end
else push_head t value backoff
else push_head t value
| Snoc move_r as move ->
begin match Atomic.get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter ->
let after = rev move in
if
Atomic.fenceless_get t.head == H head
&& Atomic.compare_and_set t.head (H head) (H after)
then tail_r.move <- Used
if Atomic.fenceless_get t.head == H head then
if Atomic.compare_and_set t.head (H head) (H after) then
tail_r.move <- Used
else backoff t
| _ -> tail_r.move <- Used
end;
push_head t value backoff
push_head t value
end
end

Expand All @@ -204,9 +255,5 @@ let[@inline] length t =
tail_at - head_at + 1

let[@inline] is_empty t = length t == 0
let[@inline] pop_exn t = pop t Backoff.default (Atomic.fenceless_get t.head)

let[@inline] push t value =
push t value Backoff.default (Atomic.fenceless_get t.tail)

let[@inline] push_head t value = push_head t value Backoff.default
let[@inline] pop_exn t = pop t (Atomic.fenceless_get t.head)
let[@inline] push t value = push t value (Atomic.fenceless_get t.tail)
2 changes: 1 addition & 1 deletion lib/picos_aux.mpscq/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(library
(name picos_aux_mpscq)
(public_name picos_aux.mpscq)
(libraries backoff multicore-magic))
(libraries picos_aux.adaptive_backoff multicore-magic))

(mdx
(package picos_meta)
Expand Down
Loading