Skip to content

Commit 3014046

Browse files
committed
use lwt_direct from lwt PR
1 parent 029c558 commit 3014046

File tree

6 files changed

+85
-44
lines changed

6 files changed

+85
-44
lines changed

examples/echo_lwt.ml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
open Tiny_httpd_core
22
module Log = Tiny_httpd.Log
33
module MFD = Tiny_httpd_multipart_form_data
4-
module Task = Tiny_httpd_lwt.Task
4+
module Lwt_direct = Tiny_httpd_lwt.Lwt_direct
55

66
let now_ = Unix.gettimeofday
77

@@ -151,11 +151,11 @@ let () =
151151
let ev = new Lwt_engine.libev () in
152152
Lwt_engine.set ev;
153153

154-
Lwt_main.run @@ Task.run
154+
Lwt_main.run @@ Lwt_direct.run
155155
@@ fun () ->
156156
let server =
157157
Tiny_httpd_lwt.create ~addr:!addr ~port:!port_ ~max_connections:!j ()
158-
|> Task.await
158+
|> Lwt_direct.await
159159
in
160160

161161
Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server;
Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
module ED = Effect.Deep
22

3-
type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t
3+
type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t
44

55
(** Queue of microtasks that are ready *)
66
let tasks : (unit -> unit) Queue.t = Queue.create ()
77

88
let[@inline] push_task f : unit = Queue.push f tasks
99

10-
let on_uncaught_exn : (exn -> Printexc.raw_backtrace -> unit) ref =
11-
ref (fun exn bt ->
12-
Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!"
13-
(Printexc.to_string exn)
14-
(Printexc.raw_backtrace_to_string bt))
10+
let default_on_uncaught_exn exn bt =
11+
Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!"
12+
(Printexc.to_string exn)
13+
(Printexc.raw_backtrace_to_string bt)
1514

1615
let run_all_tasks () : unit =
1716
let n_processed = ref 0 in
@@ -22,27 +21,35 @@ let run_all_tasks () : unit =
2221
try t ()
2322
with exn ->
2423
let bt = Printexc.get_raw_backtrace () in
25-
!on_uncaught_exn exn bt
24+
default_on_uncaught_exn exn bt
2625
done;
2726
(* make sure we don't sleep forever if there's no lwt promise
2827
ready but [tasks] contains ready tasks *)
2928
if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t)
3029

31-
let () =
32-
let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in
33-
let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in
34-
()
30+
let setup_hooks =
31+
let already_done = ref false in
32+
fun () ->
33+
if not !already_done then (
34+
already_done := true;
35+
let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in
36+
let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in
37+
()
38+
)
3539

3640
let await (fut : 'a Lwt.t) : 'a =
3741
match Lwt.state fut with
3842
| Lwt.Return x -> x
3943
| Lwt.Fail exn -> raise exn
4044
| Lwt.Sleep -> Effect.perform (Await fut)
4145

46+
let yield () : unit = Effect.perform Yield
47+
4248
(** the main effect handler *)
4349
let handler : _ ED.effect_handler =
4450
let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option =
4551
function
52+
| Yield -> Some (fun k -> push_task (fun () -> ED.continue k ()))
4653
| Await fut ->
4754
Some
4855
(fun k ->
@@ -51,10 +58,10 @@ let handler : _ ED.effect_handler =
5158
(fun exn -> push_task (fun () -> ED.discontinue k exn)))
5259
| _ -> None
5360
in
54-
5561
{ effc }
5662

57-
let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit =
63+
let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () :
64+
unit =
5865
let res = ref (Error (Failure "not resolved")) in
5966
let run_f_and_set_res () =
6067
(try
@@ -66,10 +73,21 @@ let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit =
6673
ED.try_with run_f_and_set_res () handler
6774

6875
let run f : _ Lwt.t =
76+
setup_hooks ();
6977
let lwt, resolve = Lwt.wait () in
70-
push_task (run_inside_effect_handler_ resolve f);
78+
push_task (run_inside_effect_handler_and_resolve_ resolve f);
7179
lwt
7280

73-
let run_async f : unit = ignore (run f : unit Lwt.t)
81+
let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit =
82+
let run_f () : unit =
83+
try f ()
84+
with exn ->
85+
let bt = Printexc.get_raw_backtrace () in
86+
on_uncaught_exn exn bt
87+
in
88+
ED.try_with run_f () handler
7489

75-
(* TODO: yield, use that in loops? *)
90+
let run_in_the_background ?(on_uncaught_exn = default_on_uncaught_exn) f : unit
91+
=
92+
setup_hooks ();
93+
push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f)

src/lwt/lwt_direct.mli

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
(** Direct style control flow for Lwt. *)
2+
3+
val run : (unit -> 'a) -> 'a Lwt.t
4+
(** [run f] runs the function [f ()] in a task within
5+
the [Lwt_unix] event loop. [f ()] can create [Lwt]
6+
promises and use {!await} to wait for them. Like any promise
7+
in Lwt, [f ()] can starve the event loop if it runs long computations
8+
without yielding to the event loop.
9+
10+
When [f ()] terminates (successfully or not), the promise
11+
[run f] is resolved with [f ()]'s result, or the exception
12+
raised by [f ()]. *)
13+
14+
val run_in_the_background :
15+
?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) ->
16+
(unit -> unit) ->
17+
unit
18+
(** [run_in_the_background f] is similar to [ignore (run f)].
19+
The computation [f()] runs in the background in the event loop
20+
and returns no result.
21+
@param on_uncaught_exn if provided, this is called when [f()]
22+
raises an exception. *)
23+
24+
val yield : unit -> unit
25+
(** Yield to the event loop.
26+
Can only be used inside {!run} or {!run_in_the_background}. *)
27+
28+
val await : 'a Lwt.t -> 'a
29+
(** [await prom] returns the result of [prom], or re-raises the
30+
exception with which [prom] failed if it failed.
31+
If [prom] is not resolved yet, [await prom] will suspend the
32+
current task and resume it when [prom] is resolved.
33+
Can only be used inside {!run} or {!run_in_the_background}. *)

src/lwt/task.mli

Lines changed: 0 additions & 9 deletions
This file was deleted.

src/lwt/tiny_httpd_lwt.ml

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module H = Tiny_httpd.Server
33
module Pool = Tiny_httpd.Pool
44
module Slice = IO.Slice
55
module Log = Tiny_httpd.Log
6-
module Task = Task
6+
module Lwt_direct = Lwt_direct
77

88
let spf = Printf.sprintf
99
let ( let@ ) = ( @@ )
@@ -37,33 +37,33 @@ let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) :
3737
assert (sl.len = 0);
3838
sl.off <- 0;
3939
let n =
40-
Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Task.await
40+
Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Lwt_direct.await
4141
in
4242
sl.len <- n
4343

