Skip to content

Commit 5f91619

Browse files
authored
Ensure json is streamed from the reqest into the db (#1608)
* import commits via a temp table and then merge data into the commits table * define and use a streaming json IAsyncEnumerable model
1 parent 0d4ab11 commit 5f91619

File tree

4 files changed

+91
-47
lines changed

4 files changed

+91
-47
lines changed

backend/FwLite/FwLiteWeb/Routes/ProjectRoutes.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public static IEndpointConventionBuilder MapProjectRoutes(this WebApplication ap
4242
async (SyncService syncService,
4343
IOptions<AuthConfig> options,
4444
string serverAuthority,
45+
[FromRoute(Name = CrdtMiniLcmApiHub.ProjectRouteKey)]string project,
4546
[FromQuery] Guid lexboxProjectId) =>
4647
{
4748
var server = options.Value.GetServerByAuthority(serverAuthority);

backend/LexBoxApi/Controllers/CrdtController.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using LexBoxApi.Auth.Attributes;
55
using LexBoxApi.GraphQL;
66
using LexBoxApi.Hub;
7+
using LexBoxApi.Models;
78
using LexBoxApi.Services;
89
using LexCore.Auth;
910
using LexCore.Entities;
@@ -12,6 +13,7 @@
1213
using Microsoft.AspNetCore.Mvc;
1314
using Microsoft.AspNetCore.SignalR;
1415
using Microsoft.EntityFrameworkCore;
16+
using Microsoft.Extensions.Options;
1517
using MiniLcm.Push;
1618

1719
namespace LexBoxApi.Controllers;
@@ -27,7 +29,8 @@ public class CrdtController(
2729
LoggedInContext loggedInContext,
2830
ProjectService projectService,
2931
CrdtCommitService crdtCommitService,
30-
LexAuthService lexAuthService) : ControllerBase
32+
LexAuthService lexAuthService
33+
) : ControllerBase
3134
{
3235
[HttpGet("{projectId}/get")]
3336
public async Task<ActionResult<SyncState>> GetSyncState(Guid projectId)
@@ -37,10 +40,11 @@ public async Task<ActionResult<SyncState>> GetSyncState(Guid projectId)
3740
}
3841

3942
[HttpPost("{projectId}/add")]
40-
public async Task<ActionResult> Add(Guid projectId, [FromBody] IAsyncEnumerable<ServerCommit> commits, Guid? clientId)
43+
44+
public async Task<ActionResult> Add(Guid projectId, StreamJsonAsyncEnumerable<ServerCommit> commits, Guid? clientId, CancellationToken token)
4145
{
4246
await permissionService.AssertCanSyncProject(projectId);
43-
await crdtCommitService.AddCommits(projectId, commits);
47+
await crdtCommitService.AddCommits(projectId, commits, token);
4448

4549
await hubContext.Clients.Group(CrdtProjectChangeHub.ProjectGroup(projectId)).OnProjectUpdated(projectId, clientId);
4650
return Ok();
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using System.Reflection;
2+
using System.Text.Json;
3+
using System.Text.Json.Serialization;
4+
using Microsoft.AspNetCore.Mvc;
5+
using Microsoft.AspNetCore.Mvc.ModelBinding;
6+
using Microsoft.Extensions.Options;
7+
8+
namespace LexBoxApi.Models;
9+
10+
/// <summary>
11+
/// by default if you set IAsyncEnumerable<T> as a parameter type, it will be created using JsonSerializer.DeserializeAsync
12+
/// this will buffer the data into a list, rather than streaming it. This class exists to allow streaming
13+
/// </summary>
14+
[UseStreamingBinder]
15+
public class StreamJsonAsyncEnumerable<T>(IAsyncEnumerable<T> stream): IAsyncEnumerable<T>
16+
{
17+
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())
18+
{
19+
return stream.GetAsyncEnumerator(cancellationToken);
20+
}
21+
}
22+
23+
internal class UseStreamingBinderAttribute() : ModelBinderAttribute(typeof(StreamJsonAsyncEnumerableConverter))
24+
{
25+
public override BindingSource? BindingSource => BindingSource.Body;
26+
}
27+
public class StreamJsonAsyncEnumerableConverter(ILogger<StreamJsonAsyncEnumerableConverter> logger): IModelBinder
28+
{
29+
private static readonly MethodInfo BindMethod = new Action<ModelBindingContext>(Bind<object>).Method.GetGenericMethodDefinition();
30+
public Task BindModelAsync(ModelBindingContext bindingContext)
31+
{
32+
if (!bindingContext.ModelType.IsGenericType)
33+
{
34+
throw new InvalidOperationException("Model type must be generic");
35+
}
36+
try
37+
{
38+
var genericType = bindingContext.ModelType.GetGenericArguments()[0];
39+
var bindMethodGeneric = BindMethod.MakeGenericMethod(genericType);
40+
bindMethodGeneric.Invoke(null, [bindingContext]);
41+
}
42+
catch (Exception e)
43+
{
44+
logger.LogError(e, "problem deserializing json");
45+
bindingContext.ModelState.TryAddModelException("this", e);
46+
}
47+
return Task.CompletedTask;
48+
}
49+
50+
private static void Bind<T>(ModelBindingContext bindingContext) where T : class
51+
{
52+
var options = bindingContext.HttpContext.RequestServices.GetRequiredService<IOptions<JsonOptions>>().Value
53+
.JsonSerializerOptions;
54+
var enumerable = JsonSerializer.DeserializeAsyncEnumerable<T>(bindingContext.HttpContext.Request.BodyReader.AsStream(), options, bindingContext.HttpContext.RequestAborted);
55+
bindingContext.Result = ModelBindingResult.Success(new StreamJsonAsyncEnumerable<T>(enumerable.OfType<T>()));
56+
}
57+
}

backend/LexBoxApi/Services/CrdtCommitService.cs

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using LexCore.Utils;
22
using LexData;
33
using LinqToDB;
4+
using LinqToDB.Data;
45
using LinqToDB.EntityFrameworkCore;
56
using LinqToDB.EntityFrameworkCore.Internal;
67
using SIL.Harmony.Core;
@@ -9,53 +10,34 @@ namespace LexBoxApi.Services;
910

1011
public class CrdtCommitService(LexBoxDbContext dbContext)
1112
{
12-
public async Task AddCommits(Guid projectId, IAsyncEnumerable<ServerCommit> commits)
13+
public async Task AddCommits(Guid projectId, IAsyncEnumerable<ServerCommit> commits, CancellationToken token = default)
1314
{
14-
await using var transaction = await dbContext.Database.BeginTransactionAsync();
15-
var commitsTable = dbContext.CreateLinqToDBContext().GetTable<ServerCommit>();
16-
const int commitsThreshold = 100;
17-
const int changeThreshold = 150;
18-
var currentChangeCount = 0;
19-
var commitBucket = new List<ServerCommit>(100);
20-
await foreach (var serverCommit in commits)
21-
{
22-
commitBucket.Add(serverCommit);
23-
currentChangeCount += serverCommit.ChangeEntities.Count;
24-
if (currentChangeCount < changeThreshold && commitBucket.Count < commitsThreshold)
15+
await using var transaction = await dbContext.Database.BeginTransactionAsync(token);
16+
var linqToDbContext = dbContext.CreateLinqToDBContext();
17+
await using var tmpTable = await linqToDbContext.CreateTempTableAsync<ServerCommit>($"tmp_crdt_commit_import_{projectId}__{Guid.NewGuid()}", cancellationToken: token);
18+
await tmpTable.BulkCopyAsync(new BulkCopyOptions{BulkCopyType = BulkCopyType.ProviderSpecific, MaxBatchSize = 10}, commits, token);
19+
20+
var commitsTable = linqToDbContext.GetTable<ServerCommit>();
21+
await commitsTable
22+
.Merge()
23+
.Using(tmpTable)
24+
.OnTargetKey()
25+
.InsertWhenNotMatched(commit => new ServerCommit(commit.Id)
2526
{
26-
continue;
27-
}
28-
29-
await FlushCommits();
30-
}
31-
32-
if (commitBucket.Count > 0) await FlushCommits();
33-
34-
await transaction.CommitAsync();
35-
36-
async Task FlushCommits()
37-
{
38-
await commitsTable
39-
.Merge()
40-
.Using(commitBucket)
41-
.OnTargetKey()
42-
.InsertWhenNotMatched(commit => new ServerCommit(commit.Id)
27+
Id = commit.Id,
28+
ClientId = commit.ClientId,
29+
HybridDateTime = new HybridDateTime(commit.HybridDateTime.DateTime, commit.HybridDateTime.Counter)
4330
{
44-
Id = commit.Id,
45-
ClientId = commit.ClientId,
46-
HybridDateTime = new HybridDateTime(commit.HybridDateTime.DateTime, commit.HybridDateTime.Counter)
47-
{
48-
DateTime = commit.HybridDateTime.DateTime, Counter = commit.HybridDateTime.Counter
49-
},
50-
ProjectId = projectId,
51-
Metadata = commit.Metadata,
52-
//without this sql cast the value will be treated as text and fail to insert into the jsonb column
53-
ChangeEntities = Sql.Expr<List<ChangeEntity<ServerJsonChange>>>($"{commit.ChangeEntities}::jsonb")
54-
})
55-
.MergeAsync();
56-
commitBucket.Clear();
57-
currentChangeCount = 0;
58-
}
31+
DateTime = commit.HybridDateTime.DateTime, Counter = commit.HybridDateTime.Counter
32+
},
33+
ProjectId = projectId,
34+
Metadata = commit.Metadata,
35+
//without this sql cast the value will be treated as text and fail to insert into the jsonb column
36+
ChangeEntities = Sql.Expr<List<ChangeEntity<ServerJsonChange>>>($"{commit.ChangeEntities}::jsonb")
37+
})
38+
.MergeAsync(token);
39+
40+
await transaction.CommitAsync(token);
5941
}
6042

6143
public IAsyncEnumerable<ServerCommit> GetMissingCommits(Guid projectId, SyncState localState, SyncState remoteState)

0 commit comments

Comments
 (0)