Skip to content

Commit c39b982

Browse files
authored
Merge pull request #228 from clj-commons/bugfix/update-stream-docs
Update stream docs
2 parents 34ed55d + 3db7368 commit c39b982

File tree

5 files changed

+76
-60
lines changed

5 files changed

+76
-60
lines changed

doc/stream.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ true
118118

119119
Here, we create a source stream `s`, and map `inc` and `dec` over it. When we put our message into `s` it immediately is accepted, since `a` and `b` are downstream. All messages put into `s` will be propagated into *both* `a` and `b`.
120120

121-
If `s` is closed, both `a` and `b` will be closed, as will any other downstream sources we've created. Likewise, if everything downstream of `s` is closed, `s` will also be closed. This is almost always desirable, as failing to do this will simply cause `s` to exert backpressure on everything upstream of it. However, If we wish to avoid this behavior, we can create a `(permanent-stream)`, which cannot be closed.
121+
If `s` is closed, both `a` and `b` will be closed, as will any other downstream sources we've created. Likewise, if everything downstream of `s` is closed, `s` will also be closed, once it's unable to `put!` anywhere. This is almost always desirable, as failing to do this will simply cause `s` to exert backpressure on everything upstream of it. However, if we wish to avoid this behavior, we can create a stream using `stream*` and `:permanent? true`, which cannot be closed.
122122

123123
For any Clojure operation that doesn't have an equivalent in `manifold.stream`, we can use `manifold.stream/transform` with a transducer:
124124

src/manifold/stream.clj

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,10 @@
305305
Optionally takes a map of parameters:
306306
307307
|:---|:---
308-
| `upstream?` | if closing the sink should always close the source, even if there are other sinks downstream of the source. Defaults to `false`. Note that if the sink is the only thing downstream of the source, the source will always be closed, unless it is permanent.
309-
| `downstream?` | if closing the source will close the sink. Defaults to `true`.
310-
| `timeout` | if defined, the maximum time, in milliseconds, that will be spent trying to put a message into the sink before closing it. Useful when there are multiple sinks downstream of a source, and you want to avoid a single backed up sink from blocking all the others.
311-
| `description` | describes the connection, useful for traversing the stream topology via `downstream`."
308+
| `upstream?` | Whether closing the sink should always close the source, even if there are other sinks downstream of the source. Defaults to `false`. Note that if the sink is the only thing downstream of the source, the source will eventually be closed, unless it is permanent.
309+
| `downstream?` | Whether closing the source will close the sink. Defaults to `true`.
310+
| `timeout` | If defined, the maximum time, in milliseconds, that will be spent trying to put a message into the sink before closing it. Useful when there are multiple sinks downstream of a source, and you want to avoid a single backed-up sink from blocking all the others.
311+
| `description` | Describes the connection, useful for traversing the stream topology via `downstream`."
312312
{:arglists
313313
'[[source sink]
314314
[source
@@ -336,7 +336,7 @@
336336

337337
(defn stream
338338
"Returns a Manifold stream with a configurable `buffer-size`. If a capacity is specified,
339-
`put!` will yield `true` when the message is in the buffer. Otherwise it will only yield
339+
`put!` will yield `true` when the message is in the buffer. Otherwise, it will only yield
340340
`true` once it has been consumed.
341341
342342
`xform` is an optional transducer, which will transform all messages that are enqueued

src/manifold/stream/core.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
(definterface+ IEventStream
1212
(description [])
13+
; Is the underlying class synchronous by default? NB: async usage is still possible, but requires wrapping
1314
(isSynchronous [])
1415
(downstream [])
1516
(weakHandle [reference-queue])

src/manifold/stream/default.clj

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,22 +289,21 @@
289289
(doto acc
290290
(.add
291291
(or
292-
293-
;; see if there are any unclaimed consumers left
292+
;; send to all unclaimed consumers, if any
294293
(loop [^Consumer c (.poll consumers)]
295294
(when c
296295
(if-let [token (d/claim! (.deferred c))]
297296
(Production. (.deferred c) msg token)
298297
(recur (.poll consumers)))))
299298

300-
;; see if we can enqueue into the buffer
299+
;; otherwise, see if we can enqueue into the buffer
301300
(and
302301
messages
303302
(when (< (.size messages) capacity)
304303
(.offer messages (de-nil msg)))
305304
t-d)
306305

307-
;; add to the producers queue
306+
;; otherwise, add to the producers queue
308307
(do
309308
(when (> (.getAndIncrement dirty-puts) max-dirty-puts)
310309
(cleanup-expired-deferreds producers)

src/manifold/stream/graph.clj

Lines changed: 66 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@
5858
.iterator
5959
iterator-seq
6060
(map
61-
(fn [^Downstream d]
62-
[(.description d) (.sink d)]))))))
61+
(fn [^Downstream dwn]
62+
[(.description dwn) (.sink dwn)]))))))
6363

