Skip to content

Commit a63d025

Browse files
committed
Bug fix: File ingestion ordering to chronological
1 parent 311addf commit a63d025

23 files changed

+196
-153
lines changed

src/KeeperData.Bridge.PerformanceTests/PerformanceTests.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public async Task InitializeAsync()
6363
_localStackContainer = new LocalStackBuilder()
6464
.WithImage("localstack/localstack:3.0")
6565
.Build();
66-
66+
6767
await _localStackContainer.StartAsync();
6868
_output.WriteLine(" LocalStack container started");
6969

@@ -85,7 +85,7 @@ public async Task InitializeAsync()
8585
_mongoDbContainer = new MongoDbBuilder()
8686
.WithImage("mongo:7.0")
8787
.Build();
88-
88+
8989
await _mongoDbContainer.StartAsync();
9090
_output.WriteLine(" MongoDB container started");
9191

@@ -131,7 +131,7 @@ public async Task DisposeAsync()
131131
[Fact]
132132
public async Task FullPipeline_SamCPHHolding_NRecords_ShouldCompleteSuccessfully()
133133
{
134-
const int TotalToProcess = 300_000;
134+
const int TotalToProcess = 300_000;
135135

136136
_output.WriteLine("╔══════════════════════════════════════════════════════════════╗");
137137
_output.WriteLine("║ PERFORMANCE TEST: SAM CPH Holding - 1000 Records Pipeline ║");
@@ -177,9 +177,9 @@ public async Task FullPipeline_SamCPHHolding_NRecords_ShouldCompleteSuccessfully
177177

178178
// Step 10: Verify data integrity
179179
VerifyDataIntegrity(queryResults, recordCount);
180-
180+
181181
stopwatch.Stop();
182-
182+
183183
_output.WriteLine($" Step 10: Data integrity verified - {stopwatch.ElapsedMilliseconds}ms\n");
184184

185185
_output.WriteLine("╔══════════════════════════════════════════════════════════════╗");
@@ -198,7 +198,7 @@ public async Task FullPipeline_SamCPHHolding_NRecords_ShouldCompleteSuccessfully
198198

199199
var cacheDir = Path.Combine(Path.GetTempPath(), "KeeperDataPerformanceTests");
200200
Directory.CreateDirectory(cacheDir);
201-
201+
202202
var cachedFilePath = Path.Combine(cacheDir, cachedDataFileName);
203203

204204
if (File.Exists(cachedFilePath))
@@ -212,17 +212,17 @@ public async Task FullPipeline_SamCPHHolding_NRecords_ShouldCompleteSuccessfully
212212

213213
_output.WriteLine($"Generating new test data ({totalToProcess} records)...");
214214
var (csvContent, recordCount) = GenerateSamCPHHoldingData(totalToProcess);
215-
215+
216216
await File.WriteAllTextAsync(cachedFilePath, csvContent);
217217
_output.WriteLine($"Cached test data to: {cachedFilePath}");
218-
218+
219219
return (csvContent, recordCount);
220220
}
221221

