Skip to content

Commit 23250c4

Browse files
committed
Bug fix: pipes rather than commas
1 parent 2c39818 commit 23250c4

File tree

5 files changed

+49
-102
lines changed

5 files changed

+49
-102
lines changed

src/KeeperData.Core/ETL/Impl/IngestionPipeline.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1+
using CsvHelper;
2+
using CsvHelper.Configuration;
3+
using KeeperData.Core.Database;
14
using KeeperData.Core.ETL.Abstract;
25
using KeeperData.Core.Reporting;
36
using KeeperData.Core.Reporting.Dtos;
47
using KeeperData.Core.Storage;
58
using KeeperData.Core.Storage.Dtos;
69
using Microsoft.Extensions.Logging;
7-
using System.Diagnostics;
8-
using MongoDB.Driver;
9-
using MongoDB.Bson;
10-
using CsvHelper;
11-
using System.Globalization;
12-
using CsvHelper.Configuration;
1310
using Microsoft.Extensions.Options;
14-
using KeeperData.Core.Database;
11+
using MongoDB.Bson;
12+
using MongoDB.Driver;
1513
using System.Collections.Immutable;
14+
using System.Diagnostics;
15+
using System.Globalization;
1616

1717
namespace KeeperData.Core.ETL.Impl;
1818

@@ -276,7 +276,8 @@ private async Task<CsvContext> OpenCsvFileAsync(
276276
{
277277
HasHeaderRecord = true,
278278
TrimOptions = TrimOptions.Trim,
279-
BadDataFound = null // Ignore bad data
279+
BadDataFound = null, // Ignore bad data
280+
Delimiter = "|" // Use pipe delimiter for CSV files
280281
});
281282

282283
return new CsvContext(stream, reader, csv);

src/KeeperData.Crypto.Tool/Services/CsvDataGenerator.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ private async Task<List<string>> GenerateMainFileAsync(
6363
var config = new CsvConfiguration(CultureInfo.InvariantCulture)
6464
{
6565
HasHeaderRecord = true,
66+
Delimiter = "|" // Use pipe delimiter to match production format
6667
};
6768

6869
await using var writer = new StreamWriter(filePath);
@@ -116,6 +117,7 @@ private async Task GenerateDeltaFileAsync(
116117
var config = new CsvConfiguration(CultureInfo.InvariantCulture)
117118
{
118119
HasHeaderRecord = true,
120+
Delimiter = "|" // Use pipe delimiter to match production format
119121
};
120122

121123
await using var writer = new StreamWriter(filePath);

tests/KeeperData.Bridge.Tests.Integration/Core/ETL/IngestionPipelineCompositeKeyTests.cs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,10 @@ public async Task StartAsync_WithCompositeKey_ContainingSpecialCharacters_Should
209209

210210
// CPH contains slashes, HOLDER_ID contains alphanumeric
211211
// Generate CSV with correct headers matching the DataSetDefinition
212-
var csvContent = "CPH,HOLDER_ID,NAME,CHANGE_TYPE\n" +
213-
"12/345/6789,H001,Holder One,I\n" +
214-
"98/765/4321,H002,Holder Two,I\n" +
215-
"11/222/3333,H123ABC,Holder Three,I\n";
212+
var csvContent = "CPH|HOLDER_ID|NAME|CHANGE_TYPE\n"
213+
+ "12/345/6789|H001|Holder One|I\n"
214+
+ "98/765/4321|H002|Holder Two|I\n"
215+
+ "11/222/3333|H123ABC|Holder Three|I\n";
216216

217217
var fileName = $"TEST_SPECIAL_{testDate:yyyyMMdd}120000.csv";
218218
var fileKey = $"{DestinationFolder}/{fileName}";
@@ -379,7 +379,7 @@ public async Task StartAsync_WithCompositeKey_MissingOneKeyColumn_ShouldThrowExc
379379
Accumulators: []);
380380

381381
// CSV missing FARM_ID column
382-
var csvContent = "REGION,NAME,CHANGE_TYPE\nNORTH,Farm Alpha,I\n";
382+
var csvContent = "REGION|NAME|CHANGE_TYPE\nNORTH|Farm Alpha|I\n";
383383

384384
var fileName = $"TEST_MISSING_{testDate:yyyyMMdd}120000.csv";
385385
var fileKey = $"{DestinationFolder}/{fileName}";
@@ -410,10 +410,10 @@ public async Task StartAsync_WithCompositeKey_NullKeyPart_ShouldCreateIdWithEmpt
410410
Accumulators: []);
411411