6464
(defn pop-connected! [stream]
6565
(when-let [handle (s/weak-handle stream)]
@@ -76,38 +76,43 @@
7676
;;;
7777

7878
(defn- async-send
79-
[^Downstream d msg dsts]
80-
(let [^IEventSink sink (.sink d)]
81-
(let [x (if (== (.timeout d) -1)
79+
"Returns an AsyncPut with the result of calling a non-blocking .put() on a sink.
80+
If it times out, returns the sink itself as the timeout value."
81+
[^Downstream dwn msg dsts]
82+
(let [^IEventSink sink (.sink dwn)]
83+
(let [x (if (== (.timeout dwn) -1)
8284
(.put sink msg false)
83-
(.put sink msg false (.timeout d) (if (.downstream? d) sink false)))]
84-
(AsyncPut. x dsts d (.upstream? d)))))
85+
(.put sink msg false (.timeout dwn) (if (.downstream? dwn) sink false)))]
86+
(AsyncPut. x dsts dwn (.upstream? dwn)))))
8587

8688
(defn- sync-send
87-
[^Downstream d msg ^CopyOnWriteArrayList dsts ^IEventSink upstream]
88-
(let [^IEventSink sink (.sink d)
89+
[^Downstream dwn msg ^CopyOnWriteArrayList dsts ^IEventSink upstream]
90+
(let [^IEventSink sink (.sink dwn)
8991
x (try
90-
(if (== (.timeout d) -1)
92+
(if (== (.timeout dwn) -1)
9193
(.put sink msg true)
92-
(.put sink msg true (.timeout d) ::timeout))
94+
(.put sink msg true (.timeout dwn) ::timeout))
9395
(catch Throwable e
9496
(log/error e "error in message propagation")
9597
(s/close! sink)
9698
false))]
9799
(when (false? x)
98-
(.remove dsts d)
100+
(.remove dsts dwn)
99101
(when upstream
100102
(s/close! upstream)))
101-
(when (and (identical? ::timeout x) (.downstream? d))
103+
(when (and (identical? ::timeout x) (.downstream? dwn))
102104
(s/close! sink))))
103105

104-
(defn- handle-async-put [^AsyncPut x val source]
106+
(defn- handle-async-put
107+
"Handle a successful async put"
108+
[^AsyncPut x val source]
105109
(let [d (.deferred x)
106-
val (if (instance? IEventSink val)
110+
val (if (instance? IEventSink val) ; it timed out, in which case the val is set to the sink
107111
(do
108112
(s/close! val)
109113
false)
110114
val)]
115+
;; if sink failed or timed out, remove and maybe close source
111116
(when (false? val)
112117
(let [^CopyOnWriteArrayList l (.dsts x)]
113118
(.remove l (.dst x))
@@ -125,23 +130,27 @@
125130
(.remove handle->downstreams (s/weak-handle source)))))
126131

127132
(defn- async-connect
133+
"Connects downstreams to an async source.
134+
135+
Puts to sync sinks are delayed until all async sinks have been successfully put to"
128136
[^IEventSource source
129137
^CopyOnWriteArrayList dsts]
130138
(let [sync-sinks (LinkedList.)
131-
deferreds (LinkedList.)
139+
put-deferreds (LinkedList.)
132140

141+
;; asynchronously .put to all synchronous sinks, using callbacks and trampolines as needed
133142
sync-propagate (fn this [recur-point msg]
134143
(loop []
135-
(let [^Downstream d (.poll sync-sinks)]
136-
(if (nil? d)
144+
(let [^Downstream dwn (.poll sync-sinks)]
145+
(if (nil? dwn)
137146
recur-point
138-
(let [^AsyncPut x (async-send d msg dsts)
147+
(let [^AsyncPut x (async-send dwn msg dsts)
139148
d (.deferred x)
140149
val (d/success-value d ::none)]
141150
(if (identical? val ::none)
142151
(d/on-realized d
143-
(fn [v]
144-
(handle-async-put x v source)
152+
(fn [val]
153+
(handle-async-put x val source)
145154
(trampoline #(this recur-point msg)))
146155
(fn [e]
147156
(handle-async-error x e source)
@@ -150,12 +159,14 @@
150159
(handle-async-put x val source)
151160
(recur))))))))
152161

162+
;; handle all the async puts, using callbacks and trampolines as needed
163+
;; then handle all sync puts once asyncs are done
153164
async-propagate (fn this [recur-point msg]
154165
(loop []
155-
(let [^AsyncPut x (.poll deferreds)]
166+
(let [^AsyncPut x (.poll put-deferreds)]
156167
(if (nil? x)
157168

158-
;; iterator over sync-sinks
169+
;; iterator over sync-sinks when deferreds list is empty
159170
(if (.isEmpty sync-sinks)
160171
recur-point
161172
#(sync-propagate recur-point msg))
@@ -197,9 +208,9 @@
197208
(let [i (.iterator dsts)]
198209
(loop []
199210
(when (.hasNext i)
200-
(let [^Downstream d (.next i)]
201-
(when (.downstream? d)
202-
(s/close! (.sink d)))
211+
(let [^Downstream dwn (.next i)]
212+
(when (.downstream? dwn)
213+
(s/close! (.sink dwn)))
203214
(recur))))))
204215

205216
(== 1 (.size dsts))
@@ -226,23 +237,28 @@
226237
:else
227238
(let [i (.iterator dsts)]
228239
(if (not (.hasNext i))
229-
240+
;; close source if no downstreams
230241
(do
231242
(s/close! source)
232243
(.remove handle->downstreams (s/weak-handle source)))
233244

245+
;; otherwise:
246+
;; 1. add all sync downstreams into a list
247+
;; 2. attempt to .put() all async downstreams and collect AsyncPuts in a list
248+
;; 3. call async-propagate
234249
(do
235250
(loop []
236251
(when (.hasNext i)
237-
(let [^Downstream d (.next i)]
238-
(if (s/synchronous? (.sink d))
239-
(.add sync-sinks d)
240-
(.add deferreds (async-send d msg dsts)))
252+
(let [^Downstream dwn (.next i)]
253+
(if (s/synchronous? (.sink dwn))
254+
(.add sync-sinks dwn)
255+
(.add put-deferreds (async-send dwn msg dsts)))
241256
(recur))))
242257

243258
(async-propagate this msg))))))))))
244259

245260
(defn- sync-connect
261+
"Connects downstreams to a sync source"
246262
[^IEventSource source
247263
^CopyOnWriteArrayList dsts]
248264
(utils/future-with (ex/wait-pool)
@@ -259,17 +275,17 @@
259275
(.remove handle->downstreams (s/weak-handle source))
260276
(loop []
261277
(when (.hasNext i)
262-
(let [^Downstream d (.next i)]
263-
(when (.downstream? d)
264-
(s/close! (.sink d)))))))
278+
(let [^Downstream dwn (.next i)]
279+
(when (.downstream? dwn)
280+
(s/close! (.sink dwn)))))))
265281

266282
(do
267283
(loop []
268284
(when (.hasNext i)
269-
(let [^Downstream d (.next i)]
270-
(if (s/synchronous? (.sink d))
271-
(.add sync-sinks d)
272-
(.add deferreds (async-send d msg dsts)))
285+
(let [^Downstream dwn (.next i)]
286+
(if (s/synchronous? (.sink dwn))
287+
(.add sync-sinks dwn)
288+
(.add deferreds (async-send dwn msg dsts)))
273289
(recur))))
274290

275291
(loop []
@@ -284,11 +300,11 @@
284300
(recur)))))
285301

286302
(loop []
287-
(let [^Downstream d (.poll sync-sinks)]
288-
(if (nil? d)
303+
(let [^Downstream dwn (.poll sync-sinks)]
304+
(if (nil? dwn)
289305
nil
290306
(do
291-
(sync-send d msg dsts (when (.upstream? d) source))
307+
(sync-send dwn msg dsts (when (.upstream? dwn) source))
292308
(recur)))))
293309

294310
(recur))))
@@ -309,20 +325,20 @@
309325
downstream? true}
310326
:as opts}]
311327
(locking src
312-
(let [d (Downstream.
313-
timeout
314-
(boolean (and upstream? (instance? IEventSink src)))
315-
downstream?
316-
dst
317-
description)
328+
(let [dwn (Downstream.
329+
timeout
330+
(boolean (and upstream? (instance? IEventSink src)))
331+
downstream?
332+
dst
333+
description)
318334
k (.weakHandle ^IEventStream src ref-queue)]
319335
(if-let [dsts (.get handle->downstreams k)]
320-
(.add ^CopyOnWriteArrayList dsts d)
336+
(.add ^CopyOnWriteArrayList dsts dwn)
321337
(let [dsts (CopyOnWriteArrayList.)]
322338
(if-let [dsts' (.putIfAbsent handle->downstreams k dsts)]
323-
(.add ^CopyOnWriteArrayList dsts' d)
339+
(.add ^CopyOnWriteArrayList dsts' dwn)
324340
(do
325-
(.add ^CopyOnWriteArrayList dsts d)
341+
(.add ^CopyOnWriteArrayList dsts dwn)
326342
(if (s/synchronous? src)
327343
(sync-connect src dsts)
328344
(async-connect src dsts))))))))))

0 commit comments

Comments
 (0)