@@ -325,56 +325,97 @@ end
325325module Chan = Domainslib. Chan
326326
327327module Domain_pair = struct
328- type 'a promise = 'a Chan .t
328+ type 'a promise =
329+ { mutable result : 'a option ;
330+ mutex : Mutex .t ;
331+ fulfilled : Condition .t ;
332+ }
333+
334+ type command = Task : (unit -> 'a ) * 'a promise -> command
329335
330- type command = Command : (unit -> 'a ) * 'a Chan .t -> command
336+ type task_mvar =
337+ { mutable task : command option ;
338+ task_mutex : Mutex .t ;
339+ new_task : Condition .t ;
340+ promise_mutex : Mutex .t ;
341+ promise_fulfilled : Condition .t ;
342+ }
331343
332344 type t =
333- { d1 : unit Domain .t ;
345+ { task1 : task_mvar ;
346+ task2 : task_mvar ;
347+ d1 : unit Domain .t ;
334348 d2 : unit Domain .t ;
335- d1_commands : command Chan .t ;
336- d2_commands : command Chan .t ;
337- d1_continue : bool Atomic .t ;
338- d2_continue : bool Atomic .t ;
349+ done_ : bool Atomic .t ;
339350 }
340351
341- let async commands_chan f =
342- let res_chan = Chan. make_bounded 0 in
343- assert (Chan. send_poll commands_chan (Command (f, res_chan)));
344- res_chan
352+ let async runner f =
353+ let promise =
354+ { result = None ; mutex = runner.promise_mutex; fulfilled = runner.promise_fulfilled }
355+ in
356+ Mutex. lock runner.task_mutex;
357+ runner.task < - (Some (Task (f, promise)));
358+ Condition. signal runner.new_task;
359+ Mutex. unlock runner.task_mutex;
360+ promise
345361
346362 let async_d1 pair f =
347- async pair.d1_commands f
363+ async pair.task1 f
348364
349365 let async_d2 pair f =
350- async pair.d2_commands f
351-
352- let await = Chan. recv
353-
354- let domain_fun continue commands_chan =
355- while Atomic. get continue do
356- match Chan. recv commands_chan with
357- | Command (f , res_chan ) ->
358- Chan. send res_chan (f () );
359- ()
366+ async pair.task2 f
367+
368+ let await promise =
369+ Mutex. lock promise.mutex;
370+ while Option. is_none promise.result do
371+ Condition. wait promise.fulfilled promise.mutex
372+ done ;
373+ let result = Option. get promise.result in
374+ promise.result < - None ;
375+ Mutex. unlock promise.mutex;
376+ result
377+
378+ let domain_fun done_ runner =
379+ while not (Atomic. get done_) do
380+ Mutex. lock runner.task_mutex;
381+ while Option. is_none runner.task do
382+ Condition. wait runner.new_task runner.task_mutex
383+ done ;
384+ let Task (f, promise) = Option. get runner.task in
385+ runner.task < - None ;
386+ Mutex. unlock runner.task_mutex;
387+ Mutex. lock promise.mutex;
388+ promise.result < - Some (f () );
389+ Condition. signal promise.fulfilled;
390+ Mutex. unlock promise.mutex;
360391 done
361392
362393 let init () =
363- let d1_continue = Atomic. make true in
364- let d2_continue = Atomic. make true in
365- let d1_commands = Chan. make_bounded 1 in
366- let d2_commands = Chan. make_bounded 1 in
367- { d1 = Domain. spawn (fun () -> domain_fun d1_continue d1_commands);
368- d2 = Domain. spawn (fun () -> domain_fun d2_continue d2_commands);
369- d1_commands;
370- d2_commands;
371- d1_continue;
372- d2_continue;
394+ let done_ = Atomic. make false in
395+ let task1 =
396+ { task = None ;
397+ task_mutex = Mutex. create () ;
398+ new_task = Condition. create () ;
399+ promise_mutex = Mutex. create () ;
400+ promise_fulfilled = Condition. create () ;
401+ }
402+ and task2 =
403+ { task = None ;
404+ task_mutex = Mutex. create () ;
405+ new_task = Condition. create () ;
406+ promise_mutex = Mutex. create () ;
407+ promise_fulfilled = Condition. create () ;
408+ }
409+ in
410+ { task1;
411+ task2;
412+ done_;
413+ d1 = Domain. spawn (fun () -> domain_fun done_ task1);
414+ d2 = Domain. spawn (fun () -> domain_fun done_ task2);
373415 }
374416
375417 let takedown pair =
376- Atomic. set pair.d1_continue false ;
377- Atomic. set pair.d2_continue false ;
418+ Atomic. set pair.done_ true ;
378419 Domain. join pair.d1;
379420 Domain. join pair.d2
380421
0 commit comments