Skip to content

Commit 21e650f

Browse files
committed
Fix for memory leak when using InputStreams to obtain body
The original ch close handler holds a reference to body' even if it didn't need it. And since channels are extremely long-lived, many non-stream resources could be inadvertently held onto. Fixes #523.
1 parent a36b5e1 commit 21e650f

File tree

2 files changed

+51
-49
lines changed

2 files changed

+51
-49
lines changed

src/aleph/http/core.clj

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -308,37 +308,39 @@
308308

309309
(let [src (if (or (sequential? body') (s/stream? body'))
310310
(->> body'
311-
s/->source
312-
(s/map (fn [x]
313-
(try
314-
(netty/to-byte-buf x)
315-
(catch Throwable e
316-
(log/error e "error converting " (.getName (class x)) " to ByteBuf")
317-
(netty/close ch))))))
311+
s/->source
312+
(s/map (fn [x]
313+
(try
314+
(netty/to-byte-buf x)
315+
(catch Throwable e
316+
(log/error e "error converting " (.getName (class x)) " to ByteBuf")
317+
(netty/close ch))))))
318318
(netty/to-byte-buf-stream body' 8192))
319319

320-
sink (netty/sink ch false #(DefaultHttpContent. %))]
320+
sink (netty/sink ch false #(DefaultHttpContent. %))
321+
322+
;; mustn't close over body' if NOT a stream, can hold on to data too long when conns are keep-alive
323+
ch-close-handler (if (s/stream? body')
324+
#(s/close! body')
325+
#(s/close! src))]
321326

322327
(s/connect src sink)
323328

324329
(-> ch
325-
netty/channel
326-
.closeFuture
327-
netty/wrap-future
328-
(d/chain' (fn [_] (if (s/stream? body')
329-
(s/close! body')
330-
(s/close! src)))))
330+
netty/channel
331+
.closeFuture
332+
netty/wrap-future
333+
(d/chain' (fn [_] (ch-close-handler))))
331334

332335
(let [d (d/deferred)]
333336
(s/on-closed sink
334-
(fn []
335-
336-
(when (instance? Closeable body)
337-
(.close ^Closeable body))
337+
(fn []
338+
(when (instance? Closeable body)
339+
(.close ^Closeable body))
338340

339-
(.execute (-> ch aleph.netty/channel .eventLoop)
340-
#(d/success! d
341-
(netty/write-and-flush ch empty-last-content)))))
341+
(.execute (-> ch aleph.netty/channel .eventLoop)
342+
#(d/success! d
343+
(netty/write-and-flush ch empty-last-content)))))
342344
d))
343345

344346
(netty/write-and-flush ch empty-last-content)))
@@ -496,12 +498,12 @@
496498
handle-cleanup
497499
(fn [ch f]
498500
(-> f
499-
(d/chain'
500-
(fn [^ChannelFuture f]
501-
(if f
502-
(.addListener f ChannelFutureListener/CLOSE)
503-
(netty/close ch))))
504-
(d/catch' (fn [_]))))]
501+
(d/chain'
502+
(fn [^ChannelFuture f]
503+
(if f
504+
(.addListener f ChannelFutureListener/CLOSE)
505+
(netty/close ch))))
506+
(d/catch' (fn [_]))))]
505507

506508
(defn send-message
507509
[ch keep-alive? ssl? ^HttpMessage msg body]

src/aleph/http/server.clj

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -185,28 +185,28 @@
185185
(error-response e))))]
186186

187187
(-> previous-response
188-
(d/chain'
189-
netty/wrap-future
190-
(fn [_]
191-
(netty/release req)
192-
(-> rsp
193-
(d/catch' error-response)
194-
(d/chain'
195-
(fn [rsp]
196-
(when (not (-> req' ^AtomicBoolean (.websocket?) .get))
197-
(send-response ctx keep-alive? ssl?
198-
(cond
199-
200-
(map? rsp)
201-
(if head?
202-
(assoc rsp :body :aleph/omitted)
203-
rsp)
204-
205-
(nil? rsp)
206-
{:status 204}
207-
208-
:else
209-
(invalid-value-response req rsp))))))))))))
188+
(d/chain'
189+
netty/wrap-future
190+
(fn [_]
191+
(netty/release req)
192+
(-> rsp
193+
(d/catch' error-response)
194+
(d/chain'
195+
(fn [rsp]
196+
(when (not (-> req' ^AtomicBoolean (.websocket?) .get))
197+
(send-response ctx keep-alive? ssl?
198+
(cond
199+
200+
(map? rsp)
201+
(if head?
202+
(assoc rsp :body :aleph/omitted)
203+
rsp)
204+
205+
(nil? rsp)
206+
{:status 204}
207+
208+
:else
209+
(invalid-value-response req rsp))))))))))))
210210

211211
(defn exception-handler [ctx ex]
212212
(when-not (instance? IOException ex)

0 commit comments

Comments
 (0)