Skip to content

Commit 935fc5e

Browse files
committed
eio: Use buffered IO by default
Construct values of `Eio.Buf_read.t` and `Eio.Buf_write.t` when converting `Lwt_io` code to Eio. Code using `Lwt_io.read_into` is in-between using unbuffered and buffered IO and should be changed. A comment is inserted.
1 parent 501063c commit 935fc5e

File tree

3 files changed

+94
-74
lines changed

3 files changed

+94
-74
lines changed

bin/lwt_to_direct_style/concurrency_backend.ml

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,24 @@ let eio ~eio_sw_as_fiber_var ~eio_env_as_fiber_var add_comment =
5151
in
5252
Exp.send env_exp (mk_loc field)
5353
in
54+
let buf_read_of_flow flow =
55+
mk_apply_ident
56+
[ "Eio"; "Buf_read"; "of_flow" ]
57+
[
58+
(Labelled (mk_loc "max_size"), mk_const_int "1_000_000"); (Nolabel, flow);
59+
]
60+
in
61+
let buf_write_of_flow flow =
62+
add_comment
63+
"Write operations to buffered IO should be moved inside [with_flow].";
64+
mk_apply_simple
65+
[ "Eio"; "Buf_write"; "with_flow" ]
66+
[
67+
flow;
68+
mk_fun ~arg_name:"outbuf" (fun _outbuf ->
69+
mk_variant_exp "Move_writing_code_here");
70+
]
71+
in
5472
let import_socket_stream ~r_or_w fd =
5573
(* Used by [input_io] and [output_io]. *)
5674
Exp.constraint_
@@ -191,6 +209,12 @@ let eio ~eio_sw_as_fiber_var ~eio_env_as_fiber_var add_comment =
191209
method io_read input buffer buf_offset buf_len =
192210
add_comment "[%s] should be a [Cstruct.t]."
193211
(Ocamlformat_utils.format_expression buffer);
212+
add_comment
213+
"[Eio.Flow.single_read] operates on a [Flow.source] but [%s] is likely \
214+
of type [Eio.Buf_read.t]. Rewrite this code to use [Buf_read] (which \
215+
contains an internal buffer) or change the call to \
216+
[Eio.Buf_read.of_flow] used to create the buffer."
217+
(Ocamlformat_utils.format_expression input);
194218
add_comment_dropped_exp ~label:"buffer offset" buf_offset;
195219
add_comment_dropped_exp ~label:"buffer length" buf_len;
196220
mk_apply_simple [ "Eio"; "Flow"; "single_read" ] [ input; buffer ]
@@ -234,48 +258,39 @@ let eio ~eio_sw_as_fiber_var ~eio_env_as_fiber_var add_comment =
234258

235259
method input_io =
236260
function
237-
| `Of_fd fd ->
238-
add_comment
239-
"This creates a closeable [Flow.source] resource but read \
240-
operations are rewritten to calls to [Buf_read].";
241-
import_socket_stream ~r_or_w:"R" fd
261+
| `Of_fd fd -> buf_read_of_flow (import_socket_stream ~r_or_w:"R" fd)
242262
| `Fname fname ->
243-
mk_apply_ident
244-
[ "Eio"; "Path"; "open_in" ]
245-
[
246-
get_current_switch_arg ();
247-
( Nolabel,
248-
mk_apply_simple [ "Eio"; "Path"; "/" ] [ env "cwd"; fname ] );
249-
]
263+
buf_read_of_flow
264+
@@ mk_apply_ident
265+
[ "Eio"; "Path"; "open_in" ]
266+
[
267+
get_current_switch_arg ();
268+
( Nolabel,
269+
mk_apply_simple [ "Eio"; "Path"; "/" ] [ env "cwd"; fname ]
270+
);
271+
]
250272

251273
method output_io =
252274
function
253-
| `Of_fd fd ->
254-
add_comment
255-
"This creates a closeable [Flow.sink] resource but write \
256-
operations are rewritten to calls to [Buf_write]. You might want \
257-
to use [Buf_write.with_flow sink (fun buf_write -> ...)].";
258-
import_socket_stream ~r_or_w:"W" fd
275+
| `Of_fd fd -> buf_write_of_flow (import_socket_stream ~r_or_w:"W" fd)
259276
| `Fname fname ->
260277
add_comment
261278
"[flags] and [perm] arguments were dropped. The [~create] was \
262279
added by default and might not match the previous flags. Use \
263280
[~append:true] for [O_APPEND].";
264-
mk_apply_ident
265-
[ "Eio"; "Path"; "open_out" ]
266-
[
267-
get_current_switch_arg ();
268-
( Labelled (mk_loc "create"),
269-
mk_variant_exp ~arg:(mk_const_int "0o666") "If_missing" );
270-
( Nolabel,
271-
mk_apply_simple [ "Eio"; "Path"; "/" ] [ env "cwd"; fname ] );
272-
]
281+
buf_write_of_flow
282+
@@ mk_apply_ident
283+
[ "Eio"; "Path"; "open_out" ]
284+
[
285+
get_current_switch_arg ();
286+
( Labelled (mk_loc "create"),
287+
mk_variant_exp ~arg:(mk_const_int "0o666") "If_missing" );
288+
( Nolabel,
289+
mk_apply_simple [ "Eio"; "Path"; "/" ] [ env "cwd"; fname ]
290+
);
291+
]
273292

