Skip to content

Commit 0589844

Browse files
committed
Ingestion progress tracking and reporting + tests
Wildcard index bug fix
1 parent 6ebc696 commit 0589844

File tree

12 files changed

+1014
-6
lines changed

12 files changed

+1014
-6
lines changed

src/KeeperData.Core/ETL/Impl/IngestionPipeline.cs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using CsvHelper.Configuration;
33
using KeeperData.Core.Database;
44
using KeeperData.Core.ETL.Abstract;
5+
using KeeperData.Core.ETL.Utils;
56
using KeeperData.Core.Reporting;
67
using KeeperData.Core.Reporting.Dtos;
78
using KeeperData.Core.Storage;
@@ -25,12 +26,14 @@ public class IngestionPipeline(
2526
IMongoClient mongoClient,
2627
IOptions<IDatabaseConfig> databaseConfig,
2728
IImportReportingService reportingService,
29+
CsvRowCounter csvRowCounter,
2830
ILogger<IngestionPipeline> logger) : IIngestionPipeline
2931
{
3032
private const int BatchSize = 1000;
3133
private const int LogInterval = 100;
3234
private const int LineageEventBatchSize = 500;
3335
private readonly IDatabaseConfig _databaseConfig = databaseConfig.Value;
36+
private readonly CsvRowCounter _rowCounter = csvRowCounter;
3437

3538
// MongoDB field name constants
3639
private const string FieldId = "_id";
@@ -112,7 +115,8 @@ private async Task UpdateIngestionPhaseStartedAsync(Guid importId, CancellationT
112115
FilesProcessed = 0,
113116
RecordsCreated = 0,
114117
RecordsUpdated = 0,
115-
RecordsDeleted = 0
118+
RecordsDeleted = 0,
119+
CurrentFileStatus = null
116120
}, ct);
117121
}
118122

@@ -151,11 +155,12 @@ CancellationToken ct
151155

152156
totals = totals.Add(fileResult);
153157

