Skip to content

Commit db8d97f

Browse files
committed
Add some docs, comments, refactor some names
1 parent 40f0b9f commit db8d97f

File tree

3 files changed

+29
-13
lines changed

3 files changed

+29
-13
lines changed

src/manifold/stream/core.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
(definterface+ IEventStream
1212
(description [])
13+
; Is the underlying class synchronous by default? NB: async usage is still possible, but requires wrapping
1314
(isSynchronous [])
1415
(downstream [])
1516
(weakHandle [reference-queue])

src/manifold/stream/default.clj

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,22 +289,21 @@
289289
(doto acc
290290
(.add
291291
(or
292-
293-
;; see if there are any unclaimed consumers left
292+
;; send to all unclaimed consumers, if any
294293
(loop [^Consumer c (.poll consumers)]
295294
(when c
296295
(if-let [token (d/claim! (.deferred c))]
297296
(Production. (.deferred c) msg token)
298297
(recur (.poll consumers)))))
299298

300-
;; see if we can enqueue into the buffer
299+
;; otherwise, see if we can enqueue into the buffer
301300
(and
302301
messages
303302
(when (< (.size messages) capacity)
304303
(.offer messages (de-nil msg)))
305304
t-d)
306305

307-
;; add to the producers queue
306+
;; otherwise, add to the producers queue
308307
(do
309308
(when (> (.getAndIncrement dirty-puts) max-dirty-puts)
310309
(cleanup-expired-deferreds producers)

src/manifold/stream/graph.clj

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@
7676
;;;
7777

7878
(defn- async-send
79+
"Returns an AsyncPut with the result of calling a non-blocking .put() on a sink.
80+
If it times out, returns the sink itself as the timeout value."
7981
[^Downstream dwn msg dsts]
8082
(let [^IEventSink sink (.sink dwn)]
8183
(let [x (if (== (.timeout dwn) -1)
@@ -101,13 +103,16 @@
101103
(when (and (identical? ::timeout x) (.downstream? dwn))
102104
(s/close! sink))))
103105

104-
(defn- handle-async-put [^AsyncPut x val source]
106+
(defn- handle-async-put
107+
"Handle a successful async put"
108+
[^AsyncPut x val source]
105109
(let [d (.deferred x)
106-
val (if (instance? IEventSink val)
110+
val (if (instance? IEventSink val) ; it timed out, in which case the val is set to the sink
107111
(do
108112
(s/close! val)
109113
false)
110114
val)]
115+
;; if sink failed or timed out, remove and maybe close source
111116
(when (false? val)
112117
(let [^CopyOnWriteArrayList l (.dsts x)]
113118
(.remove l (.dst x))
@@ -125,11 +130,15 @@
125130
(.remove handle->downstreams (s/weak-handle source)))))
126131

127132
(defn- async-connect
133+
"Connects downstreams to an async source.
134+
135+
Puts to sync sinks are delayed until all async sinks have been successfully put to"
128136
[^IEventSource source
129137
^CopyOnWriteArrayList dsts]
130138
(let [sync-sinks (LinkedList.)
131-
deferreds (LinkedList.)
139+
put-deferreds (LinkedList.)
132140

141+
;; asynchronously .put to all synchronous sinks, using callbacks and trampolines as needed
133142
sync-propagate (fn this [recur-point msg]
134143
(loop []
135144
(let [^Downstream dwn (.poll sync-sinks)]
@@ -140,8 +149,8 @@
140149
val (d/success-value d ::none)]
141150
(if (identical? val ::none)
142151
(d/on-realized d
143-
(fn [v]
144-
(handle-async-put x v source)
152+
(fn [val]
153+
(handle-async-put x val source)
145154
(trampoline #(this recur-point msg)))
146155
(fn [e]
147156
(handle-async-error x e source)
@@ -150,12 +159,14 @@
150159
(handle-async-put x val source)
151160
(recur))))))))
152161

162+
;; handle all the async puts, using callbacks and trampolines as needed
163+
;; then handle all sync puts once asyncs are done
153164
async-propagate (fn this [recur-point msg]
154165
(loop []
155-
(let [^AsyncPut x (.poll deferreds)]
166+
(let [^AsyncPut x (.poll put-deferreds)]
156167
(if (nil? x)
157168

158-
;; iterator over sync-sinks
169+
;; iterator over sync-sinks when deferreds list is empty
159170
(if (.isEmpty sync-sinks)
160171
recur-point
161172
#(sync-propagate recur-point msg))
@@ -226,23 +237,28 @@
226237
:else
227238
(let [i (.iterator dsts)]
228239
(if (not (.hasNext i))
229-
240+
;; close source if no downstreams
230241
(do
231242
(s/close! source)
232243
(.remove handle->downstreams (s/weak-handle source)))
233244

245+
;; otherwise:
246+
;; 1. add all sync downstreams into a list
247+
;; 2. attempt to .put() all async downstreams and collect AsyncPuts in a list
248+
;; 3. call async-propagate
234249
(do
235250
(loop []
236251
(when (.hasNext i)
237252
(let [^Downstream dwn (.next i)]
238253
(if (s/synchronous? (.sink dwn))
239254
(.add sync-sinks dwn)
240-
(.add deferreds (async-send dwn msg dsts)))
255+
(.add put-deferreds (async-send dwn msg dsts)))
241256
(recur))))
242257

243258
(async-propagate this msg))))))))))
244259

245260
(defn- sync-connect
261+
"Connects downstreams to a sync source"
246262
[^IEventSource source
247263
^CopyOnWriteArrayList dsts]
248264
(utils/future-with (ex/wait-pool)

0 commit comments

Comments
 (0)