Skip to content

Commit 52bf102

Browse files
committed
lwt: handle fibers in moonpool_lwt
1 parent fc5fd99 commit 52bf102

File tree

2 files changed

+6
-4
lines changed

2 files changed

+6
-4
lines changed

src/lwt/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
(>= %{ocaml_version} 5.0))
66
(libraries
77
(re_export moonpool)
8+
moonpool.fib
89
picos
910
(re_export lwt)
1011
lwt.unix))

src/lwt/moonpool_lwt.ml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ module Scheduler_state = struct
4949
let[@inline never] add_action_from_another_thread_ (self : st) f : unit =
5050
Mutex.lock self.mutex;
5151
Queue.push f self.actions_from_other_threads;
52+
Mutex.unlock self.mutex;
5253
if not (Atomic.exchange self.has_notified true) then
53-
Lwt_unix.send_notification self.notification;
54-
Mutex.unlock self.mutex
54+
Lwt_unix.send_notification self.notification
5555

5656
let[@inline] on_lwt_thread_ (self : st) : bool =
5757
Thread.id (Thread.self ()) = self.thread
@@ -73,9 +73,10 @@ module Scheduler_state = struct
7373
failwith "moonpool-lwt: cleanup from the wrong thread";
7474
Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook;
7575
Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook;
76+
Lwt_unix.stop_notification st.notification;
7677

7778
Atomic.set cur_st None
78-
| _ -> ()
79+
| None -> failwith "moonpool-lwt: cleanup failed (no current active state)"
7980
end
8081

8182
module Ops = struct
@@ -290,7 +291,7 @@ let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
290291
let spawn_lwt f : _ Lwt.t =
291292
let st = Main_state.get_st () in
292293
let lwt_fut, lwt_prom = Lwt.wait () in
293-
M.Runner.run_async st.as_runner (fun () ->
294+
Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () ->
294295
try
295296
let x = f () in
296297
Lwt.wakeup lwt_prom x

0 commit comments

Comments
 (0)