@@ -21,6 +21,8 @@ implementations that are conveniently provided by
2121```ocaml
2222# #thread
2323# #require "kcas_data"
24+ # #require "picos"
25+ # open Picos
2426# open Kcas_data
2527# open Kcas
2628```
@@ -36,45 +38,29 @@ module Scheduler : sig
3638 val fiber : t -> (unit -> 'a) -> 'a Promise.t
3739end = struct
3840 open Effect.Deep
39- type _ Effect.t +=
40- | Suspend : (('a, unit) continuation -> unit) -> 'a Effect.t
4141 type t = {
4242 queue: (unit -> unit) Queue.t;
4343 domain: unit Domain.t
4444 }
4545 let spawn () =
46- let queue = Queue.create () in
46+ let queue: (unit -> unit) Queue.t = Queue.create () in
4747 let rec scheduler work =
4848 let effc (type a) : a Effect.t -> _ = function
49- | Suspend ef -> Some ef
50- | _ -> None in
49+ | Trigger.Await release ->
50+ Some (fun (k: (a, _) continuation) ->
51+ if not (Trigger.on_signal release () () @@ fun _ () () ->
52+ Queue.add (fun () -> continue k None) queue) then
53+ continue k None)
54+ | _ ->
55+ None in
5156 try_with work () { effc };
5257 match Queue.take_opt queue with
5358 | Some work -> scheduler work
5459 | None -> () in
55- let prepare_for_await _ =
56- let state = Atomic.make `Init in
57- let release () =
58- if Atomic.get state != `Released then
59- match Atomic.exchange state `Released with
60- | `Awaiting k ->
61- Queue.add (continue k) queue
62- | _ -> () in
63- let await () =
64- if Atomic.get state != `Released then
65- Effect.perform @@ Suspend (fun k ->
66- if not (Atomic.compare_and_set state `Init
67- (`Awaiting k)) then
68- continue k ())
69- in
70- Domain_local_await.{ release; await } in
7160 let domain = Domain.spawn @@ fun () ->
7261 try
7362 while true do
74- let work = Queue.take_blocking queue in
75- Domain_local_await.using
76- ~prepare_for_await
77- ~while_running:(fun () -> scheduler work)
63+ scheduler (Queue.take_blocking queue)
7864 done
7965 with Exit -> () in
8066 { queue; domain }
0 commit comments