Skip to content

Commit fce768c

Browse files
committed
Refactor
1 parent 7836169 commit fce768c

File tree

2 files changed

+31
-21
lines changed

2 files changed

+31
-21
lines changed

src/infra/docs-lambda-index-publisher/LinkIndexUpdater.cs renamed to src/infra/docs-lambda-index-publisher/LinkIndexProvider.cs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,18 @@
99

1010
namespace Elastic.Documentation.Lambda.LinkIndexUploader;
1111

12-
public class LinkIndexUpdater(IAmazonS3 s3Client, ILambdaLogger logger, string bucketName, string key)
12+
/// <summary>
13+
/// Gets the link index from S3 once.
14+
/// You can then update the link index with <see cref="UpdateLinkIndexEntry(LinkIndexEntry)"/> and save it with <see cref="Save()"/>.
15+
/// If the link index changed in the meantime, <see cref="Save()"/> will throw an exception,
16+
/// thus all the messages from the queue will be sent back to the queue.
17+
/// </summary>
18+
public class LinkIndexProvider(IAmazonS3 s3Client, ILambdaLogger logger, string bucketName, string key)
1319
{
1420
private string? _etag;
15-
public async Task<LinkIndex> GetLinkIndex()
21+
private LinkIndex? _linkIndex;
22+
23+
private async Task<LinkIndex> GetLinkIndex()
1624
{
1725
var getObjectRequest = new GetObjectRequest
1826
{
@@ -24,12 +32,14 @@ public async Task<LinkIndex> GetLinkIndex()
2432
await using var stream = getObjectResponse.ResponseStream;
2533
_etag = getObjectResponse.ETag;
2634
logger.LogInformation("Successfully got link index from s3://{bucketName}/{key}", bucketName, key);
27-
return LinkIndex.Deserialize(stream);
35+
_linkIndex = LinkIndex.Deserialize(stream);
36+
return _linkIndex;
2837
}
2938

30-
public void UpdateLinkIndexEntry(LinkIndex linkIndex, LinkIndexEntry linkIndexEntry)
39+
public async Task UpdateLinkIndexEntry(LinkIndexEntry linkIndexEntry)
3140
{
32-
if (linkIndex.Repositories.TryGetValue(linkIndexEntry.Repository, out var existingEntry))
41+
_linkIndex ??= await GetLinkIndex();
42+
if (_linkIndex.Repositories.TryGetValue(linkIndexEntry.Repository, out var existingEntry))
3343
{
3444
var newEntryIsNewer = DateTime.Compare(linkIndexEntry.UpdatedAt, existingEntry[linkIndexEntry.Branch].UpdatedAt) > 0;
3545
if (newEntryIsNewer)
@@ -42,20 +52,19 @@ public void UpdateLinkIndexEntry(LinkIndex linkIndex, LinkIndexEntry linkIndexEn
4252
}
4353
else
4454
{
45-
linkIndex.Repositories.Add(linkIndexEntry.Repository, new Dictionary<string, LinkIndexEntry>
55+
_linkIndex.Repositories.Add(linkIndexEntry.Repository, new Dictionary<string, LinkIndexEntry>
4656
{
4757
{ linkIndexEntry.Branch, linkIndexEntry }
4858
});
4959
logger.LogInformation("Added new entry for {repository}@{branch}", linkIndexEntry.Repository, linkIndexEntry.Branch);
5060
}
5161
}
5262

53-
public async Task SaveLinkIndex(LinkIndex linkIndex)
63+
public async Task Save()
5464
{
55-
if (_etag == null)
56-
throw new InvalidOperationException("You must call GetLinkIndex() before SaveLinkIndex()");
57-
58-
var json = LinkIndex.Serialize(linkIndex);
65+
if (_etag == null || _linkIndex == null)
66+
throw new InvalidOperationException("You must call UpdateLinkIndexEntry() before Save()");
67+
var json = LinkIndex.Serialize(_linkIndex);
5968
logger.LogInformation("Saving link index to s3://{bucketName}/{key}", bucketName, key);
6069
var putObjectRequest = new PutObjectRequest
6170
{

src/infra/docs-lambda-index-publisher/Program.cs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222

2323
return;
2424

25+
// The SQS queue is configured to trigger when elastic/*/*/links.json files are created or updated.
2526
static async Task<SQSBatchResponse> Handler(SQSEvent ev, ILambdaContext context)
2627
{
2728
var s3Client = new AmazonS3Client();
28-
var linkIndexUpdater = new LinkIndexUpdater(s3Client, context.Logger, bucketName, indexFile);
29+
var linkIndexProvider = new LinkIndexProvider(s3Client, context.Logger, bucketName, indexFile);
2930
var batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>();
30-
var linkIndex = await linkIndexUpdater.GetLinkIndex();
3131
foreach (var message in ev.Records)
3232
{
3333
try
@@ -36,12 +36,12 @@ static async Task<SQSBatchResponse> Handler(SQSEvent ev, ILambdaContext context)
3636
foreach (var (s3Record, linkReference) in s3RecordLinkReferenceTuples)
3737
{
3838
var newEntry = ConvertToLinkIndexEntry(s3Record, linkReference);
39-
linkIndexUpdater.UpdateLinkIndexEntry(linkIndex, newEntry);
39+
await linkIndexProvider.UpdateLinkIndexEntry(newEntry);
4040
}
4141
}
4242
catch (Exception e)
4343
{
44-
//Add failed message identifier to the batchItemFailures list
44+
// Add failed message identifier to the batchItemFailures list
4545
context.Logger.LogWarning(e, "Failed to process message {MessageId}", message.MessageId);
4646
batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure
4747
{
@@ -51,17 +51,18 @@ static async Task<SQSBatchResponse> Handler(SQSEvent ev, ILambdaContext context)
5151
}
5252
try
5353
{
54-
await linkIndexUpdater.SaveLinkIndex(linkIndex);
54+
await linkIndexProvider.Save();
5555
var response = new SQSBatchResponse(batchItemFailures);
5656
if (batchItemFailures.Count > 0)
5757
context.Logger.LogInformation("Failed to process {batchItemFailuresCount} of {allMessagesCount} messages. Returning them to the queue.", batchItemFailures.Count, ev.Records.Count);
5858
return response;
5959
}
6060
catch (Exception ex)
6161
{
62+
// If we fail to update the link index, we need to return all messages to the queue
63+
// so that they can be retried later.
6264
context.Logger.LogError("Failed to update {bucketName}/{indexFile}. Returning all {recordCount} messages to the queue.", bucketName, indexFile, ev.Records.Count);
6365
context.Logger.LogError(ex, ex.Message);
64-
// If we fail to update the link index, we need to return all messages to the queue
6566
var response = new SQSBatchResponse(ev.Records.Select(r => new SQSBatchResponse.BatchItemFailure
6667
{
6768
ItemIdentifier = r.MessageId
@@ -91,12 +92,12 @@ static LinkIndexEntry ConvertToLinkIndexEntry(S3EventNotification.S3EventNotific
9192
SQSEvent.SQSMessage message, ILambdaContext context)
9293
{
9394
var s3Event = S3EventNotification.ParseJson(message.Body);
94-
var linkReferences = new ConcurrentBag<(S3EventNotification.S3EventNotificationRecord, LinkReference)>();
95+
var recordLinkReferenceTuples = new ConcurrentBag<(S3EventNotification.S3EventNotificationRecord, LinkReference)>();
96+
var linkReferenceProvider = new LinkReferenceProvider(s3Client, context.Logger, bucketName);
9597
await Parallel.ForEachAsync(s3Event.Records, async (record, ctx) =>
9698
{
97-
var linkReferenceProvider = new LinkReferenceProvider(s3Client, context.Logger, record.S3.Bucket.Name);
9899
var linkReference = await linkReferenceProvider.GetLinkReference(record.S3.Object.Key, ctx);
99-
linkReferences.Add((record, linkReference));
100+
recordLinkReferenceTuples.Add((record, linkReference));
100101
});
101-
return linkReferences;
102+
return recordLinkReferenceTuples;
102103
}

0 commit comments

Comments
 (0)