Skip to content

Commit 6665af8

Browse files
committed
Merge branch 'master' of github.com:ztellman/aleph
2 parents f2d2ec8 + c833381 commit 6665af8

File tree

3 files changed

+211
-31
lines changed

3 files changed

+211
-31
lines changed

src/aleph/http/client.clj

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@
3333
Channel
3434
ChannelHandler ChannelHandlerContext
3535
ChannelPipeline]
36-
[io.netty.handler.stream ChunkedWriteHandler]
37-
[io.netty.handler.codec.http FullHttpRequest]
36+
[io.netty.handler.codec
37+
TooLongFrameException]
38+
[io.netty.handler.stream
39+
ChunkedWriteHandler]
40+
[io.netty.handler.codec.http
41+
FullHttpRequest]
3842
[io.netty.handler.codec.http.websocketx
3943
CloseWebSocketFrame
4044
PingWebSocketFrame
@@ -159,7 +163,13 @@
159163

160164
:exception-caught
161165
([_ ctx ex]
162-
(when-not (instance? IOException ex)
166+
(cond
167+
; could happens when io.netty.handler.codec.http.HttpObjectAggregator
168+
; is part of the pipeline
169+
(instance? TooLongFrameException ex)
170+
(s/put! response-stream ex)
171+
172+
(not (instance? IOException ex))
163173
(log/warn ex "error in HTTP client")))
164174

165175
:channel-inactive
@@ -176,6 +186,18 @@
176186

177187
(cond
178188

189+
; happens when io.netty.handler.codec.http.HttpObjectAggregator is part of the pipeline
190+
(instance? FullHttpResponse msg)
191+
(let [^FullHttpResponse rsp msg
192+
content (.content rsp)
193+
c (d/deferred)
194+
s (netty/buffered-source (netty/channel ctx) #(alength ^bytes %) buffer-capacity)]
195+
(s/on-closed s #(d/success! c true))
196+
(s/put! s (netty/buf->array content))
197+
(netty/release content)
198+
(handle-response rsp c s)
199+
(s/close! s))
200+
179201
(instance? HttpResponse msg)
180202
(let [rsp msg]
181203
(if (HttpUtil/isTransferEncodingChunked rsp)

src/aleph/http/multipart.clj

Lines changed: 125 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,41 @@
11
(ns aleph.http.multipart
22
(:require
3-
[clojure.core :as cc]
4-
[byte-streams :as bs]
5-
[aleph.http.encoding :refer [encode]]
6-
[aleph.netty :as netty])
3+
[clojure.core :as cc]
4+
[byte-streams :as bs]
5+
[aleph.http.encoding :refer [encode]]
6+
[aleph.http.core :as http-core]
7+
[aleph.netty :as netty]
8+
[manifold.stream :as s]
9+
[clojure.tools.logging :as log]
10+
[manifold.deferred :as d])
711
(:import
8-
[java.util
9-
Locale]
10-
[java.io
11-
File]
12-
[java.nio
13-
ByteBuffer]
14-
[java.nio.charset
15-
Charset]
16-
[java.net
17-
URLConnection]
18-
[io.netty.util.internal
19-
ThreadLocalRandom]
20-
[io.netty.handler.codec.http
21-
DefaultHttpRequest
22-
FullHttpRequest
23-
HttpConstants]
24-
[io.netty.handler.codec.http.multipart
25-
HttpPostRequestEncoder
26-
MemoryAttribute]))
12+
[java.util
13+
Locale]
14+
[java.io
15+
File]
16+
[java.nio
17+
ByteBuffer]
18+
[java.nio.charset
19+
Charset]
20+
[java.net
21+
URLConnection]
22+
[io.netty.util.internal
23+
ThreadLocalRandom]
24+
[io.netty.handler.codec.http
25+
DefaultHttpContent
26+
DefaultHttpRequest
27+
FullHttpRequest
28+
HttpConstants]
29+
[io.netty.handler.codec.http.multipart
30+
Attribute
31+
MemoryAttribute
32+
FileUpload
33+
HttpDataFactory
34+
DefaultHttpDataFactory
35+
HttpPostRequestDecoder
36+
HttpPostRequestEncoder
37+
InterfaceHttpData
38+
InterfaceHttpData$HttpDataType]))
2739

2840
(defn boundary []
2941
(-> (ThreadLocalRandom/current) .nextLong Long/toHexString .toLowerCase))
@@ -146,3 +158,93 @@
146158
(.addBodyHttpData encoder attr))))
147159
(let [req' (.finalizeRequest encoder)]
148160
[req' (when (.isChunked encoder) encoder)])))
161+
162+
(defmulti http-data->map
163+
(fn [^InterfaceHttpData data]
164+
(.getHttpDataType data)))
165+
166+
(defmethod http-data->map InterfaceHttpData$HttpDataType/Attribute
167+
[^Attribute attr]
168+
(let [content (.getValue attr)]
169+
{:part-name (.getName attr)
170+
:content content
171+
:name nil
172+
:charset (-> attr .getCharset .toString)
173+
:mime-type nil
174+
:transfer-encoding nil
175+
:memory? (.isInMemory attr)
176+
:file? false
177+
:file nil
178+
:size (count content)}))
179+
180+
(defmethod http-data->map InterfaceHttpData$HttpDataType/FileUpload
181+
[^FileUpload data]
182+
(let [memory? (.isInMemory data)]
183+
{:part-name (.getName data)
184+
:content (when memory?
185+
(bs/to-input-stream (netty/acquire (.content data))))
186+
:name (.getFilename data)
187+
:charset (-> data .getCharset .toString)
188+
:mime-type (.getContentType data)
189+
:transfer-encoding (.getContentTransferEncoding data)
190+
:memory? memory?
191+
:file? true
192+
:file (when-not memory? (.getFile data))
193+
:size (.length data)}))
194+
195+
(defn- read-attributes [^HttpPostRequestDecoder decoder parts]
196+
(while (.hasNext decoder)
197+
(s/put! parts (http-data->map (.next decoder)))))
198+
199+
(defn decode-request
200+
"Takes a ring request and returns a manifold stream which yields
201+
parts of the mutlipart/form-data encoded body. In case the size of
202+
a part content exceeds `:memory-limit` limit (16KB by default),
203+
corresponding payload would be written to a temp file. Check `:memory?`
204+
flag to know whether content might be read directly from `:content` or
205+
should be fetched from the file specified in `:file`.
206+
207+
Note, that if your handler works with multipart requests only,
208+
it's better to set `:raw-stream?` to `true` to avoid additional
209+
input stream coercion."
210+
([req] (decode-request req {}))
211+
([{:keys [body] :as req}
212+
{:keys [body-buffer-size
213+
memory-limit]
214+
:or {body-buffer-size 65536
215+
memory-limit DefaultHttpDataFactory/MINSIZE}}]
216+
(let [body (if (s/stream? body)
217+
body
218+
(netty/to-byte-buf-stream body body-buffer-size))
219+
destroyed? (atom false)
220+
req' (http-core/ring-request->netty-request req)
221+
factory (DefaultHttpDataFactory. (long memory-limit))
222+
decoder (HttpPostRequestDecoder. factory req')
223+
parts (s/stream)]
224+
225+
;; on each HttpContent chunk, put it into the decoder
226+
;; and resume our attempts to get the next attribute available
227+
(s/connect-via
228+
body
229+
(fn [chunk]
230+
(let [content (DefaultHttpContent. chunk)]
231+
(.offer decoder content)
232+
(read-attributes decoder parts)
233+
;; note, that releasing chunk right here relies on
234+
;; the internals of the decoder. in case those
235+
;; internal are changed in future, this flow of
236+
;; manipulations should be also reconsidered
237+
(netty/release chunk)
238+
(d/success-deferred true)))
239+
parts)
240+
241+
(s/on-closed
242+
parts
243+
(fn []
244+
(when (compare-and-set! destroyed? false true)
245+
(try
246+
(.destroy decoder)
247+
(catch Exception e
248+
(log/warn e "exception when cleaning up multipart decoder"))))))
249+
250+
parts)))

test/aleph/http/multipart_test.clj

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
[aleph.http :as http]
66
[aleph.http.multipart :as mp]
77
[byte-streams :as bs]
8-
[manifold.deferred :as d])
8+
[manifold.deferred :as d]
9+
[manifold.stream :as s]
10+
[clojure.string :as str]
11+
[clojure.edn :as edn])
912
(:import
1013
[java.io
1114
File]))
@@ -113,8 +116,12 @@
113116
(is (.contains body-str "Content-Type: application/png\r\n"))
114117
(is (.contains body-str "Content-Transfer-Encoding: base64\r\n"))))
115118

116-
(def port 26003)
117-
(def url (str "http://localhost:" port))
119+
(def port1 26023)
120+
(def port2 26024)
121+
(def port3 26025)
122+
(def url1 (str "http://localhost:" port1))
123+
(def url2 (str "http://localhost:" port2))
124+
(def url3 (str "http://localhost:" port3))
118125

119126
(def parts [{:part-name "#0-string"
120127
:content "CONTENT1"}
@@ -137,9 +144,9 @@
137144
:body body})
138145

139146
(deftest test-send-multipart-request
140-
(let [s (http/start-server echo-handler {:port port})
147+
(let [s (http/start-server echo-handler {:port port1})
141148
^String resp @(d/chain'
142-
(http/post url {:multipart parts})
149+
(http/post url1 {:multipart parts})
143150
:body
144151
bs/to-string)]
145152
;; part names
@@ -161,3 +168,52 @@
161168
(is (.contains resp "filename=\"text-file-to-send.txt\""))
162169

163170
(.close ^java.io.Closeable s)))
171+
172+
(defn- pack-chunk [{:keys [content] :as chunk}]
173+
(cond-> (dissoc chunk :file)
174+
(not (string? content))
175+
(dissoc :content)))
176+
177+
(defn- decode-handler [req]
178+
(let [chunks (-> req
179+
mp/decode-request
180+
s/stream->seq)]
181+
{:status 200
182+
:body (pr-str (map pack-chunk chunks))}))
183+
184+
(defn- test-decoder [port url raw-stream?]
185+
(let [s (http/start-server decode-handler {:port port
186+
:raw-stream? raw-stream?})
187+
chunks (-> (http/post url {:multipart parts})
188+
(deref 1e3 {:body "timeout"})
189+
:body
190+
bs/to-string
191+
clojure.edn/read-string
192+
vec)]
193+
(is (= 6 (count chunks)))
194+
195+
;; part-names
196+
(is (= (map :part-name parts)
197+
(map :part-name chunks)))
198+
199+
;; content
200+
(is (= "CONTENT1" (get-in chunks [0 :content])))
201+
202+
;; mime type
203+
(is (= "text/plain" (get-in chunks [2 :mime-type])))
204+
(is (= "application/png" (get-in chunks [3 :mime-type])))
205+
206+
;; filename
207+
(is (= "file.txt" (get-in chunks [3 :name])))
208+
(is (= "text-file-to-send.txt" (get-in chunks [4 :name])))
209+
210+
;; charset
211+
(is (= "ISO-8859-1" (get-in chunks [5 :charset])))
212+
213+
(.close ^java.io.Closeable s)))
214+
215+
(deftest test-mutlipart-request-decode-with-ring-handler
216+
(test-decoder port2 url2 false))
217+
218+
(deftest test-mutlipart-request-decode-with-raw-handler
219+
(test-decoder port3 url3 true))

0 commit comments

Comments
 (0)