Skip to content

Commit 42fe0e1

Browse files
committed
fix(streams): proper usage of getBlocking
1 parent c97a190 commit 42fe0e1

File tree

35 files changed

+962
-197
lines changed

35 files changed

+962
-197
lines changed

datastructures/src/commonMain/kotlin/org/modelix/datastructures/btree/BTree.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package org.modelix.datastructures.btree
22

33
import org.modelix.datastructures.objects.IObjectGraph
4-
import org.modelix.kotlin.utils.DelicateModelixApi
54
import org.modelix.streams.IStream
5+
import org.modelix.streams.getBlocking
66

77
data class BTree<K, V>(val root: BTreeNode<K, V>) {
88
constructor(config: BTreeConfig<K, V>) : this(BTreeNodeLeaf(config, emptyList()))
@@ -12,11 +12,10 @@ data class BTree<K, V>(val root: BTreeNode<K, V>) {
1212
fun validate() {
1313
graph.getStreamExecutor().query {
1414
root.validate(true)
15-
@OptIn(DelicateModelixApi::class)
16-
check(root.getEntries().toList().getSynchronous().map { it.key }.toSet().size == root.getEntries().map { it.key }.count().getSynchronous()) {
15+
check(root.getEntries().toList().getBlocking(graph).map { it.key }.toSet().size == root.getEntries().map { it.key }.count().getBlocking(graph)) {
1716
"duplicate entries: $root"
1817
}
19-
check(root.getEntries().map { it.key }.toList().getSynchronous().sortedWith(root.config.keyConfiguration) == root.getEntries().map { it.key }.toList().getSynchronous()) {
18+
check(root.getEntries().map { it.key }.toList().getBlocking(graph).sortedWith(root.config.keyConfiguration) == root.getEntries().map { it.key }.toList().getBlocking(graph)) {
2019
"not sorted: $this"
2120
}
2221
IStream.of(Unit)

model-api/src/commonMain/kotlin/org/modelix/model/test/RandomModelChangeGenerator.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import org.modelix.model.api.async.IAsyncNode
66
import org.modelix.model.api.async.asAsyncNode
77
import org.modelix.model.api.getAncestors
88
import org.modelix.streams.IStream
9+
import org.modelix.streams.getBlocking
910
import org.modelix.streams.plus
1011
import kotlin.random.Random
1112

@@ -99,7 +100,7 @@ class RandomModelChangeGenerator(val rootNode: INode, private val rand: Random)
99100
.map { it.asRegularNode() }
100101
.filter(condition)
101102
.firstOrNull()
102-
.getSynchronous()
103+
.getBlocking(rootNode.asAsyncNode())
103104
}
104105
}
105106

model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientAsStore.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class ModelClientAsStore(client: IModelClientV2, val repositoryId: RepositoryId)
5454
).exactlyOne()
5555
}
5656

57-
override fun putAll(entries: Map<ObjectRequest<*>, IObjectData>): IStream.Zero {
57+
override fun putAll(entries: Map<ObjectRequest<*>, IObjectData>): IStream.Completable {
5858
return IStream.fromFlow<Nothing>(
5959
flow {
6060
client.pushObjects(

model-datastructure/src/commonMain/kotlin/org/modelix/datastructures/model/ModelTreeBuilder.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import org.modelix.model.TreeId
1515
import org.modelix.model.api.INodeReference
1616
import org.modelix.model.api.ITree
1717
import org.modelix.model.api.PNodeReference
18+
import org.modelix.streams.getBlocking
1819

1920
abstract class ModelTreeBuilder<NodeId> private constructor(protected val common: Common = Common()) {
2021
protected class Common {
@@ -35,13 +36,13 @@ abstract class ModelTreeBuilder<NodeId> private constructor(protected val common
3536

3637
val config = HamtNode.Config(
3738
graph = common.graph,
38-
keyConfig = nodeIdType!!,
39+
keyConfig = nodeIdType,
3940
valueConfig = ObjectReferenceDataTypeConfiguration(common.graph, NodeObjectData.Deserializer(nodeIdType, common.treeId)),
4041
)
4142
return HamtInternalNode.createEmpty(config)
4243
.put(root.data.id, root.ref, common.graph)
4344
.orNull()
44-
.getSynchronous()!!
45+
.getBlocking(common.graph)!!
4546
.let { HamtTree(it) }
4647
.autoResolveValues()
4748
.asModelTree(common.treeId)
@@ -61,7 +62,7 @@ abstract class ModelTreeBuilder<NodeId> private constructor(protected val common
6162
keyConfig = nodeIdType,
6263
valueConfig = ObjectReferenceDataTypeConfiguration(common.graph, NodeObjectData.Deserializer(nodeIdType, common.treeId)),
6364
)
64-
return PatriciaTrie(config).put(root.data.id, root.ref).getSynchronous().autoResolveValues().asModelTree(common.treeId)
65+
return PatriciaTrie(config).put(root.data.id, root.ref).getBlocking(common.graph).autoResolveValues().asModelTree(common.treeId)
6566
}
6667
}
6768

model-datastructure/src/commonMain/kotlin/org/modelix/model/async/BulkAsyncStore.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class BulkAsyncStore(
5858
return getAllAsStream(IStream.many(keys)).toMap({ it.first }, { it.second })
5959
}
6060

61-
override fun putAll(entries: Map<ObjectRequest<*>, IObjectData>): IStream.Zero {
61+
override fun putAll(entries: Map<ObjectRequest<*>, IObjectData>): IStream.Completable {
6262
return store.putAll(entries)
6363
}
6464
}

model-datastructure/src/commonMain/kotlin/org/modelix/model/async/CachingAsyncStore.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class CachingAsyncStore(val store: IAsyncObjectStore, cacheSize: Int = 100_000)
8080
}
8181
}
8282

83-
override fun putAll(entries: Map<ObjectRequest<*>, IObjectData>): IStream.Zero {
83+
override fun putAll(entries: Map<ObjectRequest<*>, IObjectData>): IStream.Completable {
8484
runSynchronized(cache) {
8585
for (entry in entries) {
8686
cache.set(entry.key, entry.value)

model-datastructure/src/commonMain/kotlin/org/modelix/model/async/IAsyncObjectStore.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ interface IAsyncObjectStore : IStreamExecutorProvider {
1818

1919
fun getAllAsStream(keys: IStream.Many<ObjectRequest<*>>): IStream.Many<Pair<ObjectRequest<*>, IObjectData?>>
2020
fun getAllAsMap(keys: List<ObjectRequest<*>>): IStream.One<Map<ObjectRequest<*>, IObjectData?>>
21-
fun putAll(entries: Map<ObjectRequest<*>, IObjectData>): IStream.Zero
21+
fun putAll(entries: Map<ObjectRequest<*>, IObjectData>): IStream.Completable
2222

2323
fun clearCache()
2424

model-datastructure/src/commonMain/kotlin/org/modelix/model/async/LegacyKeyValueStoreAsAsyncStore.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class LegacyKeyValueStoreAsAsyncStore(val store: IKeyValueStore) : IAsyncObjectS
5050
)
5151
}
5252

53-
override fun putAll(entries: Map<ObjectRequest<*>, IObjectData>): IStream.Zero {
53+
override fun putAll(entries: Map<ObjectRequest<*>, IObjectData>): IStream.Completable {
5454
store.putAll(entries.entries.associate { it.key.hash to it.value.serialize() })
5555
return IStream.zero()
5656
}

model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/CLTree.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.modelix.model.persistent.CPTree
2929
import org.modelix.streams.IStream
3030
import org.modelix.streams.IStreamExecutor
3131
import org.modelix.streams.IStreamExecutorProvider
32+
import org.modelix.streams.getBlocking
3233

3334
private fun createNewTreeData(
3435
graph: IObjectGraph,
@@ -53,7 +54,7 @@ private fun createNewTreeData(
5354
HamtInternalNode.createEmpty(config)
5455
.put(root.id, graph.fromCreated(root), graph)
5556
.orNull()
56-
.getSynchronous()!!,
57+
.getBlocking(graph)!!,
5758
),
5859
trieWithNodeRefIds = null,
5960
usesRoleIds = useRoleIds,

model-datastructure/src/commonMain/kotlin/org/modelix/model/lazy/OperationsCompressor.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import org.modelix.model.operations.SetPropertyOp
1616
import org.modelix.model.operations.SetReferenceOp
1717
import org.modelix.model.operations.UndoOp
1818
import org.modelix.model.persistent.CPTree
19+
import org.modelix.streams.getBlocking
1920

2021
class OperationsCompressor(val resultTree: Object<CPTree>) {
2122

@@ -55,7 +56,7 @@ class OperationsCompressor(val resultTree: Object<CPTree>) {
5556
}
5657

5758
for (id in createdNodes) {
58-
if (!resultTree.data.getModelTree().containsNode(id).getSynchronous()) {
59+
if (!resultTree.data.getModelTree().containsNode(id).getBlocking(resultTree.graph)) {
5960
throw RuntimeException("Tree expected to contain node $id")
6061
}
6162
}

0 commit comments

Comments
 (0)