Skip to content

Commit 83388a3

Browse files
authored
Merge pull request #484 from alexander-yakushev/server-aggregator
Add support for HttpObjectAggregator in server pipeline
2 parents b994040 + 1c13623 commit 83388a3

File tree

2 files changed

+71
-1
lines changed

2 files changed

+71
-1
lines changed

src/aleph/http/server.clj

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
TooLongFrameException]
3939
[io.netty.handler.codec.http
4040
DefaultFullHttpResponse
41+
FullHttpRequest
4142
HttpContent HttpHeaders HttpUtil
4243
HttpContentCompressor
4344
HttpRequest HttpResponse
@@ -278,6 +279,16 @@
278279
(handle-request ctx req s))
279280
(reset! request req)))
280281

282+
process-full-request
283+
(fn [ctx ^FullHttpRequest req]
284+
;; HttpObjectAggregator disables chunked encoding, no need to check for it.
285+
(let [content (.content req)
286+
body (when (pos? (.readableBytes content))
287+
(netty/buf->array content))]
288+
;; Don't release content as it will happen automatically once whole
289+
;; request is released.
290+
(handle-request ctx req body)))
291+
281292
process-last-content
282293
(fn [ctx ^HttpContent msg]
283294
(let [content (.content msg)]
@@ -356,6 +367,12 @@
356367
([_ ctx msg]
357368
(cond
358369

370+
;; Happens when io.netty.handler.codec.http.HttpObjectAggregator is part of the pipeline.
371+
(instance? FullHttpRequest msg)
372+
(if (invalid-request? msg)
373+
(reject-invalid-request ctx msg)
374+
(process-full-request ctx msg))
375+
359376
(instance? HttpRequest msg)
360377
(if (invalid-request? msg)
361378
(reject-invalid-request ctx msg)
@@ -404,6 +421,21 @@
404421
([_ ctx msg]
405422
(cond
406423

424+
;; Happens when io.netty.handler.codec.http.HttpObjectAggregator is part of the pipeline.
425+
(instance? FullHttpRequest msg)
426+
(if (invalid-request? msg)
427+
(reject-invalid-request ctx msg)
428+
(let [^FullHttpRequest req msg
429+
content (.content req)
430+
ch (netty/channel ctx)
431+
s (netty/source ch)]
432+
(when-not (zero? (.readableBytes content))
433+
;; Retain the content of FullHttpRequest one extra time to
434+
;; compensate for it being released together with the request.
435+
(netty/put! ch s (netty/acquire content)))
436+
(s/close! s)
437+
(handle-request ctx req s)))
438+
407439
(instance? HttpRequest msg)
408440
(if (invalid-request? msg)
409441
(reject-invalid-request ctx msg)

test/aleph/http_test.clj

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
(:import
1717
(aleph.utils ConnectionTimeoutException RequestTimeoutException)
1818
(io.netty.channel ChannelHandlerContext ChannelOutboundHandlerAdapter ChannelPipeline ChannelPromise)
19-
(io.netty.handler.codec.http HttpMessage)
19+
(io.netty.handler.codec.http HttpMessage HttpObjectAggregator)
2020
(java.io File)
2121
(java.util.concurrent TimeoutException)
2222
(java.util.zip GZIPInputStream ZipException)
@@ -601,3 +601,41 @@
601601
(.write ctx msg p)))))})
602602
(let [resp @(http-get (str "http://localhost:" port "/string"))]
603603
(is (= test-header-val (get (:headers resp) test-header-name)))))))
604+
605+
(defn add-http-object-aggregator [^ChannelPipeline pipeline]
606+
(let [max-content-length 5]
607+
(.addBefore pipeline
608+
"request-handler"
609+
"http-object-aggregator"
610+
(HttpObjectAggregator. max-content-length))))
611+
612+
(deftest test-http-object-aggregator-support
613+
(with-server (http/start-server
614+
basic-handler
615+
{:port port
616+
:pipeline-transform add-http-object-aggregator})
617+
(let [rsp @(http-put (str "http://localhost:" port "/echo")
618+
{:body "hello"})]
619+
(is (= "hello" (bs/to-string (:body rsp))))
620+
(is (= 200 (:status rsp))))
621+
622+
(let [rsp @(http-put (str "http://localhost:" port "/echo")
623+
{:body "hello, world!"})]
624+
(is (= 413 (:status rsp)))
625+
(is (empty? (bs/to-string (:body rsp)))))))
626+
627+
(deftest test-http-object-aggregator-raw-stream-support
628+
(with-server (http/start-server
629+
basic-handler
630+
{:port port
631+
:raw-stream? true
632+
:pipeline-transform add-http-object-aggregator})
633+
(let [rsp @(http-put (str "http://localhost:" port "/echo")
634+
{:body "hello"})]
635+
(is (= "hello" (bs/to-string (:body rsp))))
636+
(is (= 200 (:status rsp))))
637+
638+
(let [rsp @(http-put (str "http://localhost:" port "/echo")
639+
{:body "hello, world!"})]
640+
(is (= 413 (:status rsp)))
641+
(is (empty? (bs/to-string (:body rsp)))))))

0 commit comments

Comments
 (0)