Skip to content

Commit 913bb19

Browse files
authored
Change link-index-updater lambda to react on SQS events (#1169)
* Change link index updater lambda to react on SQS events * Remove temporary workflow for building the binary * Update src/infra/docs-lambda-index-publisher/Program.cs
1 parent a886521 commit 913bb19

File tree

8 files changed

+225
-120
lines changed

8 files changed

+225
-120
lines changed

Directory.Packages.props

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
</ItemGroup>
99
<!-- AWS -->
1010
<ItemGroup>
11-
<PackageVersion Include="Amazon.Lambda.RuntimeSupport" Version="1.13.0"/>
12-
<PackageVersion Include="Amazon.Lambda.Core" Version="2.5.1"/>
13-
<PackageVersion Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.4"/>
11+
<PackageVersion Include="Amazon.Lambda.RuntimeSupport" Version="1.13.0" />
12+
<PackageVersion Include="Amazon.Lambda.Core" Version="2.5.1" />
13+
<PackageVersion Include="Amazon.Lambda.S3Events" Version="3.1.0" />
14+
<PackageVersion Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.4" />
15+
<PackageVersion Include="Amazon.Lambda.SQSEvents" Version="2.2.0" />
16+
<PackageVersion Include="AWSSDK.SQS" Version="3.7.400.135" />
1417
<PackageVersion Include="AWSSDK.S3" Version="3.7.416.16"/>
1518
</ItemGroup>
16-
1719
<!-- Build -->
1820
<ItemGroup>
1921
<PackageVersion Include="Argu" Version="6.2.5" />
@@ -23,42 +25,40 @@
2325
<PackageVersion Include="Fake.IO.Zip" Version="6.1.3" />
2426
<PackageVersion Include="FSharp.Core" Version="9.0.202" />
2527
</ItemGroup>
26-
2728
<ItemGroup>
28-
<PackageVersion Include="ConsoleAppFramework" Version="5.4.1" PrivateAssets="all" IncludeAssets="runtime; build; native; contentfiles; analyzers; buildtransitive"/>
29+
<PackageVersion Include="ConsoleAppFramework" Version="5.4.1" PrivateAssets="all" IncludeAssets="runtime; build; native; contentfiles; analyzers; buildtransitive" />
2930
<PackageVersion Include="ConsoleAppFramework.Abstractions" Version="5.4.1" />
30-
<PackageVersion Include="Crayon" Version="2.0.69"/>
31+
<PackageVersion Include="Crayon" Version="2.0.69" />
3132
<PackageVersion Include="DotNet.Glob" Version="3.1.3" />
3233
<PackageVersion Include="Errata" Version="0.14.0" />
3334
<PackageVersion Include="Github.Actions.Core" Version="9.0.0"/>
35+
<PackageVersion Include="Microsoft.Extensions.Logging" Version="9.0.4" />
36+
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="9.0.4" />
3437
<PackageVersion Include="Markdig" Version="0.41.1" />
35-
<PackageVersion Include="Microsoft.Extensions.Logging" Version="9.0.4"/>
36-
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="9.0.4"/>
3738
<PackageVersion Include="NetEscapades.EnumGenerators" Version="1.0.0-beta12" PrivateAssets="all" ExcludeAssets="runtime" />
3839
<PackageVersion Include="Proc" Version="0.9.1" />
3940
<PackageVersion Include="RazorSlices" Version="0.8.1" />
4041
<PackageVersion Include="Samboy063.Tomlet" Version="6.0.0" />
4142
<PackageVersion Include="Slugify.Core" Version="4.0.1" />
4243
<PackageVersion Include="SoftCircuits.IniFileParser" Version="2.7.0" />
4344
<PackageVersion Include="System.IO.Abstractions" Version="21.0.29" />
44-
<PackageVersion Include="Utf8StreamReader" Version="1.3.2"/>
45-
<PackageVersion Include="Vecc.YamlDotNet.Analyzers.StaticGenerator" Version="16.1.3" PrivateAssets="All"/>
45+
<PackageVersion Include="Utf8StreamReader" Version="1.3.2" />
46+
<PackageVersion Include="Vecc.YamlDotNet.Analyzers.StaticGenerator" Version="16.1.3" PrivateAssets="All" />
4647
<PackageVersion Include="Westwind.AspNetCore.LiveReload" Version="0.5.2" />
4748
<PackageVersion Include="YamlDotNet" Version="16.3.0" />
4849
</ItemGroup>
49-
5050
<!-- Test packages -->
5151
<ItemGroup>
52-
<PackageVersion Include="AngleSharp.Diffing" Version="1.0.0"/>
53-
<PackageVersion Include="DiffPlex" Version="1.7.2"/>
54-
<PackageVersion Include="FluentAssertions" Version="6.12.1"/>
55-
<PackageVersion Include="FsUnit.xUnit" Version="7.0.1"/>
52+
<PackageVersion Include="AngleSharp.Diffing" Version="1.0.0" />
53+
<PackageVersion Include="DiffPlex" Version="1.7.2" />
54+
<PackageVersion Include="FluentAssertions" Version="6.12.1" />
55+
<PackageVersion Include="FsUnit.xUnit" Version="7.0.1" />
5656
<PackageVersion Include="GitHubActionsTestLogger" Version="2.4.1" />
5757
<PackageVersion Include="JetBrains.Annotations" Version="2024.3.0" />
58-
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.13.0"/>
59-
<PackageVersion Include="System.IO.Abstractions.TestingHelpers" Version="21.0.29"/>
60-
<PackageVersion Include="Unquote" Version="7.0.1"/>
61-
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.2"/>
62-
<PackageVersion Include="xunit.v3" Version="1.1.0"/>
58+
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
59+
<PackageVersion Include="System.IO.Abstractions.TestingHelpers" Version="21.0.29" />
60+
<PackageVersion Include="Unquote" Version="7.0.1" />
61+
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.2" />
62+
<PackageVersion Include="xunit.v3" Version="1.1.0" />
6363
</ItemGroup>
64-
</Project>
64+
</Project>

src/Elastic.Markdown/Links/CrossLinks/CrossLinkResolver.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ public record LinkIndexEntry
3838
[JsonPropertyName("etag")]
3939
public required string ETag { get; init; }
4040

41+
// TODO can be made required after all doc_sets have published again
42+
[JsonPropertyName("updated_at")]
43+
public DateTime UpdatedAt { get; init; } = DateTime.MinValue;
44+
4145
// TODO can be made required after all doc_sets have published again
4246
[JsonPropertyName("ref")]
4347
public string GitReference { get; init; } = "unknown";
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using Amazon.Lambda.Core;
6+
using Amazon.S3;
7+
using Amazon.S3.Model;
8+
using Elastic.Markdown.Links.CrossLinks;
9+
10+
namespace Elastic.Documentation.Lambda.LinkIndexUploader;
11+
12+
/// <summary>
13+
/// Gets the link index from S3 once.
14+
/// You can then update the link index with <see cref="UpdateLinkIndexEntry(LinkIndexEntry)"/> and save it with <see cref="Save()"/>.
15+
/// If the link index changed in the meantime, <see cref="Save()"/> will throw an exception,
16+
/// thus all the messages from the queue will be sent back to the queue.
17+
/// </summary>
18+
public class LinkIndexProvider(IAmazonS3 s3Client, ILambdaLogger logger, string bucketName, string key)
19+
{
20+
private string? _etag;
21+
private LinkIndex? _linkIndex;
22+
23+
private async Task<LinkIndex> GetLinkIndex()
24+
{
25+
var getObjectRequest = new GetObjectRequest
26+
{
27+
BucketName = bucketName,
28+
Key = key
29+
};
30+
logger.LogInformation("Getting link index from s3://{bucketName}/{key}", bucketName, key);
31+
var getObjectResponse = await s3Client.GetObjectAsync(getObjectRequest);
32+
await using var stream = getObjectResponse.ResponseStream;
33+
_etag = getObjectResponse.ETag;
34+
logger.LogInformation("Successfully got link index from s3://{bucketName}/{key}", bucketName, key);
35+
_linkIndex = LinkIndex.Deserialize(stream);
36+
return _linkIndex;
37+
}
38+
39+
public async Task UpdateLinkIndexEntry(LinkIndexEntry linkIndexEntry)
40+
{
41+
_linkIndex ??= await GetLinkIndex();
42+
if (_linkIndex.Repositories.TryGetValue(linkIndexEntry.Repository, out var existingEntry))
43+
{
44+
var newEntryIsNewer = DateTime.Compare(linkIndexEntry.UpdatedAt, existingEntry[linkIndexEntry.Branch].UpdatedAt) > 0;
45+
if (newEntryIsNewer)
46+
{
47+
existingEntry[linkIndexEntry.Branch] = linkIndexEntry;
48+
logger.LogInformation("Updated existing entry for {repository}@{branch}", linkIndexEntry.Repository, linkIndexEntry.Branch);
49+
}
50+
else
51+
logger.LogInformation("Skipping update for {repository}@{branch} because the existing entry is newer", linkIndexEntry.Repository, linkIndexEntry.Branch);
52+
}
53+
else
54+
{
55+
_linkIndex.Repositories.Add(linkIndexEntry.Repository, new Dictionary<string, LinkIndexEntry>
56+
{
57+
{ linkIndexEntry.Branch, linkIndexEntry }
58+
});
59+
logger.LogInformation("Added new entry for {repository}@{branch}", linkIndexEntry.Repository, linkIndexEntry.Branch);
60+
}
61+
}
62+
63+
public async Task Save()
64+
{
65+
if (_etag == null || _linkIndex == null)
66+
throw new InvalidOperationException("You must call UpdateLinkIndexEntry() before Save()");
67+
var json = LinkIndex.Serialize(_linkIndex);
68+
logger.LogInformation("Saving link index to s3://{bucketName}/{key}", bucketName, key);
69+
var putObjectRequest = new PutObjectRequest
70+
{
71+
BucketName = bucketName,
72+
Key = key,
73+
ContentBody = json,
74+
ContentType = "application/json",
75+
IfMatch = _etag // Only update if the ETag matches. Meaning the object has not been changed in the meantime.
76+
};
77+
_ = await s3Client.PutObjectAsync(putObjectRequest);
78+
logger.LogInformation("Successfully saved link index to s3://{bucketName}/{key}", bucketName, key);
79+
}
80+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using Amazon.Lambda.Core;
6+
using Amazon.S3;
7+
using Amazon.S3.Model;
8+
using Elastic.Markdown.IO.State;
9+
10+
namespace Elastic.Documentation.Lambda.LinkIndexUploader;
11+
12+
public class LinkReferenceProvider(IAmazonS3 s3Client, ILambdaLogger logger, string bucketName)
13+
{
14+
public async Task<LinkReference> GetLinkReference(string key, Cancel ctx)
15+
{
16+
var getObjectRequest = new GetObjectRequest
17+
{
18+
BucketName = bucketName,
19+
Key = key
20+
};
21+
logger.LogInformation("Getting object {key} from bucket {bucketName}", key, bucketName);
22+
var getObjectResponse = await s3Client.GetObjectAsync(getObjectRequest, ctx);
23+
await using var stream = getObjectResponse.ResponseStream;
24+
logger.LogInformation("Successfully got object {key} from bucket {bucketName}", key, bucketName);
25+
return LinkReference.Deserialize(stream);
26+
}
27+
}

src/infra/docs-lambda-index-publisher/Program.cs

Lines changed: 74 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -2,123 +2,102 @@
22
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information
44

5-
using System.Diagnostics;
6-
using System.Text;
5+
using System.Collections.Concurrent;
76
using Amazon.Lambda.Core;
87
using Amazon.Lambda.RuntimeSupport;
8+
using Amazon.Lambda.Serialization.SystemTextJson;
9+
using Amazon.Lambda.SQSEvents;
910
using Amazon.S3;
10-
using Amazon.S3.Model;
11+
using Amazon.S3.Util;
12+
using Elastic.Documentation.Lambda.LinkIndexUploader;
1113
using Elastic.Markdown.IO.State;
1214
using Elastic.Markdown.Links.CrossLinks;
1315

1416
const string bucketName = "elastic-docs-link-index";
17+
const string indexFile = "link-index.json";
1518

16-
await LambdaBootstrapBuilder.Create(Handler)
17-
.Build()
19+
await LambdaBootstrapBuilder.Create<SQSEvent, SQSBatchResponse>(Handler, new SourceGeneratorLambdaJsonSerializer<SerializerContext>())
20+
.Build()
1821
.RunAsync();
1922

20-
// Uncomment to test locally without uploading
21-
// await CreateLinkIndex(new AmazonS3Client());
23+
return;
2224

23-
#pragma warning disable CS8321 // Local function is declared but never used
24-
static async Task<string> Handler(ILambdaContext context)
25-
#pragma warning restore CS8321 // Local function is declared but never used
25+
// The SQS queue is configured to trigger when elastic/*/*/links.json files are created or updated.
26+
static async Task<SQSBatchResponse> Handler(SQSEvent ev, ILambdaContext context)
2627
{
27-
var sw = Stopwatch.StartNew();
28-
29-
IAmazonS3 s3Client = new AmazonS3Client();
30-
var linkIndex = await CreateLinkIndex(s3Client);
31-
if (linkIndex == null)
32-
return $"Error encountered on server. getting list of objects.";
33-
34-
var json = LinkIndex.Serialize(linkIndex);
35-
36-
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(json));
37-
await s3Client.UploadObjectFromStreamAsync(bucketName, "link-index.json", stream, new Dictionary<string, object>(), CancellationToken.None);
38-
return $"Finished in {sw}";
39-
}
40-
41-
42-
static async Task<LinkIndex?> CreateLinkIndex(IAmazonS3 s3Client)
43-
{
44-
var request = new ListObjectsV2Request
45-
{
46-
BucketName = bucketName,
47-
MaxKeys = 1000 //default
48-
};
49-
50-
var linkIndex = new LinkIndex
51-
{
52-
Repositories = []
53-
};
54-
try
28+
var s3Client = new AmazonS3Client();
29+
var linkIndexProvider = new LinkIndexProvider(s3Client, context.Logger, bucketName, indexFile);
30+
var batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>();
31+
foreach (var message in ev.Records)
5532
{
56-
ListObjectsV2Response response;
57-
do
33+
try
5834
{
59-
response = await s3Client.ListObjectsV2Async(request, CancellationToken.None);
60-
await Parallel.ForEachAsync(response.S3Objects, async (obj, ctx) =>
35+
var s3RecordLinkReferenceTuples = await GetS3RecordLinkReferenceTuples(s3Client, message, context);
36+
foreach (var (s3Record, linkReference) in s3RecordLinkReferenceTuples)
6137
{
62-
if (!obj.Key.StartsWith("elastic/", StringComparison.OrdinalIgnoreCase))
63-
return;
64-
65-
var tokens = obj.Key.Split('/');
66-
if (tokens.Length < 3)
67-
return;
68-
69-
// TODO create a dedicated state file for git configuration
70-
// Deserializing all of the links metadata adds significant overhead
71-
var gitReference = await ReadLinkReferenceSha(s3Client, obj);
72-
73-
var repository = tokens[1];
74-
var branch = tokens[2];
75-
76-
var entry = new LinkIndexEntry
77-
{
78-
Repository = repository,
79-
Branch = branch,
80-
ETag = obj.ETag.Trim('"'),
81-
Path = obj.Key,
82-
GitReference = gitReference
83-
};
84-
if (linkIndex.Repositories.TryGetValue(repository, out var existingEntry))
85-
existingEntry[branch] = entry;
86-
else
87-
{
88-
linkIndex.Repositories.Add(repository, new Dictionary<string, LinkIndexEntry>
89-
{
90-
{ branch, entry }
91-
});
92-
}
38+
var newEntry = ConvertToLinkIndexEntry(s3Record, linkReference);
39+
await linkIndexProvider.UpdateLinkIndexEntry(newEntry);
40+
}
41+
}
42+
catch (Exception e)
43+
{
44+
// Add failed message identifier to the batchItemFailures list
45+
context.Logger.LogWarning(e, "Failed to process message {MessageId}", message.MessageId);
46+
batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure
47+
{
48+
ItemIdentifier = message.MessageId
9349
});
94-
95-
// If the response is truncated, set the request ContinuationToken
96-
// from the NextContinuationToken property of the response.
97-
request.ContinuationToken = response.NextContinuationToken;
98-
} while (response.IsTruncated);
50+
}
9951
}
100-
catch
52+
try
10153
{
102-
return null;
54+
await linkIndexProvider.Save();
55+
var response = new SQSBatchResponse(batchItemFailures);
56+
if (batchItemFailures.Count > 0)
57+
context.Logger.LogInformation("Failed to process {batchItemFailuresCount} of {allMessagesCount} messages. Returning them to the queue.", batchItemFailures.Count, ev.Records.Count);
58+
return response;
59+
}
60+
catch (Exception ex)
61+
{
62+
// If we fail to update the link index, we need to return all messages to the queue
63+
// so that they can be retried later.
64+
context.Logger.LogError("Failed to update {bucketName}/{indexFile}. Returning all {recordCount} messages to the queue.", bucketName, indexFile, ev.Records.Count);
65+
context.Logger.LogError(ex, ex.Message);
66+
var response = new SQSBatchResponse(ev.Records.Select(r => new SQSBatchResponse.BatchItemFailure
67+
{
68+
ItemIdentifier = r.MessageId
69+
}).ToList());
70+
return response;
10371
}
104-
105-
return linkIndex;
10672
}
10773

