Skip to content

Commit a7770ad

Browse files
committed
Composite keys as per HLD
Changed to SHA256-hash based mongo ids for composite keys due to unpredictable key data and need for any such key to be both stable and url-safe Added record lineage reporting api
1 parent 5e84c8c commit a7770ad

File tree

11 files changed

+1491
-130
lines changed

11 files changed

+1491
-130
lines changed

src/KeeperData.Bridge/Controllers/ImportController.cs

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using KeeperData.Bridge.Worker.Tasks;
22
using KeeperData.Core.Database;
33
using KeeperData.Core.ETL.Abstract;
4+
using KeeperData.Core.ETL.Utils;
45
using KeeperData.Core.Reporting;
56
using KeeperData.Core.Reporting.Dtos;
67
using KeeperData.Infrastructure.Storage;
@@ -17,6 +18,8 @@ public class ImportController(
1718
ICollectionManagementService collectionManagementService,
1819
IReportingCollectionManagementService reportingCollectionManagementService) : ControllerBase
1920
{
21+
private readonly RecordIdGenerator _recordIdGenerator = new();
22+
2023
/// <summary>
2124
/// Starts a bulk file import process asynchronously.
2225
/// Returns immediately with an import ID once the lock has been acquired.
@@ -244,6 +247,93 @@ public async Task<IActionResult> GetFileReports(Guid importId, CancellationToken
244247
}
245248
}
246249

