Skip to content

Commit f43c98d

Browse files
committed
WIP: Change to use Picos for scheduler interop
1 parent 4c16f6b commit f43c98d

File tree

10 files changed

+53
-80
lines changed

10 files changed

+53
-80
lines changed

.ocamlformat

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
profile = default
2-
version = 0.26.0
2+
version = 0.26.1
33

44
exp-grouping=preserve

bench/bench.ml

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,37 +30,11 @@ module Times = struct
3030
results.(domain_i)
3131
done
3232
in
33-
let prepare_for_await () =
34-
let open struct
35-
type state = Init | Released | Awaiting of { mutable released : bool }
36-
end in
37-
let state = Atomic.make Init in
38-
let release () =
39-
if Multicore_magic.fenceless_get state != Released then
40-
match Atomic.exchange state Released with
41-
| Awaiting r -> r.released <- true
42-
| _ -> ()
43-
in
44-
let await () =
45-
if Multicore_magic.fenceless_get state != Released then
46-
let awaiting = Awaiting { released = false } in
47-
if Atomic.compare_and_set state Init awaiting then
48-
match awaiting with
49-
| Awaiting r ->
50-
(* Avoid sleeping *)
51-
while not r.released do
52-
Domain.cpu_relax ()
53-
done
54-
| _ -> ()
55-
in
56-
Domain_local_await.{ release; await }
33+
let domains =
34+
Array.init n_domains @@ fun domain_i ->
35+
Domain.spawn @@ fun () -> main domain_i
5736
in
58-
Domain_local_await.using ~prepare_for_await ~while_running:(fun () ->
59-
let domains =
60-
Array.init n_domains @@ fun domain_i ->
61-
Domain.spawn @@ fun () -> main domain_i
62-
in
63-
Array.iter Domain.join domains);
37+
Array.iter Domain.join domains;
6438
let n = Stack.length results.(0) in
6539
let times = Array.create_float n in
6640
for run_i = 0 to n - 1 do

