Skip to content

Commit 6ebc696

Browse files
committed
More ingestion metrics
1 parent 23250c4 commit 6ebc696

File tree

2 files changed

+128
-32
lines changed

2 files changed

+128
-32
lines changed

src/KeeperData.Core/ETL/Impl/IngestionPipeline.cs

Lines changed: 125 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ private async Task<FileIngestionMetrics> IngestFileAsync(
226226
StorageObjectInfo file,
227227
CancellationToken ct)
228228
{
229-
var stopwatch = Stopwatch.StartNew();
229+
var overallStopwatch = Stopwatch.StartNew();
230230
var collectionName = fileSet.Definition.Name;
231231

232232
logger.LogInformation("Starting ingestion of file {FileKey} into collection {CollectionName}",
@@ -235,42 +235,114 @@ private async Task<FileIngestionMetrics> IngestFileAsync(
235235
var collection = await EnsureCollectionExistsAsync(collectionName, ct);
236236
await EnsureWildcardIndexExistsAsync(collection, ct);
237237

238-
var csvContext = await OpenCsvFileAsync(blobs, file.Key, ct);
238+
string? tempFilePath = null;
239+
try
240+
{
241+
// Track S3 download time
242+
var downloadStopwatch = Stopwatch.StartNew();
243+
tempFilePath = await DownloadToTempFileAsync(blobs, file.Key, ct);
244+
downloadStopwatch.Stop();
239245

240-
var headers = await ReadAndValidateHeadersAsync(
241-
csvContext.Csv,
242-
file.Key,
243-
fileSet.Definition.PrimaryKeyHeaderNames,
244-
fileSet.Definition.ChangeTypeHeaderName);
246+
logger.LogInformation("Downloaded file {FileKey} to temp storage: {TempPath} in {DownloadDuration}ms",
247+
file.Key, tempFilePath, downloadStopwatch.ElapsedMilliseconds);
245248

246-
var metrics = await ProcessCsvRecordsAsync(
247-
importId,
248-
collection,
249-
csvContext.Csv,
250-
headers,
251-
file.Key,
252-
collectionName,
253-
fileSet.Definition,
254-
ct);
249+
// Track MongoDB ingestion time
250+
var mongoIngestionStopwatch = Stopwatch.StartNew();
255251

256-
stopwatch.Stop();
257-
logger.LogInformation("Completed ingestion of file {FileKey}. Total records: {TotalRecords}, Created: {Created}, Updated: {Updated}, Deleted: {Deleted}, Duration: {Duration}ms",
258-
file.Key,
259-
metrics.RecordsProcessed,
260-
metrics.RecordsCreated,
261-
metrics.RecordsUpdated,
262-
metrics.RecordsDeleted,
263-
stopwatch.ElapsedMilliseconds);
252+
var csvContext = await OpenCsvFileFromDiskAsync(tempFilePath, ct);
253+
254+
var headers = await ReadAndValidateHeadersAsync(
255+
csvContext.Csv,
256+
file.Key,
257+
fileSet.Definition.PrimaryKeyHeaderNames,
258+
fileSet.Definition.ChangeTypeHeaderName);
259+
260+
var metrics = await ProcessCsvRecordsAsync(
261+
importId,
262+
collection,
263+
csvContext.Csv,
264+
headers,
265+
file.Key,
266+
collectionName,
267+
fileSet.Definition,
268+
ct);
269+
270+
await csvContext.DisposeAsync();
264271

265-
return metrics;
272+
mongoIngestionStopwatch.Stop();
273+
overallStopwatch.Stop();
274+
275+
logger.LogInformation("Completed ingestion of file {FileKey}. Total records: {TotalRecords}, Created: {Created}, Updated: {Updated}, Deleted: {Deleted}, S3 Download: {DownloadDuration}ms, MongoDB Ingestion: {MongoIngestionDuration}ms, Total Duration: {TotalDuration}ms, Avg Record Processing: {AvgMs:F2}ms/record",
276+
file.Key,
277+
metrics.RecordsProcessed,
278+
metrics.RecordsCreated,
279+
metrics.RecordsUpdated,
280+
metrics.RecordsDeleted,
281+
downloadStopwatch.ElapsedMilliseconds,
282+
mongoIngestionStopwatch.ElapsedMilliseconds,
283+
overallStopwatch.ElapsedMilliseconds,
284+
metrics.AverageMongoIngestionMs);
285+
286+
return metrics with
287+
{
288+
S3DownloadDurationMs = downloadStopwatch.ElapsedMilliseconds,
289+
MongoIngestionDurationMs = mongoIngestionStopwatch.ElapsedMilliseconds
290+
};
291+
}
292+
finally
293+
{
294+
// Ensure temp file is always cleaned up
295+
if (tempFilePath != null && File.Exists(tempFilePath))
296+
{
297+
try
298+
{
299+
File.Delete(tempFilePath);
300+
logger.LogDebug("Deleted temp file: {TempPath}", tempFilePath);
301+
}
302+
catch (Exception ex)
303+
{
304+
logger.LogWarning(ex, "Failed to delete temp file: {TempPath}", tempFilePath);
305+
}
306+
}
307+
}
266308
}
267309

268-
private async Task<CsvContext> OpenCsvFileAsync(
310+
private async Task<string> DownloadToTempFileAsync(
269311
IBlobStorageService blobs,
270312
string fileKey,
271313
CancellationToken ct)
272314
{
273-
var stream = await blobs.OpenReadAsync(fileKey, ct);
315+
var tempFilePath = Path.Combine(Path.GetTempPath(), $"keeper_import_{Guid.NewGuid():N}.csv");
316+
317+
logger.LogDebug("Downloading {FileKey} to {TempPath}", fileKey, tempFilePath);
318+
319+
await using var sourceStream = await blobs.OpenReadAsync(fileKey, ct);
320+
await using var fileStream = new FileStream(
321+
tempFilePath,
322+
FileMode.Create,
323+
FileAccess.Write,
324+
FileShare.None,
325+
bufferSize: 81920, // 80KB buffer
326+
useAsync: true);
327+
328+
await sourceStream.CopyToAsync(fileStream, ct);
329+
await fileStream.FlushAsync(ct);
330+
331+
return tempFilePath;
332+
}
333+
334+
private async Task<CsvContext> OpenCsvFileFromDiskAsync(
335+
string filePath,
336+
CancellationToken ct)
337+
{
338+
var stream = new FileStream(
339+
filePath,
340+
FileMode.Open,
341+
FileAccess.Read,
342+
FileShare.Read,
343+
bufferSize: 81920, // 80KB buffer
344+
useAsync: true);
345+
274346
var reader = new StreamReader(stream);
275347
var csv = new CsvReader(reader, new CsvConfiguration(CultureInfo.InvariantCulture)
276348
{
@@ -351,6 +423,7 @@ private async Task<FileIngestionMetrics> ProcessCsvRecordsAsync(
351423
var metrics = new RecordMetricsAccumulator();
352424
var batch = new List<(BsonDocument Document, string ChangeType)>();
353425
var lineageEvents = new List<RecordLineageEvent>();
426+
var totalMongoProcessingMs = 0L;
354427

355428
while (await csv.ReadAsync())
356429
{
@@ -370,6 +443,8 @@ private async Task<FileIngestionMetrics> ProcessCsvRecordsAsync(
370443

371444
if (batch.Count >= BatchSize)
372445
{
446+
var batchStopwatch = Stopwatch.StartNew();
447+
373448
var batchMetrics = await ProcessBatchAsync(
374449
importId,
375450
collection,
@@ -380,6 +455,9 @@ private async Task<FileIngestionMetrics> ProcessCsvRecordsAsync(
380455
lineageEvents,
381456
ct);
382457

458+
batchStopwatch.Stop();
459+
totalMongoProcessingMs += batchStopwatch.ElapsedMilliseconds;
460+
383461
metrics.AddBatch(batchMetrics);
384462

385463
LogProgressIfNeeded(metrics.RecordsProcessed, fileKey);
@@ -409,7 +487,12 @@ private async Task<FileIngestionMetrics> ProcessCsvRecordsAsync(
409487
// Flush remaining lineage events
410488
await FlushLineageEventsAsync(lineageEvents, ct);
411489

412-
return metrics.ToFileMetrics();
490+
// Calculate average MongoDB ingestion time per record
491+
var avgMongoMs = metrics.RecordsProcessed > 0
492+
? (double)totalMongoProcessingMs / metrics.RecordsProcessed
493+
: 0;
494+
495+
return metrics.ToFileMetrics(avgMongoMs);
413496
}
414497

415498
private bool IsValidChangeType(string changeType)
@@ -818,6 +901,9 @@ private async Task RecordSuccessfulIngestionAsync(
818901
RecordsUpdated = metrics.RecordsUpdated,
819902
RecordsDeleted = metrics.RecordsDeleted,
820903
IngestionDurationMs = durationMs,
904+
AverageRecordIngestionMs = metrics.AverageMongoIngestionMs,
905+
S3DownloadDurationMs = metrics.S3DownloadDurationMs,
906+
MongoIngestionDurationMs = metrics.MongoIngestionDurationMs,
821907
IngestedAtUtc = DateTime.UtcNow,
822908
Status = FileProcessingStatus.Ingested
823909
}, ct);
@@ -826,9 +912,9 @@ private async Task RecordSuccessfulIngestionAsync(
826912
private async Task RecordFailedIngestionAsync(
827913
Guid importId,
828914
StorageObjectInfo file,
829-
long durationMs,
915+
long durationMs,
830916
Exception ex,
831-
CancellationToken ct)
917+
CancellationToken ct)
832918
{
833919
try
834920
{
@@ -840,6 +926,9 @@ private async Task RecordFailedIngestionAsync(
840926
RecordsUpdated = 0,
841927
RecordsDeleted = 0,
842928
IngestionDurationMs = durationMs,
929+
AverageRecordIngestionMs = 0,
930+
S3DownloadDurationMs = 0,
931+
MongoIngestionDurationMs = 0,
843932
IngestedAtUtc = DateTime.UtcNow,
844933
Status = FileProcessingStatus.Failed,
845934
Error = ex.Message
@@ -949,12 +1038,13 @@ public void AddBatch(BatchProcessingMetrics batchMetrics)
9491038
RecordsDeleted += batchMetrics.RecordsDeleted;
9501039
}
9511040

952-
public FileIngestionMetrics ToFileMetrics() => new()
1041+
public FileIngestionMetrics ToFileMetrics(double avgMongoIngestionMs) => new()
9531042
{
9541043
RecordsProcessed = RecordsProcessed,
9551044
RecordsCreated = RecordsCreated,
9561045
RecordsUpdated = RecordsUpdated,
957-
RecordsDeleted = RecordsDeleted
1046+
RecordsDeleted = RecordsDeleted,
1047+
AverageMongoIngestionMs = avgMongoIngestionMs
9581048
};
9591049
}
9601050

@@ -980,6 +1070,9 @@ private record FileIngestionMetrics
9801070
public int RecordsCreated { get; init; }
9811071
public int RecordsUpdated { get; init; }
9821072
public int RecordsDeleted { get; init; }
1073+
public double AverageMongoIngestionMs { get; init; }
1074+
public long S3DownloadDurationMs { get; init; }
1075+
public long MongoIngestionDurationMs { get; init; }
9831076
}
9841077

9851078
private record BatchProcessingMetrics

src/KeeperData.Core/Reporting/Dtos/FileIngestionRecord.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ public record FileIngestionRecord
88
public int RecordsUpdated { get; init; }
99
public int RecordsDeleted { get; init; }
1010
public long IngestionDurationMs { get; init; }
11+
public double AverageRecordIngestionMs { get; init; }
12+
public long S3DownloadDurationMs { get; init; }
13+
public long MongoIngestionDurationMs { get; init; }
1114
public DateTime IngestedAtUtc { get; init; }
1215
public FileProcessingStatus Status { get; init; }
1316
public string? Error { get; init; }

0 commit comments

Comments
 (0)