250+
/// <summary>
251+
/// Gets the paginated lineage events for a specific record in chronological order.
252+
/// Shows the complete history of changes to a record across all imports.
253+
/// The recordId is a SHA256 hash generated by RecordIdGenerator from the composite key parts, making it URL-safe.
254+
/// </summary>
255+
/// <param name="collectionName">The collection name (e.g., "sam_cph_holdings")</param>
256+
/// <param name="recordId">The URL-safe record ID (SHA256 hash generated from primary key parts)</param>
257+
/// <param name="skip">Number of events to skip for pagination (default: 0)</param>
258+
/// <param name="top">Number of events to return (default: 10, max: 100)</param>
259+
/// <param name="cancellationToken">Cancellation token</param>
260+
/// <returns>Paginated list of lineage events</returns>
261+
[HttpGet("lineage/{collectionName}/{recordId}")]
262+
[ProducesResponseType(typeof(PaginatedLineageEvents), StatusCodes.Status200OK)]
263+
[ProducesResponseType(typeof(ErrorResponse), StatusCodes.Status400BadRequest)]
264+
[ProducesResponseType(typeof(ErrorResponse), StatusCodes.Status404NotFound)]
265+
[ProducesResponseType(typeof(ErrorResponse), StatusCodes.Status499ClientClosedRequest)]
266+
public async Task<IActionResult> GetRecordLineageEvents(
267+
string collectionName,
268+
string recordId,
269+
[FromQuery] int skip = 0,
270+
[FromQuery] int top = 10,
271+
CancellationToken cancellationToken = default)
272+
{
273+
logger.LogInformation("Received request to get lineage events for {CollectionName}/{RecordId} with skip={Skip}, top={Top}",
274+
collectionName, recordId, skip, top);
275+
276+
// Validate parameters
277+
if (skip < 0)
278+
{
279+
logger.LogWarning("Invalid skip parameter: {Skip}", skip);
280+
return BadRequest(new ErrorResponse
281+
{
282+
Message = "Skip parameter must be greater than or equal to 0.",
283+
Timestamp = DateTime.UtcNow
284+
});
285+
}
286+
287+
if (top <= 0 || top > 100)
288+
{
289+
logger.LogWarning("Invalid top parameter: {Top}", top);
290+
return BadRequest(new ErrorResponse
291+
{
292+
Message = "Top parameter must be between 1 and 100.",
293+
Timestamp = DateTime.UtcNow
294+
});
295+
}
296+
297+
// The recordId is a SHA256 hash generated by RecordIdGenerator from the composite key parts.
298+
// ASP.NET Core URL-decodes the route parameter, but since our hash is already URL-safe,
299+
// we can pass it directly to the reporting service.
300+
301+
try
302+
{
303+
var result = await importReportingService.GetRecordLineageEventsPaginatedAsync(
304+
collectionName,
305+
recordId,
306+
skip,
307+
top,
308+
cancellationToken);
309+
310+
logger.LogInformation("Successfully retrieved {Count} of {Total} lineage events for {CollectionName}/{RecordId}",
311+
result.Count, result.TotalEvents, collectionName, recordId);
312+
313+
return Ok(result);
314+
}
315+
catch (KeyNotFoundException ex)
316+
{
317+
logger.LogWarning("Lineage not found for {CollectionName}/{RecordId}: {Message}",
318+
collectionName, recordId, ex.Message);
319+
return NotFound(new ErrorResponse
320+
{
321+
Message = ex.Message,
322+
Timestamp = DateTime.UtcNow
323+
});
324+
}
325+
catch (OperationCanceledException)
326+
{
327+
logger.LogWarning("Get lineage events request was cancelled for {CollectionName}/{RecordId}",
328+
collectionName, recordId);
329+
return StatusCode(499, new ErrorResponse
330+
{
331+
Message = "Request was cancelled.",
332+
Timestamp = DateTime.UtcNow
333+
});
334+
}
335+
}
336+
247337
/// <summary>
248338
/// Deletes a specific MongoDB collection by name.
249339
/// The collection name must be defined in DataSetDefinitions.
@@ -435,4 +525,102 @@ public async Task<IActionResult> DeleteAllReportingCollections(CancellationToken
435525
DeletedAtUtc = result.OperatedAtUtc
436526
});
437527
}
528+
529+
/// <summary>
530+
/// Generates a URL-safe record ID from composite key parts using SHA256 hashing.
531+
/// This endpoint helps clients construct the recordId needed for lineage queries.
532+
/// </summary>
533+
/// <param name="request">Request containing the key parts to hash</param>
534+
/// <param name="cancellationToken">Cancellation token</param>
535+
/// <returns>Generated record ID hash</returns>
536+
[HttpPost("generate-record-id")]
537+
[ProducesResponseType(typeof(GenerateRecordIdResponse), StatusCodes.Status200OK)]
538+
[ProducesResponseType(typeof(ErrorResponse), StatusCodes.Status400BadRequest)]
539+
public IActionResult GenerateRecordId([FromBody] GenerateRecordIdRequest request, CancellationToken cancellationToken = default)
540+
{
541+
logger.LogInformation("Received request to generate record ID from {count} key parts", request.KeyParts?.Length ?? 0);
542+
543+
// Validate request
544+
if (request.KeyParts == null || request.KeyParts.Length == 0)
545+
{
546+
logger.LogWarning("Invalid request: KeyParts is null or empty");
547+
return BadRequest(new ErrorResponse
548+
{
549+
Message = "KeyParts must contain at least one value.",
550+
Timestamp = DateTime.UtcNow
551+
});
552+
}
553+
554+
// Check for null or empty values in key parts
555+
for (int i = 0; i < request.KeyParts.Length; i++)
556+
{
557+
if (string.IsNullOrEmpty(request.KeyParts[i]))
558+
{
559+
logger.LogWarning("Invalid request: KeyPart at index {index} is null or empty", i);
560+
return BadRequest(new ErrorResponse
561+
{
562+
Message = $"KeyPart at index {i} cannot be null or empty.",
563+
Timestamp = DateTime.UtcNow
564+
});
565+
}
566+
}
567+
568+
try
569+
{
570+
var recordId = _recordIdGenerator.GenerateId(request.KeyParts);
571+
572+
logger.LogInformation("Successfully generated record ID: {recordId}", recordId);
573+
574+
return Ok(new GenerateRecordIdResponse
575+
{
576+
RecordId = recordId,
577+
KeyParts = request.KeyParts,
578+
Timestamp = DateTime.UtcNow
579+
});
580+
}
581+
catch (Exception ex)
582+
{
583+
logger.LogError(ex, "Failed to generate record ID");
584+
return BadRequest(new ErrorResponse
585+
{
586+
Message = $"Failed to generate record ID: {ex.Message}",
587+
Timestamp = DateTime.UtcNow
588+
});
589+
}
590+
}
591+
}
592+
593+
/// <summary>
594+
/// Request to generate a record ID from composite key parts.
595+
/// </summary>
596+
public record GenerateRecordIdRequest
597+
{
598+
/// <summary>
599+
/// The individual parts of the composite key (in order).
600+
/// Each part will be joined and hashed to create the record ID.
601+
/// </summary>
602+
/// <example>["NORTH", "F001"]</example>
603+
public required string[] KeyParts { get; init; }
604+
}
605+
606+
/// <summary>
607+
/// Response containing the generated record ID.
608+
/// </summary>
609+
public record GenerateRecordIdResponse
610+
{
611+
/// <summary>
612+
/// The generated URL-safe record ID (SHA256 hash, 43 characters).
613+
/// This ID can be used to query lineage events.
614+
/// </summary>
615+
public required string RecordId { get; init; }
616+
617+
/// <summary>
618+
/// The key parts that were used to generate the record ID.
619+
/// </summary>
620+
public required string[] KeyParts { get; init; }
621+
622+
/// <summary>
623+
/// The timestamp when the ID was generated.
624+
/// </summary>
625+
public DateTime Timestamp { get; init; }
438626
}

