Skip to content

Commit fc206a9

Browse files
authored
Improved batching of migration jobs (#1238)
* Handle duplicate Dialogporten migration jobs * Batch the batching. It takes about a minute to create 10k jobs. * Increase max cache entry size * Error handling in sync * Maybe actually scheduling it is a good idea too * Ai * Delete file
1 parent 2bbff9d commit fc206a9

File tree

8 files changed

+431
-11
lines changed

8 files changed

+431
-11
lines changed

Test/Altinn.Correspondence.Tests/TestingHandler/MigrateCorrespondenceHandlerTests.cs

Lines changed: 373 additions & 0 deletions
Large diffs are not rendered by default.

src/Altinn.Correspondence.API/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ static void ConfigureServices(IServiceCollection services, IConfiguration config
122122
options.InstanceName = "redisCache";
123123
});
124124
#pragma warning disable EXTEXP0018
125-
services.AddHybridCache();
125+
services.AddHybridCache(options => options.MaximumPayloadBytes = 1000 * 1000 * 10L);
126126
#pragma warning restore EXTEXP0018
127127
services.AddSingleton<IHybridCacheWrapper, HybridCacheWrapper>();
128128
services.ConfigureAuthentication(config, hostEnvironment);

src/Altinn.Correspondence.Application/MigrateCorrespondence/MakeCorrespondenceAvailableRequest.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ public class MakeCorrespondenceAvailableRequest
1010
public List<Guid>? CorrespondenceIds { get; set; }
1111
public bool AsyncProcessing { get; set; }
1212
public int? BatchSize { get; set; }
13+
public int? BatchOffset { get; set; }
1314
}

src/Altinn.Correspondence.Application/MigrateCorrespondence/MigrateCorrespondenceHandler.cs

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,17 +111,53 @@ public async Task<OneOf<MakeCorrespondenceAvailableResponse, Error>> MakeCorresp
111111
}
112112
else if (request.BatchSize is not null)
113113
{
114-
var correspondences = await correspondenceRepository.GetCandidatesForMigrationToDialogporten(request.BatchSize ?? 0, cancellationToken);
115-
foreach(var correspondence in correspondences)
114+
if (!request.AsyncProcessing)
116115
{
117-
if (request.AsyncProcessing)
116+
var correspondences = await correspondenceRepository.GetCandidatesForMigrationToDialogporten(request.BatchSize ?? 0, request.BatchOffset ?? 0, cancellationToken);
117+
foreach (var correspondence in correspondences)
118118
{
119-
backgroundJobClient.Enqueue<MigrateCorrespondenceHandler>("migration", (handler) => handler.MakeCorrespondenceAvailableInDialogportenAndApi(correspondence.Id));
120-
}
121-
else
119+
try
120+
{
121+
dialogId = await MakeCorrespondenceAvailableInDialogportenAndApi(correspondence.Id, cancellationToken);
122+
response.Statuses.Add(new(correspondence.Id, null, dialogId, true));
123+
}
124+
catch (Exception ex)
125+
{
126+
response.Statuses.Add(new(correspondence.Id, ex.ToString()));
127+
}
128+
}
129+
return response;
130+
}
131+
var batchLimit = 10000;
132+
if (request.BatchSize > batchLimit)
133+
{
134+
var remainingCount = request.BatchSize;
135+
var alreadyAdded = 0;
136+
while(remainingCount > 0)
137+
{
138+
int currentBatch = batchLimit;
139+
remainingCount -= batchLimit;
140+
if (remainingCount < 0)
141+
{
142+
currentBatch = (int)(remainingCount + batchLimit);
143+
}
144+
var migrateRequest = new MakeCorrespondenceAvailableRequest()
145+
{
146+
AsyncProcessing = true,
147+
BatchOffset = alreadyAdded,
148+
BatchSize = currentBatch,
149+
CreateEvents = request.CreateEvents
150+
};
151+
backgroundJobClient.Enqueue<MigrateCorrespondenceHandler>((handler) => handler.MakeCorrespondenceAvailable(migrateRequest, CancellationToken.None));
152+
alreadyAdded += currentBatch;
153+
}
154+
}
155+
else
156+
{
157+
var correspondences = await correspondenceRepository.GetCandidatesForMigrationToDialogporten(request.BatchSize ?? 0, request.BatchOffset ?? 0, cancellationToken);
158+
foreach(var correspondence in correspondences)
122159
{
123-
dialogId = await MakeCorrespondenceAvailableInDialogportenAndApi(correspondence.Id, cancellationToken);
124-
response.Statuses.Add(new(correspondence.Id, null, dialogId, true));
160+
backgroundJobClient.Enqueue<MigrateCorrespondenceHandler>(handler => handler.MakeCorrespondenceAvailableInDialogportenAndApi(correspondence.Id));
125161
}
126162
}
127163
}

