Skip to content

Commit 70d1f24

Browse files
committed
Bug fix: Prevent re-acquisition/re-ingestion of files if the files have not changed.
1 parent bc4f92d commit 70d1f24

File tree

13 files changed

+331
-109
lines changed

13 files changed

+331
-109
lines changed

src/KeeperData.Core/ETL/Impl/AcquisitionPipeline.cs

Lines changed: 118 additions & 53 deletions
Large diffs are not rendered by default.

src/KeeperData.Core/ETL/Impl/ImportOrchestrator.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public async Task StartAsync(Guid importId, string sourceType, CancellationToken
3131
report.CompletedAtUtc = DateTime.UtcNow;
3232
report.Error = ex.Message;
3333
await reportingService.UpsertImportReportAsync(report, ct);
34-
34+
3535
throw;
3636
}
3737
}

src/KeeperData.Core/ETL/Impl/IngestionPipeline.cs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public async Task StartAsync(ImportReport report, CancellationToken ct)
118118
return (fileSets, totalFiles);
119119
}
120120

121-
private async Task<IngestionTotals> ProcessAllFilesAsync(ImportReport report, ImmutableList<FileSet> fileSets, int totalFiles,
121+
private async Task<IngestionTotals> ProcessAllFilesAsync(ImportReport report, ImmutableList<FileSet> fileSets, int totalFiles,
122122
IBlobStorageService blobStorage, CancellationToken ct)
123123
{
124124
Debug.WriteLine($"[keepetl] Step 2: Processing and ingesting {totalFiles} files for ImportId: {report.ImportId}");
@@ -132,7 +132,9 @@ private async Task<IngestionTotals> ProcessAllFilesAsync(ImportReport report, Im
132132
Debug.WriteLine($"[keepetl] Processing file set: {fileSet.Definition.Name} with {fileSet.Files.Length} file(s)");
133133
logger.LogDebug("Processing file set for definition: {DefinitionName} with {FileCount} file(s) for ImportId: {ImportId}", fileSet.Definition.Name, fileSet.Files.Length, report.ImportId);
134134

135-
foreach (var file in fileSet.Files)
135+
var orderedFiles = fileSet.Files.OrderBy(f => f.Timestamp).ToArray();
136+
137+
foreach (var file in orderedFiles)
136138
{
137139
processedFileCount++;
138140

@@ -159,7 +161,7 @@ private async Task<IngestionTotals> ProcessAllFilesAsync(ImportReport report, Im
159161
return totals with { FilesProcessed = processedFileCount };
160162
}
161163

162-
private async Task<IngestionTotals> ProcessSingleFileAsync(Guid importId, FileSet fileSet, EtlFile file, int currentFileNumber,
164+
private async Task<IngestionTotals> ProcessSingleFileAsync(Guid importId, FileSet fileSet, EtlFile file, int currentFileNumber,
163165
int totalFiles, IBlobStorageService blobStorage, ImportReport report, CancellationToken ct)
164166
{
165167
var fileStopwatch = Stopwatch.StartNew();
@@ -169,17 +171,17 @@ private async Task<IngestionTotals> ProcessSingleFileAsync(Guid importId, FileSe
169171

170172
try
171173
{
172-
// Check if file has already been ingested by retrieving MD5 from S3 metadata
174+
// Check if file has already been ingested by retrieving ETag from S3 metadata
173175
var isAlreadyIngested = await IsFileAlreadyIngestedAsync(file.StorageObject.Key, blobStorage, ct);
174176
if (isAlreadyIngested)
175177
{
176178
fileStopwatch.Stop();
177179
Debug.WriteLine($"[keepetl] Skipping file {file.StorageObject.Key} - already ingested in a previous import");
178180
logger.LogInformation("Skipping file {FileKey} - already ingested in a previous import for ImportId: {ImportId}", file.StorageObject.Key, importId);
179181

180-
return new IngestionTotals()
181-
{
182-
FilesSkipped = 1
182+
return new IngestionTotals()
183+
{
184+
FilesSkipped = 1
183185
};
184186
}
185187

@@ -189,7 +191,7 @@ private async Task<IngestionTotals> ProcessSingleFileAsync(Guid importId, FileSe
189191
await RecordSuccessfulIngestionAsync(importId, file, fileMetrics, fileStopwatch.ElapsedMilliseconds, ct);
190192

191193
Debug.WriteLine($"[keepetl] Successfully ingested file: {file.StorageObject.Key} - Created: {fileMetrics.RecordsCreated}, Updated: {fileMetrics.RecordsUpdated}, Deleted: {fileMetrics.RecordsDeleted}, Duration: {fileStopwatch.ElapsedMilliseconds}ms");
192-
logger.LogInformation("Successfully ingested file: {FileKey} - Created: {Created}, Updated: {Updated}, Deleted: {Deleted}, Duration: {Duration}ms",
194+
logger.LogInformation("Successfully ingested file: {FileKey} - Created: {Created}, Updated: {Updated}, Deleted: {Deleted}, Duration: {Duration}ms",
193195
file.StorageObject.Key, fileMetrics.RecordsCreated, fileMetrics.RecordsUpdated, fileMetrics.RecordsDeleted, fileStopwatch.ElapsedMilliseconds);
194196

195197
return new IngestionTotals
@@ -221,23 +223,23 @@ private async Task<bool> IsFileAlreadyIngestedAsync(
221223
{
222224
try
223225
{
224-
// Retrieve file metadata from S3 to get the MD5 hash
226+
// Retrieve file metadata from S3 to get the ETag
225227
var metadata = await blobStorage.GetMetadataAsync(fileKey, ct);
226228

227-
if (!metadata.UserMetadata.TryGetValue("MD5Hash", out var md5Hash) || string.IsNullOrEmpty(md5Hash))
229+
if (string.IsNullOrEmpty(metadata.ETag))
228230
{
229-
Debug.WriteLine($"[keepetl] No MD5 hash found in S3 metadata for file {fileKey} - will proceed with ingestion");
230-
logger.LogDebug("No MD5 hash found in S3 metadata for file {FileKey} - will proceed with ingestion", fileKey);
231+
Debug.WriteLine($"[keepetl] No ETag found for file {fileKey} - will proceed with ingestion");
232+
logger.LogDebug("No ETag found for file {FileKey} - will proceed with ingestion", fileKey);
231233
return false;
232234
}
233235

234-
// Check if a file with this key and MD5 has been previously ingested (not just acquired)
235-
var isIngested = await reportingService.IsFileIngestedAsync(fileKey, md5Hash, ct);
236+
// Check if a file with this key and ETag has been previously ingested (not just acquired)
237+
var isIngested = await reportingService.IsFileIngestedAsync(fileKey, metadata.ETag, ct);
236238

237239
if (isIngested)
238240
{
239-
Debug.WriteLine($"[keepetl] File {fileKey} with MD5 {md5Hash} has already been ingested");
240-
logger.LogWarning("File {FileKey} with MD5 {Md5Hash} has already been ingested in a previous import", fileKey, md5Hash);
241+
Debug.WriteLine($"[keepetl] File {fileKey} with ETag {metadata.ETag} has already been ingested");
242+
logger.LogWarning("File {FileKey} with ETag {ETag} has already been ingested in a previous import", fileKey, metadata.ETag);
241243
}
242244

243245
return isIngested;
@@ -296,7 +298,7 @@ private async Task<FileIngestionMetrics> IngestFileAsync(Guid importId, IBlobSto
296298
fileSet.Definition.PrimaryKeyHeaderNames,
297299
fileSet.Definition.ChangeTypeHeaderName);
298300

299-
var metrics = await ProcessCsvRecordsAsync(importId, collection, csvContext.Csv, headers, file.StorageObject.Key,
301+
var metrics = await ProcessCsvRecordsAsync(importId, collection, csvContext.Csv, headers, file.StorageObject.Key,
300302
collectionName, fileSet.Definition, progressTracker, report, ct);
301303

302304
await csvContext.DisposeAsync();
@@ -448,8 +450,8 @@ private void ValidateChangeTypeHeader(string[] headers, string changeTypeHeaderN
448450
}
449451

450452

451-
private async Task<FileIngestionMetrics> ProcessCsvRecordsAsync(Guid importId, IMongoCollection<BsonDocument> collection, CsvReader csv,
452-
CsvHeaders headers, string fileKey, string collectionName, DataSetDefinition definition, IngestionProgressTracker progressTracker,
453+
private async Task<FileIngestionMetrics> ProcessCsvRecordsAsync(Guid importId, IMongoCollection<BsonDocument> collection, CsvReader csv,
454+
CsvHeaders headers, string fileKey, string collectionName, DataSetDefinition definition, IngestionProgressTracker progressTracker,
453455
ImportReport report, CancellationToken ct)
454456
{
455457
Debug.WriteLine($"[keepetl] Starting to process CSV records for file: {fileKey}");

src/KeeperData.Core/EtlConstants.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,9 @@ public static class EtlConstants
66
public const string DateTimePattern = "yyyyMMddHHmmss";
77
public const string CompositeKeyDelimiter = "@@";
88
public const string LineageEventIdDelimiter = "||";
9+
10+
// S3-compliant metadata keys (lowercase with x-amz-meta- prefix)
11+
// S3 automatically prefixes user metadata with "x-amz-meta-" and lowercases the keys
12+
public const string MetadataKeySourceEncryptedLength = "x-amz-meta-sourceencryptedlength";
13+
public const string MetadataKeySourceETag = "x-amz-meta-sourceetag";
914
}

src/KeeperData.Core/Reporting/Domain/ImportFileDocument.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class ImportFileDocument
1717
public required string FileName { get; init; }
1818
public required string FileKey { get; init; }
1919
public required string DatasetName { get; init; }
20-
public required string Md5Hash { get; init; }
20+
public required string ETag { get; init; }
2121
public long FileSize { get; init; }
2222
public required string Status { get; init; }
2323

src/KeeperData.Core/Reporting/Domain/ImportReportDocument.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class AcquisitionPhaseDocument
2828
public int FilesDiscovered { get; set; }
2929
public int FilesProcessed { get; set; }
3030
public int FilesFailed { get; set; }
31+
public int FilesSkipped { get; set; }
3132
public DateTime? StartedAtUtc { get; set; }
3233
public DateTime? CompletedAtUtc { get; set; }
3334
}
@@ -36,6 +37,7 @@ public class IngestionPhaseDocument
3637
{
3738
public required string Status { get; set; }
3839
public int FilesProcessed { get; set; }
40+
public int FilesSkipped { get; set; }
3941
public int RecordsCreated { get; set; }
4042
public int RecordsUpdated { get; set; }
4143
public int RecordsDeleted { get; set; }

src/KeeperData.Core/Reporting/Dtos/FileAcquisitionRecord.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ public record FileAcquisitionRecord
55
public required string FileName { get; init; }
66
public required string FileKey { get; init; }
77
public required string DatasetName { get; init; }
8-
public required string Md5Hash { get; init; }
8+
public required string ETag { get; init; }
99
public long FileSize { get; init; }
1010
public required string SourceKey { get; init; }
1111
public long DecryptionDurationMs { get; init; }

src/KeeperData.Core/Reporting/Dtos/FileProcessingReport.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ public record FileProcessingReport
2626
public required string DatasetName { get; init; }
2727

2828
/// <summary>
29-
/// Gets the MD5 hash of the file content for integrity verification.
29+
/// Gets the ETag of the file content for integrity verification.
3030
/// </summary>
31-
public required string Md5Hash { get; init; }
31+
public required string ETag { get; init; }
3232

3333
/// <summary>
3434
/// Gets the file size in bytes.

src/KeeperData.Core/Reporting/Dtos/ImportReport.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public record IngestionPhaseReport
128128
public DateTime? CompletedAtUtc { get; set; }
129129

130130
public IngestionCurrentFileStatus? CurrentFileStatus { get; set; }
131-
131+
132132
public int FilesSkipped { get; set; }
133133
}
134134

src/KeeperData.Core/Reporting/IImportReportingService.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ public interface IImportReportingService
99
Task UpsertImportReportAsync(ImportReport report, CancellationToken ct);
1010

1111
// File-level operations
12-
Task<bool> IsFileProcessedAsync(string fileKey, string md5Hash, CancellationToken ct);
13-
Task<bool> IsFileIngestedAsync(string fileKey, string md5Hash, CancellationToken ct);
12+
Task<bool> IsFileProcessedAsync(string fileKey, string etag, CancellationToken ct);
13+
Task<bool> IsFileIngestedAsync(string fileKey, string etag, CancellationToken ct);
1414
Task RecordFileAcquisitionAsync(Guid importId, FileAcquisitionRecord record, CancellationToken ct);
1515
Task RecordFileIngestionAsync(Guid importId, FileIngestionRecord record, CancellationToken ct);
1616

0 commit comments

Comments
 (0)