274293
method io_read_line chan =
275-
add_comment
276-
"Argument to [Eio.Buf_read.line] is a [Flow.source] but it should be a \
277-
[Eio.Buf_read.t]. Use [Eio.Buf_read.of_flow ~max_size:1_000_000 \
278-
source].";
279294
mk_apply_simple [ "Eio"; "Buf_read"; "line" ] [ chan ]
280295

281296
(* This is of type [Optint.Int63.t] instead of [int] with Lwt. *)

test/lwt_to_direct_style/eio-switch.t/run.t

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ Make a writable directory tree:
2020

2121
let _f fname =
2222
let fd =
23-
Eio.Path.open_in
24-
~sw:(Stdlib.Option.get (Fiber.get Fiber_var.sw))
25-
(Eio.Path.( / ) (Stdlib.Option.get (Fiber.get Fiber_var.env))#cwd fname)
23+
Eio.Buf_read.of_flow ~max_size:1_000_000
24+
(Eio.Path.open_in
25+
~sw:(Stdlib.Option.get (Fiber.get Fiber_var.sw))
26+
(Eio.Path.( / ) (Stdlib.Option.get (Fiber.get Fiber_var.env))#cwd fname))
2627
in
2728
Eio.Resource.close fd
2829
@@ -32,12 +33,11 @@ Make a writable directory tree:
3233
(fun () -> async_process 1);
3334
let fd = Unix.stdin in
3435
let in_chan =
35-
(Eio_unix.Net.import_socket_stream
36-
~sw:(Stdlib.Option.get (Fiber.get Fiber_var.sw))
37-
~close_unix:true
38-
(* TODO: lwt-to-direct-style: This creates a closeable [Flow.source] resource but read operations are rewritten to calls to [Buf_read]. *)
39-
fd
40-
: [ `R | `Flow | `Close ] r)
36+
Eio.Buf_read.of_flow ~max_size:1_000_000
37+
(Eio_unix.Net.import_socket_stream
38+
~sw:(Stdlib.Option.get (Fiber.get Fiber_var.sw))
39+
~close_unix:true fd
40+
: [ `R | `Flow | `Close ] r)
4141
in
4242
let s = Lwt_io.read in_chan in
4343
Lwt_io.printf "%s" s

test/lwt_to_direct_style/to_direct_style.t/run.t

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -680,15 +680,16 @@ Make a writable directory tree:
680680
Unix.(openfile fname [ O_RDWR; O_NONBLOCK; O_APPEND ]) 0o660
681681
|>
682682
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
683-
(* TODO: lwt-to-direct-style: This creates a closeable [Flow.source] resource but read operations are rewritten to calls to [Buf_read]. *)
684683
fun x1 ->
685-
(Eio_unix.Net.import_socket_stream ~sw ~close_unix:true x1
686-
: [ `R | `Flow | `Close ] r)
684+
Eio.Buf_read.of_flow ~max_size:1_000_000
685+
(Eio_unix.Net.import_socket_stream ~sw ~close_unix:true x1
686+
: [ `R | `Flow | `Close ] r)
687687
in
688688
let buf = Bytes.create 1024 in
689689
let _n : int =
690690
Eio.Flow.single_read
691691
(* TODO: lwt-to-direct-style: [buf] should be a [Cstruct.t]. *)
692+
(* TODO: lwt-to-direct-style: [Eio.Flow.single_read] operates on a [Flow.source] but [inp] is likely of type [Eio.Buf_read.t]. Rewrite this code to use [Buf_read] (which contains an internal buffer) or change the call to [Eio.Buf_read.of_flow] used to create the buffer. *)
692693
(* TODO: lwt-to-direct-style: Dropped expression (buffer offset): [0]. *)
693694
(* TODO: lwt-to-direct-style: Dropped expression (buffer length): [1024]. *)
694695
inp buf
@@ -707,52 +708,56 @@ Make a writable directory tree:
707708
Unix.getaddrinfo
708709
709710
let _f fd =
710-
(Eio_unix.Net.import_socket_stream ~sw ~close_unix:true
711-
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
712-
(* TODO: lwt-to-direct-style: This creates a closeable [Flow.sink] resource but write operations are rewritten to calls to [Buf_write]. You might want to use [Buf_write.with_flow sink (fun buf_write -> ...)]. *)
713-
fd
714-
: [ `W | `Flow | `Close ] r)
711+
Eio.Buf_write.with_flow
712+
(Eio_unix.Net.import_socket_stream ~sw ~close_unix:true
713+
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
714+
(* TODO: lwt-to-direct-style: Write operations to buffered IO should be moved inside [with_flow]. *)
715+
fd
716+
: [ `W | `Flow | `Close ] r)
717+
(fun outbuf -> `Move_writing_code_here)
715718
716719
let _f fd =
717-
(Eio_unix.Net.import_socket_stream ~sw ~close_unix:true
718-
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
719-
(* TODO: lwt-to-direct-style: This creates a closeable [Flow.source] resource but read operations are rewritten to calls to [Buf_read]. *)
720-
fd
721-
: [ `R | `Flow | `Close ] r)
720+
Eio.Buf_read.of_flow ~max_size:1_000_000
721+
(Eio_unix.Net.import_socket_stream ~sw ~close_unix:true
722+
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
723+
fd
724+
: [ `R | `Flow | `Close ] r)
722725
723726
let _f fd =
724-
(Eio_unix.Net.import_socket_stream ~sw ~close_unix:true
725-
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
726-
(* TODO: lwt-to-direct-style: This creates a closeable [Flow.sink] resource but write operations are rewritten to calls to [Buf_write]. You might want to use [Buf_write.with_flow sink (fun buf_write -> ...)]. *)
727-
fd
728-
: [ `W | `Flow | `Close ] r)
727+
Eio.Buf_write.with_flow
728+
(Eio_unix.Net.import_socket_stream ~sw ~close_unix:true
729+
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
730+
(* TODO: lwt-to-direct-style: Write operations to buffered IO should be moved inside [with_flow]. *)
731+
fd
732+
: [ `W | `Flow | `Close ] r)
733+
(fun outbuf -> `Move_writing_code_here)
729734
730735
let _f out_chan = Eio.Buf_write.string out_chan "str"
731736
let _ : Eio.Buf_write.t = Lwt_io.stdout
732-
733-
let _f chan =
734-
Eio.Buf_read.line
735-
(* TODO: lwt-to-direct-style: Argument to [Eio.Buf_read.line] is a [Flow.source] but it should be a [Eio.Buf_read.t]. Use [Eio.Buf_read.of_flow ~max_size:1_000_000 source]. *)
736-
chan
737+
let _f chan = Eio.Buf_read.line chan
737738
738739
let _f fname =
739740
let fd =
740-
Eio.Path.open_in ~sw
741-
(Eio.Path.( / ) env#cwd
742-
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
743-
(* TODO: lwt-to-direct-style: [env] must be propagated from the main loop *)
744-
fname)
741+
Eio.Buf_read.of_flow ~max_size:1_000_000
742+
(Eio.Path.open_in ~sw
743+
(Eio.Path.( / ) env#cwd
744+
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
745+
(* TODO: lwt-to-direct-style: [env] must be propagated from the main loop *)
746+
fname))
745747
in
746748
Eio.Resource.close fd
747749
748750
let _f fname =
749751
let fd =
750-
Eio.Path.open_out ~sw ~create:(`If_missing 0o666)
751-
(Eio.Path.( / ) env#cwd
752-
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
753-
(* TODO: lwt-to-direct-style: [flags] and [perm] arguments were dropped. The [~create] was added by default and might not match the previous flags. Use [~append:true] for [O_APPEND]. *)
754-
(* TODO: lwt-to-direct-style: [env] must be propagated from the main loop *)
755-
fname)
752+
Eio.Buf_write.with_flow
753+
(Eio.Path.open_out ~sw ~create:(`If_missing 0o666)
754+
(Eio.Path.( / ) env#cwd
755+
(* TODO: lwt-to-direct-style: [sw] (of type Switch.t) must be propagated here. *)
756+
(* TODO: lwt-to-direct-style: [flags] and [perm] arguments were dropped. The [~create] was added by default and might not match the previous flags. Use [~append:true] for [O_APPEND]. *)
757+
(* TODO: lwt-to-direct-style: [env] must be propagated from the main loop *)
758+
(* TODO: lwt-to-direct-style: Write operations to buffered IO should be moved inside [with_flow]. *)
759+
fname))
760+
(fun outbuf -> `Move_writing_code_here)
756761
in
757762
Eio.File.size fd
758763

0 commit comments

Comments
 (0)