222222
private (string CsvContent, int RecordCount) GenerateSamCPHHoldingData(int recordCount)
223223
{
224224
var csv = new StringBuilder();
225-
225+
226226
// Write header based on samCPHHolding definition
227227
csv.AppendLine("CPH|ADDRESS_PK|DISEASE_TYPE|INTERVAL|INTERVAL_UNIT_OF_TIME|CPH_RELATIONSHIP_TYPE|SECONDARY_CPH|ANIMAL_SPECIES_CODE|ANIMAL_PRODUCTION_USAGE_CODE|CHANGETYPE");
228228

@@ -263,7 +263,7 @@ private string GenerateRandomCode(Random random, int length)
263263
private DataSetDefinition CreateSamCPHHoldingDefinition()
264264
{
265265
var today = DateOnly.FromDateTime(DateTime.UtcNow);
266-
266+
267267
return new DataSetDefinition(
268268
Name: "sam_cph_holdings",
269269
FilePrefixFormat: "LITP_SAMCPHHOLDING_{0}",
@@ -291,7 +291,7 @@ private ServiceProvider ConfigureServices(DataSetDefinition dataSetDefinition)
291291
builder.AddConsole();
292292
builder.SetMinimumLevel(LogLevel.Warning); // Reduce noise in performance tests
293293
});
294-
294+
295295
services.AddSingleton(TimeProvider.System);
296296

297297
var configuration = new ConfigurationBuilder()
@@ -495,7 +495,7 @@ private async Task VerifyImportCompletedAsync(Guid importId)
495495
if (report.Status == ImportStatus.Completed)
496496
{
497497
_output.WriteLine("Import completed successfully!");
498-
498+
499499
report.AcquisitionPhase.Should().NotBeNull();
500500
report.AcquisitionPhase!.Status.Should().Be(PhaseStatus.Completed);
501501
report.AcquisitionPhase.FilesProcessed.Should().BeGreaterThan(0);
@@ -506,7 +506,7 @@ private async Task VerifyImportCompletedAsync(Guid importId)
506506

507507
_output.WriteLine($" Files Processed: {report.AcquisitionPhase.FilesProcessed}");
508508
_output.WriteLine($" Records Created: {report.IngestionPhase.RecordsCreated}");
509-
509+
510510
return;
511511
}
512512

src/KeeperData.Bridge/Controllers/ExternalCatalogueController.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -232,15 +232,15 @@ private static string GenerateReport(string sourceType, IReadOnlyList<FileSet> f
232232
if (fileSet.Files.Any())
233233
{
234234
report.AppendLine(" Files:");
235-
foreach (var file in fileSet.Files.OrderByDescending(f => f.LastModified))
235+
foreach (var file in fileSet.Files.OrderBy(f => f.Timestamp))
236236
{
237-
var sizeKB = file.Size / 1024.0;
237+
var sizeKB = file.StorageObject.Size / 1024.0;
238238
var sizeDisplay = sizeKB < 1024 ? $"{sizeKB:F1} KB" : $"{sizeKB / 1024:F1} MB";
239239

240-
report.AppendLine($" - {file.Key}");
240+
report.AppendLine($" - {file.StorageObject.Key}");
241241
report.AppendLine($" Size: {sizeDisplay}");
242-
report.AppendLine($" Last Modified: {file.LastModified:yyyy-MM-dd HH:mm:ss} UTC");
243-
report.AppendLine($" ETag: {file.ETag}");
242+
report.AppendLine($" Last Modified: {file.StorageObject.LastModified:yyyy-MM-dd HH:mm:ss} UTC");
243+
report.AppendLine($" ETag: {file.StorageObject.ETag}");
244244
}
245245
}
246246
else
@@ -288,7 +288,7 @@ private static bool IsFileNameMatchingDefinition(string fileNameWithoutExtension
288288
try
289289
{
290290
var prefixPattern = definition.FilePrefixFormat.Replace("{0}", "");
291-
var dateTimePattern = StandardDataSetDefinitionsBuilder.DateTimePattern;
291+
var dateTimePattern = EtlFileTimestampPatterns.DateTimePattern;
292292

293293
var expectedPattern = prefixPattern + dateTimePattern;
294294

@@ -312,7 +312,7 @@ private static bool IsFileNameMatchingDefinition(string fileNameWithoutExtension
312312

313313
return DateTime.TryParseExact(
314314
dateTimeString,
315-
StandardDataSetDefinitionsBuilder.DateTimePattern,
315+
EtlFileTimestampPatterns.DateTimePattern,
316316
CultureInfo.InvariantCulture,
317317
DateTimeStyles.None,
318318
out _);
@@ -326,7 +326,7 @@ private static bool IsFileNameMatchingDefinition(string fileNameWithoutExtension
326326
private static string FormatExampleFileName(DataSetDefinition definition)
327327
{
328328
var prefix = definition.FilePrefixFormat.Replace("{0}", "");
329-
return $"{prefix}{StandardDataSetDefinitionsBuilder.DateTimePattern}.csv (e.g., {prefix}20241201123045.csv)";
329+
return $"{prefix}{EtlFileTimestampPatterns.DateTimePattern}.csv (e.g., {prefix}20241201123045.csv)";
330330
}
331331

332332
private ValidationProblemDetails CreateValidationProblem(string errorMessage, string fieldName)

src/KeeperData.Core/ETL/Impl/AcquisitionPipeline.cs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ await UpdateAcquisitionPhaseCompletedAsync(
7777
{
7878
logger.LogInformation("Step 1: Discovering files for ImportId: {ImportId}", importId);
7979

80-
var fileSets = await catalogueService.GetFileSetsAsync(20, ct);
80+
var fileSets = await catalogueService.GetFileSetsAsync(100, ct);
8181
var totalFiles = fileSets.Sum(fs => fs.Files.Length);
8282

8383
logger.LogInformation("Discovered {FileSetCount} file set(s) containing {TotalFileCount} file(s) for ImportId: {ImportId}",
@@ -150,7 +150,7 @@ private async Task UpdateAcquisitionPhaseStartedAsync(Guid importId, int totalFi
150150
private async Task<bool> ProcessSingleFileAsync(
151151
Guid importId,
152152
FileSet fileSet,
153-
StorageObjectInfo file,
153+
EtlFile file,
154154
int currentFileNumber,
155155
int totalFiles,
156156
IBlobStorageServiceReadOnly sourceBlobs,
@@ -162,15 +162,15 @@ private async Task<bool> ProcessSingleFileAsync(
162162
logger.LogInformation("Processing file {CurrentFile}/{TotalFiles}: {FileKey} for ImportId: {ImportId}",
163163
currentFileNumber,
164164
totalFiles,
165-
file.Key,
165+
file.StorageObject.Key,
166166
importId);
167167

168168
try
169169
{
170170
var fileContext = await PrepareFileContextAsync(file, sourceBlobs, destinationBlobs, ct);
171171

172172
var transferDecision = await DetermineFileTransferRequirementAsync(
173-
file.Key,
173+
file.StorageObject.Key,
174174
fileContext.EncryptedMetadata.ContentLength,
175175
destinationBlobs,
176176
importId,
@@ -184,7 +184,7 @@ private async Task<bool> ProcessSingleFileAsync(
184184

185185
fileStopwatch.Stop();
186186

187-
await CheckForDuplicateProcessingAsync(file.Key, acquisitionResult.Md5Hash, importId, ct);
187+
await CheckForDuplicateProcessingAsync(file.StorageObject.Key, acquisitionResult.Md5Hash, importId, ct);
188188

189189
await RecordSuccessfulAcquisitionAsync(
190190
importId,
@@ -200,7 +200,7 @@ await RecordSuccessfulAcquisitionAsync(
200200
{
201201
fileStopwatch.Stop();
202202
logger.LogError(ex, "Failed to process file: {FileKey} after {Duration}ms for ImportId: {ImportId}",
203-
file.Key,
203+
file.StorageObject.Key,
204204
fileStopwatch.ElapsedMilliseconds,
205205
importId);
206206

@@ -211,21 +211,21 @@ await RecordSuccessfulAcquisitionAsync(
211211
}
212212

213213
private async Task<FileContext> PrepareFileContextAsync(
214-
StorageObjectInfo file,
214+
EtlFile file,
215215
IBlobStorageServiceReadOnly sourceBlobs,
216216
IBlobStorageService destinationBlobs,
217217
CancellationToken ct)
218218
{
219-
var encryptedStream = await sourceBlobs.OpenReadAsync(file.Key, ct);
220-
var encryptedMetadata = await sourceBlobs.GetMetadataAsync(file.Key, ct);
221-
var credentials = passwordSalt.Get(file.Key);
219+
var encryptedStream = await sourceBlobs.OpenReadAsync(file.StorageObject.Key, ct);
220+
var encryptedMetadata = await sourceBlobs.GetMetadataAsync(file.StorageObject.Key, ct);
221+
var credentials = passwordSalt.Get(file.StorageObject.Key);
222222

223223
logger.LogDebug("Loaded file context: {FileKey}, ContentLength: {ContentLength} bytes",
224-
file.Key,
224+
file.StorageObject.Key,
225225
encryptedMetadata.ContentLength);
226226

227227
return new FileContext(
228-
file.Key,
228+
file.StorageObject.Key,
229229
encryptedStream,
230230
encryptedMetadata,
231231
credentials.Password,
@@ -350,19 +350,19 @@ private async Task CheckForDuplicateProcessingAsync(
350350
private async Task RecordSuccessfulAcquisitionAsync(
351351
Guid importId,
352352
FileSet fileSet,
353-
StorageObjectInfo file,
353+
EtlFile file,
354354
FileAcquisitionResult acquisitionResult,
355355
long durationMs,
356356
CancellationToken ct)
357357
{
358358
await reportingService.RecordFileAcquisitionAsync(importId, new FileAcquisitionRecord
359359
{
360-
FileName = Path.GetFileName(file.Key),
361-
FileKey = file.Key,
360+
FileName = Path.GetFileName(file.StorageObject.Key),
361+
FileKey = file.StorageObject.Key,
362362
DatasetName = fileSet.Definition.Name,
363363
Md5Hash = acquisitionResult.Md5Hash,
364364
FileSize = acquisitionResult.FileSize,
365-
SourceKey = file.Key,
365+
SourceKey = file.StorageObject.Key,
366366
DecryptionDurationMs = durationMs,
367367
AcquiredAtUtc = DateTime.UtcNow,
368368
Status = FileProcessingStatus.Acquired
@@ -372,7 +372,7 @@ private async Task RecordSuccessfulAcquisitionAsync(
372372
private async Task RecordFailedAcquisitionAsync(
373373
Guid importId,
374374
FileSet fileSet,
375-
StorageObjectInfo file,
375+
EtlFile file,
376376
long durationMs,
377377
Exception ex,
378378
CancellationToken ct)
@@ -381,12 +381,12 @@ private async Task RecordFailedAcquisitionAsync(
381381
{
382382
await reportingService.RecordFileAcquisitionAsync(importId, new FileAcquisitionRecord
383383
{
384-
FileName = Path.GetFileName(file.Key),
385-
FileKey = file.Key,
384+
FileName = Path.GetFileName(file.StorageObject.Key),
385+
FileKey = file.StorageObject.Key,
386386
DatasetName = fileSet.Definition.Name,
387387
Md5Hash = string.Empty,
388388
FileSize = 0,
389-
SourceKey = file.Key,
389+
SourceKey = file.StorageObject.Key,
390390
DecryptionDurationMs = durationMs,
391391
AcquiredAtUtc = DateTime.UtcNow,
392392
Status = FileProcessingStatus.Failed,
@@ -395,7 +395,7 @@ private async Task RecordFailedAcquisitionAsync(
395395
}
396396
catch (Exception reportEx)
397397
{
398-
logger.LogError(reportEx, "Failed to record acquisition failure for file: {FileKey}", file.Key);
398+
logger.LogError(reportEx, "Failed to record acquisition failure for file: {FileKey}", file.StorageObject.Key);
399399
}
400400
}
401401

src/KeeperData.Core/ETL/Impl/DataSetDefinitions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
namespace KeeperData.Core.ETL.Impl;
55

66

7-
public record DataSetDefinition(string Name, string FilePrefixFormat, string DatePattern, string[] PrimaryKeyHeaderNames, string ChangeTypeHeaderName, string[] Accumulators);
7+
public record DataSetDefinition(string Name, string FilePrefixFormat, string[] PrimaryKeyHeaderNames, string ChangeTypeHeaderName, string[] Accumulators, string DatePattern = EtlFileTimestampPatterns.DatePattern, string DateTimePattern = EtlFileTimestampPatterns.DateTimePattern);
88

99
public class DataSetDefinitions : IDataSetDefinitions
1010
{
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace KeeperData.Core.ETL.Impl;
2+
3+
public static class EtlFileTimestampPatterns
4+
{
5+
public const string DatePattern = "yyyyMMdd";
6+
public const string DateTimePattern = "yyyyMMddHHmmss";
7+
}

src/KeeperData.Core/ETL/Impl/ExternalCatalogueService.cs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
namespace KeeperData.Core.ETL.Impl;
88

9-
public record FileSet(DataSetDefinition Definition, StorageObjectInfo[] Files);
9+
public record EtlFile(StorageObjectInfo StorageObject, DateTimeOffset Timestamp);
10+
11+
public record FileSet(DataSetDefinition Definition, EtlFile[] Files);
1012

1113
public class ExternalCatalogueService(IBlobStorageServiceReadOnly sourceBlobs,
1214
TimeProvider timeProvider,
@@ -56,7 +58,7 @@ public async Task<ImmutableList<FileSet>> GetFileSetsAsync(ImmutableArray<DataSe
5658

5759
// project into new list ordering the files rev-chrono
5860
var files = groupedByDefinition.Select(x => new FileSet(x.Key,
59-
[.. x.SelectMany(y => y.Files).OrderByDescending(x => x.LastModified)]))
61+
[.. x.SelectMany(y => y.Files).OrderBy(x => x.Timestamp)]))
6062
.ToImmutableList();
6163

6264
return files;
@@ -110,7 +112,47 @@ public async Task<FileSet> GetFileSetAsync(DataSetDefinition definition, DateOnl
110112
{
111113
var prefix = GetBlobKeyPrefix(definition, date);
112114
var blobs = await sourceBlobs.ListAsync(prefix, ct);
113-
return new FileSet(definition, [.. blobs]);
115+
var etlFiles = blobs.Select(blob => new EtlFile(blob, ExtractTimestampFromFileName(definition, blob.Key))).ToArray();
116+
117+
return new FileSet(definition, [.. etlFiles]);
118+
}
119+
120+
private DateTimeOffset ExtractTimestampFromFileName(DataSetDefinition definition, string key)
121+
{
122+
ArgumentException.ThrowIfNullOrEmpty(key, nameof(key));
123+
ArgumentNullException.ThrowIfNull(definition, nameof(definition));
124+
125+
var parts = key.Split(".").First().Split('_');
126+
127+
if (parts.Length == 0)
128+
{
129+
throw new InvalidOperationException($"Cannot extract timestamp from blob key '{key}' for dataset '{definition.Name}'");
130+
}
131+
132+
var timestampPart = parts.Last();
133+
134+
if (timestampPart.Length >= definition.DateTimePattern.Length && long.TryParse(timestampPart.AsSpan(0, 14), out _))
135+
{
136+
var dateTimeString = timestampPart.Substring(0, 14);
137+
138+
if (DateTime.TryParseExact(dateTimeString, definition.DateTimePattern,
139+
System.Globalization.CultureInfo.InvariantCulture,
140+
System.Globalization.DateTimeStyles.None,
141+
out var parsedDateTime))
142+
{
143+
// Create DateTimeOffset from the parsed DateTime, explicitly specifying UTC offset
144+
var utcDateTime = DateTime.SpecifyKind(parsedDateTime, DateTimeKind.Utc);
145+
return new DateTimeOffset(utcDateTime, TimeSpan.Zero);
146+
}
147+
else
148+
{
149+
throw new InvalidOperationException($"Cannot parse timestamp '{dateTimeString}' from blob key '{key}' for dataset '{definition.Name}'");
150+
}
151+
}
152+
else
153+
{
154+
throw new InvalidOperationException($"Cannot extract timestamp from blob key '{key}' for dataset '{definition.Name}'");
155+
}
114156
}
115157

116158
private static string GetBlobKeyPrefix(DataSetDefinition definition, DateOnly date)

0 commit comments

Comments
 (0)