412412
// CSV with null/empty value in one key part
413-
var csvContent = "REGION,FARM_ID,NAME,CHANGE_TYPE\n"
414-
+ "NORTH,F001,Farm Alpha,I\n"
415-
+ ",F002,Farm Beta,I\n"
416-
+ "SOUTH,,Farm Gamma,I\n";
413+
var csvContent = "REGION|FARM_ID|NAME|CHANGE_TYPE\n"
414+
+ "NORTH|F001|Farm Alpha|I\n"
415+
+ "|F002|Farm Beta|I\n"
416+
+ "SOUTH||Farm Gamma|I\n";
417417

418418
var fileName = $"TEST_NULL_{testDate:yyyyMMdd}120000.csv";
419419
var fileKey = $"{DestinationFolder}/{fileName}";
@@ -461,8 +461,8 @@ public async Task StartAsync_WithCompositeKey_AndAccumulators_ShouldWorkTogether
461461
Accumulators: ["DISEASE_TYPE", "ANIMAL_CODE"]);
462462

463463
// First import
464-
var csvContent1 = "REGION,FARM_ID,NAME,DISEASE_TYPE,ANIMAL_CODE,CHANGE_TYPE\n"
465-
+ "NORTH,F001,Farm Alpha,BVD,BOVINE,I\n";
464+
var csvContent1 = "REGION|FARM_ID|NAME|DISEASE_TYPE|ANIMAL_CODE|CHANGE_TYPE\n"
465+
+ "NORTH|F001|Farm Alpha|BVD|BOVINE|I\n";
466466
var fileName1 = $"TEST_COMP_ACC_{testDate:yyyyMMdd}120000.csv";
467467
await UploadCsvToS3($"{DestinationFolder}/{fileName1}", csvContent1);
468468
await IngestWithCustomDefinition(Guid.NewGuid(), dataSetDefinition);
@@ -476,8 +476,8 @@ await _localStackFixture.S3Client.DeleteObjectAsync(new DeleteObjectRequest
476476
_createdTestFileKeys.Remove($"{DestinationFolder}/{fileName1}");
477477

478478
// Second import - same composite key, different accumulator values
479-
var csvContent2 = "REGION,FARM_ID,NAME,DISEASE_TYPE,ANIMAL_CODE,CHANGE_TYPE\n" +
480-
"NORTH,F001,Farm Alpha Updated,IBR,OVINE,U\n";
479+
var csvContent2 = "REGION|FARM_ID|NAME|DISEASE_TYPE|ANIMAL_CODE|CHANGE_TYPE\n" +
480+
"NORTH|F001|Farm Alpha Updated|IBR|OVINE|U\n";
481481
var testDate2 = testDate.AddDays(-1);
482482
var fileName2 = $"TEST_COMP_ACC_{testDate2:yyyyMMdd}120000.csv";
483483
await UploadCsvToS3($"{DestinationFolder}/{fileName2}", csvContent2);
@@ -611,11 +611,11 @@ private IExternalCatalogueServiceFactory CreateExternalCatalogueServiceFactory(I
611611
private string GenerateCompositeKeyCsv((string key1, string key2, string name, string changeType)[] records)
612612
{
613613
var sb = new StringBuilder();
614-
sb.AppendLine("REGION,FARM_ID,NAME,CHANGE_TYPE");
614+
sb.AppendLine("REGION|FARM_ID|NAME|CHANGE_TYPE");
615615

616616
foreach (var (key1, key2, name, changeType) in records)
617617
{
618-
sb.AppendLine($"{key1},{key2},{name},{changeType}");
618+
sb.AppendLine($"{key1}|{key2}|{name}|{changeType}");
619619
}
620620

621621
return sb.ToString();
@@ -624,11 +624,11 @@ private string GenerateCompositeKeyCsv((string key1, string key2, string name, s
624624
private string GenerateTripleKeyCsv((string key1, string key2, string key3, string name, string changeType)[] records)
625625
{
626626
var sb = new StringBuilder();
627-
sb.AppendLine("COUNTRY,REGION,FARM_ID,NAME,CHANGE_TYPE");
627+
sb.AppendLine("COUNTRY|REGION|FARM_ID|NAME|CHANGE_TYPE");
628628

629629
foreach (var (key1, key2, key3, name, changeType) in records)
630630
{
631-
sb.AppendLine($"{key1},{key2},{key3},{name},{changeType}");
631+
sb.AppendLine($"{key1}|{key2}|{key3}|{name}|{changeType}");
632632
}
633633

634634
return sb.ToString();

tests/KeeperData.Bridge.Tests.Integration/Core/ETL/IngestionPipelineIntegrationTests.cs

Lines changed: 19 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -354,9 +354,9 @@ public async Task StartAsync_WithNullValues_ShouldStoreBsonNull()
354354
{
355355
// Arrange
356356
var testDate = new DateOnly(2024, 12, 15);
357-
var csvContent = "CPH,FarmName,Owner,Address,CHANGE_TYPE\n" +
358-
"CPH001,Farm One,,,I\n" +
359-
"CPH002,,Owner B,Address 2,I\n";
357+
var csvContent = "CPH|FarmName|Owner|Address|CHANGE_TYPE\n" +
358+
"CPH001|Farm One|||I\n" +
359+
"CPH002||Owner B|Address 2|I\n";
360360

361361
var fileName = $"LITP_SAMCPHHOLDING_{testDate:yyyyMMdd}120000.csv";
362362
await UploadCsvToS3($"{DestinationFolder}/{fileName}", csvContent);
@@ -389,7 +389,7 @@ public async Task StartAsync_WithEmptyFile_ShouldNotFail()
389389
{
390390
// Arrange
391391
var testDate = new DateOnly(2024, 12, 15);
392-
var csvContent = "CPH,FarmName,Owner,Address,CHANGE_TYPE\n"; // Headers only
392+
var csvContent = "CPH|FarmName|Owner|Address|CHANGE_TYPE\n"; // Headers only
393393

394394
var fileName = $"LITP_SAMCPHHOLDING_{testDate:yyyyMMdd}120000.csv";
395395
await UploadCsvToS3($"{DestinationFolder}/{fileName}", csvContent);
@@ -415,8 +415,8 @@ public async Task StartAsync_WithMissingPrimaryKey_ShouldThrowException()
415415
{
416416
// Arrange
417417
var testDate = new DateOnly(2024, 12, 15);
418-
var csvContent = "FarmName,Owner,Address,CHANGE_TYPE\n" + // Missing CPH column
419-
"Farm One,Owner A,Address 1,I\n";
418+
var csvContent = "FarmName|Owner|Address|CHANGE_TYPE\n" + // Missing CPH column
419+
"Farm One|Owner A|Address 1|I\n";
420420

421421
var fileName = $"LITP_SAMCPHHOLDING_{testDate:yyyyMMdd}120000.csv";
422422
await UploadCsvToS3($"{DestinationFolder}/{fileName}", csvContent);
@@ -439,9 +439,9 @@ public async Task StartAsync_WithQuotedHeaders_ShouldProcessCorrectly()
439439
// Arrange
440440
var testDate = new DateOnly(2024, 12, 15);
441441
// CSV with quoted headers (common in some CSV exports)
442-
var csvContent = "\"CPH\",\"FarmName\",\"Owner\",\"Address\",\"CHANGE_TYPE\"\n"
443-
+ "CPH001,Farm One,Owner A,Address 1,I\n"
444-
+ "CPH002,Farm Two,Owner B,Address 2,I\n";
442+
var csvContent = "\"CPH\"|\"FarmName\"|\"Owner\"|\"Address\"|\"CHANGE_TYPE\"\n"
443+
+ "CPH001|Farm One|Owner A|Address 1|I\n"
444+
+ "CPH002|Farm Two|Owner B|Address 2|I\n";
445445

446446
var fileName = $"LITP_SAMCPHHOLDING_{testDate:yyyyMMdd}120000.csv";
447447
await UploadCsvToS3($"{DestinationFolder}/{fileName}", csvContent);
@@ -474,8 +474,8 @@ public async Task StartAsync_WithMixedQuotedAndUnquotedHeaders_ShouldProcessCorr
474474
// Arrange
475475
var testDate = new DateOnly(2024, 12, 15);
476476
// CSV with some headers quoted and some not (edge case but should be handled)
477-
var csvContent = "\"CPH\",FarmName,\"Owner\",Address,\"CHANGE_TYPE\"\n"
478-
+ "CPH001,Farm One,Owner A,Address 1,I\n";
477+
var csvContent = "\"CPH\"|FarmName|\"Owner\"|Address|\"CHANGE_TYPE\"\n"
478+
+ "CPH001|Farm One|Owner A|Address 1|I\n";
479479

480480
var fileName = $"LITP_SAMCPHHOLDING_{testDate:yyyyMMdd}120000.csv";
481481
await UploadCsvToS3($"{DestinationFolder}/{fileName}", csvContent);
@@ -741,8 +741,8 @@ public async Task StartAsync_WithNullAccumulatorValues_ShouldNotAddToArray()
741741
_createdTestFileKeys.Remove($"{DestinationFolder}/{fileName1}");
742742

743743
// Second import with null/empty accumulator values
744-
var csvContent2 = "CPH,FarmName,ADDRESS_PK,DISEASE_TYPE,ANIMAL_SPECIES_CODE,CHANGE_TYPE\n" +
745-
"CPH001,Farm One Updated,,,CAPRINE,U\n";
744+
var csvContent2 = "CPH|FarmName|ADDRESS_PK|DISEASE_TYPE|ANIMAL_SPECIES_CODE|CHANGE_TYPE\n"
745+
+ "CPH001|Farm One Updated|||CAPRINE|U\n";
746746
var testDate2 = testDate.AddDays(-1);
747747
var fileName2 = $"LITP_SAMCPHHOLDING_{testDate2:yyyyMMdd}120000.csv";
748748
await UploadCsvToS3($"{DestinationFolder}/{fileName2}", csvContent2);
@@ -776,8 +776,8 @@ public async Task StartAsync_WithEmptyAccumulatorFieldsOnFirstImport_ShouldCreat
776776
{
777777
// Arrange
778778
var testDate = new DateOnly(2024, 12, 15);
779-
var csvContent = "CPH,FarmName,ADDRESS_PK,DISEASE_TYPE,ANIMAL_SPECIES_CODE,CHANGE_TYPE\n"
780-
+ "CPH001,Farm One,,,,I\n";
779+
var csvContent = "CPH|FarmName|ADDRESS_PK|DISEASE_TYPE|ANIMAL_SPECIES_CODE|CHANGE_TYPE\n"
780+
+ "CPH001|Farm One||||I\n";
781781
var fileName = $"LITP_SAMCPHHOLDING_{testDate:yyyyMMdd}120000.csv";
782782
await UploadCsvToS3($"{DestinationFolder}/{fileName}", csvContent);
783783

@@ -806,62 +806,6 @@ public async Task StartAsync_WithEmptyAccumulatorFieldsOnFirstImport_ShouldCreat
806806
_testOutputHelper.WriteLine("Successfully created empty arrays for null accumulator fields on first import");
807807
}
808808

809-
[Fact]
810-
public async Task StartAsync_WithMixedAccumulatorAndNonAccumulatorUpdates_ShouldHandleCorrectly()
811-
{
812-
// Arrange
813-
var testDate = new DateOnly(2024, 12, 15);
814-
815-
// First import
816-
var csvContent1 = GenerateSampleCsvContentWithAccumulators(new[]
817-
{
818-
("CPH001", "Farm One", "ADDR001", "BVD", "BOVINE", "I"),
819-
("CPH002", "Farm Two", "ADDR002", "IBR", "OVINE", "I")
820-
});
821-
var fileName1 = $"LITP_SAMCPHHOLDING_{testDate:yyyyMMdd}120000.csv";
822-
await UploadCsvToS3($"{DestinationFolder}/{fileName1}", csvContent1);
823-
await _ingestionPipeline.StartAsync(Guid.NewGuid(), CancellationToken.None);
824-
await _localStackFixture.S3Client.DeleteObjectAsync(new DeleteObjectRequest { BucketName = LocalStackFixture.TestBucket, Key = $"{DestinationFolder}/{fileName1}" });
825-
_createdTestFileKeys.Remove($"{DestinationFolder}/{fileName1}");
826-
827-
// Second import - update one, create new
828-
var csvContent2 = GenerateSampleCsvContentWithAccumulators(new[]
829-
{
830-
("CPH001", "Farm One Updated", "ADDR003", "FMD", "PORCINE", "U"),
831-
("CPH003", "Farm Three", "ADDR004", "BSE", "CAPRINE", "I")
832-
});
833-
var testDate2 = testDate.AddDays(-1);
834-
var fileName2 = $"LITP_SAMCPHHOLDING_{testDate2:yyyyMMdd}120000.csv";
835-
await UploadCsvToS3($"{DestinationFolder}/{fileName2}", csvContent2);
836-
837-
// Act
838-
await _ingestionPipeline.StartAsync(Guid.NewGuid(), CancellationToken.None);
839-
840-
// Assert
841-
var database = _mongoClient.GetDatabase(_testDatabaseName);
842-
var collection = database.GetCollection<BsonDocument>("sam_cph_holdings");
843-
844-
// CPH001: Updated - should have accumulated values and overwritten FarmName
845-
var doc1 = await collection.Find(d => d["_id"] == "CPH001").FirstOrDefaultAsync();
846-
doc1["FarmName"].AsString.Should().Be("Farm One Updated");
847-
doc1["ADDRESS_PK"].AsBsonArray.Should().HaveCount(2);
848-
doc1["ADDRESS_PK"].AsBsonArray.Select(v => v.AsString).Should().BeEquivalentTo(new[] { "ADDR001", "ADDR003" });
849-
850-
// CPH002: Unchanged - should still have original values
851-
var doc2 = await collection.Find(d => d["_id"] == "CPH002").FirstOrDefaultAsync();
852-
doc2["FarmName"].AsString.Should().Be("Farm Two");
853-
doc2["ADDRESS_PK"].AsBsonArray.Should().HaveCount(1);
854-
doc2["ADDRESS_PK"].AsBsonArray[0].AsString.Should().Be("ADDR002");
855-
856-
// CPH003: New - should have initial values as arrays
857-
var doc3 = await collection.Find(d => d["_id"] == "CPH003").FirstOrDefaultAsync();
858-
doc3["FarmName"].AsString.Should().Be("Farm Three");
859-
doc3["ADDRESS_PK"].AsBsonArray.Should().HaveCount(1);
860-
doc3["ADDRESS_PK"].AsBsonArray[0].AsString.Should().Be("ADDR004");
861-
862-
_testOutputHelper.WriteLine("Successfully handled mixed accumulator and non-accumulator updates");
863-
}
864-
865809
#endregion
866810

867811
private IBlobStorageServiceFactory CreateBlobStorageFactory()
@@ -926,11 +870,11 @@ private IExternalCatalogueServiceFactory CreateExternalCatalogueServiceFactory(I
926870
private string GenerateSampleCsvContent((string cph, string farmName, string owner, string address)[] records)
927871
{
928872
var sb = new StringBuilder();
929-
sb.AppendLine("CPH,FarmName,Owner,Address,CHANGE_TYPE");
873+
sb.AppendLine("CPH|FarmName|Owner|Address|CHANGE_TYPE");
930874

931875
foreach (var (cph, farmName, owner, address) in records)
932876
{
933-
sb.AppendLine($"{cph},{farmName},{owner},{address},{ChangeType.Insert}");
877+
sb.AppendLine($"{cph}|{farmName}|{owner}|{address}|{ChangeType.Insert}");
934878
}
935879

936880
return sb.ToString();
@@ -939,11 +883,11 @@ private string GenerateSampleCsvContent((string cph, string farmName, string own
939883
private string GenerateSampleCsvContentWithAccumulators((string cph, string farmName, string addressPk, string diseaseType, string animalSpeciesCode, string changeType)[] records)
940884
{
941885
var sb = new StringBuilder();
942-
sb.AppendLine("CPH,FarmName,ADDRESS_PK,DISEASE_TYPE,ANIMAL_SPECIES_CODE,CHANGE_TYPE");
886+
sb.AppendLine("CPH|FarmName|ADDRESS_PK|DISEASE_TYPE|ANIMAL_SPECIES_CODE|CHANGE_TYPE");
943887

944888
foreach (var (cph, farmName, addressPk, diseaseType, animalSpeciesCode, changeType) in records)
945889
{
946-
sb.AppendLine($"{cph},{farmName},{addressPk},{diseaseType},{animalSpeciesCode},{changeType}");
890+
sb.AppendLine($"{cph}|{farmName}|{addressPk}|{diseaseType}|{animalSpeciesCode}|{changeType}");
947891
}
948892

949893
return sb.ToString();

tests/KeeperData.Bridge.Tests.Integration/Helpers/TestDataGenerator.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ public static (string CsvContent, List<PersonRecord> Records) GeneratePersonCsv(
2626
var records = faker.Generate(recordCount);
2727

2828
var csv = new StringBuilder();
29-
csv.AppendLine($"{primaryKeyHeaderName},FirstName,LastName,Email,PhoneNumber,DateOfBirth,Address,City,PostalCode,Country,Salary,Department,IsActive,CHANGETYPE");
29+
csv.AppendLine($"{primaryKeyHeaderName}|FirstName|LastName|Email|PhoneNumber|DateOfBirth|Address|City|PostalCode|Country|Salary|Department|IsActive|CHANGETYPE");
3030

3131
foreach (var record in records)
3232
{
33-
csv.AppendLine($"\"{record.PersonId}\",\"{record.FirstName}\",\"{record.LastName}\",\"{record.Email}\",\"{record.PhoneNumber}\",\"{record.DateOfBirth}\",\"{EscapeCsv(record.Address)}\",\"{record.City}\",\"{record.PostalCode}\",\"{record.Country}\",{record.Salary},\"{record.Department}\",{record.IsActive.ToString().ToLower()},{record.ChangeType}");
33+
csv.AppendLine($"\"{record.PersonId}\"|\"{record.FirstName}\"|\"{record.LastName}\"|\"{record.Email}\"|\"{record.PhoneNumber}\"|\"{record.DateOfBirth}\"|\"{EscapeCsv(record.Address)}\"|\"{record.City}\"|\"{record.PostalCode}\"|\"{record.Country}\"|{record.Salary}|\"{record.Department}\"|{record.IsActive.ToString().ToLower()}|{record.ChangeType}");
3434
}
3535

3636
return (csv.ToString(), records);

0 commit comments

Comments
 (0)