Skip to content

Commit 4eafd32

Browse files
committed
fix(streams): IStream.flatMap replaced by flatMapOrdered/flatMapUnordered
The Reaktive based implementation had the semantics of flatMapUnordered while all other implementations had the one of flatMapOrdered.
1 parent a068256 commit 4eafd32

File tree

12 files changed

+61
-19
lines changed

12 files changed

+61
-19
lines changed

model-api/src/commonMain/kotlin/org/modelix/model/api/async/AsyncNode.kt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,17 @@ class AsyncNode(
8585

8686
override fun getDescendants(includeSelf: Boolean): IStream.Many<IAsyncNode> {
8787
return if (includeSelf) {
88-
getStreamExecutor()
8988
IStream.of(IStream.of(this), getDescendants(false)).flatten()
9089
} else {
91-
getAllChildren().flatMap { it.getDescendants(true) }
90+
getAllChildren().flatMapOrdered { it.getDescendants(true) }
91+
}
92+
}
93+
94+
override fun getDescendantsUnordered(includeSelf: Boolean): IStream.Many<IAsyncNode> {
95+
return if (includeSelf) {
96+
IStream.of(IStream.of(this), getDescendantsUnordered(false)).flatten()
97+
} else {
98+
getAllChildren().flatMapUnordered { it.getDescendantsUnordered(true) }
9299
}
93100
}
94101
}

model-api/src/commonMain/kotlin/org/modelix/model/api/async/IAsyncNode.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ interface IAsyncNode : IStreamExecutorProvider {
3333
fun getAllReferenceTargets(): IStream.Many<Pair<IReferenceLinkReference, IAsyncNode>>
3434

3535
fun getDescendants(includeSelf: Boolean): IStream.Many<IAsyncNode>
36+
fun getDescendantsUnordered(includeSelf: Boolean): IStream.Many<IAsyncNode> = getDescendants(includeSelf)
3637
}
3738

3839
interface INodeWithAsyncSupport : INode {

model-server/src/test/kotlin/org/modelix/model/server/LazyLoadingTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ private object DepthFirstSearchPattern : AccessPattern {
148148
private object StreamBasedApi : AccessPattern {
149149
override suspend fun runPattern(rootNode: INode) {
150150
val asyncNode = rootNode.asAsyncNode()
151-
asyncNode.querySuspending { asyncNode.getDescendants(true).count() }
151+
asyncNode.querySuspending { asyncNode.getDescendantsUnordered(true).count() }
152152
}
153153
}
154154

streams/src/commonMain/kotlin/org/modelix/streams/CollectionAsStream.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ open class CollectionAsStream<E>(val collection: Collection<E>) : IStream.Many<E
2727
return SingleValueStream(collection.firstOrNull())
2828
}
2929

30-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
31-
return convertLater().flatMap(mapper)
30+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
31+
return convertLater().flatMapOrdered(mapper)
32+
}
33+
34+
override fun <R> flatMapUnordered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
35+
return convertLater().flatMapUnordered(mapper)
3236
}
3337

3438
override fun <R> flatMapIterable(mapper: (E) -> Iterable<R>): IStream.Many<R> {

streams/src/commonMain/kotlin/org/modelix/streams/DeferredStreamBuilder.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,15 @@ class DeferredStreamBuilder : IStreamBuilder {
162162
return ConvertibleMany { convert(it).map(mapper) }
163163
}
164164

165-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
165+
override fun <R> flatMapUnordered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
166166
return ConvertibleMany { converter ->
167-
convert(converter).flatMap { mapper(it).convert(converter) }
167+
convert(converter).flatMapUnordered { mapper(it).convert(converter) }
168+
}
169+
}
170+
171+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
172+
return ConvertibleMany { converter ->
173+
convert(converter).flatMapOrdered { mapper(it).convert(converter) }
168174
}
169175
}
170176

streams/src/commonMain/kotlin/org/modelix/streams/EmptyStream.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class EmptyStream<E> : IStreamInternal.ZeroOrOne<E> {
6969
@DelicateModelixApi
7070
override fun iterateBlocking(visitor: (E) -> Unit) {}
7171

72-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
72+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
7373
return EmptyStream()
7474
}
7575

streams/src/commonMain/kotlin/org/modelix/streams/FlowStreamBuilder.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import kotlinx.coroutines.flow.emptyFlow
1313
import kotlinx.coroutines.flow.filter
1414
import kotlinx.coroutines.flow.firstOrNull
1515
import kotlinx.coroutines.flow.flatMapConcat
16+
import kotlinx.coroutines.flow.flatMapMerge
1617
import kotlinx.coroutines.flow.flow
1718
import kotlinx.coroutines.flow.flowOf
1819
import kotlinx.coroutines.flow.fold
@@ -187,7 +188,11 @@ class FlowStreamBuilder() : IStreamBuilder {
187188
return Wrapper(wrapped.flatMapConcat { convert(mapper(it)) })
188189
}
189190

