Skip to content

Commit 2c45c1e

Browse files
committed
WIP
1 parent 0921912 commit 2c45c1e

File tree

13 files changed

+377
-23
lines changed

13 files changed

+377
-23
lines changed

bench/bench_two_stack_queue.ml

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
open Kcas_data
2+
open Bench
3+
4+
let run_single ?(factor = 1) ?(n_msgs = 50 * factor * Util.iter_factor) () =
5+
let t = Two_stack_queue.create () in
6+
7+
let init _ = () in
8+
let work _ () =
9+
for i = 1 to n_msgs do
10+
Two_stack_queue.push t i;
11+
Two_stack_queue.pop t |> ignore
12+
done
13+
in
14+
15+
let times = Times.record ~n_domains:1 ~init ~work () in
16+
17+
let name metric = Printf.sprintf "%s/single-domain" metric in
18+
19+
List.concat
20+
[
21+
Stats.of_times times
22+
|> Stats.scale (1_000_000_000.0 /. Float.of_int n_msgs)
23+
|> Stats.to_json ~name:(name "time per message")
24+
~description:
25+
"Time to transmit one message from one domain to another"
26+
~units:"ns";
27+
Times.invert times |> Stats.of_times
28+
|> Stats.scale (Float.of_int n_msgs /. 1_000_000.0)
29+
|> Stats.to_json
30+
~name:(name "messages over time")
31+
~description:
32+
"Number of messages transmitted over time using all domains"
33+
~units:"M/s";
34+
]
35+
36+
let run_one ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1)
37+
?(n_msgs = 50 * factor * Util.iter_factor) () =
38+
let n_domains = n_adders + n_takers in
39+
40+
let t = Two_stack_queue.create () in
41+
42+
let n_msgs_to_take = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in
43+
let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in
44+
45+
let init _ = () in
46+
let work i () =
47+
if i < n_adders then
48+
let rec work () =
49+
let n = Util.alloc n_msgs_to_add in
50+
if 0 < n then begin
51+
for i = 1 to n do
52+
Two_stack_queue.push t i
53+
done;
54+
work ()
55+
end
56+
in
57+
work ()
58+
else
59+
let rec work () =
60+
let n = Util.alloc n_msgs_to_take in
61+
if n <> 0 then begin
62+
for _ = 1 to n do
63+
while Option.is_none (Two_stack_queue.pop_opt t) do
64+
Domain.cpu_relax ()
65+
done
66+
done;
67+
work ()
68+
end
69+
in
70+
work ()
71+
in
72+
let after () =
73+
Atomic.set n_msgs_to_take n_msgs;
74+
Atomic.set n_msgs_to_add n_msgs
75+
in
76+
77+
let times = Times.record ~n_domains ~init ~work ~after () in
78+
79+
let name metric =
80+
let format role blocking n =
81+
Printf.sprintf "%d %s%s%s" n
82+
(if blocking then "" else "nb ")
83+
role
84+
(if n = 1 then "" else "s")
85+
in
86+
Printf.sprintf "%s/%s, %s" metric
87+
(format "adder" false n_adders)
88+
(format "taker" false n_takers)
89+
in
90+
91+
List.concat
92+
[
93+
Stats.of_times times
94+
|> Stats.scale (1_000_000_000.0 /. Float.of_int n_msgs)
95+
|> Stats.to_json ~name:(name "time per message")
96+
~description:
97+
"Time to transmit one message from one domain to another"
98+
~units:"ns";
99+
Times.invert times |> Stats.of_times
100+
|> Stats.scale (Float.of_int (n_msgs * n_domains) /. 1_000_000.0)
101+
|> Stats.to_json
102+
~name:(name "messages over time")
103+
~description:
104+
"Number of messages transmitted over time using all domains"
105+
~units:"M/s";
106+
]
107+
108+
let run_suite ~factor =
109+
run_single ~factor ()
110+
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
111+
|> List.concat_map @@ fun (n_adders, n_takers) ->
112+
run_one ~n_adders ~n_takers ~factor ())

bench/main.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ let benchmarks =
99
("Kcas_data Hashtbl", Bench_hashtbl.run_suite);
1010
("Kcas_data Mvar", Bench_mvar.run_suite);
1111
("Kcas_data Queue", Bench_queue.run_suite);
12+
("Kcas_data Two_stack_queue", Bench_two_stack_queue.run_suite);
1213
("Kcas_data Stack", Bench_stack.run_suite);
1314
]
1415

