Skip to content

Commit 284310f

Browse files
mfpvbmithr
authored andcommitted
Ocsigen_http_com: avoid O(response) allocation by using Lwt_io, 2X speedup.
Lwt_chan (deprecated) is now built atop Lwt_io and allocates a buffer on every write. Refer to #49. This makes ocsigenserver over 2X faster at serving large files, and also gives a measurable improvement (~20%) for smaller ones (in the 10 KB range).
1 parent d3e4711 commit 284310f

File tree

2 files changed

+34
-32
lines changed

2 files changed

+34
-32
lines changed

opam

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ depends: [
2323
"base-threads"
2424
"react"
2525
"ssl"
26-
"lwt" {>= "2.4.6"}
26+
"lwt" {>= "2.5.0"}
2727
"ocamlnet" {>= "4.0.2"}
2828
"pcre"
2929
"cryptokit"

src/http/ocsigen_http_com.ml

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ let create_waiter block =
9898
type connection =
9999
{ id : int;
100100
fd : Lwt_ssl.socket;
101-
chan : Lwt_chan.out_channel;
101+
chan : Lwt_io.output_channel;
102102
timeout : Lwt_timeout.t;
103103
r_mode : mode;
104104
closed : unit Lwt.t * unit Lwt.u;
@@ -127,11 +127,13 @@ let create_receiver timeout mode fd =
127127
{ id = new_id ();
128128
fd = fd;
129129
chan =
130-
Lwt_chan.make_out_channel
130+
Lwt_io.make
131+
~mode:Lwt_io.output
132+
~buffer:(Lwt_bytes.create buffer_size)
131133
(fun buf pos len ->
132134
Lwt_timeout.start timeout;
133135
Lwt.try_bind
134-
(fun () -> Lwt_ssl.write fd buf pos len)
136+
(fun () -> Lwt_ssl.write_bytes fd buf pos len)
135137
(fun l -> Lwt_timeout.stop timeout; Lwt.return l)
136138
(fun e -> Lwt_timeout.stop timeout;
137139
Lwt.fail (convert_io_error e)));
@@ -505,7 +507,7 @@ let get_http_frame ?(head = false) receiver =
505507

506508
type slot =
507509
{ sl_waiter : waiter;
508-
sl_chan : Lwt_chan.out_channel;
510+
sl_chan : Lwt_io.output_channel;
509511
sl_ssl : bool (* for secure cookies only *)}
510512

511513
let create_slot conn =
@@ -534,7 +536,7 @@ let start_processing conn f =
534536
(*XXX It would be clearer to put this code at the end of the sender function,
535537
but we don't have access to [next_slot] there *)
536538
if not next_waiter.w_did_wait then
537-
Lwt_chan.flush conn.chan
539+
Lwt_io.flush conn.chan
538540
else
539541
Lwt.return ()))
540542
(fun () ->
@@ -565,7 +567,7 @@ let wait_all_senders conn =
565567
(*XXX Do we need a flush here? Are we properly flushing in case of an error? *)
566568
(fun () ->
567569
conn.senders.w_wait >>= fun () ->
568-
Lwt_chan.flush conn.chan)
570+
Lwt_io.flush conn.chan)
569571
(fun e -> match e with Aborted -> Lwt.return () | _ -> Lwt.fail e))
570572
(fun () ->
571573
Lwt_timeout.stop conn.timeout;
@@ -623,16 +625,16 @@ let default_sender = create_sender ~server_name:Ocsigen_config.server_name ()
623625
Ocsigen_stream.next stream >>= fun e ->
624626
match e with
625627
Ocsigen_stream.Finished _ ->
626-
Lwt_chan.output_string out_ch "0\r\n\r\n"
628+
Lwt_io.write out_ch "0\r\n\r\n"
627629
| Ocsigen_stream.Cont (s, next) ->
628630
let l = String.length s in
629631
begin if l = 0 then
630632
(* It is incorrect to send an empty chunk *)
631633
Lwt.return ()
632634
else begin
633-
Lwt_chan.output_string out_ch (Format.sprintf "%x\r\n" l) >>= fun () ->
634-
Lwt_chan.output_string out_ch s >>= fun () ->
635-
Lwt_chan.output_string out_ch "\r\n"
635+
Lwt_io.write out_ch (Format.sprintf "%x\r\n" l) >>= fun () ->
636+
Lwt_io.write out_ch s >>= fun () ->
637+
Lwt_io.write out_ch "\r\n"
636638
end end >>= fun () ->
637639
write_stream_chunked out_ch next
638640
*)
@@ -643,7 +645,7 @@ let default_sender = create_sender ~server_name:Ocsigen_config.server_name ()
643645
We bufferise them before creating a thunk.
644646
Benchmarks cannot prove that it is better, but at least the network stream
645647
is readable ...
646-
It is then buffered again by Lwt_chan.
648+
It is then buffered again by Lwt_io.
647649
Is there a way to have only one buffer?
648650
*)
649651
let write_stream_chunked out_ch stream =
@@ -656,38 +658,38 @@ let write_stream_chunked out_ch stream =
656658
| Ocsigen_stream.Finished _ ->
657659
(if len > 0 then begin
658660
(* It is incorrect to send an empty chunk *)
659-
Lwt_chan.output_string
661+
Lwt_io.write
660662
out_ch (Format.sprintf "%x\r\n" len) >>= fun () ->
661-
Lwt_chan.output out_ch buffer 0 len >>= fun () ->
662-
Lwt_chan.output_string out_ch "\r\n"
663+
Lwt_io.write_from_exactly out_ch buffer 0 len >>= fun () ->
664+
Lwt_io.write out_ch "\r\n"
663665
end else
664666
Lwt.return ()) >>= fun () ->
665-
Lwt_chan.output_string out_ch "0\r\n\r\n"
667+
Lwt_io.write out_ch "0\r\n\r\n"
666668
| Ocsigen_stream.Cont (s, next) ->
667669
let l = String.length s in
668670
if l = 0 then
669671
aux next len
670672
else
671673
if l >= size_for_not_buffering then begin
672674
(if len > 0 then begin
673-
Lwt_chan.output_string
675+
Lwt_io.write
674676
out_ch (Format.sprintf "%x\r\n" len) >>= fun () ->
675-
Lwt_chan.output out_ch buffer 0 len >>= fun () ->
676-
Lwt_chan.output_string out_ch "\r\n"
677+
Lwt_io.write_from_exactly out_ch buffer 0 len >>= fun () ->
678+
Lwt_io.write out_ch "\r\n"
677679
end else Lwt.return ()) >>= fun () ->
678-
Lwt_chan.output_string
680+
Lwt_io.write
679681
out_ch (Format.sprintf "%x\r\n" l) >>= fun () ->
680-
Lwt_chan.output out_ch s 0 l >>= fun () ->
681-
Lwt_chan.output_string out_ch "\r\n" >>= fun () ->
682+
Lwt_io.write_from_exactly out_ch s 0 l >>= fun () ->
683+
Lwt_io.write out_ch "\r\n" >>= fun () ->
682684
aux next 0
683685
end else (* Will not work if l is very large: *)
684686
let available = buf_size - len in
685687
if l > available then begin
686-
Lwt_chan.output_string
688+
Lwt_io.write
687689
out_ch (Format.sprintf "%x\r\n" buf_size) >>= fun () ->
688-
Lwt_chan.output out_ch buffer 0 len >>= fun () ->
689-
Lwt_chan.output out_ch s 0 available >>= fun () ->
690-
Lwt_chan.output_string out_ch "\r\n" >>= fun () ->
690+
Lwt_io.write_from_exactly out_ch buffer 0 len >>= fun () ->
691+
Lwt_io.write_from_exactly out_ch s 0 available >>= fun () ->
692+
Lwt_io.write out_ch "\r\n" >>= fun () ->
691693
let newlen = l - available in
692694
String.blit s available buffer 0 newlen;
693695
aux next newlen
@@ -705,7 +707,7 @@ let rec write_stream_raw out_ch stream =
705707
| Ocsigen_stream.Finished _ ->
706708
Lwt.return ()
707709
| Ocsigen_stream.Cont (s, next) ->
708-
Lwt_chan.output_string out_ch s >>= fun () ->
710+
Lwt_io.write out_ch s >>= fun () ->
709711
write_stream_raw out_ch next
710712

