@@ -59,128 +59,77 @@ let gzip_header =
5959type output_buffer =
6060 { stream : Zlib .stream
6161 ; buf : bytes
62- ; mutable pos : int
63- ; mutable avail : int
62+ ; flush : string -> unit Lwt .t
6463 ; mutable size : int32
65- ; mutable crc : int32
66- ; mutable add_trailer : bool }
64+ ; mutable crc : int32 }
6765
68- let write_int32 oz n =
66+ let write_int32 buf offset n =
6967 for i = 0 to 3 do
70- Bytes. set oz. buf (oz.pos + i)
68+ Bytes. set buf (offset + i)
7169 (Char. chr (Int32. to_int (Int32. shift_right_logical n (8 * i)) land 0xff ))
72- done ;
73- oz.pos < - oz.pos + 4 ;
74- oz.avail < - oz.avail - 4 ;
75- assert (oz.avail > = 0 )
70+ done
7671
77- (* puts in oz the content of buf, from pos to pos + len ;
78- * f is the continuation of the current stream *)
79- let rec output oz f buf pos len =
80- assert (pos > = 0 && len > = 0 && pos + len < = String. length buf);
81- if oz.avail = 0
82- then (
83- let cont () = output oz f buf pos len in
84- Lwt_log. ign_info ~section " Flushing because output buffer is full" ;
85- flush oz cont)
86- else if len = 0
87- then next_cont oz f
72+ let compress_flush oz used_out = oz.flush (Bytes. sub_string oz.buf 0 used_out)
73+
74+ (* gzip trailer *)
75+ let write_trailer oz =
76+ write_int32 oz.buf 0 oz.crc;
77+ write_int32 oz.buf 4 oz.size;
78+ compress_flush oz 8
79+
80+ (* puts in oz the content of buf, from pos to pos + len ; *)
81+ let rec compress_output oz inbuf pos len =
82+ if len = 0
83+ then Lwt. return_unit
8884 else
8985 let (_ : bool ), used_in, used_out =
9086 try
91- Zlib. deflate oz.stream
92- (Bytes. unsafe_of_string buf)
93- pos len oz.buf oz.pos oz.avail Zlib. Z_NO_FLUSH
87+ Zlib. deflate_string oz.stream inbuf pos len oz.buf 0
88+ (Bytes. length oz.buf) Zlib. Z_NO_FLUSH
9489 with Zlib. Error (s , s' ) ->
9590 raise
9691 (Ocsigen_stream. Stream_error
9792 (" Error during compression: " ^ s ^ " " ^ s'))
9893 in
99- oz.pos < - oz.pos + used_out;
100- oz.avail < - oz.avail - used_out;
101- oz.size < - Int32. add oz.size (Int32. of_int used_in);
102- oz.crc < - Zlib. update_crc_string oz.crc buf pos used_in;
103- output oz f buf (pos + used_in) (len - used_in)
94+ compress_flush oz used_out >> = fun () ->
95+ compress_output oz inbuf (pos + used_in) (len - used_in)
10496
105- (* Flush oz, ie. produces a new_stream with the content of oz, cleans it
106- * and returns the continuation of the stream *)
107- and flush oz cont =
108- let len = oz.pos in
109- if len = 0
110- then cont ()
111- else
112- let buf_len = Bytes. length oz.buf in
113- let s =
114- if len = buf_len
115- then Bytes. to_string oz.buf
116- else Bytes. sub_string oz.buf 0 len
117- in
118- Lwt_log. ign_info ~section " Flushing!" ;
119- oz.pos < - 0 ;
120- oz.avail < - buf_len;
121- Ocsigen_stream. cont s cont
122-
123- and next_cont oz stream =
124- Ocsigen_stream. next (stream : string Ocsigen_stream.stream ) >> = fun e ->
125- match e with
126- | Ocsigen_stream. Finished None ->
127- Lwt_log. ign_info ~section " End of stream: big cleaning for zlib" ;
128- (* loop until there is nothing left to compress and flush *)
129- let rec finish () =
130- (* buffer full *)
131- if oz.avail = 0
132- then flush oz finish
133- else
134- (* no more input, deflates only what were left because output buffer
135- * was full *)
136- let finished, (_ : int ), used_out =
137- Zlib. deflate oz.stream oz.buf 0 0 oz.buf oz.pos oz.avail
138- Zlib. Z_FINISH
139- in
140- oz.pos < - oz.pos + used_out;
141- oz.avail < - oz.avail - used_out;
142- if not finished then finish () else write_trailer ()
143- and write_trailer () =
144- if oz.add_trailer && oz.avail < 8
145- then flush oz write_trailer
146- else (
147- if oz.add_trailer then (write_int32 oz oz.crc; write_int32 oz oz.size);
148- Lwt_log. ign_info ~section " Zlib.deflate finished, last flush" ;
149- flush oz (fun () -> Ocsigen_stream. empty None ))
150- in
151- finish ()
152- | Ocsigen_stream. Finished (Some s ) -> next_cont oz s
153- | Ocsigen_stream. Cont (s , f ) -> output oz f s 0 (String. length s)
97+ let rec compress_finish oz =
98+ (* loop until there is nothing left to compress and flush *)
99+ let finished, (_ : int ), used_out =
100+ Zlib. deflate oz.stream oz.buf 0 0 oz.buf 0 (Bytes. length oz.buf)
101+ Zlib. Z_FINISH
102+ in
103+ compress_flush oz used_out >> = fun () ->
104+ if not finished then compress_finish oz else Lwt. return_unit
154105
155106(* deflate param : true = deflate ; false = gzip (no header in this case) *)
156- let compress deflate stream : string Ocsigen_stream.t =
107+ let compress_body deflate body =
108+ fun flush ->
157109 let zstream = Zlib. deflate_init ! compress_level deflate in
158- let finalize status =
159- Ocsigen_stream. finalize stream status >> = fun _e ->
160- (try Zlib. deflate_end zstream
161- with
162- (* ignore errors, deflate_end cleans everything anyway *)
163- | Zlib. Error _ ->
164- () );
165- Lwt. return (Lwt_log. ign_info ~section " Zlib stream closed" )
166- in
167110 let oz =
168111 let buffer_size = ! buffer_size in
169112 { stream = zstream
170113 ; buf = Bytes. create buffer_size
171- ; pos = 0
172- ; avail = buffer_size
114+ ; flush
173115 ; size = 0l
174- ; crc = 0l
175- ; add_trailer = not deflate }
116+ ; crc = 0l }
176117 in
177- let new_stream () = next_cont oz (Ocsigen_stream. get stream) in
178- Lwt_log. ign_info ~section " Zlib stream initialized" ;
179- if deflate
180- then Ocsigen_stream. make ~finalize new_stream
181- else
182- Ocsigen_stream. make ~finalize (fun () ->
183- Ocsigen_stream. cont gzip_header new_stream)
118+ (if deflate then Lwt. return_unit else flush gzip_header) >> = fun () ->
119+ body (fun inbuf ->
120+ let len = String. length inbuf in
121+ oz.size < - Int32. add oz.size (Int32. of_int len);
122+ oz.crc < - Zlib. update_crc_string oz.crc inbuf 0 len;
123+ compress_output oz inbuf 0 len)
124+ >> = fun () ->
125+ compress_finish oz >> = fun () ->
126+ (if deflate then Lwt. return_unit else write_trailer oz) >> = fun () ->
127+ (try Zlib. deflate_end zstream
128+ with
129+ (* ignore errors, deflate_end cleans everything anyway *)
130+ | Zlib. Error _ ->
131+ () );
132+ Lwt. return_unit
184133
185134(* We implement Content-Encoding, not Transfer-Encoding *)
186135type encoding = Deflate | Gzip | Id | Star | Not_acceptable
@@ -249,8 +198,8 @@ let stream_filter contentencoding url deflate choice res =
249198 match Ocsigen_header.Mime_type. parse contenttype with
250199 | None , _ | _ , None -> Lwt. return res
251200 | Some a , Some b when should_compress (a, b) url choice ->
252- let response, body = Ocsigen_response. to_cohttp res in
253201 let response =
202+ let response = Ocsigen_response. response res in
254203 let headers = Cohttp.Response. headers response in
255204 let headers =
256205 let name = Ocsigen_header.Name. (to_string etag) in
@@ -270,10 +219,10 @@ let stream_filter contentencoding url deflate choice res =
270219 Cohttp.Response. headers
271220 ; Cohttp.Response. encoding = Cohttp.Transfer. Chunked }
272221 and body =
273- Cohttp_lwt .Body.to_stream body
274- |> Ocsigen_stream. of_lwt_stream |> compress deflate
275- |> Ocsigen_stream. to_lwt_stream
276- |> Cohttp_lwt.Body. of_stream
222+ Ocsigen_response .Body.make Cohttp.Transfer. Chunked
223+ (compress_body deflate
224+ ( Ocsigen_response.Body. write
225+ ( Ocsigen_response. body res)))
277226 in
278227 Lwt. return (Ocsigen_response. update res ~body ~response )
279228 | _ -> Lwt. return res)
0 commit comments