@@ -16,6 +16,7 @@ package org.modelix.model.client2
16
16
import io.ktor.client.HttpClient
17
17
import io.ktor.client.HttpClientConfig
18
18
import io.ktor.client.call.body
19
+ import io.ktor.client.plugins.ClientRequestException
19
20
import io.ktor.client.plugins.HttpRequestRetry
20
21
import io.ktor.client.plugins.HttpTimeout
21
22
import io.ktor.client.plugins.ResponseException
@@ -55,10 +56,12 @@ import org.modelix.model.api.INode
55
56
import org.modelix.model.api.IdGeneratorDummy
56
57
import org.modelix.model.api.TreePointer
57
58
import org.modelix.model.api.getRootNode
59
+ import org.modelix.model.api.runSynchronized
58
60
import org.modelix.model.client.IdGenerator
59
61
import org.modelix.model.lazy.BranchReference
60
62
import org.modelix.model.lazy.CLTree
61
63
import org.modelix.model.lazy.CLVersion
64
+ import org.modelix.model.lazy.IDeserializingKeyValueStore
62
65
import org.modelix.model.lazy.ObjectStoreCache
63
66
import org.modelix.model.lazy.RepositoryId
64
67
import org.modelix.model.lazy.computeDelta
@@ -74,6 +77,8 @@ import org.modelix.modelql.core.IMonoStep
74
77
import kotlin.time.Duration
75
78
import kotlin.time.Duration.Companion.seconds
76
79
80
+ class VersionNotFoundException (val versionHash : String ) : Exception(" Version $versionHash not found" )
81
+
77
82
class ModelClientV2 (
78
83
private val httpClient : HttpClient ,
79
84
val baseUrl : String ,
@@ -82,14 +87,20 @@ class ModelClientV2(
82
87
private var clientId: Int = 0
83
88
private var idGenerator: IIdGenerator = IdGeneratorDummy ()
84
89
private var serverProvidedUserId: String? = null
85
- private val kvStore = MapBasedStore ()
86
- val store = ObjectStoreCache (kvStore) // TODO the store will accumulate garbage
90
+
91
+ // TODO the store will accumulate garbage
92
+ private val storeForRepository: MutableMap <RepositoryId ?, IDeserializingKeyValueStore > = HashMap ()
87
93
88
94
suspend fun init () {
89
95
updateClientId()
90
96
updateUserId()
91
97
}
92
98
99
+ private fun getStore (repository : RepositoryId ? ) = runSynchronized(storeForRepository) { storeForRepository.getOrPut(repository) { ObjectStoreCache (MapBasedStore ()) } }
100
+ private fun getRepository (store : IDeserializingKeyValueStore ): RepositoryId ? {
101
+ return storeForRepository.asSequence().first { it.value == store }.key
102
+ }
103
+
93
104
private suspend fun updateClientId () {
94
105
this .clientId = httpClient.post {
95
106
url {
@@ -127,15 +138,24 @@ class ModelClientV2(
127
138
override fun getUserId (): String? = clientProvidedUserId ? : serverProvidedUserId
128
139
129
140
override suspend fun initRepository (repository : RepositoryId , useRoleIds : Boolean ): IVersion {
141
+ return initRepository(repository, useRoleIds = useRoleIds, legacyGlobalStorage = false )
142
+ }
143
+
144
+ override suspend fun initRepositoryWithLegacyStorage (repository : RepositoryId ): IVersion {
145
+ return initRepository(repository, useRoleIds = false , legacyGlobalStorage = true )
146
+ }
147
+
148
+ suspend fun initRepository (repository : RepositoryId , useRoleIds : Boolean = false, legacyGlobalStorage : Boolean = false): IVersion {
130
149
return httpClient.preparePost {
131
150
url {
132
151
parameter(" useRoleIds" , useRoleIds)
133
152
takeFrom(baseUrl)
134
153
appendPathSegmentsEncodingSlash(" repositories" , repository.id, " init" )
154
+ if (legacyGlobalStorage) parameters[" legacyGlobalStorage" ] = legacyGlobalStorage.toString()
135
155
}
136
156
useVersionStreamFormat()
137
157
}.execute { response ->
138
- createVersion(null , response.readVersionDelta())
158
+ createVersion(getStore(repository), null , response.readVersionDelta())
139
159
}
140
160
}
141
161
@@ -193,36 +213,55 @@ class ModelClientV2(
193
213
@Deprecated(" repository ID is required for permission checks" )
194
214
@DeprecationInfo(" 3.7.0" , " May be removed with the next major release. Also remove the endpoint from the model-server." )
195
215
override suspend fun loadVersion (versionHash : String , baseVersion : IVersion ? ): IVersion {
196
- return httpClient.prepareGet {
197
- url {
198
- takeFrom(baseUrl)
199
- appendPathSegments(" versions" , versionHash)
200
- if (baseVersion != null ) {
201
- parameters[" lastKnown" ] = (baseVersion as CLVersion ).getContentHash()
216
+ val repositoryIdFromBaseVersion = (baseVersion as ? CLVersion )?.let { getRepository(it.store) }
217
+ if (repositoryIdFromBaseVersion != null ) {
218
+ return doLoadVersion(repositoryIdFromBaseVersion, versionHash, baseVersion)
219
+ } else {
220
+ // try finding the version in any repository
221
+ for (repositoryId in listRepositories() + null ) {
222
+ try {
223
+ return doLoadVersion(repositoryId, versionHash, baseVersion)
224
+ } catch (ex: ClientRequestException ) {
225
+ when (ex.response.status) {
226
+ HttpStatusCode .NotFound -> {}
227
+ HttpStatusCode .Unauthorized -> {}
228
+ HttpStatusCode .Forbidden -> {}
229
+ else -> throw ex
230
+ }
202
231
}
203
232
}
204
- useVersionStreamFormat()
205
- }.execute { response ->
206
- createVersion(baseVersion as CLVersion ? , response.readVersionDelta())
233
+ throw VersionNotFoundException (versionHash)
207
234
}
208
235
}
209
236
210
237
override suspend fun loadVersion (
211
238
repositoryId : RepositoryId ,
212
239
versionHash : String ,
213
240
baseVersion : IVersion ? ,
241
+ ): IVersion {
242
+ return doLoadVersion(repositoryId, versionHash, baseVersion)
243
+ }
244
+
245
+ private suspend fun doLoadVersion (
246
+ repositoryId : RepositoryId ? ,
247
+ versionHash : String ,
248
+ baseVersion : IVersion ? ,
214
249
): IVersion {
215
250
return httpClient.prepareGet {
216
251
url {
217
252
takeFrom(baseUrl)
218
- appendPathSegments(" repositories" , repositoryId.id, " versions" , versionHash)
253
+ if (repositoryId == null ) {
254
+ appendPathSegments(" versions" , versionHash)
255
+ } else {
256
+ appendPathSegments(" repositories" , repositoryId.id, " versions" , versionHash)
257
+ }
219
258
if (baseVersion != null ) {
220
259
parameters[" lastKnown" ] = (baseVersion as CLVersion ).getContentHash()
221
260
}
222
261
}
223
262
useVersionStreamFormat()
224
263
}.execute { response ->
225
- createVersion(baseVersion as CLVersion ? , response.readVersionDelta())
264
+ createVersion(getStore(repositoryId), baseVersion as CLVersion ? , response.readVersionDelta())
226
265
}
227
266
}
228
267
@@ -249,7 +288,7 @@ class ModelClientV2(
249
288
contentType(ContentType .Application .Json )
250
289
setBody(delta)
251
290
}.execute { response ->
252
- createVersion(version, response.readVersionDelta())
291
+ createVersion(getStore(branch.repositoryId), version, response.readVersionDelta())
253
292
}
254
293
}
255
294
@@ -285,7 +324,7 @@ class ModelClientV2(
285
324
}
286
325
useVersionStreamFormat()
287
326
}.execute { response ->
288
- val receivedVersion = createVersion(lastKnownVersion, response.readVersionDelta())
327
+ val receivedVersion = createVersion(getStore(branch.repositoryId), lastKnownVersion, response.readVersionDelta())
289
328
LOG .debug { " ${clientId.toString(16 )} .pull($branch , $lastKnownVersion ) -> $receivedVersion " }
290
329
receivedVersion
291
330
}
@@ -302,7 +341,7 @@ class ModelClientV2(
302
341
}.execute { response ->
303
342
val receivedVersion = when (response.status) {
304
343
HttpStatusCode .NotFound -> null
305
- HttpStatusCode .OK -> createVersion(null , response.readVersionDelta())
344
+ HttpStatusCode .OK -> createVersion(getStore(branch.repositoryId), null , response.readVersionDelta())
306
345
else -> throw ResponseException (response, response.bodyAsText())
307
346
}
308
347
LOG .debug { " ${clientId.toString(16 )} .pullIfExists($branch ) -> $receivedVersion " }
@@ -352,7 +391,7 @@ class ModelClientV2(
352
391
}
353
392
useVersionStreamFormat()
354
393
}.execute { response ->
355
- val receivedVersion = createVersion(lastKnownVersion, response.readVersionDelta())
394
+ val receivedVersion = createVersion(getStore(branch.repositoryId), lastKnownVersion, response.readVersionDelta())
356
395
LOG .debug { " ${clientId.toString(16 )} .poll($branch , $lastKnownVersion ) -> $receivedVersion " }
357
396
receivedVersion
358
397
}
@@ -378,7 +417,7 @@ class ModelClientV2(
378
417
httpClient.close()
379
418
}
380
419
381
- private suspend fun createVersion (baseVersion : CLVersion ? , delta : VersionDeltaStream ): CLVersion {
420
+ private suspend fun createVersion (store : IDeserializingKeyValueStore , baseVersion : CLVersion ? , delta : VersionDeltaStream ): CLVersion {
382
421
delta.getObjectsAsFlow().collect {
383
422
HashUtil .checkObjectHash(it.first, it.second)
384
423
store.keyValueStore.put(it.first, it.second)
@@ -393,7 +432,7 @@ class ModelClientV2(
393
432
}
394
433
}
395
434
396
- private suspend fun createVersion (baseVersion : CLVersion ? , delta : Flow <String >): CLVersion {
435
+ private suspend fun createVersion (store : IDeserializingKeyValueStore , baseVersion : CLVersion ? , delta : Flow <String >): CLVersion {
397
436
var firstHash: String? = null
398
437
var isHash = true
399
438
var lastHash: String? = null
@@ -586,7 +625,7 @@ suspend fun <T> IModelClientV2.runWriteOnBranch(branchRef: BranchReference, body
586
625
.takeIf { it != branchRef }
587
626
?.let { client.pullIfExists(it) } // master branch
588
627
? : client.initRepository(branchRef.repositoryId)
589
- val branch = OTBranch (TreePointer (baseVersion.getTree(), client.getIdGenerator()), client.getIdGenerator(), (client as ModelClientV2 ).store)
628
+ val branch = OTBranch (TreePointer (baseVersion.getTree(), client.getIdGenerator()), client.getIdGenerator(), (baseVersion as CLVersion ).store)
590
629
val result = branch.computeWrite {
591
630
body(branch)
592
631
}
0 commit comments