Skip to content

Commit a7706c1

Browse files
committed
unbreak backpressure in stream -> input-stream conversion, mark 0.2.2-alpha1
1 parent 5177edb commit a7706c1

File tree

2 files changed

+15
-10
lines changed

2 files changed

+15
-10
lines changed

project.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
(defproject byte-streams "0.2.1"
1+
(defproject byte-streams "0.2.2-alpha1"
22
:description "A simple way to handle the menagerie of Java byte represenations."
33
:license {:name "MIT License"
44
:url "http://opensource.org/licenses/MIT"}
55
:dependencies [[primitive-math "0.1.5"]
66
[clj-tuple "0.2.2"]
7-
[manifold "0.1.2"]]
7+
[manifold "0.1.3"]]
88
:profiles {:dev {:dependencies [[org.clojure/clojure "1.8.0"]
99
[org.clojure/test.check "0.9.0"]
1010
[codox-md "0.2.0" :exclusions [org.clojure/clojure]]]}}

src/byte_streams.clj

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -340,11 +340,13 @@
340340
(d/chain (s/take! s ::none)
341341
(fn [^bytes msg]
342342
(if (identical? ::none msg)
343-
(ps/close ps)
344-
(
345-
do
346-
(ps/put-array ps msg 0 (alength msg))
347-
(d/recur))))))
343+
(do
344+
(ps/close ps)
345+
false)
346+
(ps/put-array ps msg 0 (alength msg))))
347+
(fn [result]
348+
(when result
349+
(d/recur)))))
348350
(ps/->input-stream ps)))
349351

350352
(def-conversion ^{:cost 0} [(stream-of ByteBuffer) InputStream]
@@ -354,10 +356,13 @@
354356
(d/chain (s/take! s ::none)
355357
(fn [^ByteBuffer msg]
356358
(if (identical? ::none msg)
357-
(ps/close ps)
358359
(do
359-
(ps/put-buffer ps (.duplicate msg))
360-
(d/recur))))))
360+
(ps/close ps)
361+
false)
362+
(ps/put-buffer ps (.duplicate msg))))
363+
(fn [result]
364+
(when result
365+
(d/recur)))))
361366
(ps/->input-stream ps)))
362367

363368
;; byte-array => byte-buffer

0 commit comments

Comments
 (0)