src/Altinn.Correspondence.Core/Repositories/ICorrespondenceRepostitory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ Task<CorrespondenceEntity> GetCorrespondenceByAltinn2Id(
5454
Task UpdatePublished(Guid correspondenceId, DateTimeOffset published, CancellationToken cancellationToken);
5555
Task UpdateIsMigrating(Guid correspondenceId, bool isMigrating, CancellationToken cancellationToken);
5656
Task<bool> AreAllAttachmentsPublished(Guid correspondenceId, CancellationToken cancellationToken = default);
57-
Task<List<CorrespondenceEntity>> GetCandidatesForMigrationToDialogporten(int batchSize, CancellationToken cancellationToken = default);
57+
Task<List<CorrespondenceEntity>> GetCandidatesForMigrationToDialogporten(int batchSize, int offset, CancellationToken cancellationToken = default);
5858
Task<List<CorrespondenceEntity>> GetPurgedCorrespondencesWithDialogsAfter(
5959
int limit,
6060
DateTimeOffset? lastCreated,

src/Altinn.Correspondence.Integrations/Altinn/Notifications/AltinnNotificationService.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Microsoft.Extensions.Options;
66
using Altinn.Correspondence.Core.Repositories;
77
using Altinn.Correspondence.Core.Models.Notifications;
8+
using System.Text.Json;
89

910
namespace Altinn.Correspondence.Integrations.Altinn.Notifications;
1011

@@ -42,6 +43,8 @@ public AltinnNotificationService(HttpClient httpClient, IOptions<AltinnOptions>
4243
{
4344
_logger.LogInformation("Creating notification in Altinn Notification v2");
4445
var response = await _httpClient.PostAsJsonAsync("notifications/api/v1/future/orders", notificationRequest, cancellationToken);
46+
var jsonRequest = JsonSerializer.Serialize(notificationRequest);
47+
var responseJson = await response.Content.ReadAsStringAsync(cancellationToken);
4548
if (!response.IsSuccessStatusCode)
4649
{
4750
_logger.LogError("Failed to create notification in Altinn Notification v2. Status code: {StatusCode}", response.StatusCode);

src/Altinn.Correspondence.Integrations/Dialogporten/DialogportenService.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,11 @@ public async Task<string> CreateCorrespondenceDialogForMigratedCorrespondence(Gu
435435
{
436436
throw new ArgumentException($"Correspondence with id {correspondenceId} not found", nameof(correspondenceId));
437437
}
438+
if (correspondence.ExternalReferences.Any(reference => reference.ReferenceType == ReferenceType.DialogportenDialogId))
439+
{
440+
logger.LogWarning($"Duplicate job for correspondence {correspondenceId}");
441+
return correspondence.ExternalReferences.FirstOrDefault(reference => reference.ReferenceType == ReferenceType.DialogportenDialogId)?.ReferenceValue ?? string.Empty;
442+
}
438443

439444
await CreateIdempotencyKeysForCorrespondence(correspondence, cancellationToken);
440445

src/Altinn.Correspondence.Persistence/Repositories/CorrespondenceRepository.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,13 @@ public async Task<bool> AreAllAttachmentsPublished(Guid correspondenceId, Cancel
185185
.SingleOrDefaultAsync(cancellationToken);
186186
}
187187

188-
public Task<List<CorrespondenceEntity>> GetCandidatesForMigrationToDialogporten(int batchSize, CancellationToken cancellationToken = default)
188+
public Task<List<CorrespondenceEntity>> GetCandidatesForMigrationToDialogporten(int batchSize, int offset, CancellationToken cancellationToken = default)
189189
{
190190
return _context.Correspondences
191191
.Where(c => c.Altinn2CorrespondenceId != null && c.IsMigrating) // Only include correspondences that are not already migrated
192192
.OrderByDescending(c => c.Created)
193+
.ThenBy(c => c.Id)
194+
.Skip(offset)
193195
.Take(batchSize)
194196
.ToListAsync(cancellationToken);
195197
}

0 commit comments

Comments
 (0)