|
58 | 58 | .iterator
|
59 | 59 | iterator-seq
|
60 | 60 | (map
|
61 |
| - (fn [^Downstream d] |
62 |
| - [(.description d) (.sink d)])))))) |
| 61 | + (fn [^Downstream dwn] |
| 62 | + [(.description dwn) (.sink dwn)])))))) |
63 | 63 |
|
64 | 64 | (defn pop-connected! [stream]
|
65 | 65 | (when-let [handle (s/weak-handle stream)]
|
|
76 | 76 | ;;;
|
77 | 77 |
|
78 | 78 | (defn- async-send
|
79 |
| - [^Downstream d msg dsts] |
80 |
| - (let [^IEventSink sink (.sink d)] |
81 |
| - (let [x (if (== (.timeout d) -1) |
| 79 | + [^Downstream dwn msg dsts] |
| 80 | + (let [^IEventSink sink (.sink dwn)] |
| 81 | + (let [x (if (== (.timeout dwn) -1) |
82 | 82 | (.put sink msg false)
|
83 |
| - (.put sink msg false (.timeout d) (if (.downstream? d) sink false)))] |
84 |
| - (AsyncPut. x dsts d (.upstream? d))))) |
| 83 | + (.put sink msg false (.timeout dwn) (if (.downstream? dwn) sink false)))] |
| 84 | + (AsyncPut. x dsts dwn (.upstream? dwn))))) |
85 | 85 |
|
86 | 86 | (defn- sync-send
|
87 |
| - [^Downstream d msg ^CopyOnWriteArrayList dsts ^IEventSink upstream] |
88 |
| - (let [^IEventSink sink (.sink d) |
| 87 | + [^Downstream dwn msg ^CopyOnWriteArrayList dsts ^IEventSink upstream] |
| 88 | + (let [^IEventSink sink (.sink dwn) |
89 | 89 | x (try
|
90 |
| - (if (== (.timeout d) -1) |
| 90 | + (if (== (.timeout dwn) -1) |
91 | 91 | (.put sink msg true)
|
92 |
| - (.put sink msg true (.timeout d) ::timeout)) |
| 92 | + (.put sink msg true (.timeout dwn) ::timeout)) |
93 | 93 | (catch Throwable e
|
94 | 94 | (log/error e "error in message propagation")
|
95 | 95 | (s/close! sink)
|
96 | 96 | false))]
|
97 | 97 | (when (false? x)
|
98 |
| - (.remove dsts d) |
| 98 | + (.remove dsts dwn) |
99 | 99 | (when upstream
|
100 | 100 | (s/close! upstream)))
|
101 |
| - (when (and (identical? ::timeout x) (.downstream? d)) |
| 101 | + (when (and (identical? ::timeout x) (.downstream? dwn)) |
102 | 102 | (s/close! sink))))
|
103 | 103 |
|
104 | 104 | (defn- handle-async-put [^AsyncPut x val source]
|
|
132 | 132 |
|
133 | 133 | sync-propagate (fn this [recur-point msg]
|
134 | 134 | (loop []
|
135 |
| - (let [^Downstream d (.poll sync-sinks)] |
136 |
| - (if (nil? d) |
| 135 | + (let [^Downstream dwn (.poll sync-sinks)] |
| 136 | + (if (nil? dwn) |
137 | 137 | recur-point
|
138 |
| - (let [^AsyncPut x (async-send d msg dsts) |
| 138 | + (let [^AsyncPut x (async-send dwn msg dsts) |
139 | 139 | d (.deferred x)
|
140 | 140 | val (d/success-value d ::none)]
|
141 | 141 | (if (identical? val ::none)
|
|
197 | 197 | (let [i (.iterator dsts)]
|
198 | 198 | (loop []
|
199 | 199 | (when (.hasNext i)
|
200 |
| - (let [^Downstream d (.next i)] |
201 |
| - (when (.downstream? d) |
202 |
| - (s/close! (.sink d))) |
| 200 | + (let [^Downstream dwn (.next i)] |
| 201 | + (when (.downstream? dwn) |
| 202 | + (s/close! (.sink dwn))) |
203 | 203 | (recur))))))
|
204 | 204 |
|
205 | 205 | (== 1 (.size dsts))
|
|
234 | 234 | (do
|
235 | 235 | (loop []
|
236 | 236 | (when (.hasNext i)
|
237 |
| - (let [^Downstream d (.next i)] |
238 |
| - (if (s/synchronous? (.sink d)) |
239 |
| - (.add sync-sinks d) |
240 |
| - (.add deferreds (async-send d msg dsts))) |
| 237 | + (let [^Downstream dwn (.next i)] |
| 238 | + (if (s/synchronous? (.sink dwn)) |
| 239 | + (.add sync-sinks dwn) |
| 240 | + (.add deferreds (async-send dwn msg dsts))) |
241 | 241 | (recur))))
|
242 | 242 |
|
243 | 243 | (async-propagate this msg))))))))))
|
|
259 | 259 | (.remove handle->downstreams (s/weak-handle source))
|
260 | 260 | (loop []
|
261 | 261 | (when (.hasNext i)
|
262 |
| - (let [^Downstream d (.next i)] |
263 |
| - (when (.downstream? d) |
264 |
| - (s/close! (.sink d))))))) |
| 262 | + (let [^Downstream dwn (.next i)] |
| 263 | + (when (.downstream? dwn) |
| 264 | + (s/close! (.sink dwn))))))) |
265 | 265 |
|
266 | 266 | (do
|
267 | 267 | (loop []
|
268 | 268 | (when (.hasNext i)
|
269 |
| - (let [^Downstream d (.next i)] |
270 |
| - (if (s/synchronous? (.sink d)) |
271 |
| - (.add sync-sinks d) |
272 |
| - (.add deferreds (async-send d msg dsts))) |
| 269 | + (let [^Downstream dwn (.next i)] |
| 270 | + (if (s/synchronous? (.sink dwn)) |
| 271 | + (.add sync-sinks dwn) |
| 272 | + (.add deferreds (async-send dwn msg dsts))) |
273 | 273 | (recur))))
|
274 | 274 |
|
275 | 275 | (loop []
|
|
284 | 284 | (recur)))))
|
285 | 285 |
|
286 | 286 | (loop []
|
287 |
| - (let [^Downstream d (.poll sync-sinks)] |
288 |
| - (if (nil? d) |
| 287 | + (let [^Downstream dwn (.poll sync-sinks)] |
| 288 | + (if (nil? dwn) |
289 | 289 | nil
|
290 | 290 | (do
|
291 |
| - (sync-send d msg dsts (when (.upstream? d) source)) |
| 291 | + (sync-send dwn msg dsts (when (.upstream? dwn) source)) |
292 | 292 | (recur)))))
|
293 | 293 |
|
294 | 294 | (recur))))
|
|
309 | 309 | downstream? true}
|
310 | 310 | :as opts}]
|
311 | 311 | (locking src
|
312 |
| - (let [d (Downstream. |
313 |
| - timeout |
314 |
| - (boolean (and upstream? (instance? IEventSink src))) |
315 |
| - downstream? |
316 |
| - dst |
317 |
| - description) |
| 312 | + (let [dwn (Downstream. |
| 313 | + timeout |
| 314 | + (boolean (and upstream? (instance? IEventSink src))) |
| 315 | + downstream? |
| 316 | + dst |
| 317 | + description) |
318 | 318 | k (.weakHandle ^IEventStream src ref-queue)]
|
319 | 319 | (if-let [dsts (.get handle->downstreams k)]
|
320 |
| - (.add ^CopyOnWriteArrayList dsts d) |
| 320 | + (.add ^CopyOnWriteArrayList dsts dwn) |
321 | 321 | (let [dsts (CopyOnWriteArrayList.)]
|
322 | 322 | (if-let [dsts' (.putIfAbsent handle->downstreams k dsts)]
|
323 |
| - (.add ^CopyOnWriteArrayList dsts' d) |
| 323 | + (.add ^CopyOnWriteArrayList dsts' dwn) |
324 | 324 | (do
|
325 |
| - (.add ^CopyOnWriteArrayList dsts d) |
| 325 | + (.add ^CopyOnWriteArrayList dsts dwn) |
326 | 326 | (if (s/synchronous? src)
|
327 | 327 | (sync-connect src dsts)
|
328 | 328 | (async-connect src dsts))))))))))
|
0 commit comments