4444
method close () =
4545
decr num_open;
46-
if !num_open <= 0 then Lwt_unix.close fd |> Task.await
46+
if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await
4747
end
4848

4949
let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) :
5050
IO.Output.t =
5151
object
5252
inherit IO.Output.t_from_output ~bytes ()
53-
(* method flush () : unit = Lwt_io.flush oc |> Task.await *)
53+
(* method flush () : unit = Lwt_io.flush oc |> Lwt_direct.await *)
5454

5555
method private output_underlying buf i len =
5656
let i = ref i in
5757
let len = ref len in
5858
while !len > 0 do
59-
let n = Lwt_unix.write fd buf !i !len |> Task.await in
59+
let n = Lwt_unix.write fd buf !i !len |> Lwt_direct.await in
6060
i := !i + n;
6161
len := !len - n
6262
done
6363

6464
method private close_underlying () =
6565
decr num_open;
66-
if !num_open <= 0 then Lwt_unix.close fd |> Task.await
66+
if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await
6767
end
6868

6969
let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
@@ -80,7 +80,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
8080
| addr, port, None ->
8181
let addr = Option.value ~default:"127.0.0.1" addr in
8282
let sockaddr, port =
83-
match Lwt_unix.getaddrinfo addr "" [] |> Task.await, port with
83+
match Lwt_unix.getaddrinfo addr "" [] |> Lwt_direct.await, port with
8484
| { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, None ->
8585
let p = 8080 in
8686
Unix.ADDR_INET (h, p), p
@@ -115,7 +115,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
115115
let port = ref port in
116116

117117
let server_loop : unit Lwt.t =
118-
let@ () = Task.run in
118+
let@ () = Lwt_direct.run in
119119
let backlog = max_connections in
120120
let sock =
121121
Lwt_unix.socket ~cloexec:true
@@ -126,7 +126,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
126126
Lwt_unix.setsockopt_optint sock Unix.SO_LINGER None;
127127
Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true;
128128
Lwt_unix.setsockopt sock Unix.SO_REUSEPORT true;
129-
Lwt_unix.bind sock sockaddr |> Task.await;
129+
Lwt_unix.bind sock sockaddr |> Lwt_direct.await;
130130
Lwt_unix.listen sock backlog;
131131

132132
(* recover real port, if any *)
@@ -136,8 +136,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
136136

137137
let handle_client client_addr fd : unit =
138138
Atomic.incr active_conns;
139-
let@ () = Task.run_async in
140-
139+
Lwt_direct.run_in_the_background @@ fun () ->
141140
let cleanup () =
142141
Log.debug (fun k ->
143142
k "Tiny_httpd_lwt: client handler returned");
@@ -169,7 +168,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
169168
in
170169

171170
while Atomic.get running do
172-
let fd, addr = Lwt_unix.accept sock |> Task.await in
171+
let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in
173172
handle_client addr fd
174173
done
175174
in
@@ -181,21 +180,21 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
181180
(fun () ->
182181
Atomic.set running false;
183182
Lwt.wakeup_later set_server_done ();
184-
Task.await server_loop);
183+
Lwt_direct.await server_loop);
185184
endpoint = (fun () -> addr, !port);
186185
active_connections = (fun () -> Atomic.get active_conns);
187186
}
188187
in
189188

190189
after_init tcp_server;
191-
Task.await server_done);
190+
Lwt_direct.await server_done);
192191
}
193192
end in
194193
(module M)
195194

196195
let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size
197196
?middlewares () : H.t Lwt.t =
198-
let@ () = Task.run in
197+
let@ () = Lwt_direct.run in
199198
let backend =
200199
io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections
201200
?buf_size ()

src/lwt/tiny_httpd_lwt.mli

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
{b NOTE}: this is very experimental and will absolutely change over time,
77
@since NEXT_RELEASE *)
88

9-
module Task = Task
9+
module Lwt_direct = Lwt_direct
1010

1111
type 'a with_args =
1212
?addr:string ->

0 commit comments

Comments
 (0)