Skip to content

Commit e5c9d05

Browse files
committed
fix(streams): wrapper for Single didn't consume some of its inputs
The output was correct, but the side effects where not applied.
1 parent 037c8be commit e5c9d05

File tree

1 file changed

+27
-60
lines changed

1 file changed

+27
-60
lines changed

streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt

Lines changed: 27 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import com.badoo.reaktive.maybe.asCompletable
1313
import com.badoo.reaktive.maybe.asObservable
1414
import com.badoo.reaktive.maybe.asSingle
1515
import com.badoo.reaktive.maybe.asSingleOrError
16-
import com.badoo.reaktive.maybe.defaultIfEmpty
1716
import com.badoo.reaktive.maybe.doOnAfterSubscribe
1817
import com.badoo.reaktive.maybe.doOnBeforeError
1918
import com.badoo.reaktive.maybe.filter
@@ -121,14 +120,39 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
121120
abstract fun wrappedAsObservable(): Observable<E>
122121
}
123122

124-
abstract inner class ReaktiveWrapper<E> : Wrapper<E>(), IStream<E> {
123+
abstract inner class ReaktiveWrapper<E> : Wrapper<E>(), IStream.Many<E> {
125124
abstract val wrapped: Source<*>
126125
override fun iterateSynchronous(visitor: (E) -> Unit) {
127126
throw UnsupportedOperationException("Use IStreamExecutor.iterate")
128127
}
129128
override suspend fun iterateSuspending(visitor: suspend (E) -> Unit) {
130129
throw UnsupportedOperationException("Use IStreamExecutor.iterateSuspending")
131130
}
131+
132+
override fun skip(count: Long): IStream.Many<E> {
133+
require(count >= 0L)
134+
return WrapperMany(wrappedAsObservable().skip(count))
135+
}
136+
137+
override fun count(): IStream.One<Int> {
138+
return WrapperSingle(wrappedAsObservable().count())
139+
}
140+
141+
override fun take(n: Int): IStream.Many<E> {
142+
return WrapperMany(wrappedAsObservable().take(n))
143+
}
144+
145+
override fun toList(): IStream.One<List<E>> {
146+
return WrapperSingle(wrappedAsObservable().toList())
147+
}
148+
149+
override fun isEmpty(): IStream.One<Boolean> {
150+
return WrapperSingle(wrappedAsObservable().isEmpty())
151+
}
152+
153+
override fun exactlyOne(): IStream.One<E> {
154+
return WrapperSingle(wrappedAsSingle())
155+
}
132156
}
133157

134158
inner class WrapperCompletable(val wrapped: Completable) :
@@ -219,10 +243,6 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
219243
throw UnsupportedOperationException("Use IStreamExecutor.iterate")
220244
}
221245

222-
override fun toList(): IStream.One<List<E>> {
223-
return WrapperSingle(wrapped.toList())
224-
}
225-
226246
override fun iterateSynchronous(visitor: (E) -> Unit) {
227247
throw UnsupportedOperationException("Use IStreamExecutor.iterate")
228248
}
@@ -269,18 +289,6 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
269289
return merger(WrapperMany(a), WrapperMany(b))
270290
}
271291

272-
override fun skip(count: Long): IStream.Many<E> {
273-
return WrapperMany(wrapped.skip(count))
274-
}
275-
276-
override fun exactlyOne(): IStream.One<E> {
277-
return WrapperSingle(wrappedAsSingle())
278-
}
279-
280-
override fun count(): IStream.One<Int> {
281-
return WrapperSingle(wrapped.count())
282-
}
283-
284292
override fun filterBySingle(condition: (E) -> IStream.One<Boolean>): IStream.Many<E> {
285293
return WrapperMany(wrapped.filterBySingle { condition(it).toReaktive() })
286294
}
@@ -289,10 +297,6 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
289297
return WrapperSingle(wrapped.firstOrDefault(defaultValue))
290298
}
291299

