Skip to content

Commit 52c9291

Browse files
committed
mark 0.2.1-alpha3, fixes #21
1 parent 4750b5b commit 52c9291

File tree

5 files changed

+132
-79
lines changed

5 files changed

+132
-79
lines changed

project.clj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
(defproject byte-streams "0.2.1-alpha2"
1+
(defproject byte-streams "0.2.1-alpha3"
22
:description "A simple way to handle the menagerie of Java byte represenations."
33
:license {:name "MIT License"
44
:url "http://opensource.org/licenses/MIT"}
55
:dependencies [[primitive-math "0.1.4"]
66
[clj-tuple "0.2.2"]
7-
[manifold "0.1.1"]]
8-
:profiles {:dev {:dependencies [[org.clojure/clojure "1.7.0"]
9-
[org.clojure/test.check "0.8.1"]
7+
[manifold "0.1.2-alpha3"]]
8+
:profiles {:dev {:dependencies [[org.clojure/clojure "1.8.0-RC5"]
9+
[org.clojure/test.check "0.9.0"]
1010
[codox-md "0.2.0" :exclusions [org.clojure/clojure]]]}}
1111
:test-selectors {:stress :stress
1212
:default (complement :stress)}

src/byte_streams.clj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,8 @@
330330

331331
(def-conversion ^{:cost 0} [(stream-of bytes) InputStream]
332332
[s options]
333-
(let [ps (ps/pushback-stream (get options :buffer-size 65536))]
334-
(s/consume
333+
(let [ps (ps/pushback-stream (get options :buffer-size 1024))]
334+
(s/consume-async
335335
(fn [^bytes ary]
336336
(ps/put-array ps ary 0 (alength ary)))
337337
s)
@@ -340,8 +340,8 @@
340340

341341
(def-conversion ^{:cost 0} [(stream-of ByteBuffer) InputStream]
342342
[s options]
343-
(let [ps (ps/pushback-stream (get options :buffer-size 65536))]
344-
(s/consume
343+
(let [ps (ps/pushback-stream (get options :buffer-size 1024))]
344+
(s/consume-async
345345
(fn [^ByteBuffer buf]
346346
(ps/put-buffer ps (.duplicate buf)))
347347
s)

src/byte_streams/pushback_stream.clj

Lines changed: 109 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
(:refer-clojure :exclude [take])
33
(:require
44
[primitive-math :as p]
5+
[byte-streams.utils :refer [doit]]
56
[manifold
67
[utils :as u]
78
[stream :as s]
@@ -32,6 +33,10 @@
3233
deferred
3334
^boolean eager?])
3435

36+
(defn trigger [^Consumption c]
37+
(let [^ByteBuffer buf (.buf c)]
38+
(d/success! (.deferred c) (.position buf))))
39+
3540
(defn put [^ByteBuffer src ^ByteBuffer dst]
3641
(let [l (.limit src)]
3742
(.limit src (p/+ (.position src) (p/min (.remaining src) (.remaining dst))))
@@ -121,37 +126,45 @@
121126
PushbackStream
122127

123128
(put [_ buf]
124-
((either
125-
[do]
126-
[u/with-lock lock])
129+
(let [[consumers d]
130+
((either
131+
[do]
132+
[u/with-lock* lock])
133+
134+
(if closed?
135+
[nil
136+
(d/success-deferred false)]
137+
138+
[(loop [acc []]
139+
(if-let [^Consumption c (.peek consumers)]
140+
(let [^ByteBuffer out (.buf c)]
141+
(put buf out)
142+
(when (or (.eager? c) (not (.hasRemaining out)))
143+
(.remove consumers)
144+
(recur (conj acc c))))
145+
acc))
127146

128-
(if closed?
129-
(d/success-deferred false)
147+
(do
148+
(when (.hasRemaining buf)
149+
(.add buffer buf)
150+
(set! buffer-size (unchecked-int (p/+ buffer-size (.remaining buf)))))
130151

131-
(do
132-
(loop []
133-
(when-let [^Consumption c (.peek consumers)]
134-
(let [^ByteBuffer out (.buf c)]
135-
(put buf out)
136-
(when (or (.eager? c) (not (.hasRemaining out)))
137-
(.remove consumers)
138-
(d/success! (.deferred c) (.position out))
139-
(recur)))))
152+
(cond
140153

141-
(when (.hasRemaining buf)
142-
(.add buffer buf)
143-
(set! buffer-size (unchecked-int (p/+ buffer-size (.remaining buf)))))
154+
deferred
155+
deferred
144156

145-
(cond
157+
(p/<= buffer-size buffer-capacity)
158+
(d/success-deferred true)
146159

147-
deferred
148-
deferred
160+
:else
161+
(set! deferred (d/deferred))))]))]
149162

150-
(p/<= buffer-size buffer-capacity)
151-
(d/success-deferred true)
163+
(when consumers
164+
(doit [c consumers]
165+
(trigger c)))
152166

153-
:else
154-
(set! deferred (d/deferred)))))))
167+
d))
155168

156169
(put [this ary offset length]
157170
(.put this
@@ -160,21 +173,28 @@
160173
(.limit (+ offset length)))))
161174

162175
(pushback [_ buf]
163-
((either
164-
[do]
165-
[u/with-lock lock])
166-
(loop []
167-
(when-let [^Consumption c (.peek consumers)]
168-
(let [^ByteBuffer out (.buf c)]
169-
(put buf out)
170-
(when (or (.eager? c) (not (.hasRemaining out)))
171-
(.remove consumers)
172-
(d/success! (.deferred c) (.position out))
173-
(recur)))))
174-
175-
(when (.hasRemaining buf)
176-
(.addLast buffer buf)
177-
(set! buffer-size (unchecked-int (p/+ buffer-size (.remaining buf)))))))
176+
(let [consumers
177+
((either
178+
[do]
179+
[u/with-lock* lock])
180+
(let [consumers
181+
(loop [acc []]
182+
(if-let [^Consumption c (.peek consumers)]
183+
(let [^ByteBuffer out (.buf c)]
184+
(put buf out)
185+
(when (or (.eager? c) (not (.hasRemaining out)))
186+
(.remove consumers)
187+
(recur (conj acc c))))
188+
acc))]
189+
190+
(when (.hasRemaining buf)
191+
(.addLast buffer buf)
192+
(set! buffer-size (unchecked-int (p/+ buffer-size (.remaining buf)))))
193+
194+
consumers))]
195+
196+
(doit [c consumers]
197+
(trigger c))))
178198

179199
(pushback [this ary offset length]
180200
(.pushback this
@@ -183,47 +203,65 @@
183203
(.limit (+ offset length)))))
184204

185205
(take [_ ary offset length eager?]
206+
186207
(let [out (-> (ByteBuffer/wrap ary)
187208
(.position offset)
188209
^ByteBuffer (.limit (+ offset length))
189-
.slice)]
190-
((either
191-
[do]
192-
[u/with-lock lock])
193-
194-
(loop []
195-
(when-let [^ByteBuffer in (.peek buffer)]
196-
(put in out)
197-
(when-not (.hasRemaining in)
198-
(.remove buffer))
199-
(when (.hasRemaining out)
200-
(recur))))
201-
202-
(set! buffer-size (unchecked-int (p/- buffer-size (.position out))))
203-
204-
(when (and (p/<= buffer-size buffer-capacity) deferred)
205-
(d/success! deferred true)
206-
(set! deferred nil))
207-
208-
(if (or closed?
209-
(and (pos? (.position out))
210-
(or eager? (not (.hasRemaining out)))))
211-
(d/success-deferred (.position out))
212-
(let [d (d/deferred)]
213-
(.add consumers (Consumption. out d eager?))
214-
d)))))
210+
.slice)
211+
212+
[put take]
213+
214+
((either
215+
[do]
216+
[u/with-lock* lock])
217+
218+
(loop []
219+
(when-let [^ByteBuffer in (.peek buffer)]
220+
(put in out)
221+
(when-not (.hasRemaining in)
222+
(.remove buffer))
223+
(when (.hasRemaining out)
224+
(recur))))
225+
226+
(set! buffer-size
227+
(unchecked-int
228+
(p/- buffer-size
229+
(p/-
230+
(.position out)
231+
offset))))
232+
233+
[(when (and (p/<= buffer-size buffer-capacity) deferred)
234+
(let [d deferred]
235+
(set! deferred nil)
236+
d))
237+
238+
(if (or closed?
239+
(and (pos? (.position out))
240+
(or eager? (not (.hasRemaining out)))))
241+
(d/success-deferred (.position out))
242+
(let [d (d/deferred)]
243+
(.add consumers (Consumption. out d eager?))
244+
d))])]
245+
246+
(when put
247+
(d/success! put true))
248+
249+
take))
215250

216251
(close [_]
217-
((either
218-
[do]
219-
[u/with-lock lock])
220-
(set! closed? true)
252+
(when ((either
253+
[do]
254+
[u/with-lock* lock])
255+
(when-not closed?
256+
(set! closed? true)
257+
true))
221258
(loop []
222259
(when-let [^Consumption c (.poll consumers)]
223260
(let [^ByteBuffer buf (.buf c)]
224261
(d/success! (.deferred c) (.position buf)))
225-
(recur)))
226-
true))))
262+
(recur))))
263+
264+
true)))
227265

