Skip to content

Commit c573cfd

Browse files
committed
Reimplement into without toList
1 parent 2b597fc commit c573cfd

File tree

2 files changed

+13
-7
lines changed
  • language-adaptors/rxjava-clojure/src

2 files changed

+13
-7
lines changed

language-adaptors/rxjava-clojure/src/main/clojure/rx/lang/clojure/core.clj

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -554,10 +554,11 @@
554554
clojure.core/into
555555
rx.Observable/toList
556556
"
557-
[to ^Observable from-observable]
558-
(->> from-observable
559-
.toList
560-
(map (partial clojure.core/into to))))
557+
[to ^Observable from]
558+
; clojure.core/into uses transients if to is IEditableCollection
559+
; I don't think we have any guarantee that all on-next calls will be on the
560+
; same thread, so we can't do that here.
561+
(reduce conj to from))
561562

562563
(defn keep
563564
[f xs]

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/core_test.clj

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
sleepy-o #(f/future-generator f/default-runner [o]
116116
(doseq [x %]
117117
(Thread/sleep 10)
118-
(rx/on-next o x)))
118+
(rx/on-next o x)))
119119
make-inputs (fn [] (mapv sleepy-o expected-result))
120120
make-output (fn [r] [(keep #{1 3 5} r)
121121
(keep #{2 4 6} r)])]
@@ -338,8 +338,13 @@
338338
(b/into [] (rx/interpose \, (rx/seq->o [1 2 3]))))))
339339

340340
(deftest test-into
341-
(is (= (into [6 7 8] [9 10 [11]])
342-
(b/first (rx/into [6 7 8] (rx/seq->o [9 10 [11]]))))))
341+
(are [input to] (= (into to input)
342+
(b/single (rx/into to (rx/seq->o input))))
343+
[6 7 8] [9 10 [11]]
344+
#{} [1 2 3 2 4 5]
345+
{} [[1 2] [3 2] [4 5]]
346+
{} []
347+
'() (range 50)))
343348

344349
(deftest test-keep
345350
(is (= (into [] (keep identity [true true false]))

0 commit comments

Comments
 (0)