292-
override fun take(n: Int): IStream.Many<E> {
293-
return WrapperMany(wrapped.take(n))
294-
}
295-
296300
override fun firstOrEmpty(): IStream.ZeroOrOne<E> {
297301
return WrapperMaybe(wrappedAsMaybe())
298302
}
@@ -469,19 +473,10 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
469473
TODO("Not yet implemented")
470474
}
471475

472-
override fun skip(count: Long): IStream.Many<E> {
473-
require(count >= 0L)
474-
return if (count == 0L) this else IStream.empty()
475-
}
476-
477476
override fun exactlyOne(): IStream.One<E> {
478477
return this
479478
}
480479

481-
override fun count(): IStream.One<Int> {
482-
return IStream.of(1)
483-
}
484-
485480
override fun filterBySingle(condition: (E) -> IStream.One<Boolean>): IStream.Many<E> {
486481
return WrapperMany(wrapped.asObservable().filterBySingle { condition(it).toReaktive() })
487482
}
@@ -490,10 +485,6 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
490485
return this // there is always a first element
491486
}
492487

493-
override fun take(n: Int): IStream.Many<E> {
494-
return if (n > 0) this else IStream.empty()
495-
}
496-
497488
override fun firstOrEmpty(): IStream.ZeroOrOne<E> {
498489
return this // there is always a first element
499490
}
@@ -503,7 +494,7 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
503494
}
504495

505496
override fun isEmpty(): IStream.One<Boolean> {
506-
return IStream.of(false)
497+
return WrapperSingle(wrapped.asObservable().isEmpty())
507498
}
508499

509500
override fun withIndex(): IStream.Many<IndexedValue<E>> {
@@ -561,10 +552,6 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
561552
throw UnsupportedOperationException("Use IStreamExecutor.iterate")
562553
}
563554

564-
override fun toList(): IStream.One<List<E>> {
565-
return WrapperSingle(wrapped.asObservable().toList())
566-
}
567-
568555
override fun iterateSynchronous(visitor: (E) -> Unit) {
569556
throw UnsupportedOperationException("Use IStreamExecutor.iterate")
570557
}
@@ -615,18 +602,6 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
615602
TODO("Not yet implemented")
616603
}
617604

618-
override fun skip(count: Long): IStream.Many<E> {
619-
return if (count > 0) IStream.empty() else this
620-
}
621-
622-
override fun exactlyOne(): IStream.One<E> {
623-
return WrapperSingle(wrappedAsSingle())
624-
}
625-
626-
override fun count(): IStream.One<Int> {
627-
return WrapperSingle(wrapped.asObservable().count())
628-
}
629-
630605
override fun filterBySingle(condition: (E) -> IStream.One<Boolean>): IStream.Many<E> {
631606
return WrapperMany(wrapped.asObservable().filterBySingle { condition(it).toReaktive() })
632607
}
@@ -635,10 +610,6 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
635610
return WrapperSingle(wrapped.asSingle(defaultValue))
636611
}
637612

638-
override fun take(n: Int): IStream.Many<E> {
639-
return if (n > 0) this else IStream.empty()
640-
}
641-
642613
override fun firstOrEmpty(): IStream.ZeroOrOne<E> {
643614
return this
644615
}
@@ -647,10 +618,6 @@ class ReaktiveStreamBuilder(executor: IStreamExecutorProvider) : IStreamBuilder,
647618
return WrapperMany(wrapped.asObservable().switchIfEmpty { alternative().toReaktive() })
648619
}
649620

650-
override fun isEmpty(): IStream.One<Boolean> {
651-
return WrapperSingle(wrapped.map { false }.defaultIfEmpty(true))
652-
}
653-
654621
override fun withIndex(): IStream.Many<IndexedValue<E>> {
655622
return WrapperMaybe(wrapped.map { IndexedValue(0, it) })
656623
}

0 commit comments

Comments
 (0)