228266
(defn pushback-stream [capacity]
229267
(SynchronizedPushbackByteStream.

src/byte_streams/utils.clj

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,18 @@
55
[java.util.concurrent
66
ConcurrentHashMap]))
77

8+
(defmacro doit
9+
"A version of doseq that doesn't emit all that inline-destroying chunked-seq code."
10+
[[x it] & body]
11+
(let [it-sym (gensym "iterable")]
12+
`(let [~it-sym ~it
13+
it# (.iterator ~(with-meta it-sym {:tag "Iterable"}))]
14+
(loop []
15+
(when (.hasNext it#)
16+
(let [~x (.next it#)]
17+
~@body)
18+
(recur))))))
19+
820
(defmacro memoize-form [m f & args]
921
`(let [k# (t/vector ~@args)]
1022
(let [v# (.get ~m k#)]

test/byte_streams_test.clj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
distinct
3838
(map (partial map eval')))]
3939
(doseq [[src dst] pairwise-conversions]
40+
4041
(is (= text
4142
(-> text
4243
(convert src)
@@ -59,6 +60,7 @@
5960
distinct
6061
(map (partial map eval')))]
6162
(doseq [[src dst] pairwise-conversions]
63+
6264
(is (= (seq ary)
6365
(-> ary
6466
(convert src)
@@ -85,6 +87,7 @@
8587
(doseq [dst (->> String
8688
possible-conversions
8789
(map eval'))]
90+
8891
(let [file (temp-file)
8992
file' (temp-file)]
9093
(transfer (convert text dst) dev-null)

0 commit comments

Comments
 (0)