bench/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
(package kcas_data)
44
(libraries
55
kcas_data
6-
domain-local-await
6+
picos-release
77
multicore-magic
88
yojson
99
domain_shims

doc/scheduler-interop.md

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ implementations that are conveniently provided by
2121
```ocaml
2222
# #thread
2323
# #require "kcas_data"
24+
# #require "picos-release"
2425
# open Kcas_data
2526
# open Kcas
2627
```
@@ -36,45 +37,29 @@ module Scheduler : sig
3637
val fiber : t -> (unit -> 'a) -> 'a Promise.t
3738
end = struct
3839
open Effect.Deep
39-
type _ Effect.t +=
40-
| Suspend : (('a, unit) continuation -> unit) -> 'a Effect.t
4140
type t = {
4241
queue: (unit -> unit) Queue.t;
4342
domain: unit Domain.t
4443
}
4544
let spawn () =
46-
let queue = Queue.create () in
45+
let queue: (unit -> unit) Queue.t = Queue.create () in
4746
let rec scheduler work =
4847
let effc (type a) : a Effect.t -> _ = function
49-
| Suspend ef -> Some ef
50-
| _ -> None in
48+
| Release.Await release ->
49+
Some (fun (k: (a, _) continuation) ->
50+
if not (Release.attach release () @@ fun _ () ->
51+
Queue.add (continue k) queue) then
52+
continue k ())
53+
| _ ->
54+
None in
5155
try_with work () { effc };
5256
match Queue.take_opt queue with
5357
| Some work -> scheduler work
5458
| None -> () in
55-
let prepare_for_await _ =
56-
let state = Atomic.make `Init in
57-
let release () =
58-
if Atomic.get state != `Released then
59-
match Atomic.exchange state `Released with
60-
| `Awaiting k ->
61-
Queue.add (continue k) queue
62-
| _ -> () in
63-
let await () =
64-
if Atomic.get state != `Released then
65-
Effect.perform @@ Suspend (fun k ->
66-
if not (Atomic.compare_and_set state `Init
67-
(`Awaiting k)) then
68-
continue k ())
69-
in
70-
Domain_local_await.{ release; await } in
7159
let domain = Domain.spawn @@ fun () ->
7260
try
7361
while true do
74-
let work = Queue.take_blocking queue in
75-
Domain_local_await.using
76-
~prepare_for_await
77-
~while_running:(fun () -> scheduler work)
62+
scheduler (Queue.take_blocking queue)
7863
done
7964
with Exit -> () in
8065
{ queue; domain }

dune-project

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
(depends
1414
(ocaml (>= 4.13.0))
1515
(backoff (>= 0.1.0))
16-
(domain-local-await (>= 1.0.0))
16+
picos-release
1717
(domain-local-timeout (>= 1.0.0))
1818
(multicore-magic (>= 2.0.0))
1919
(domain_shims (and (>= 0.1.0) :with-test))
@@ -26,7 +26,7 @@
2626
(depends
2727
(kcas (= :version))
2828
(multicore-magic (>= 2.0.0))
29-
(domain-local-await (and (>= 1.0.0) :with-test))
29+
(picos-release :with-test)
3030
(domain_shims (and (>= 0.1.0) :with-test))
3131
(mtime (and (>= 2.0.0) :with-test))
3232
(alcotest (and (>= 1.7.0) :with-test))

kcas.opam

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ depends: [
1616
"dune" {>= "3.8"}
1717
"ocaml" {>= "4.13.0"}
1818
"backoff" {>= "0.1.0"}
19-
"domain-local-await" {>= "1.0.0"}
19+
"picos-release"
2020
"domain-local-timeout" {>= "1.0.0"}
2121
"multicore-magic" {>= "2.0.0"}
2222
"domain_shims" {>= "0.1.0" & with-test}
@@ -40,3 +40,7 @@ build: [
4040
]
4141
dev-repo: "git+https://github.com/ocaml-multicore/kcas.git"
4242
doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/"
43+
pin-depends: [
44+
[ "foundation-mutex-and-condition.dev" "git+https://github.com/ocaml-multicore/picos#d4e127d27477b9bfbb90377986eb3a7b802bd4a1"]
45+
[ "picos-release.dev" "git+https://github.com/ocaml-multicore/picos#d4e127d27477b9bfbb90377986eb3a7b802bd4a1"]
46+
]

kcas.opam.template

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
11
doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/"
2+
pin-depends: [
3+
[ "foundation-mutex-and-condition.dev" "git+https://github.com/ocaml-multicore/picos#d4e127d27477b9bfbb90377986eb3a7b802bd4a1"]
4+
[ "picos-release.dev" "git+https://github.com/ocaml-multicore/picos#d4e127d27477b9bfbb90377986eb3a7b802bd4a1"]
5+
]

kcas_data.opam

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ depends: [
1616
"dune" {>= "3.8"}
1717
"kcas" {= version}
1818
"multicore-magic" {>= "2.0.0"}
19-
"domain-local-await" {>= "1.0.0" & with-test}
19+
"picos-release" {with-test}
2020
"domain_shims" {>= "0.1.0" & with-test}
2121
"mtime" {>= "2.0.0" & with-test}
2222
"alcotest" {>= "1.7.0" & with-test}

src/kcas/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(library
22
(name kcas)
33
(public_name kcas)
4-
(libraries domain-local-await domain-local-timeout backoff multicore-magic))
4+
(libraries picos-release domain-local-timeout backoff multicore-magic))
55

66
(mdx
77
(package kcas)

src/kcas/kcas.ml

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ module Timeout = struct
6161
let[@inline never] await state release =
6262
match fenceless_get state with
6363
| Call _ as alive ->
64-
if Atomic.compare_and_set state alive (Call release) then alive
64+
if
65+
Atomic.compare_and_set state alive
66+
(Call (fun () -> Release.signal release |> ignore))
67+
then alive
6568
else timeout ()
6669
| Unset | Elapsed -> timeout ()
6770

@@ -114,9 +117,9 @@ end = struct
114117
x
115118
end
116119

117-
type awaiter = unit -> unit
120+
type awaiter = [ `Signal ] Release.t
118121

119-
let[@inline] resume_awaiter awaiter = awaiter ()
122+
let[@inline] resume_awaiter awaiter = Release.signal awaiter |> ignore
120123

121124
let[@inline] resume_awaiters = function
122125
| [] -> ()
@@ -405,10 +408,11 @@ let add_awaiter loc before awaiter =
405408
(* Fenceless is safe as we have fence after. *)
406409
let state_old = fenceless_get (as_atomic loc) in
407410
let state_new =
408-
let awaiters = awaiter :: state_old.awaiters in
411+
let awaiters = (awaiter :> [ `Signal ] Release.t) :: state_old.awaiters in
409412
{ before; after = before; casn = casn_after; awaiters }
410413
in
411-
before == eval state_old
414+
Release.is_initial awaiter
415+
&& before == eval state_old
412416
&& Atomic.compare_and_set (as_atomic loc) state_old state_new
413417

414418
let[@tail_mod_cons] rec remove_first x' removed = function
@@ -422,19 +426,21 @@ let rec remove_awaiter loc before awaiter =
422426
let state_old = fenceless_get (as_atomic loc) in
423427
if before == eval state_old then
424428
let removed = ref true in
425-
let awaiters = remove_first awaiter removed state_old.awaiters in
429+
let awaiters =
430+
remove_first (awaiter :> [ `Signal ] Release.t) removed state_old.awaiters
431+
in
426432
if !removed then
427433
let state_new = { before; after = before; casn = casn_after; awaiters } in
428434
if not (Atomic.compare_and_set (as_atomic loc) state_old state_new) then
429435
remove_awaiter loc before awaiter
430436

431437
let block timeout loc before =
432-
let t = Domain_local_await.prepare_for_await () in
433-
let alive = Timeout.await timeout t.release in
434-
if add_awaiter loc before t.release then begin
435-
try t.await ()
438+
let t = Release.create () in
439+
let alive = Timeout.await timeout t in
440+
if add_awaiter loc before t then begin
441+
try Release.await t
436442
with cancellation_exn ->
437-
remove_awaiter loc before t.release;
443+
remove_awaiter loc before t;
438444
Timeout.cancel_alive alive;
439445
raise cancellation_exn
440446
end;
@@ -969,22 +975,22 @@ module Xt = struct
969975
commit (Backoff.once backoff) mode (reset_quick xt) tx
970976
| exception Retry.Later -> begin
971977
if xt.cass == NIL then invalid_retry ();
972-
let t = Domain_local_await.prepare_for_await () in
973-
let alive = Timeout.await (timeout_as_atomic xt) t.release in
974-
match add_awaiters t.release xt.casn xt.cass with
978+
let t = Release.create () in
979+
let alive = Timeout.await (timeout_as_atomic xt) t in
980+
match add_awaiters t xt.casn xt.cass with
975981
| NIL -> begin
976-
match t.await () with
982+
match Release.await t with
977983
| () ->
978-
remove_awaiters t.release xt.casn NIL xt.cass;
984+
remove_awaiters t xt.casn NIL xt.cass;
979985
Timeout.unawait (timeout_as_atomic xt) alive;
980986
commit (Backoff.reset backoff) mode (reset_quick xt) tx
981987
| exception cancellation_exn ->
982-
remove_awaiters t.release xt.casn NIL xt.cass;
988+
remove_awaiters t xt.casn NIL xt.cass;
983989
Timeout.cancel_alive alive;
984990
raise cancellation_exn
985991
end
986992
| CASN _ as stop ->
987-
remove_awaiters t.release xt.casn stop xt.cass;
993+
remove_awaiters t xt.casn stop xt.cass;
988994
Timeout.unawait (timeout_as_atomic xt) alive;
989995
commit (Backoff.once backoff) mode (reset_quick xt) tx
990996
end

0 commit comments

Comments
 (0)