Skip to content

Commit abc850b

Browse files
author
Zach Tellman
committed
consume-async wasn't quite what I needed, mark 0.2.1-alpha4
1 parent 52c9291 commit abc850b

File tree

3 files changed

+41
-39
lines changed

3 files changed

+41
-39
lines changed

project.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
(defproject byte-streams "0.2.1-alpha3"
1+
(defproject byte-streams "0.2.1-alpha4"
22
:description "A simple way to handle the menagerie of Java byte represenations."
33
:license {:name "MIT License"
44
:url "http://opensource.org/licenses/MIT"}
55
:dependencies [[primitive-math "0.1.4"]
66
[clj-tuple "0.2.2"]
7-
[manifold "0.1.2-alpha3"]]
8-
:profiles {:dev {:dependencies [[org.clojure/clojure "1.8.0-RC5"]
7+
[manifold "0.1.2"]]
8+
:profiles {:dev {:dependencies [[org.clojure/clojure "1.8.0"]
99
[org.clojure/test.check "0.9.0"]
1010
[codox-md "0.2.0" :exclusions [org.clojure/clojure]]]}}
1111
:test-selectors {:stress :stress

src/byte_streams.clj

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
;;;
5555

5656
(defonce conversions (atom (g/conversion-graph)))
57+
(defonce inverse-conversions (atom (g/conversion-graph)))
5758
(defonce src->dst->transfer (atom nil))
5859

5960
(def ^:private ^:const byte-array (class (Utils/byteArray 0)))
@@ -106,17 +107,21 @@
106107
[[src dst :as conversion] params & body]
107108
(let [^Type src (normalize-type-descriptor src)
108109
dst (normalize-type-descriptor dst)]
109-
`(swap! conversions g/assoc-conversion ~src ~dst
110-
(fn [~(with-meta (first params)
111-
{:tag (when (and (instance? Class (.type src)) (not (.wrapper src)))
112-
(if (= src (normalize-type-descriptor 'bytes))
113-
'bytes
114-
(.getName ^Class (.type src))))})
115-
~(if-let [options (second params)]
116-
options
117-
`_#)]
118-
~@body)
119-
~(get (meta conversion) :cost 1))))
110+
`(let [f#
111+
(fn [~(with-meta (first params)
112+
{:tag (when (and (instance? Class (.type src)) (not (.wrapper src)))
113+
(if (= src (normalize-type-descriptor 'bytes))
114+
'bytes
115+
(.getName ^Class (.type src))))})
116+
~(if-let [options (second params)]
117+
options
118+
`_#)]
119+
~@body)
120+
121+
cost#
122+
~(get (meta conversion) :cost 1)]
123+
(swap! conversions g/assoc-conversion ~src ~dst f# cost#)
124+
(swap! inverse-conversions g/assoc-conversion ~dst ~src f# cost#))))
120125

121126
(defmacro def-transfer
122127
"Defines a byte transfer from one type to another."
@@ -331,21 +336,28 @@
331336
(def-conversion ^{:cost 0} [(stream-of bytes) InputStream]
332337
[s options]
333338
(let [ps (ps/pushback-stream (get options :buffer-size 1024))]
334-
(s/consume-async
335-
(fn [^bytes ary]
336-
(ps/put-array ps ary 0 (alength ary)))
337-
s)
338-
(s/on-drained s #(ps/close ps))
339+
(d/loop []
340+
(d/chain (s/take! s ::none)
341+
(fn [^bytes msg]
342+
(if (identical? ::none msg)
343+
(ps/close ps)
344+
(
345+
do
346+
(ps/put-array ps msg 0 (alength msg))
347+
(d/recur))))))
339348
(ps/->input-stream ps)))
340349

341350
(def-conversion ^{:cost 0} [(stream-of ByteBuffer) InputStream]
342351
[s options]
343352
(let [ps (ps/pushback-stream (get options :buffer-size 1024))]
344-
(s/consume-async
345-
(fn [^ByteBuffer buf]
346-
(ps/put-buffer ps (.duplicate buf)))
347-
s)
348-
(s/on-drained s #(ps/close ps))
353+
(d/loop []
354+
(d/chain (s/take! s ::none)
355+
(fn [^ByteBuffer msg]
356+
(if (identical? ::none msg)
357+
(ps/close ps)
358+
(do
359+
(ps/put-buffer ps (.duplicate msg))
360+
(d/recur))))))
349361
(ps/->input-stream ps)))
350362

351363
;; byte-array => byte-buffer

src/byte_streams/pushback_stream.clj

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,9 @@
9191
[lock
9292
^LinkedList consumers
9393
^long buffer-capacity
94-
(either
95-
[^:unsynchronized-mutable ^int buffer-size]
96-
[^:volatile-mutable ^int buffer-size])
97-
(either
98-
[^:unsynchronized-mutable deferred]
99-
[^:volatile-mutable deferred])
100-
(either
101-
[^:unsynchronized-mutable closed?]
102-
[^:volatile-mutable closed?])
94+
^:unsynchronized-mutable ^int buffer-size
95+
^:unsynchronized-mutable deferred
96+
^:unsynchronized-mutable closed?
10397
^LinkedList buffer]
10498

10599
InputStream$Streamable
@@ -126,6 +120,7 @@
126120
PushbackStream
127121

128122
(put [_ buf]
123+
129124
(let [[consumers d]
130125
((either
131126
[do]
@@ -223,12 +218,7 @@
223218
(when (.hasRemaining out)
224219
(recur))))
225220

226-
(set! buffer-size
227-
(unchecked-int
228-
(p/- buffer-size
229-
(p/-
230-
(.position out)
231-
offset))))
221+
(set! buffer-size (unchecked-int (p/- buffer-size (.position out))))
232222

233223
[(when (and (p/<= buffer-size buffer-capacity) deferred)
234224
(let [d deferred]

0 commit comments

Comments
 (0)