@@ -59,131 +59,83 @@ let gzip_header =
5959type output_buffer =
6060 { stream : Zlib .stream
6161 ; buf : bytes
62- ; mutable pos : int
63- ; mutable avail : int
62+ ; flush : string -> unit Lwt .t
6463 ; mutable size : int32
65- ; mutable crc : int32
66- ; mutable add_trailer : bool }
64+ ; mutable crc : int32 }
6765
68- let write_int32 oz n =
66+ let write_int32 buf offset n =
6967 for i = 0 to 3 do
70- Bytes. set oz. buf (oz.pos + i)
68+ Bytes. set buf (offset + i)
7169 (Char. chr (Int32. to_int (Int32. shift_right_logical n (8 * i)) land 0xff ))
72- done ;
73- oz.pos < - oz.pos + 4 ;
74- oz.avail < - oz.avail - 4 ;
75- assert (oz.avail > = 0 )
70+ done
7671
77- (* puts in oz the content of buf, from pos to pos + len ;
78- * f is the continuation of the current stream *)
79- let rec output oz f buf pos len =
80- assert (pos > = 0 && len > = 0 && pos + len < = String. length buf);
81- if oz.avail = 0
82- then (
83- let cont () = output oz f buf pos len in
84- Logs. info ~src: section (fun fmt ->
85- fmt " Flushing because output buffer is full" );
86- flush oz cont)
87- else if len = 0
88- then next_cont oz f
72+ let compress_flush oz used_out =
73+ Logs. debug ~src: section (fun fmt -> fmt " Flushing %d bytes" used_out);
74+ if used_out > 0
75+ then oz.flush (Bytes. sub_string oz.buf 0 used_out)
76+ else Lwt. return_unit
77+
78+ (* gzip trailer *)
79+ let write_trailer oz =
80+ write_int32 oz.buf 0 oz.crc;
81+ write_int32 oz.buf 4 oz.size;
82+ compress_flush oz 8
83+
84+ (* puts in oz the content of buf, from pos to pos + len ; *)
85+ let rec compress_output oz inbuf pos len =
86+ if len = 0
87+ then Lwt. return_unit
8988 else
9089 let (_ : bool ), used_in, used_out =
9190 try
92- Zlib. deflate oz.stream
93- (Bytes. unsafe_of_string buf)
94- pos len oz.buf oz.pos oz.avail Zlib. Z_NO_FLUSH
91+ Zlib. deflate_string oz.stream inbuf pos len oz.buf 0
92+ (Bytes. length oz.buf) Zlib. Z_NO_FLUSH
9593 with Zlib. Error (s , s' ) ->
9694 raise
9795 (Ocsigen_stream. Stream_error
9896 (" Error during compression: " ^ s ^ " " ^ s'))
9997 in
100- oz.pos < - oz.pos + used_out;
101- oz.avail < - oz.avail - used_out;
102- oz.size < - Int32. add oz.size (Int32. of_int used_in);
103- oz.crc < - Zlib. update_crc_string oz.crc buf pos used_in;
104- output oz f buf (pos + used_in) (len - used_in)
98+ compress_flush oz used_out >> = fun () ->
99+ compress_output oz inbuf (pos + used_in) (len - used_in)
105100
106- (* Flush oz, ie. produces a new_stream with the content of oz, cleans it
107- * and returns the continuation of the stream *)
108- and flush oz cont =
109- let len = oz.pos in
110- if len = 0
111- then cont ()
112- else
113- let buf_len = Bytes. length oz.buf in
114- let s =
115- if len = buf_len
116- then Bytes. to_string oz.buf
117- else Bytes. sub_string oz.buf 0 len
118- in
119- Logs. info ~src: section (fun fmt -> fmt " Flushing!" );
120- oz.pos < - 0 ;
121- oz.avail < - buf_len;
122- Ocsigen_stream. cont s cont
123-
124- and next_cont oz stream =
125- Ocsigen_stream. next (stream : string Ocsigen_stream.stream ) >> = fun e ->
126- match e with
127- | Ocsigen_stream. Finished None ->
128- Logs. info ~src: section (fun fmt ->
129- fmt " End of stream: big cleaning for zlib" );
130- (* loop until there is nothing left to compress and flush *)
131- let rec finish () =
132- (* buffer full *)
133- if oz.avail = 0
134- then flush oz finish
135- else
136- (* no more input, deflates only what were left because output buffer
137- * was full *)
138- let finished, (_ : int ), used_out =
139- Zlib. deflate oz.stream oz.buf 0 0 oz.buf oz.pos oz.avail
140- Zlib. Z_FINISH
141- in
142- oz.pos < - oz.pos + used_out;
143- oz.avail < - oz.avail - used_out;
144- if not finished then finish () else write_trailer ()
145- and write_trailer () =
146- if oz.add_trailer && oz.avail < 8
147- then flush oz write_trailer
148- else (
149- if oz.add_trailer then (write_int32 oz oz.crc; write_int32 oz oz.size);
150- Logs. info ~src: section (fun fmt ->
151- fmt " Zlib.deflate finished, last flush" );
152- flush oz (fun () -> Ocsigen_stream. empty None ))
153- in
154- finish ()
155- | Ocsigen_stream. Finished (Some s ) -> next_cont oz s
156- | Ocsigen_stream. Cont (s , f ) -> output oz f s 0 (String. length s)
101+ let rec compress_finish oz =
102+ Logs. debug ~src: section (fun fmt -> fmt " Finishing" );
103+ (* loop until there is nothing left to compress and flush *)
104+ let finished, (_ : int ), used_out =
105+ Zlib. deflate oz.stream oz.buf 0 0 oz.buf 0 (Bytes. length oz.buf)
106+ Zlib. Z_FINISH
107+ in
108+ compress_flush oz used_out >> = fun () ->
109+ if not finished then compress_finish oz else Lwt. return_unit
157110
158111(* deflate param : true = deflate ; false = gzip (no header in this case) *)
159- let compress deflate stream : string Ocsigen_stream.t =
112+ let compress_body deflate body =
113+ fun flush ->
160114 let zstream = Zlib. deflate_init ! compress_level deflate in
161- let finalize status =
162- Ocsigen_stream. finalize stream status >> = fun _e ->
163- (try Zlib. deflate_end zstream
164- with
165- (* ignore errors, deflate_end cleans everything anyway *)
166- | Zlib. Error _ ->
167- () );
168- Lwt. return (Logs. info ~src: section (fun fmt -> fmt " Zlib stream closed" ))
169- in
170115 let oz =
171116 let buffer_size = ! buffer_size in
172117 { stream = zstream
173118 ; buf = Bytes. create buffer_size
174- ; pos = 0
175- ; avail = buffer_size
119+ ; flush
176120 ; size = 0l
177- ; crc = 0l
178- ; add_trailer = not deflate }
121+ ; crc = 0l }
179122 in
180- let new_stream () = next_cont oz (Ocsigen_stream. get stream) in
181- Logs. info ~src: section (fun fmt -> fmt " Zlib stream initialized" );
182- if deflate
183- then Ocsigen_stream. make ~finalize new_stream
184- else
185- Ocsigen_stream. make ~finalize (fun () ->
186- Ocsigen_stream. cont gzip_header new_stream)
123+ (if deflate then Lwt. return_unit else flush gzip_header) >> = fun () ->
124+ body (fun inbuf ->
125+ let len = String. length inbuf in
126+ oz.size < - Int32. add oz.size (Int32. of_int len);
127+ oz.crc < - Zlib. update_crc_string oz.crc inbuf 0 len;
128+ compress_output oz inbuf 0 len)
129+ >> = fun () ->
130+ compress_finish oz >> = fun () ->
131+ (if deflate then Lwt. return_unit else write_trailer oz) >> = fun () ->
132+ Logs. debug ~src: section (fun fmt -> fmt " Close stream" );
133+ (try Zlib. deflate_end zstream
134+ with
135+ (* ignore errors, deflate_end cleans everything anyway *)
136+ | Zlib. Error _ ->
137+ () );
138+ Lwt. return_unit
187139
188140(* We implement Content-Encoding, not Transfer-Encoding *)
189141type encoding = Deflate | Gzip | Id | Star | Not_acceptable
@@ -252,8 +204,8 @@ let stream_filter contentencoding url deflate choice res =
252204 match Ocsigen_header.Mime_type. parse contenttype with
253205 | None , _ | _ , None -> Lwt. return res
254206 | Some a , Some b when should_compress (a, b) url choice ->
255- let response, body = Ocsigen_response. to_cohttp res in
256207 let response =
208+ let response = Ocsigen_response. response res in
257209 let headers = Cohttp.Response. headers response in
258210 let headers =
259211 let name = Ocsigen_header.Name. (to_string etag) in
@@ -273,10 +225,10 @@ let stream_filter contentencoding url deflate choice res =
273225 Cohttp.Response. headers
274226 ; Cohttp.Response. encoding = Cohttp.Transfer. Chunked }
275227 and body =
276- Cohttp_lwt .Body.to_stream body
277- |> Ocsigen_stream. of_lwt_stream |> compress deflate
278- |> Ocsigen_stream. to_lwt_stream
279- |> Cohttp_lwt.Body. of_stream
228+ Ocsigen_response .Body.make Cohttp.Transfer. Chunked
229+ (compress_body deflate
230+ ( Ocsigen_response.Body. write
231+ ( Ocsigen_response. body res)))
280232 in
281233 Lwt. return (Ocsigen_response. update res ~body ~response )
282234 | _ -> Lwt. return res)
0 commit comments