File tree Expand file tree Collapse file tree 2 files changed +45
-20
lines changed Expand file tree Collapse file tree 2 files changed +45
-20
lines changed Original file line number Diff line number Diff line change @@ -469,27 +469,21 @@ let rec await_batch t =
469469 );
470470 await_batch t
471471
472- let read_into t buf =
473- let iovecs = await_batch t in
474- let n, _iovecs = Cstruct. fillv ~src: iovecs ~dst: buf in
475- shift t n;
476- n
477-
478- let read_source_buffer t fn =
479- let iovecs = await_batch t in
480- shift t (fn iovecs)
481-
482- let as_flow t =
483- object
484- inherit Flow. source
485- method! read_methods = [Flow. Read_source_buffer (read_source_buffer t)]
486- method read_into = read_into t
487- end
472+ (* We have to do our own copy, because we can't [shift] until the write is complete. *)
473+ let copy t flow =
474+ let rec aux () =
475+ let iovecs = await_batch t in
476+ Flow. write flow iovecs; (* todo: add a Flow.single_write and use that. *)
477+ shift t (Cstruct. lenv iovecs);
478+ aux ()
479+ in
480+ try aux ()
481+ with End_of_file -> ()
488482
489483let with_flow ?(initial_size =0x1000 ) flow fn =
490484 Switch. run @@ fun sw ->
491485 let t = create ~sw initial_size in
492- Fiber. fork ~sw (fun () -> Flow. copy (as_flow t) flow);
486+ Fiber. fork ~sw (fun () -> copy t flow);
493487 match fn t with
494488 | x ->
495489 close t;
Original file line number Diff line number Diff line change @@ -182,7 +182,10 @@ Eio_mock.Flow.on_copy_bytes flow [
182182- : unit = ()
183183```
184184
185- Multiple flushes:
185+ Multiple flushes.
186+ Note: ideally the flushes here would complete as soon as enough data has been flushed,
187+ but currently Eio.Flow.sink doesn't allow short writes and so Buf_write has to wait for
188+ the whole batch to be flushed.
186189
187190``` ocaml
188191# Eio_mock.Backend.run @@ fun () ->
@@ -201,15 +204,43 @@ Multiple flushes:
201204 traceln "Done";;
202205+flow: wrote (rsb) ["a"]
203206+flow: wrote (rsb) ["b"; "c"]
204- +1st flush
205207+flow: wrote (rsb) ["d"; "e"]
206- +2nd flush
207208+flow: wrote (rsb) ["f"]
209+ +1st flush
210+ +2nd flush
208211+3rd flush
209212+Done
210213- : unit = ()
211214```
212215
216+ Check flush waits for the write to succeed:
217+
218+ ``` ocaml
219+ let slow_writer = object
220+ inherit Eio.Flow.sink
221+ method copy src =
222+ let buf = Cstruct.create 10 in
223+ try
224+ while true do
225+ let len = Eio.Flow.single_read src buf in
226+ Fiber.yield ();
227+ traceln "Write %S" (Cstruct.to_string buf ~len)
228+ done
229+ with End_of_file -> ()
230+ end
231+ ```
232+
233+ ``` ocaml
234+ # Eio_mock.Backend.run @@ fun () ->
235+ Write.with_flow slow_writer @@ fun t ->
236+ Write.string t "test";
237+ Write.flush t;
238+ traceln "Flush complete"
239+ +Write "test"
240+ +Flush complete
241+ - : unit = ()
242+ ```
243+
213244## Scheduling
214245
215246``` ocaml
You can’t perform that action at this time.
0 commit comments