Skip to content

Commit 2f95bcd

Browse files
committed
fix(stream): IStream.flatMap replaced by flatMapOrdered/flatMapUnordered
The Reaktive based implementation had the semantics of flatMapUnordered while all other implementations had the one of flatMapOrdered.
1 parent a068256 commit 2f95bcd

File tree

9 files changed

+50
-16
lines changed

9 files changed

+50
-16
lines changed

streams/src/commonMain/kotlin/org/modelix/streams/CollectionAsStream.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ open class CollectionAsStream<E>(val collection: Collection<E>) : IStream.Many<E
2727
return SingleValueStream(collection.firstOrNull())
2828
}
2929

30-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
31-
return convertLater().flatMap(mapper)
30+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
31+
return convertLater().flatMapOrdered(mapper)
32+
}
33+
34+
override fun <R> flatMapUnordered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
35+
return convertLater().flatMapUnordered(mapper)
3236
}
3337

3438
override fun <R> flatMapIterable(mapper: (E) -> Iterable<R>): IStream.Many<R> {

streams/src/commonMain/kotlin/org/modelix/streams/DeferredStreamBuilder.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,15 @@ class DeferredStreamBuilder : IStreamBuilder {
162162
return ConvertibleMany { convert(it).map(mapper) }
163163
}
164164

165-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
165+
override fun <R> flatMapUnordered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
166166
return ConvertibleMany { converter ->
167-
convert(converter).flatMap { mapper(it).convert(converter) }
167+
convert(converter).flatMapUnordered { mapper(it).convert(converter) }
168+
}
169+
}
170+
171+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
172+
return ConvertibleMany { converter ->
173+
convert(converter).flatMapOrdered { mapper(it).convert(converter) }
168174
}
169175
}
170176

streams/src/commonMain/kotlin/org/modelix/streams/EmptyStream.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class EmptyStream<E> : IStreamInternal.ZeroOrOne<E> {
6969
@DelicateModelixApi
7070
override fun iterateBlocking(visitor: (E) -> Unit) {}
7171

72-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
72+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
7373
return EmptyStream()
7474
}
7575

streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import kotlinx.coroutines.flow.emptyFlow
1313
import kotlinx.coroutines.flow.filter
1414
import kotlinx.coroutines.flow.firstOrNull
1515
import kotlinx.coroutines.flow.flatMapConcat
16+
import kotlinx.coroutines.flow.flatMapMerge
1617
import kotlinx.coroutines.flow.flow
1718
import kotlinx.coroutines.flow.flowOf
1819
import kotlinx.coroutines.flow.fold
@@ -187,7 +188,11 @@ class FlowStreamBuilder() : IStreamBuilder {
187188
return Wrapper(wrapped.flatMapConcat { convert(mapper(it)) })
188189
}
189190

190-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
191+
override fun <R> flatMapUnordered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
192+
return Wrapper(wrapped.flatMapMerge { convert(mapper(it)) })
193+
}
194+
195+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
191196
return Wrapper(wrapped.flatMapConcat { convert(mapper(it)) })
192197
}
193198

streams/src/commonMain/kotlin/org/modelix/streams/IStream.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,22 @@ interface IStream<out E> {
2727
fun filter(predicate: (E) -> Boolean): Many<E>
2828
fun <R> map(mapper: (E) -> R): Many<R>
2929
fun <R : Any> mapNotNull(mapper: (E) -> R?): Many<R> = map(mapper).filterNotNull()
30-
fun <R> flatMap(mapper: (E) -> Many<R>): Many<R>
30+
31+
@Deprecated("Use flatMapOrdered or flatMapUnordered")
32+
fun <R> flatMap(mapper: (E) -> Many<R>): Many<R> = flatMapOrdered(mapper)
33+
34+
/**
35+
* Output elements are only emitted after all output elements of the previous input are emitted.
36+
* Can have a lower performance than [flatMapUnordered].
37+
*/
38+
fun <R> flatMapOrdered(mapper: (E) -> Many<R>): Many<R>
39+
40+
/**
41+
* Output elements are emitted as soon as possible.
42+
* Can have a higher performance than [flatMapOrdered].
43+
*/
44+
fun <R> flatMapUnordered(mapper: (E) -> Many<R>): Many<R> = flatMapOrdered(mapper)
45+
3146
fun <R> flatMapIterable(mapper: (E) -> Iterable<R>): Many<R> = flatMap { IStream.many(mapper(it)) }
3247
fun concat(other: Many<@UnsafeVariance E>): Many<E>
3348
fun concat(other: OneOrMany<@UnsafeVariance E>): OneOrMany<E>

streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,12 @@ class ReaktiveStreamBuilder() : IStreamBuilder {
261261
throw UnsupportedOperationException("Use IStreamExecutor.iterate")
262262
}
263263

264-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
265-
return WrapperMany(wrapped.flatMap { mapper(it).toReaktive() })
264+
override fun <R> flatMapUnordered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
265+
return WrapperMany(wrapped.flatMap(maxConcurrency = Int.MAX_VALUE) { mapper(it).toReaktive() })
266+
}
267+
268+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
269+
return WrapperMany(wrapped.flatMap(maxConcurrency = 1) { mapper(it).toReaktive() })
266270
}
267271

268272
override fun concat(other: IStream.Many<E>): IStream.Many<E> {
@@ -356,7 +360,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder {
356360
}
357361

358362
override fun <R> flatMapOne(mapper: (E) -> IStream.One<R>): OneOrMany<R> {
359-
return WrapperOneOrMany(wrapped.flatMapSingle { mapper(it).toReaktive() })
363+
return WrapperOneOrMany(wrapped.flatMapSingle(maxConcurrency = 1) { mapper(it).toReaktive() })
360364
}
361365

362366
override fun onErrorReturn(valueSupplier: (Throwable) -> E): OneOrMany<E> {
@@ -439,7 +443,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder {
439443
return WrapperMaybe(wrapped.flatMapMaybe { mapper(it).toReaktive() })
440444
}
441445

442-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
446+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
443447
return WrapperMany(wrapped.flatMapObservable { mapper(it).toReaktive() })
444448
}
445449

@@ -565,7 +569,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder {
565569
throw UnsupportedOperationException("Use IStreamExecutor.iterate")
566570
}
567571

568-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
572+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
569573
return WrapperMany(wrapped.flatMapObservable { mapper(it).toReaktive() })
570574
}
571575

streams/src/commonMain/kotlin/org/modelix/streams/SequenceAsStream.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ open class SequenceAsStream<E>(val wrapped: Sequence<E>) : IStream.Many<E>, IStr
4949
return SequenceAsStreamOneOrMany(wrapped.ifEmpty { sequenceOf(alternative()) })
5050
}
5151

52-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
53-
return convertLater().flatMap(mapper)
52+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
53+
return convertLater().flatMapOrdered(mapper)
5454
}
5555

5656
override fun concat(other: IStream.Many<E>): IStream.Many<E> {

streams/src/commonMain/kotlin/org/modelix/streams/SequenceStreamBuilder.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ class SequenceStreamBuilder() : IStreamBuilder {
188188
return Wrapper(wrapped.flatMap { convert(mapper(it)) })
189189
}
190190

191-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
191+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
192192
return Wrapper(wrapped.flatMap { convert(mapper(it)) })
193193
}
194194

streams/src/commonMain/kotlin/org/modelix/streams/SingleValueStream.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class SingleValueStream<E>(val value: E) : IStreamInternal.One<E> {
8787
return this
8888
}
8989

90-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
90+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
9191
return mapper(value)
9292
}
9393

0 commit comments

Comments
 (0)