dune-project

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,7 @@
6969
(multicore-magic
7070
(>= 2.1.0))
7171
(backoff
72-
(and
73-
(>= 0.1.0)
74-
:with-test))
72+
(>= 0.1.0))
7573
(domain-local-await
7674
(and
7775
(>= 1.0.0)

kcas_data.opam

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ depends: [
1919
"dune" {>= "3.8"}
2020
"kcas" {= version}
2121
"multicore-magic" {>= "2.1.0"}
22-
"backoff" {>= "0.1.0" & with-test}
22+
"backoff" {>= "0.1.0"}
2323
"domain-local-await" {>= "1.0.0" & with-test}
2424
"domain_shims" {>= "0.1.0" & with-test}
2525
"mtime" {>= "2.0.0" & with-test}

src/kcas/kcas.ml

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -539,22 +539,14 @@ let rec exchange_no_alloc backoff loc state =
539539
end
540540
else exchange_no_alloc (Backoff.once backoff) loc state
541541

542-
let[@inline] rec cas_with_state backoff loc before state state_old =
542+
let[@inline] cas_with_state loc before state state_old =
543543
before == eval state_old
544544
&& (before == state.after
545-
||
546-
if Atomic.compare_and_set (as_atomic loc) state_old state then begin
547-
resume_awaiters state_old.awaiters;
548-
true
549-
end
550-
else
551-
(* We must retry, because compare is by value rather than by state. In
552-
other words, we should not fail spuriously due to some other thread
553-
having installed or removed a waiter.
554-
555-
Fenceless is safe as there was a fence before. *)
556-
cas_with_state (Backoff.once backoff) loc before state
557-
(fenceless_get (as_atomic loc)))
545+
|| Atomic.compare_and_set (as_atomic loc) state_old state
546+
&& begin
547+
resume_awaiters state_old.awaiters;
548+
true
549+
end)
558550

559551
let inc x = x + 1
560552
let dec x = x - 1
@@ -607,10 +599,17 @@ module Loc = struct
607599
let[@inline] get_mode loc =
608600
if (to_loc loc).id < 0 then `Lock_free else `Obstruction_free
609601

610-
let compare_and_set ?(backoff = Backoff.default) loc before after =
611-
let state = new_state after in
602+
let compare_and_set loc before after =
612603
let state_old = atomic_get (as_atomic (to_loc loc)) in
613-
cas_with_state backoff (to_loc loc) before state state_old
604+
before == eval state_old
605+
&& (before == after
606+
|| Atomic.compare_and_set
607+
(as_atomic (to_loc loc))
608+
state_old (new_state after)
609+
&& begin
610+
resume_awaiters state_old.awaiters;
611+
true
612+
end)
614613

615614
let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f =
616615
let timeout = Timeout.alloc_opt timeoutf in
@@ -910,7 +909,7 @@ module Xt = struct
910909
(* Fenceless is safe inside transactions as each log update has a
911910
fence. *)
912911
let state_old = fenceless_get (as_atomic loc) in
913-
if cas_with_state Backoff.default loc before state state_old then
912+
if cas_with_state loc before state state_old then
914913
success xt result
915914
else commit_once_reuse backoff xt tx
916915
end

src/kcas/kcas.mli

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ module Loc : sig
232232
conditional load. It is also safe for the given function [f] to raise any
233233
other exception to abort the conditional load. *)
234234

235-
val compare_and_set : ?backoff:Backoff.t -> 'a t -> 'a -> 'a -> bool
235+
val compare_and_set : 'a t -> 'a -> 'a -> bool
236236
(** [compare_and_set r before after] atomically updates the shared memory
237237
location [r] to the [after] value if the current value of [r] is the
238238
[before] value. *)

src/kcas_data/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
(public_name kcas_data)
44
(libraries
55
(re_export kcas)
6+
backoff
67
multicore-magic))
78