108-
static async Task<string> ReadLinkReferenceSha(IAmazonS3 client, S3Object obj)
74+
static LinkIndexEntry ConvertToLinkIndexEntry(S3EventNotification.S3EventNotificationRecord record, LinkReference linkReference)
10975
{
110-
try
76+
var s3Object = record.S3.Object;
77+
var keyTokens = s3Object.Key.Split('/');
78+
var repository = keyTokens[1];
79+
var branch = keyTokens[2];
80+
return new LinkIndexEntry
11181
{
112-
var contents = await client.GetObjectAsync(obj.Key, obj.Key, CancellationToken.None);
113-
await using var s = contents.ResponseStream;
114-
var linkReference = LinkReference.Deserialize(s);
115-
return linkReference.Origin.Ref;
116-
}
117-
catch (Exception e)
82+
Repository = repository,
83+
Branch = branch,
84+
ETag = s3Object.ETag,
85+
Path = s3Object.Key,
86+
UpdatedAt = record.EventTime,
87+
GitReference = linkReference.Origin.Ref
88+
};
89+
}
90+
91+
static async Task<IReadOnlyCollection<(S3EventNotification.S3EventNotificationRecord, LinkReference)>> GetS3RecordLinkReferenceTuples(IAmazonS3 s3Client,
92+
SQSEvent.SQSMessage message, ILambdaContext context)
93+
{
94+
var s3Event = S3EventNotification.ParseJson(message.Body);
95+
var recordLinkReferenceTuples = new ConcurrentBag<(S3EventNotification.S3EventNotificationRecord, LinkReference)>();
96+
var linkReferenceProvider = new LinkReferenceProvider(s3Client, context.Logger, bucketName);
97+
await Parallel.ForEachAsync(s3Event.Records, async (record, ctx) =>
11898
{
119-
Console.WriteLine(e);
120-
// it's important we don't fail here we need to fallback gracefully from this so we can fix the root cause
121-
// of why a repository is not reporting its git reference properly
122-
return "unknown";
123-
}
99+
var linkReference = await linkReferenceProvider.GetLinkReference(record.S3.Object.Key, ctx);
100+
recordLinkReferenceTuples.Add((record, linkReference));
101+
});
102+
return recordLinkReferenceTuples;
124103
}

0 commit comments

Comments
 (0)