711713
(*XXX We should check the length of the stream:
@@ -742,7 +744,7 @@ let send_100_continue slot =
742744
} in
743745
Lwt_log.ign_info ~section "writing 100-continue";
744746
Lwt_log.ign_info ~section hh;
745-
Lwt_chan.output_string out_ch hh
747+
Lwt_io.write out_ch hh
746748

747749
(** Sends the HTTP frame.
748750
* The headers are merged with those of the sender, the priority
@@ -842,13 +844,13 @@ let send
842844
let hh = Framepp.string_of_header hd in
843845
Lwt_log.ign_info_f ~section "writing header\n%s" hh;
844846
observe_result hd hh >>= fun () ->
845-
Lwt_chan.output_string out_ch hh >>= fun () ->
847+
Lwt_io.write out_ch hh >>= fun () ->
846848
(if reopen <> None then
847849
(* If we want to give a possibility to reopen if
848850
it fails, we must detect the failure before
849851
beginning to read the stream
850852
*)
851-
Lwt_chan.flush out_ch
853+
Lwt_io.flush out_ch
852854
else Lwt.return ())
853855
)
854856
(fun e -> (* *** If we are doing a request,
@@ -879,8 +881,8 @@ let send
879881
Lwt_log.ign_info ~section "writing body";
880882
write_stream ~chunked out_ch (fst (Result.stream res))
881883
end) >>= fun () ->
882-
Lwt_chan.flush out_ch (* Vincent: I add this otherwise HEAD answers
883-
are not flushed by the reverse proxy *)
884+
Lwt_io.flush out_ch (* Vincent: I add this otherwise HEAD answers
885+
are not flushed by the reverse proxy *)
884886
>>= fun () ->
885887
Ocsigen_stream.finalize (fst (Result.stream res)) `Success
886888
)

0 commit comments

Comments
 (0)