190-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
191+
override fun <R> flatMapUnordered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
192+
return Wrapper(wrapped.flatMapMerge { convert(mapper(it)) })
193+
}
194+
195+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
191196
return Wrapper(wrapped.flatMapConcat { convert(mapper(it)) })
192197
}
193198

streams/src/commonMain/kotlin/org/modelix/streams/IStream.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,22 @@ interface IStream<out E> {
2727
fun filter(predicate: (E) -> Boolean): Many<E>
2828
fun <R> map(mapper: (E) -> R): Many<R>
2929
fun <R : Any> mapNotNull(mapper: (E) -> R?): Many<R> = map(mapper).filterNotNull()
30-
fun <R> flatMap(mapper: (E) -> Many<R>): Many<R>
30+
31+
@Deprecated("Use flatMapOrdered or flatMapUnordered")
32+
fun <R> flatMap(mapper: (E) -> Many<R>): Many<R> = flatMapOrdered(mapper)
33+
34+
/**
35+
* Output elements are only emitted after all output elements of the previous input are emitted.
36+
* Can have a lower performance than [flatMapUnordered].
37+
*/
38+
fun <R> flatMapOrdered(mapper: (E) -> Many<R>): Many<R>
39+
40+
/**
41+
* Output elements are emitted as soon as possible.
42+
* Can have a higher performance than [flatMapOrdered].
43+
*/
44+
fun <R> flatMapUnordered(mapper: (E) -> Many<R>): Many<R> = flatMapOrdered(mapper)
45+
3146
fun <R> flatMapIterable(mapper: (E) -> Iterable<R>): Many<R> = flatMap { IStream.many(mapper(it)) }
3247
fun concat(other: Many<@UnsafeVariance E>): Many<E>
3348
fun concat(other: OneOrMany<@UnsafeVariance E>): OneOrMany<E>

streams/src/commonMain/kotlin/org/modelix/streams/ReaktiveStreamBuilder.kt

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,12 @@ class ReaktiveStreamBuilder() : IStreamBuilder {
261261
throw UnsupportedOperationException("Use IStreamExecutor.iterate")
262262
}
263263

264-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
265-
return WrapperMany(wrapped.flatMap { mapper(it).toReaktive() })
264+
override fun <R> flatMapUnordered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
265+
return WrapperMany(wrapped.flatMap(maxConcurrency = Int.MAX_VALUE) { mapper(it).toReaktive() })
266+
}
267+
268+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
269+
return WrapperMany(wrapped.flatMap(maxConcurrency = 1) { mapper(it).toReaktive() })
266270
}
267271

268272
override fun concat(other: IStream.Many<E>): IStream.Many<E> {
@@ -356,7 +360,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder {
356360
}
357361

358362
override fun <R> flatMapOne(mapper: (E) -> IStream.One<R>): OneOrMany<R> {
359-
return WrapperOneOrMany(wrapped.flatMapSingle { mapper(it).toReaktive() })
363+
return WrapperOneOrMany(wrapped.flatMapSingle(maxConcurrency = 1) { mapper(it).toReaktive() })
360364
}
361365

362366
override fun onErrorReturn(valueSupplier: (Throwable) -> E): OneOrMany<E> {
@@ -439,7 +443,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder {
439443
return WrapperMaybe(wrapped.flatMapMaybe { mapper(it).toReaktive() })
440444
}
441445

442-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
446+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
443447
return WrapperMany(wrapped.flatMapObservable { mapper(it).toReaktive() })
444448
}
445449

@@ -565,7 +569,7 @@ class ReaktiveStreamBuilder() : IStreamBuilder {
565569
throw UnsupportedOperationException("Use IStreamExecutor.iterate")
566570
}
567571

568-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
572+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
569573
return WrapperMany(wrapped.flatMapObservable { mapper(it).toReaktive() })
570574
}
571575

streams/src/commonMain/kotlin/org/modelix/streams/SequenceAsStream.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ open class SequenceAsStream<E>(val wrapped: Sequence<E>) : IStream.Many<E>, IStr
4949
return SequenceAsStreamOneOrMany(wrapped.ifEmpty { sequenceOf(alternative()) })
5050
}
5151

52-
override fun <R> flatMap(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
53-
return convertLater().flatMap(mapper)
52+
override fun <R> flatMapOrdered(mapper: (E) -> IStream.Many<R>): IStream.Many<R> {
53+
return convertLater().flatMapOrdered(mapper)
5454
}
5555

5656
override fun concat(other: IStream.Many<E>): IStream.Many<E> {

0 commit comments

Comments
 (0)