@@ -21,21 +21,28 @@ import io.ktor.client.plugins.HttpTimeout
21
21
import io.ktor.client.plugins.ResponseException
22
22
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
23
23
import io.ktor.client.plugins.expectSuccess
24
+ import io.ktor.client.request.HttpRequestBuilder
24
25
import io.ktor.client.request.get
25
26
import io.ktor.client.request.post
26
27
import io.ktor.client.request.put
27
28
import io.ktor.client.request.setBody
29
+ import io.ktor.client.statement.HttpResponse
30
+ import io.ktor.client.statement.bodyAsChannel
28
31
import io.ktor.client.statement.bodyAsText
29
32
import io.ktor.http.ContentType
33
+ import io.ktor.http.HttpHeaders
30
34
import io.ktor.http.HttpStatusCode
31
35
import io.ktor.http.URLBuilder
32
36
import io.ktor.http.appendPathSegments
33
37
import io.ktor.http.contentType
34
38
import io.ktor.http.takeFrom
35
39
import io.ktor.serialization.kotlinx.json.json
40
+ import io.ktor.utils.io.readUTF8Line
36
41
import kotlinx.coroutines.CoroutineScope
42
+ import kotlinx.coroutines.flow.Flow
43
+ import kotlinx.coroutines.flow.emptyFlow
44
+ import kotlinx.coroutines.flow.flow
37
45
import kotlinx.coroutines.launch
38
- import kotlinx.serialization.json.Json
39
46
import org.modelix.kotlin.utils.DeprecationInfo
40
47
import org.modelix.model.IVersion
41
48
import org.modelix.model.api.IIdGenerator
@@ -54,6 +61,8 @@ import org.modelix.model.operations.OTBranch
54
61
import org.modelix.model.persistent.HashUtil
55
62
import org.modelix.model.persistent.MapBasedStore
56
63
import org.modelix.model.server.api.v2.VersionDelta
64
+ import org.modelix.model.server.api.v2.VersionDeltaStream
65
+ import org.modelix.model.server.api.v2.asStream
57
66
import org.modelix.modelql.client.ModelQLClient
58
67
import org.modelix.modelql.core.IMonoStep
59
68
import kotlin.time.Duration
@@ -117,9 +126,9 @@ class ModelClientV2(
117
126
takeFrom(baseUrl)
118
127
appendPathSegmentsEncodingSlash(" repositories" , repository.id, " init" )
119
128
}
129
+ useVersionStreamFormat()
120
130
}
121
- val delta = response.body<VersionDelta >()
122
- return createVersion(null , delta)
131
+ return createVersion(null , response.readVersionDelta())
123
132
}
124
133
125
134
override suspend fun listRepositories (): List <RepositoryId > {
@@ -151,9 +160,9 @@ class ModelClientV2(
151
160
parameters[" lastKnown" ] = (baseVersion as CLVersion ).getContentHash()
152
161
}
153
162
}
163
+ useVersionStreamFormat()
154
164
}
155
- val delta = Json .decodeFromString<VersionDelta >(response.bodyAsText())
156
- return createVersion(baseVersion as CLVersion ? , delta)
165
+ return createVersion(baseVersion as CLVersion ? , response.readVersionDelta())
157
166
}
158
167
159
168
override suspend fun loadVersion (
@@ -169,9 +178,9 @@ class ModelClientV2(
169
178
parameters[" lastKnown" ] = (baseVersion as CLVersion ).getContentHash()
170
179
}
171
180
}
181
+ useVersionStreamFormat()
172
182
}
173
- val delta = Json .decodeFromString<VersionDelta >(response.bodyAsText())
174
- return createVersion(baseVersion as CLVersion ? , delta)
183
+ return createVersion(baseVersion as CLVersion ? , response.readVersionDelta())
175
184
}
176
185
177
186
override suspend fun push (branch : BranchReference , version : IVersion , baseVersion : IVersion ? ): IVersion {
@@ -193,11 +202,11 @@ class ModelClientV2(
193
202
takeFrom(baseUrl)
194
203
appendPathSegmentsEncodingSlash(" repositories" , branch.repositoryId.id, " branches" , branch.branchName)
195
204
}
205
+ useVersionStreamFormat()
196
206
contentType(ContentType .Application .Json )
197
207
setBody(delta)
198
208
}
199
- val mergedVersionDelta = response.body<VersionDelta >()
200
- return createVersion(version, mergedVersionDelta)
209
+ return createVersion(version, response.readVersionDelta())
201
210
}
202
211
203
212
private suspend fun uploadObjects (repository : RepositoryId , objects : Sequence <Pair <String , String >>) {
@@ -224,8 +233,9 @@ class ModelClientV2(
224
233
parameters[" lastKnown" ] = lastKnownVersion.hash
225
234
}
226
235
}
236
+ useVersionStreamFormat()
227
237
}
228
- val receivedVersion = createVersion(lastKnownVersion, response.body ())
238
+ val receivedVersion = createVersion(lastKnownVersion, response.readVersionDelta ())
229
239
LOG .debug { " ${clientId.toString(16 )} .pull($branch , $lastKnownVersion ) -> $receivedVersion " }
230
240
return receivedVersion
231
241
}
@@ -237,11 +247,12 @@ class ModelClientV2(
237
247
takeFrom(baseUrl)
238
248
appendPathSegmentsEncodingSlash(" repositories" , branch.repositoryId.id, " branches" , branch.branchName)
239
249
}
250
+ useVersionStreamFormat()
240
251
}
241
252
242
253
val receivedVersion = when (response.status) {
243
254
HttpStatusCode .NotFound -> null
244
- HttpStatusCode .OK -> createVersion(null , response.body ())
255
+ HttpStatusCode .OK -> createVersion(null , response.readVersionDelta ())
245
256
else -> throw ResponseException (response, response.bodyAsText())
246
257
}
247
258
LOG .debug { " ${clientId.toString(16 )} .pullIfExists($branch ) -> $receivedVersion " }
@@ -284,8 +295,9 @@ class ModelClientV2(
284
295
parameters[" lastKnown" ] = lastKnownVersion.hash
285
296
}
286
297
}
298
+ useVersionStreamFormat()
287
299
}
288
- val receivedVersion = createVersion(lastKnownVersion, response.body ())
300
+ val receivedVersion = createVersion(lastKnownVersion, response.readVersionDelta ())
289
301
LOG .debug { " ${clientId.toString(16 )} .poll($branch , $lastKnownVersion ) -> $receivedVersion " }
290
302
return receivedVersion
291
303
}
@@ -310,24 +322,78 @@ class ModelClientV2(
310
322
httpClient.close()
311
323
}
312
324
313
- private fun createVersion (baseVersion : CLVersion ? , delta : VersionDelta ): CLVersion {
325
+ private suspend fun HttpResponse.readVersionDelta (): VersionDeltaStream {
326
+ return if (contentType()?.match(VersionDeltaStream .CONTENT_TYPE ) == true ) {
327
+ val content = bodyAsChannel()
328
+ val versionHash = checkNotNull(content.readUTF8Line()) { " No objects received" }
329
+ val versionObject = content.readUTF8Line()
330
+ return if (versionObject == null ) {
331
+ VersionDeltaStream (versionHash, emptyFlow())
332
+ } else {
333
+ VersionDeltaStream (
334
+ versionHash,
335
+ flow {
336
+ emit(versionHash to versionObject)
337
+ while (true ) {
338
+ val key = content.readUTF8Line() ? : break
339
+ val value = checkNotNull(content.readUTF8Line()) { " Object missing for hash $key " }
340
+ emit(key to value)
341
+ }
342
+ },
343
+ )
344
+ }
345
+ } else {
346
+ body<VersionDelta >().asStream()
347
+ }
348
+ }
349
+
350
+ private suspend fun createVersion (baseVersion : CLVersion ? , delta : VersionDeltaStream ): CLVersion {
351
+ delta.getObjectsAsFlow().collect {
352
+ HashUtil .checkObjectHash(it.first, it.second)
353
+ store.keyValueStore.put(it.first, it.second)
354
+ }
355
+ return if (baseVersion == null ) {
356
+ CLVersion (delta.versionHash, store)
357
+ } else if (delta.versionHash == baseVersion.getContentHash()) {
358
+ baseVersion
359
+ } else {
360
+ require(baseVersion.store == store) { " baseVersion was not created by this client" }
361
+ CLVersion (delta.versionHash, store)
362
+ }
363
+ }
364
+
365
+ private suspend fun createVersion (baseVersion : CLVersion ? , delta : Flow <String >): CLVersion {
366
+ var firstHash: String? = null
367
+ var isHash = true
368
+ var lastHash: String? = null
369
+ delta.collect {
370
+ if (isHash) {
371
+ lastHash = it
372
+ if (firstHash == null ) {
373
+ firstHash = it
374
+ }
375
+ } else {
376
+ val value = it
377
+ store.keyValueStore.put(lastHash!! , value)
378
+ }
379
+ isHash = ! isHash
380
+ }
381
+ val versionHash = checkNotNull(firstHash) { " No objects received" }
382
+
314
383
return if (baseVersion == null ) {
315
- CLVersion (
316
- delta.versionHash,
317
- store.also { it.keyValueStore.putAll(delta.getAllObjects()) },
318
- )
319
- } else if (delta.versionHash == baseVersion.hash) {
384
+ CLVersion (versionHash, store)
385
+ } else if (versionHash == baseVersion.getContentHash()) {
320
386
baseVersion
321
387
} else {
322
388
require(baseVersion.store == store) { " baseVersion was not created by this client" }
323
- store.keyValueStore.putAll(delta.getAllObjects())
324
- CLVersion (
325
- delta.versionHash,
326
- baseVersion.store,
327
- )
389
+ CLVersion (versionHash, store)
328
390
}
329
391
}
330
392
393
+ private fun HttpRequestBuilder.useVersionStreamFormat () {
394
+ headers.set(HttpHeaders .Accept , VersionDeltaStream .CONTENT_TYPE .toString())
395
+ }
396
+
331
397
companion object {
332
398
private val LOG = mu.KotlinLogging .logger {}
333
399
fun builder (): ModelClientV2Builder = ModelClientV2PlatformSpecificBuilder ()
0 commit comments