89
(rule

src/kcas_data/kcas_data.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
module Hashtbl = Hashtbl
22
module Queue = Queue
3+
module Two_stack_queue = Two_stack_queue
34
module Stack = Stack
45
module Mvar = Mvar
56
module Promise = Promise

src/kcas_data/kcas_data.mli

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122

123123
module Hashtbl = Hashtbl
124124
module Queue = Queue
125+
module Two_stack_queue = Two_stack_queue
125126
module Stack = Stack
126127

127128
(** {1 Communication and synchronization primitives} *)

src/kcas_data/two_stack_queue.ml

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
open Kcas
2+
3+
type 'a t = { head : 'a head_pack Loc.t; tail : 'a tail_pack Loc.t }
4+
5+
and ('a, _) head =
6+
| Cons : {
7+
counter : int;
8+
value : 'a;
9+
suffix : 'a head_pack;
10+
}
11+
-> ('a, [> `Cons ]) head
12+
| Head : { counter : int } -> ('a, [> `Head ]) head
13+
14+
and 'a head_pack = H : ('a, [< `Cons | `Head ]) head -> 'a head_pack
15+
[@@unboxed]
16+
17+
and ('a, _) tail =
18+
| Snoc : {
19+
counter : int;
20+
prefix : 'a tail_pack;
21+
value : 'a;
22+
}
23+
-> ('a, [> `Snoc ]) tail
24+
| Tail : {
25+
counter : int;
26+
mutable move : ('a, [ `Snoc ]) tail;
27+
}
28+
-> ('a, [> `Tail ]) tail
29+
30+
and 'a tail_pack = T : ('a, [< `Snoc | `Tail ]) tail -> 'a tail_pack
31+
[@@unboxed]
32+
33+
let create () =
34+
let head = Loc.make ~padded:true (H (Head { counter = 1 })) in
35+
let tail =
36+
Loc.make ~padded:true (T (Tail { counter = 0; move = Obj.magic () }))
37+
in
38+
{ head; tail } |> Multicore_magic.copy_as_padded
39+
40+
let rec rev (suffix : (_, [< `Cons ]) head) = function
41+
| T (Snoc { counter; prefix; value }) ->
42+
rev (Cons { counter; value; suffix = H suffix }) prefix
43+
| T (Tail _) -> suffix
44+
45+
let[@inline] rev = function
46+
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tail) ->
47+
rev
48+
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
49+
prefix
50+
51+
let[@inline] counter_of_head = function
52+
| (Head r : (_, [< `Head ]) head) -> r.counter
53+
54+
let[@inline] counter_of_snoc = function
55+
| (Snoc r : (_, [< `Snoc ]) tail) -> r.counter
56+
57+
let[@inline] counter_of_tail = function
58+
| (Tail r : (_, [< `Tail ]) tail) -> r.counter
59+
60+
let clear_move = function
61+
| (Tail tail_r : (_, [< `Tail ]) tail) -> tail_r.move <- Obj.magic ()
62+
63+
let is_tail = function T (Tail _) -> true | T (Snoc _) -> false
64+
65+
let rec push backoff t value =
66+
match Loc.fenceless_get t.tail with
67+
| T (Snoc snoc_r) as prefix ->
68+
let after = T (Snoc { counter = snoc_r.counter + 1; prefix; value }) in
69+
if not (Loc.compare_and_set t.tail prefix after) then
70+
push (Backoff.once backoff) t value
71+
| T (Tail tail_r as tail) ->
72+
let move = tail_r.move in
73+
if move != Obj.magic () then
74+
match Loc.fenceless_get t.head with
75+
| H (Head _ as head) when counter_of_head head < counter_of_snoc move ->
76+
let after = rev move in
77+
if Loc.compare_and_set t.head (H head) (H after) then
78+
clear_move tail;
79+
push backoff t value
80+
| _ -> push_with backoff t (counter_of_tail tail) (T tail) value
81+
else push_with backoff t (counter_of_tail tail) (T tail) value
82+
83+
and push_with backoff t counter prefix value =
84+
let after = Snoc { counter = counter + 1; prefix; value } in
85+
if not (Loc.compare_and_set t.tail prefix (T after)) then
86+
push (Backoff.once backoff) t value
87+
88+
let[@inline] push t value = push Backoff.default t value
89+
90+
exception Empty
91+
92+
let rec pop backoff t =
93+
match Loc.get t.head with
94+
| H (Cons cons_r) as before ->
95+
let after = cons_r.suffix in
96+
if Loc.compare_and_set t.head before after then cons_r.value
97+
else pop (Backoff.once backoff) t
98+
| H (Head _ as head) -> begin
99+
match Loc.fenceless_get t.tail with
100+
| T (Snoc snoc_r as move) ->
101+
if is_tail snoc_r.prefix then begin
102+
let tail =
103+
Tail { counter = snoc_r.counter - 1; move = Obj.magic () }
104+
in
105+
if
106+
Loc.fenceless_get t.head == H head
107+
&& Loc.compare_and_set t.tail (T move) (T tail)
108+
then snoc_r.value
109+
else pop backoff t
110+
end
111+
else
112+
let tail = Tail { counter = snoc_r.counter; move } in
113+
if
114+
Loc.fenceless_get t.head == H head
115+
&& Loc.compare_and_set t.tail (T move) (T tail)
116+
then pop_moving backoff t head move tail
117+
else pop backoff t
118+
| T (Tail tail_r as tail) ->
119+
let move = tail_r.move in
120+
if move == Obj.magic () then pop_emptyish backoff t head
121+
else pop_moving backoff t head move tail
122+
end
123+
124+
and pop_moving backoff t head move tail =
125+
if counter_of_head head < counter_of_snoc move then
126+
match rev move with
127+
| Cons cons_r ->
128+
if Loc.compare_and_set t.head (H head) cons_r.suffix then begin
129+
clear_move tail;
130+
cons_r.value
131+
end
132+
else pop backoff t
133+
else pop_emptyish backoff t head
134+
135+
and pop_emptyish backoff t head =
136+
if Loc.get t.head == H head then raise_notrace Empty else pop backoff t
137+
138+
let[@inline] pop_opt t =
139+
match pop Backoff.default t with
140+
| value -> Some value
141+
| exception Empty -> None
142+
143+
let[@inline] pop t = pop Backoff.default t
144+
145+
let rec length t =
146+
let head = Loc.get t.head in
147+
let tail = Loc.get t.tail in
148+
if head != Loc.get t.head then length t
149+
else
150+
let head_at =
151+
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
152+
in
153+
let tail_at =
154+
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
155+
in
156+
tail_at - head_at + 1

0 commit comments

Comments
 (0)