Skip to content

Commit 051d164

Browse files
committed
Flush response body stream when writing seqs
Update the StreamableResponseBody implementation for ISeq to flush the OutputStream after each element. This prevents unnecessary delays when dealing with blocking seqs, for example, when being used to implement SSEs.
1 parent e209b2e commit 051d164

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

ring-core-protocols/src/ring/core/protocols.clj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
(write-body-to-stream [body response output-stream]
6464
(let [writer (response-writer response output-stream)]
6565
(doseq [chunk body]
66-
(.write writer (str chunk)))
66+
(.write writer (str chunk))
67+
(.flush writer))
6768
(.close writer)))
6869
java.io.InputStream
6970
(write-body-to-stream [body _ ^OutputStream output-stream]

ring-core-protocols/test/ring/core/test/protocols.clj

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
(:require [clojure.test :refer :all]
33
[clojure.java.io :as io]
44
[ring.core.protocols :refer :all])
5-
(:import [java.io SequenceInputStream IOException InputStream OutputStream]))
5+
(:import [java.io
6+
SequenceInputStream IOException InputStream
7+
ByteArrayOutputStream OutputStream]))
68

79
(deftest test-write-body-defaults
810
(testing "byte-array"
@@ -113,3 +115,32 @@
113115
response {:body "Hello World"}]
114116
(write-body-to-stream (:body response) response output)
115117
(is (not @flushed?)))))
118+
119+
(deftest test-flushing-for-seq
120+
(testing "seqs with delayed elements"
121+
(let [output (ByteArrayOutputStream.)
122+
counter (atom 0)
123+
event-str "data: sample\n\n"
124+
gen-event (fn [] (swap! counter inc) event-str)
125+
gen-delay (fn [] (Thread/sleep 100))
126+
lazy-exec (fn lazy-exec [[f & more]]
127+
(when f
128+
(cons (f) (lazy-seq (lazy-exec more)))))
129+
continue? (atom true)
130+
resp-body (->> (repeat gen-event)
131+
(interpose gen-delay)
132+
(take-while (fn [_] @continue?))
133+
(lazy-exec))
134+
response {:body resp-body}]
135+
;; first sequence element is already evaluated by lazy-exec
136+
(is (= 1 @counter) "counter bump - first seq element already evaluated")
137+
(is (= "" (str output)) "empty output because body not written yet")
138+
(try
139+
(future ; needs to run concurrently so we can observe flushing
140+
(write-body-to-stream (:body response) response output))
141+
(Thread/sleep 150)
142+
(is (= 2 @counter) "two seq elements evaluated yet")
143+
(is (= (str event-str event-str)
144+
(str output)) "two events written to output yet")
145+
(finally
146+
(reset! continue? false))))))

0 commit comments

Comments
 (0)