154-
// Update progress after each file
158+
// Clear current file status after completion and update overall progress
155159
await UpdateIngestionPhaseProgressAsync(
156160
importId,
157161
processedFileCount,
158162
totals,
163+
null, // Clear current file status
159164
ct);
160165
}
161166
}
@@ -236,6 +241,8 @@ private async Task<FileIngestionMetrics> IngestFileAsync(
236241
await EnsureWildcardIndexExistsAsync(collection, ct);
237242

238243
string? tempFilePath = null;
244+
IngestionProgressTracker? progressTracker = null;
245+
239246
try
240247
{
241248
// Track S3 download time
@@ -246,6 +253,13 @@ private async Task<FileIngestionMetrics> IngestFileAsync(
246253
logger.LogInformation("Downloaded file {FileKey} to temp storage: {TempPath} in {DownloadDuration}ms",
247254
file.Key, tempFilePath, downloadStopwatch.ElapsedMilliseconds);
248255

256+
// Count rows for progress tracking
257+
var estimatedRowCount = await _rowCounter.CountRowsAsync(tempFilePath, ct);
258+
progressTracker = new IngestionProgressTracker(file.Key, estimatedRowCount);
259+
260+
logger.LogInformation("File {FileKey} has approximately {RowCount} data rows to process",
261+
file.Key, estimatedRowCount);
262+
249263
// Track MongoDB ingestion time
250264
var mongoIngestionStopwatch = Stopwatch.StartNew();
251265

@@ -265,6 +279,7 @@ private async Task<FileIngestionMetrics> IngestFileAsync(
265279
file.Key,
266280
collectionName,
267281
fileSet.Definition,
282+
progressTracker,
268283
ct);
269284

270285
await csvContext.DisposeAsync();
@@ -418,12 +433,14 @@ private async Task<FileIngestionMetrics> ProcessCsvRecordsAsync(
418433
string fileKey,
419434
string collectionName,
420435
DataSetDefinition definition,
436+
IngestionProgressTracker progressTracker,
421437
CancellationToken ct)
422438
{
423439
var metrics = new RecordMetricsAccumulator();
424440
var batch = new List<(BsonDocument Document, string ChangeType)>();
425441
var lineageEvents = new List<RecordLineageEvent>();
426442
var totalMongoProcessingMs = 0L;
443+
var totals = new IngestionTotals();
427444

428445
while (await csv.ReadAsync())
429446
{
@@ -459,8 +476,27 @@ private async Task<FileIngestionMetrics> ProcessCsvRecordsAsync(
459476
totalMongoProcessingMs += batchStopwatch.ElapsedMilliseconds;
460477

461478
metrics.AddBatch(batchMetrics);
479+
totals = totals.Add(new IngestionTotals
480+
{
481+
RecordsCreated = batchMetrics.RecordsCreated,
482+
RecordsUpdated = batchMetrics.RecordsUpdated,
483+
RecordsDeleted = batchMetrics.RecordsDeleted
484+
});
485+
486+
// Update progress tracking and report every 100 records
487+
progressTracker.UpdateProgress(metrics.RecordsProcessed);
462488

463-
LogProgressIfNeeded(metrics.RecordsProcessed, fileKey);
489+
if (metrics.RecordsProcessed % LogInterval == 0)
490+
{
491+
LogProgressIfNeeded(metrics.RecordsProcessed, fileKey);
492+
493+
var currentStatus = progressTracker.GetCurrentStatus();
494+
await UpdateIngestionPhaseProgressWithFileStatusAsync(
495+
importId,
496+
totals,
497+
currentStatus,
498+
ct);
499+
}
464500

465501
batch.Clear();
466502

@@ -482,6 +518,16 @@ private async Task<FileIngestionMetrics> ProcessCsvRecordsAsync(
482518
ct);
483519

484520
metrics.AddBatch(batchMetrics);
521+
522+
// Final progress update
523+
progressTracker.UpdateProgress(metrics.RecordsProcessed);
524+
var finalStatus = progressTracker.Complete();
525+
526+
await UpdateIngestionPhaseProgressWithFileStatusAsync(
527+
importId,
528+
totals,
529+
finalStatus,
530+
ct);
485531
}
486532

487533
// Flush remaining lineage events
@@ -597,7 +643,7 @@ private async Task CreateWildcardIndexAsync(
597643
logger.LogInformation("Creating wildcard index on collection {CollectionName}",
598644
collection.CollectionNamespace.CollectionName);
599645

600-
var wildcardIndexKeys = Builders<BsonDocument>.IndexKeys.Wildcard("$**");
646+
var wildcardIndexKeys = Builders<BsonDocument>.IndexKeys.Wildcard();
601647
var indexModel = new CreateIndexModel<BsonDocument>(wildcardIndexKeys);
602648

603649
await collection.Indexes.CreateOneAsync(indexModel, cancellationToken: ct);
@@ -944,6 +990,7 @@ private async Task UpdateIngestionPhaseProgressAsync(
944990
Guid importId,
945991
int filesProcessed,
946992
IngestionTotals totals,
993+
IngestionCurrentFileStatus? currentFileStatus,
947994
CancellationToken ct)
948995
{
949996
await reportingService.UpdateIngestionPhaseAsync(importId, new IngestionPhaseUpdate
@@ -952,7 +999,25 @@ private async Task UpdateIngestionPhaseProgressAsync(
952999
FilesProcessed = filesProcessed,
9531000
RecordsCreated = totals.RecordsCreated,
9541001
RecordsUpdated = totals.RecordsUpdated,
955-
RecordsDeleted = totals.RecordsDeleted
1002+
RecordsDeleted = totals.RecordsDeleted,
1003+
CurrentFileStatus = currentFileStatus
1004+
}, ct);
1005+
}
1006+
1007+
private async Task UpdateIngestionPhaseProgressWithFileStatusAsync(
1008+
Guid importId,
1009+
IngestionTotals totals,
1010+
IngestionCurrentFileStatus currentFileStatus,
1011+
CancellationToken ct)
1012+
{
1013+
await reportingService.UpdateIngestionPhaseAsync(importId, new IngestionPhaseUpdate
1014+
{
1015+
Status = PhaseStatus.Started,
1016+
FilesProcessed = 0, // Not updated during file processing
1017+
RecordsCreated = totals.RecordsCreated,
1018+
RecordsUpdated = totals.RecordsUpdated,
1019+
RecordsDeleted = totals.RecordsDeleted,
1020+
CurrentFileStatus = currentFileStatus
9561021
}, ct);
9571022
}
9581023

src/KeeperData.Core/ETL/Setup/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using KeeperData.Core.ETL.Abstract;
22
using KeeperData.Core.ETL.Impl;
3+
using KeeperData.Core.ETL.Utils;
34
using KeeperData.Core.Reporting.Setup;
45
using Microsoft.Extensions.DependencyInjection;
56
using System.Diagnostics.CodeAnalysis;
@@ -12,6 +13,7 @@ public static class ServiceCollectionExtensions
1213
public static void AddEtlDependencies(this IServiceCollection services)
1314
{
1415
services.AddSingleton<IDataSetDefinitions>(_ => StandardDataSetDefinitionsBuilder.Build());
16+
services.AddTransient<CsvRowCounter>();
1517
services.AddTransient<IExternalCatalogueServiceFactory, ExternalCatalogueServiceFactory>();
1618
services.AddTransient<IIngestionPipeline, IngestionPipeline>();
1719
services.AddTransient<IAcquisitionPipeline, AcquisitionPipeline>();
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
using Microsoft.Extensions.Logging;
2+
3+
namespace KeeperData.Core.ETL.Utils;
4+
5+
/// <summary>
6+
/// Utility class for counting rows in CSV files efficiently.
7+
/// </summary>
8+
public class CsvRowCounter
9+
{
10+
private readonly ILogger<CsvRowCounter> _logger;
11+
private const int BufferSize = 65536; // 64KB buffer for efficient reading
12+
private const int MaxRetryAttempts = 3;
13+
private const int RetryDelayMs = 100;
14+
15+
public CsvRowCounter(ILogger<CsvRowCounter> logger)
16+
{
17+
_logger = logger;
18+
}
19+
20+
/// <summary>
21+
/// Counts the approximate number of data rows in a CSV file (excluding header).
22+
/// This is a fast approximation that counts newlines and may not be 100% accurate
23+
/// for CSVs with quoted fields containing newlines.
24+
/// </summary>
25+
public async Task<int> CountRowsAsync(string filePath, CancellationToken ct)
26+
{
27+
for (int attempt = 1; attempt <= MaxRetryAttempts; attempt++)
28+
{
29+
try
30+
{
31+
return await CountRowsInternalAsync(filePath, ct);
32+
}
33+
catch (Exception ex) when (attempt < MaxRetryAttempts)
34+
{
35+
_logger.LogWarning(ex, "Failed to count rows in file {FilePath} on attempt {Attempt}/{MaxAttempts}. Retrying...",
36+
filePath, attempt, MaxRetryAttempts);
37+
38+
await Task.Delay(RetryDelayMs * attempt, ct);
39+
}
40+
catch (Exception ex)
41+
{
42+
_logger.LogError(ex, "Failed to count rows in file {FilePath} after {MaxAttempts} attempts",
43+
filePath, MaxRetryAttempts);
44+
throw;
45+
}
46+
}
47+
48+
// This should never be reached due to the throw in the catch block
49+
return 0;
50+
}
51+
52+
private async Task<int> CountRowsInternalAsync(string filePath, CancellationToken ct)
53+
{
54+
if (!File.Exists(filePath))
55+
{
56+
throw new FileNotFoundException($"CSV file not found: {filePath}");
57+
}
58+
59+
var fileInfo = new FileInfo(filePath);
60+
61+
// Handle empty files
62+
if (fileInfo.Length == 0)
63+
{
64+
_logger.LogDebug("File {FilePath} is empty, returning 0 rows", filePath);
65+
return 0;
66+
}
67+
68+
await using var fileStream = new FileStream(
69+
filePath,
70+
FileMode.Open,
71+
FileAccess.Read,
72+
FileShare.Read,
73+
BufferSize,
74+
useAsync: true);
75+
76+
var buffer = new byte[BufferSize];
77+
var rowCount = 0;
78+
var bytesRead = 0;
79+
var lastCharWasNewline = false;
80+
81+
while ((bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length, ct)) > 0)
82+
{
83+
for (int i = 0; i < bytesRead; i++)
84+
{
85+
if (buffer[i] == '\n')
86+
{
87+
rowCount++;
88+
lastCharWasNewline = true;
89+
}
90+
else if (buffer[i] == '\r')
91+
{
92+
// Handle Windows line endings (\r\n)
93+
// Don't double-count if next char is \n
94+
if (i + 1 < bytesRead && buffer[i + 1] == '\n')
95+
{
96+
i++; // Skip the \n
97+
}
98+
rowCount++;
99+
lastCharWasNewline = true;
100+
}
101+
else
102+
{
103+
lastCharWasNewline = false;
104+
}
105+
}
106+
}
107+
108+
// If file doesn't end with newline, we still have a row
109+
if (!lastCharWasNewline && fileInfo.Length > 0)
110+
{
111+
rowCount++;
112+
}
113+
114+
// Subtract 1 for header row (if file has at least one row)
115+
var dataRows = Math.Max(0, rowCount - 1);
116+
117+
_logger.LogDebug("Counted approximately {RowCount} data rows in file {FilePath} (excluding header)",
118+
dataRows, filePath);
119+
120+
return dataRows;
121+
}
122+
}

0 commit comments

Comments
 (0)