src/KeeperData.Core/ETL/Impl/IngestionPipeline.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
using CsvHelper;
2-
using CsvHelper.Configuration;
31
using KeeperData.Core.Database;
42
using KeeperData.Core.ETL.Abstract;
53
using KeeperData.Core.ETL.Utils;
64
using KeeperData.Core.Reporting;
75
using KeeperData.Core.Reporting.Dtos;
86
using KeeperData.Core.Storage;
9-
using KeeperData.Core.Storage.Dtos;
7+
using CsvHelper;
8+
using CsvHelper.Configuration;
109
using Microsoft.Extensions.Logging;
1110
using Microsoft.Extensions.Options;
1211
using MongoDB.Bson;
@@ -36,6 +35,7 @@ public class IngestionPipeline(
3635
private const int LineageEventBatchSize = DefaultInterval;
3736
private readonly IDatabaseConfig _databaseConfig = databaseConfig.Value;
3837
private readonly CsvRowCounter _rowCounter = csvRowCounter;
38+
private readonly RecordIdGenerator _recordIdGenerator = new();
3939

4040
// MongoDB field name constants
4141
private const string FieldId = "_id";
@@ -500,7 +500,6 @@ private async Task<FileIngestionMetrics> ProcessCsvRecordsAsync(Guid importId, I
500500
Debug.WriteLine($"[keepetl] Processed batch of {BatchSize} records from {fileKey} in {batchStopwatch.ElapsedMilliseconds}ms. Total processed: {metrics.RecordsProcessed}, Created: {metrics.RecordsCreated}, Updated: {metrics.RecordsUpdated}, Deleted: {metrics.RecordsDeleted}");
501501
Debug.WriteLine($"[keepetl] -- END BATCH ({batchStopwatch.Elapsed.TotalSeconds}s, {batchStopwatch.ElapsedMilliseconds}ms) --");
502502
Debug.WriteLine($"[keepetl] ");
503-
Debug.WriteLine($"[keepetl] ");
504503

505504
batch.Clear();
506505
}
@@ -724,9 +723,9 @@ private BsonDocument CreateDocumentFromCsvRecord(
724723
var now = DateTime.UtcNow;
725724
var accumulatorSet = new HashSet<string>(definition.Accumulators ?? []);
726725

727-
// Create composite primary key for _id
726+
// Create URL-safe composite primary key for _id using RecordIdGenerator
728727
var compositeKeyParts = headers.PrimaryKeyHeaderNames.Select(pkHeader => csv.GetField(pkHeader) ?? string.Empty);
729-
document[FieldId] = string.Join(EtlConstants.CompositeKeyDelimiter, compositeKeyParts);
728+
document[FieldId] = _recordIdGenerator.GenerateId(compositeKeyParts);
730729

731730
foreach (var header in headers.AllHeaders)
732731
{

src/KeeperData.Core/ETL/Impl/StandardDataSetDefinitionsBuilder.cs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,11 @@ public static class StandardDataSetDefinitionsBuilder
55
public static DataSetDefinitions Build()
66
{
77
var list = new List<DataSetDefinition>();
8-
var samCPHHolding = list.With(new DataSetDefinition("sam_cph_holdings", "LITP_SAMCPHHOLDING_{0}", ["CPH"], ChangeType.HeaderName,
9-
[
10-
"ADDRESS_PK",
11-
"DISEASE_TYPE",
12-
"INTERVAL",
13-
"INTERVAL_UNIT_OF_TIME",
14-
"CPH_RELATIONSHIP_TYPE",
15-
"SECONDARY_CPH",
16-
"ANIMAL_SPECIES_CODE",
17-
"ANIMAL_PRODUCTION_USAGE_CODE",
18-
]));
19-
8+
var samCPHHolding = list.With(new DataSetDefinition("sam_cph_holdings", "LITP_SAMCPHHOLDING_{0}", ["CPH", "FEATURE_NAME", "SECONDARY_CPH", "ANIMAL_SPECIES_CODE"], ChangeType.HeaderName, []));
209
var ctscphHolding = list.With(new DataSetDefinition("cts_cph_holding", "LITP_CTSCPHHOLDING_{0}", ["LID_FULL_IDENTIFIER"], ChangeType.HeaderName, []));
21-
var ctsKeeper = list.With(new DataSetDefinition("cts_keeper", "LITP_CTSKEEPER_{0}", ["PAR_ID"], ChangeType.HeaderName, []));
10+
var ctsKeeper = list.With(new DataSetDefinition("cts_keeper", "LITP_CTSKEEPER_{0}", ["PAR_ID", "LID_FULL_IDENTIFIER"], ChangeType.HeaderName, []));
2211
var samCPHHolder = list.With(new DataSetDefinition("sam_cph_holder", "LITP_SAMCPHHOLDER_{0}", ["PARTY_ID"], ChangeType.HeaderName, []));
23-
var samHerd = list.With(new DataSetDefinition("sam_herd", "LITP_SAMHERD_{0}", ["CPHH", "HERDMARK"], ChangeType.HeaderName, []));
12+
var samHerd = list.With(new DataSetDefinition("sam_herd", "LITP_SAMHERD_{0}", ["CPHH", "HERDMARK", "ANIMAL_PURPOSE_CODE"], ChangeType.HeaderName, []));
2413
var samParty = list.With(new DataSetDefinition("sam_party", "LITP_SAMPARTY_{0}", ["PARTY_ID"], ChangeType.HeaderName, []));
2514

2615
return new DataSetDefinitions
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
using System.Security.Cryptography;
2+
using System.Text;
3+
4+
namespace KeeperData.Core.ETL.Utils;
5+
6+
/// <summary>
7+
/// Generates URL-safe record identifiers from composite key parts using SHA256 hashing.
8+
/// Encapsulates all concerns around key generation, ensuring consistent and collision-resistant IDs.
9+
/// </summary>
10+
public class RecordIdGenerator
11+
{
12+
/// <summary>
13+
/// Generates a URL-safe record ID from one or more key parts.
14+
/// Uses SHA256 hashing to create a consistent, collision-resistant identifier.
15+
/// The hash is Base64URL-encoded for URL safety (43 characters).
16+
/// </summary>
17+
/// <param name="keyParts">The individual parts of the composite key</param>
18+
/// <returns>A URL-safe record ID string (43 characters for SHA256)</returns>
19+
/// <exception cref="ArgumentNullException">Thrown when keyParts is null</exception>
20+
/// <exception cref="ArgumentException">Thrown when keyParts is empty or contains null/empty values</exception>
21+
public string GenerateId(params string[] keyParts)
22+
{
23+
if (keyParts == null)
24+
{
25+
throw new ArgumentNullException(nameof(keyParts));
26+
}
27+
28+
if (keyParts.Length == 0)
29+
{
30+
throw new ArgumentException("At least one key part is required", nameof(keyParts));
31+
}
32+
33+
// Validate all parts are non-null and non-empty
34+
for (int i = 0; i < keyParts.Length; i++)
35+
{
36+
if (string.IsNullOrEmpty(keyParts[i]))
37+
{
38+
throw new ArgumentException($"Key part at index {i} is null or empty", nameof(keyParts));
39+
}
40+
}
41+
42+
// Join parts with delimiter and hash
43+
var composite = string.Join(EtlConstants.CompositeKeyDelimiter, keyParts);
44+
return HashToBase64Url(composite);
45+
}
46+
47+
/// <summary>
48+
/// Generates a URL-safe record ID from an enumerable of key parts.
49+
/// Uses SHA256 hashing to create a consistent, collision-resistant identifier.
50+
/// </summary>
51+
/// <param name="keyParts">The individual parts of the composite key</param>
52+
/// <returns>A URL-safe record ID string</returns>
53+
/// <exception cref="ArgumentNullException">Thrown when keyParts is null</exception>
54+
/// <exception cref="ArgumentException">Thrown when keyParts is empty or contains null/empty values</exception>
55+
public string GenerateId(IEnumerable<string> keyParts)
56+
{
57+
if (keyParts == null)
58+
{
59+
throw new ArgumentNullException(nameof(keyParts));
60+
}
61+
62+
return GenerateId(keyParts.ToArray());
63+
}
64+
65+
/// <summary>
66+
/// Computes a SHA256 hash of the input string and returns it as a Base64URL-encoded string.
67+
/// Base64URL encoding uses '-' instead of '+', '_' instead of '/', and removes padding '='.
68+
/// </summary>
69+
/// <param name="value">The string to hash</param>
70+
/// <returns>Base64URL-encoded SHA256 hash (43 characters)</returns>
71+
private static string HashToBase64Url(string value)
72+
{
73+
var bytes = Encoding.UTF8.GetBytes(value);
74+
var hash = SHA256.HashData(bytes);
75+
76+
// Convert to Base64URL format
77+
var base64 = Convert.ToBase64String(hash);
78+
return base64
79+
.Replace('+', '-') // Replace + with -
80+
.Replace('/', '_') // Replace / with _
81+
.TrimEnd('='); // Remove padding
82+
}
83+
}

0 commit comments

Comments
 (0)