Skip to content

Commit 98827ed

Browse files
authored
Merge pull request #19 from Julow/eio_buf_read
Eio: Use buffered IO
2 parents a0886b1 + 05cc457 commit 98827ed

File tree

6 files changed

+283
-84
lines changed

6 files changed

+283
-84
lines changed

bin/lwt_to_direct_style/ast_rewrite.ml

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -149,17 +149,22 @@ let lwt_io_mode_of_ast ~state mode =
149149
| _ -> None)
150150
| _ -> None
151151

152-
let lwt_io_of_fd ~backend ~state ~mode fd =
152+
let lwt_io_open ~backend ~state ~mode src =
153153
match lwt_io_mode_of_ast ~state mode with
154-
| Some `Input -> Some (backend#input_io_of_fd fd)
155-
| Some `Output -> Some (backend#output_io_of_fd fd)
154+
| Some `Input -> Some (backend#input_io src)
155+
| Some `Output -> Some (backend#output_io src)
156156
| None ->
157157
add_comment state
158158
"Couldn't translate this call to [Lwt_io.of_fd] because the [~mode] \
159159
argument couldn't be decoded. Directly use [Lwt_io.input] or \
160160
[Lwt_io.output].";
161161
None
162162

163+
let lwt_io_read ~backend ~state:_ count in_chan =
164+
match count with
165+
| Some count_arg -> backend#io_read_string_count in_chan count_arg
166+
| None -> Some (backend#io_read_all in_chan)
167+
163168
let mk_cstr c = Some (mk_constr_exp [ c ])
164169

165170
(* Rewrite calls to functions from the [Lwt] module. See [rewrite_apply] for
@@ -322,10 +327,9 @@ let rewrite_apply ~backend ~state full_ident args =
322327
take @@ fun d ->
323328
take @@ fun f -> return (Some (backend#with_timeout d f))
324329
| "Lwt_unix", "of_unix_file_descr" ->
325-
take @@ fun fd ->
326330
take_lblopt "blocking" @@ fun blocking ->
327-
ignore_lblarg "set_flags"
328-
@@ return (Some (backend#of_unix_file_descr ?blocking fd))
331+
ignore_lblarg "set_flags" @@ take
332+
@@ fun fd -> return (Some (backend#of_unix_file_descr ?blocking fd))
329333
| "Lwt_unix", "close" -> take @@ fun fd -> return (Some (backend#fd_close fd))
330334
(* [Lwt_unix] contains functions exactly equivalent to functions of the same
331335
name in [Unix]. *)
@@ -334,6 +338,10 @@ let rewrite_apply ~backend ~state full_ident args =
334338
"This call to [Unix.%s] was [Lwt_unix.%s] before the rewrite." fname
335339
fname;
336340
transparent [ "Unix"; fname ]
341+
| "Lwt_unix", "stat" ->
342+
take @@ fun path -> return (Some (backend#path_stat ~follow:true path))
343+
| "Lwt_unix", "lstat" ->
344+
take @@ fun path -> return (Some (backend#path_stat ~follow:false path))
337345
| "Lwt_condition", "create" ->
338346
take @@ fun _unit -> return (Some (backend#condition_create ()))
339347
| "Lwt_condition", "wait" ->
@@ -359,10 +367,23 @@ let rewrite_apply ~backend ~state full_ident args =
359367
@@ ignore_lblarg ~cmt:"Will behave as if it was [true]." "close"
360368
@@ take_lbl "mode"
361369
@@ fun mode ->
362-
take @@ fun fd -> return (lwt_io_of_fd ~backend ~state ~mode fd)
370+
take @@ fun fd -> return (lwt_io_open ~backend ~state ~mode (`Of_fd fd))
371+
| "Lwt_io", "open_file" ->
372+
ignore_lblarg "buffer" @@ ignore_lblarg "flags" @@ ignore_lblarg "perm"
373+
@@ take_lbl "mode"
374+
@@ fun mode ->
375+
take @@ fun fname ->
376+
return (lwt_io_open ~backend ~state ~mode (`Fname fname))
377+
| "Lwt_io", "read_line" ->
378+
take @@ fun in_chan -> return (Some (backend#io_read_line in_chan))
379+
| "Lwt_io", "read" ->
380+
take_lblopt "count" @@ fun count ->
381+
take @@ fun in_chan -> return (lwt_io_read ~backend ~state count in_chan)
363382
| "Lwt_io", "write" ->
364383
take @@ fun chan ->
365384
take @@ fun str -> return (Some (backend#io_write_str chan str))
385+
| "Lwt_io", "length" -> take @@ fun fd -> return (Some (backend#io_length fd))
386+
| "Lwt_io", "close" -> take @@ fun fd -> return (Some (backend#io_close fd))
366387
| "Lwt_main", "run" ->
367388
take @@ fun promise -> return (Some (backend#main_run promise))
368389
| _ -> return None

bin/lwt_to_direct_style/concurrency_backend.ml

Lines changed: 122 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ let eio ~eio_sw_as_fiber_var ~eio_env_as_fiber_var add_comment =
1212
in
1313
let fiber_ident = eio_std_ident "Fiber"
1414
and promise_ident = eio_std_ident "Promise"
15-
and switch_ident = eio_std_ident "Switch" in
15+
and switch_ident = eio_std_ident "Switch"
16+
and std_ident i =
17+
used_eio_std := true;
18+
[ i ]
19+
in
1620
let add_comment fmt = Format.kasprintf add_comment fmt in
1721
let add_comment_dropped_exp ~label exp =
1822
add_comment "Dropped expression (%s): [%s]." label
@@ -47,6 +51,39 @@ let eio ~eio_sw_as_fiber_var ~eio_env_as_fiber_var add_comment =
4751
in
4852
Exp.send env_exp (mk_loc field)
4953
in
54+
let buf_read_of_flow flow =
55+
mk_apply_ident
56+
[ "Eio"; "Buf_read"; "of_flow" ]
57+
[
58+
(Labelled (mk_loc "max_size"), mk_const_int "1_000_000"); (Nolabel, flow);
59+
]
60+
in
61+
let buf_write_of_flow flow =
62+
add_comment
63+
"Write operations to buffered IO should be moved inside [with_flow].";
64+
mk_apply_simple
65+
[ "Eio"; "Buf_write"; "with_flow" ]
66+
[
67+
flow;
68+
mk_fun ~arg_name:"outbuf" (fun _outbuf ->
69+
mk_variant_exp "Move_writing_code_here");
70+
]
71+
in
72+
let import_socket_stream ~r_or_w fd =
73+
(* Used by [input_io] and [output_io]. *)
74+
Exp.constraint_
75+
(mk_apply_ident
76+
[ "Eio_unix"; "Net"; "import_socket_stream" ]
77+
[
78+
get_current_switch_arg ();
79+
(Labelled (mk_loc "close_unix"), mk_constr_exp [ "true" ]);
80+
(Nolabel, fd);
81+
])
82+
(mk_typ_constr
83+
~params:
84+
[ mk_poly_variant [ (r_or_w, []); ("Flow", []); ("Close", []) ] ]
85+
(std_ident "r"))
86+
in
5087
object
5188
method both ~left ~right =
5289
mk_apply_simple (fiber_ident "pair") [ left; right ]
@@ -150,31 +187,51 @@ let eio ~eio_sw_as_fiber_var ~eio_env_as_fiber_var add_comment =
150187

151188
method direct_style_type param = param
152189

153-
method of_unix_file_descr ?blocking fd =
154-
let blocking_arg =
155-
let lbl = mk_loc "blocking" in
156-
match blocking with
157-
| Some (expr, `Lbl) -> [ (Labelled lbl, expr) ]
158-
| Some (expr, `Opt) -> [ (Optional lbl, expr) ]
159-
| None -> []
160-
in
161-
mk_apply_ident
162-
[ "Eio_unix"; "Fd"; "of_unix" ]
163-
([ get_current_switch_arg () ]
164-
@ blocking_arg
165-
@ [
166-
(Labelled (mk_loc "close_unix"), mk_constr_exp [ "true" ]);
167-
(Nolabel, fd);
168-
])
190+
method of_unix_file_descr ?blocking:_ fd =
191+
(* TODO: We don't use [Eio_unix.Fd.t] because there is no conversion to [Flow.sink]. *)
192+
(* let blocking_arg = *)
193+
(* let lbl = mk_loc "blocking" in *)
194+
(* match blocking with *)
195+
(* | Some (expr, `Lbl) -> [ (Labelled lbl, expr) ] *)
196+
(* | Some (expr, `Opt) -> [ (Optional lbl, expr) ] *)
197+
(* | None -> [] *)
198+
(* in *)
199+
(* mk_apply_ident *)
200+
(* [ "Eio_unix"; "Fd"; "of_unix" ] *)
201+
(* ([ get_current_switch_arg () ] *)
202+
(* @ blocking_arg *)
203+
(* @ [ *)
204+
(* (Labelled (mk_loc "close_unix"), mk_constr_exp [ "true" ]); *)
205+
(* (Nolabel, fd); *)
206+
(* ]) *)
207+
fd
169208

170209
method io_read input buffer buf_offset buf_len =
171210
add_comment "[%s] should be a [Cstruct.t]."
172211
(Ocamlformat_utils.format_expression buffer);
212+
add_comment
213+
"[Eio.Flow.single_read] operates on a [Flow.source] but [%s] is likely \
214+
of type [Eio.Buf_read.t]. Rewrite this code to use [Buf_read] (which \
215+
contains an internal buffer) or change the call to \
216+
[Eio.Buf_read.of_flow] used to create the buffer."
217+
(Ocamlformat_utils.format_expression input);
173218
add_comment_dropped_exp ~label:"buffer offset" buf_offset;
174219
add_comment_dropped_exp ~label:"buffer length" buf_len;
175220
mk_apply_simple [ "Eio"; "Flow"; "single_read" ] [ input; buffer ]
176221

177-
method fd_close fd = mk_apply_simple [ "Eio_unix"; "Fd" ] [ fd ]
222+
method io_read_all input =
223+
mk_apply_simple [ "Eio"; "Buf_read"; "take_all" ] [ input ]
224+
225+
method io_read_string_count _input _count_arg =
226+
add_comment
227+
"Eio doesn't have a direct equivalent of [Lwt_io.read ~count]. Rewrite \
228+
the code using [Eio.Buf_read]'s lower level API or switch to \
229+
unbuffered IO.";
230+
None
231+
232+
method fd_close fd =
233+
(* TODO: See [of_unix_file_descr]. mk_apply_simple [ "Eio_unix"; "Fd" ] [ fd ] *)
234+
mk_apply_simple [ "Unix"; "close" ] [ fd ]
178235

179236
method main_run promise =
180237
let with_binding var_ident x body =
@@ -209,29 +266,56 @@ let eio ~eio_sw_as_fiber_var ~eio_env_as_fiber_var add_comment =
209266
wrap_env_fiber_var env (wrap_sw_fiber_var promise));
210267
]
211268

212-
method input_io_of_fd fd =
213-
Exp.constraint_
214-
(mk_apply_simple [ "Eio_unix"; "Net"; "import_socket_stream" ] [ fd ])
215-
(mk_typ_constr
216-
~params:
217-
[ mk_poly_variant [ ("R", []); ("Flow", []); ("Close", []) ] ]
218-
[ "Std"; "r" ])
219-
220-
method output_io_of_fd fd =
221-
add_comment
222-
"This creates a closeable [Flow.sink] resource but write operations \
223-
are rewritten to calls to [Buf_write].\n\
224-
\ You might want to use [Buf_write.with_flow sink (fun \
225-
buf_write -> ...)].";
226-
Exp.constraint_
227-
(mk_apply_simple [ "Eio_unix"; "Net"; "import_socket_stream" ] [ fd ])
228-
(mk_typ_constr
229-
~params:
230-
[ mk_poly_variant [ ("W", []); ("Flow", []); ("Close", []) ] ]
231-
[ "Std"; "r" ])
269+
method input_io =
270+
function
271+
| `Of_fd fd -> buf_read_of_flow (import_socket_stream ~r_or_w:"R" fd)
272+
| `Fname fname ->
273+
buf_read_of_flow
274+
@@ mk_apply_ident
275+
[ "Eio"; "Path"; "open_in" ]
276+
[
277+
get_current_switch_arg ();
278+
( Nolabel,
279+
mk_apply_simple [ "Eio"; "Path"; "/" ] [ env "cwd"; fname ]
280+
);
281+
]
282+
283+
method output_io =
284+
function
285+
| `Of_fd fd -> buf_write_of_flow (import_socket_stream ~r_or_w:"W" fd)
286+
| `Fname fname ->
287+
add_comment
288+
"[flags] and [perm] arguments were dropped. The [~create] was \
289+
added by default and might not match the previous flags. Use \
290+
[~append:true] for [O_APPEND].";
291+
buf_write_of_flow
292+
@@ mk_apply_ident
293+
[ "Eio"; "Path"; "open_out" ]
294+
[
295+
get_current_switch_arg ();
296+
( Labelled (mk_loc "create"),
297+
mk_variant_exp ~arg:(mk_const_int "0o666") "If_missing" );
298+
( Nolabel,
299+
mk_apply_simple [ "Eio"; "Path"; "/" ] [ env "cwd"; fname ]
300+
);
301+
]
302+
303+
method io_read_line chan =
304+
mk_apply_simple [ "Eio"; "Buf_read"; "line" ] [ chan ]
305+
306+
(* This is of type [Optint.Int63.t] instead of [int] with Lwt. *)
307+
method io_length fd = mk_apply_simple [ "Eio"; "File"; "size" ] [ fd ]
232308

233309
method io_write_str chan str =
234310
mk_apply_simple [ "Eio"; "Buf_write"; "string" ] [ chan; str ]
235311

312+
method io_close fd = mk_apply_simple [ "Eio"; "Resource"; "close" ] [ fd ]
236313
method type_out_channel = mk_typ_constr [ "Eio"; "Buf_write"; "t" ]
314+
315+
method path_stat ~follow path =
316+
mk_apply_ident [ "Eio"; "Path"; "stat" ]
317+
[
318+
(Labelled (mk_loc "follow"), mk_constr_of_bool follow);
319+
(Nolabel, mk_apply_simple [ "Eio"; "Path"; "/" ] [ env "cwd"; path ]);
320+
]
237321
end

lib/ocamlformat_utils/ast_utils.ml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ let mk_poly_variant ?(open_ = false) ?labels ?(inherit_ = []) vars =
6565
let constrs = List.map mk_rtag vars @ List.map mk_inherit inherit_ in
6666
Typ.variant constrs flag labels
6767

68+
let mk_constr_of_bool b = mk_constr_exp [ (if b then "true" else "false") ]
69+
6870
(* Exp *)
6971

7072
let mk_const_string s = Exp.constant (Const.string s)

test/lwt_to_direct_style/eio-switch.t/run.t

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,7 @@ Make a writable directory tree:
55
$ dune build @ocaml-index
66
$ lwt-to-direct-style --migrate --eio-sw-as-fiber-var Fiber_var.sw --eio-env-as-fiber-var Fiber_var.env
77
Formatted 1 files
8-
Warning: main.ml: 5 occurrences have not been rewritten.
9-
Lwt_io.open_file (line 8 column 13)
10-
Lwt_io.input (line 8 column 36)
11-
Lwt_io.close (line 9 column 3)
12-
Lwt_io.read (line 15 column 12)
8+
Warning: main.ml: 1 occurrences have not been rewritten.
139
Lwt_io.printf (line 16 column 3)
1410

1511
$ cat main.ml
@@ -22,25 +18,27 @@ Make a writable directory tree:
2218
(Stdlib.Option.get (Fiber.get Fiber_var.env))#mono_clock 1.0 (fun () -> 42)
2319

2420
let _f fname =
25-
let fd = Lwt_io.open_file ~mode:Lwt_io.input fname in
26-
Lwt_io.close fd
21+
let fd =
22+
Eio.Buf_read.of_flow ~max_size:1_000_000
23+
(Eio.Path.open_in
24+
~sw:(Stdlib.Option.get (Fiber.get Fiber_var.sw))
25+
(Eio.Path.( / ) (Stdlib.Option.get (Fiber.get Fiber_var.env))#cwd fname))
26+
in
27+
Eio.Resource.close fd
2728
2829
let main () =
2930
Fiber.fork
3031
~sw:(Stdlib.Option.get (Fiber.get Fiber_var.sw))
3132
(fun () -> async_process 1);
32-
let fd =
33-
fun ?blocking:x1 ?set_flags:x2 ->
34-
Eio_unix.Fd.of_unix
35-
~sw:(Stdlib.Option.get (Fiber.get Fiber_var.sw))
36-
?blocking:x1 ~close_unix:true
37-
(* TODO: lwt-to-direct-style: Labelled argument ?set_flags was dropped. *)
38-
Unix.stdin
39-
in
33+
let fd = Unix.stdin in
4034
let in_chan =
41-
(Eio_unix.Net.import_socket_stream fd : [ `R | `Flow | `Close ] Std.r)
35+
Eio.Buf_read.of_flow ~max_size:1_000_000
36+
(Eio_unix.Net.import_socket_stream
37+
~sw:(Stdlib.Option.get (Fiber.get Fiber_var.sw))
38+
~close_unix:true fd
39+
: [ `R | `Flow | `Close ] r)
4240
in
43-
let s = Lwt_io.read in_chan in
41+
let s = Eio.Buf_read.take_all in_chan in
4442
Lwt_io.printf "%s" s
4543
4644
let () =

0 commit comments

Comments
 (0)