Skip to content
3 changes: 2 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<PackageVersion Include="AWSSDK.Core" Version="4.0.0.2" />
<PackageVersion Include="AWSSDK.SQS" Version="4.0.0.1" />
<PackageVersion Include="AWSSDK.S3" Version="4.0.0.1" />
<PackageVersion Include="Moq" Version="4.20.72" />
</ItemGroup>
<!-- Build -->
<ItemGroup>
Expand Down Expand Up @@ -63,4 +64,4 @@
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.2" />
<PackageVersion Include="xunit.v3" Version="1.1.0" />
</ItemGroup>
</Project>
</Project>
98 changes: 98 additions & 0 deletions src/tooling/docs-assembler/Cli/DeployCommands.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Diagnostics.CodeAnalysis;
using System.IO.Abstractions;
using System.Text.Json;
using Actions.Core.Services;
using Amazon.S3;
using Amazon.S3.Transfer;
using ConsoleAppFramework;
using Documentation.Assembler.Deploying;
using Elastic.Documentation.Tooling.Diagnostics.Console;
using Microsoft.Extensions.Logging;

namespace Documentation.Assembler.Cli;

internal sealed class DeployCommands(ILoggerFactory logger, ICoreService githubActionsService)
{
[SuppressMessage("Usage", "CA2254:Template should be a static expression")]
private void AssignOutputLogger()
{
var log = logger.CreateLogger<Program>();
ConsoleApp.Log = msg => log.LogInformation(msg);
ConsoleApp.LogError = msg => log.LogError(msg);
}

/// <summary> Creates a sync plan </summary>
/// <param name="environment"> The environment to build</param>
/// <param name="s3BucketName">The S3 bucket name to deploy to</param>
/// <param name="out"> The file to write the plan to</param>
/// <param name="ctx"></param>
public async Task<int> Plan(
string environment, string s3BucketName, string @out = "", Cancel ctx = default)
{
AssignOutputLogger();
await using var collector = new ConsoleDiagnosticsCollector(logger, githubActionsService)
{
NoHints = true
}.StartAsync(ctx);
var assembleContext = new AssembleContext(environment, collector, new FileSystem(), new FileSystem(), null, null);
var s3Client = new AmazonS3Client();
IDocsSyncPlanStrategy planner = new AwsS3SyncPlanStrategy(s3Client, s3BucketName, assembleContext, logger);
var plan = await planner.Plan(ctx);
ConsoleApp.Log("Total files to sync: " + plan.Count);
ConsoleApp.Log("Total files to delete: " + plan.DeleteRequests.Count);
ConsoleApp.Log("Total files to add: " + plan.AddRequests.Count);
ConsoleApp.Log("Total files to update: " + plan.UpdateRequests.Count);
ConsoleApp.Log("Total files to skip: " + plan.SkipRequests.Count);
if (!string.IsNullOrEmpty(@out))
{
var output = SyncPlan.Serialize(plan);
await using var fileStream = new FileStream(@out, FileMode.Create, FileAccess.Write);
await using var writer = new StreamWriter(fileStream);
await writer.WriteAsync(output);
ConsoleApp.Log("Plan written to " + @out);
}
await collector.StopAsync(ctx);
return collector.Errors;
}

/// <summary> Applies a sync plan </summary>
/// <param name="environment"> The environment to build</param>
/// <param name="s3BucketName">The S3 bucket name to deploy to</param>
/// <param name="planFile">The path to the plan file to apply</param>
/// <param name="ctx"></param>
public async Task<int> Apply(
string environment,
string s3BucketName,
string planFile,
Cancel ctx = default)
{
AssignOutputLogger();
await using var collector = new ConsoleDiagnosticsCollector(logger, githubActionsService)
{
NoHints = true
}.StartAsync(ctx);
var assembleContext = new AssembleContext(environment, collector, new FileSystem(), new FileSystem(), null, null);
var s3Client = new AmazonS3Client();
var transferUtility = new TransferUtility(s3Client, new TransferUtilityConfig
{
ConcurrentServiceRequests = Environment.ProcessorCount * 2,
MinSizeBeforePartUpload = AwsS3SyncPlanStrategy.PartSize
});
IDocsSyncApplyStrategy applier = new AwsS3SyncApplyStrategy(s3Client, transferUtility, s3BucketName, assembleContext, logger, collector);
if (!File.Exists(planFile))
{
collector.EmitError(planFile, "Plan file does not exist.");
await collector.StopAsync(ctx);
return collector.Errors;
}
var planJson = await File.ReadAllTextAsync(planFile, ctx);
var plan = SyncPlan.Deserialize(planJson);
await applier.Apply(plan, ctx);
await collector.StopAsync(ctx);
return collector.Errors;
}
}
117 changes: 117 additions & 0 deletions src/tooling/docs-assembler/Deploying/AwsS3SyncApplyStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Amazon.S3;
using Amazon.S3.Model;
using Amazon.S3.Transfer;
using Elastic.Documentation.Diagnostics;
using Microsoft.Extensions.Logging;

namespace Documentation.Assembler.Deploying;

public class AwsS3SyncApplyStrategy(
IAmazonS3 s3Client,
ITransferUtility transferUtility,
string bucketName,
AssembleContext context,
ILoggerFactory loggerFactory,
DiagnosticsCollector collector) : IDocsSyncApplyStrategy
{
private readonly ILogger<AwsS3SyncApplyStrategy> _logger = loggerFactory.CreateLogger<AwsS3SyncApplyStrategy>();

private void DisplayProgress(object? sender, UploadDirectoryProgressArgs args) => LogProgress(_logger, args, null);

private static readonly Action<ILogger, UploadDirectoryProgressArgs, Exception?> LogProgress = LoggerMessage.Define<UploadDirectoryProgressArgs>(
LogLevel.Information,
new EventId(2, nameof(LogProgress)),
"{Args}");

public async Task Apply(SyncPlan plan, Cancel ctx = default)
{
var deleteCount = 0;
_logger.LogInformation("Processor count: {ProcessorCount}", Environment.ProcessorCount);

// Process uploads using directory upload
var uploadRequests = plan.AddRequests.Cast<UploadRequest>().Concat(plan.UpdateRequests).ToList();
if (uploadRequests.Count > 0)
{
_logger.LogInformation("Starting to process {Count} uploads using directory upload", uploadRequests.Count);

// Create a temporary directory for uploads
var tempDir = Path.Combine(context.WriteFileSystem.Path.GetTempPath(), context.WriteFileSystem.Path.GetRandomFileName());
_ = context.WriteFileSystem.Directory.CreateDirectory(tempDir);

try
{
_logger.LogInformation("Copying {Count} files to temp directory", uploadRequests.Count);

// Copy files to temp directory maintaining structure
foreach (var upload in uploadRequests)
{
var destPath = context.WriteFileSystem.Path.Combine(tempDir, upload.DestinationPath);
var destDirPath = context.WriteFileSystem.Path.GetDirectoryName(destPath)!;
_ = context.WriteFileSystem.Directory.CreateDirectory(destDirPath);
context.WriteFileSystem.File.Copy(upload.LocalPath, destPath);
}

// Configure directory upload
var directoryRequest = new TransferUtilityUploadDirectoryRequest
{
BucketName = bucketName,
Directory = tempDir,
SearchPattern = "*",
SearchOption = SearchOption.AllDirectories,
UploadFilesConcurrently = true
};

// Track upload progress
directoryRequest.UploadDirectoryProgressEvent += DisplayProgress;

_logger.LogInformation("Uploading {Count} files to S3", uploadRequests.Count);
_logger.LogDebug("Starting directory upload from {TempDir}", tempDir);

// Start the upload
await transferUtility.UploadDirectoryAsync(directoryRequest, ctx);

_logger.LogDebug("Directory upload completed");
}
finally
{
// Clean up temp directory
if (context.WriteFileSystem.Directory.Exists(tempDir))
context.WriteFileSystem.Directory.Delete(tempDir, true);
}
}

var deleteRequests = plan.DeleteRequests.ToList();
if (deleteRequests.Count > 0)
{
// Process deletes in batches of 1000 (AWS S3 limit)
foreach (var batch in deleteRequests.Chunk(1000))
{
var deleteObjectsRequest = new DeleteObjectsRequest
{
BucketName = bucketName,
Objects = batch.Select(d => new KeyVersion
{
Key = d.DestinationPath
}).ToList()
};

var response = await s3Client.DeleteObjectsAsync(deleteObjectsRequest, ctx);
if (response.HttpStatusCode != System.Net.HttpStatusCode.OK)
{
foreach (var error in response.DeleteErrors)
collector.EmitError(error.Key, $"Failed to delete: {error.Message}");
}
else
{
var newCount = Interlocked.Add(ref deleteCount, batch.Length);
_logger.LogInformation("Deleted {Count} objects ({DeleteCount}/{TotalDeleteCount})",
batch.Length, newCount, deleteRequests.Count);
}
}
}
}
}
158 changes: 158 additions & 0 deletions src/tooling/docs-assembler/Deploying/AwsS3SyncPlanStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.Security.Cryptography;
using Amazon.S3;
using Amazon.S3.Model;
using Microsoft.Extensions.Logging;

namespace Documentation.Assembler.Deploying;

public class AwsS3SyncPlanStrategy(IAmazonS3 s3Client, string bucketName, AssembleContext context, ILoggerFactory loggerFactory) : IDocsSyncPlanStrategy
{
internal const long PartSize = 5 * 1024 * 1024; // 5MB
private readonly ILogger<AwsS3SyncPlanStrategy> _logger = loggerFactory.CreateLogger<AwsS3SyncPlanStrategy>();
private static readonly ConcurrentDictionary<string, string> EtagCache = new();

public async Task<SyncPlan> Plan(Cancel ctx = default)
{
var remoteObjects = await ListObjects(ctx);
var localObjects = context.OutputDirectory.GetFiles("*", SearchOption.AllDirectories);
var deleteRequests = new ConcurrentBag<DeleteRequest>();
var addRequests = new ConcurrentBag<AddRequest>();
var updateRequests = new ConcurrentBag<UpdateRequest>();
var skipRequests = new ConcurrentBag<SkipRequest>();

await Parallel.ForEachAsync(localObjects, ctx, async (localFile, token) =>
{
var relativePath = Path.GetRelativePath(context.OutputDirectory.FullName, localFile.FullName);

_logger.LogInformation("Checking {Path}", relativePath);

if (remoteObjects.TryGetValue(relativePath, out var remoteObject))
{
// Check if the ETag differs for updates
var localETag = await CalculateS3ETag(localFile.FullName, token);
var remoteETag = remoteObject.ETag.Trim('"'); // Remove quotes from remote ETag
if (localETag == remoteETag)
{
var skipRequest = new SkipRequest
{
LocalPath = localFile.FullName,
DestinationPath = remoteObject.Key
};
skipRequests.Add(skipRequest);
}
else
{
var updateRequest = new UpdateRequest()
{
LocalPath = localFile.FullName,
DestinationPath = remoteObject.Key
};
updateRequests.Add(updateRequest);
}
}
else
{
var addRequest = new AddRequest
{
LocalPath = localFile.FullName,
DestinationPath = relativePath
};
addRequests.Add(addRequest);
}
});

// Find deletions (files in S3 but not locally)
foreach (var remoteObject in remoteObjects)
{
var localPath = Path.Combine(context.OutputDirectory.FullName, remoteObject.Key);
if (context.ReadFileSystem.File.Exists(localPath))
continue;
var deleteRequest = new DeleteRequest
{
DestinationPath = remoteObject.Key
};
deleteRequests.Add(deleteRequest);
}

return new SyncPlan
{
DeleteRequests = deleteRequests.ToList(),
AddRequests = addRequests.ToList(),
UpdateRequests = updateRequests.ToList(),
SkipRequests = skipRequests.ToList(),
Count = deleteRequests.Count + addRequests.Count + updateRequests.Count + skipRequests.Count
};
}

private async Task<Dictionary<string, S3Object>> ListObjects(Cancel ctx = default)
{
var listBucketRequest = new ListObjectsV2Request
{
BucketName = bucketName,
MaxKeys = 1000,
};
var objects = new List<S3Object>();
ListObjectsV2Response response;
do
{
response = await s3Client.ListObjectsV2Async(listBucketRequest, ctx);
objects.AddRange(response.S3Objects);
listBucketRequest.ContinuationToken = response?.NextContinuationToken;
} while (response?.IsTruncated == true);

return objects.ToDictionary(o => o.Key);
}

[SuppressMessage("Security", "CA5351:Do Not Use Broken Cryptographic Algorithms")]
private async Task<string> CalculateS3ETag(string filePath, Cancel ctx = default)
{
if (EtagCache.TryGetValue(filePath, out var cachedEtag))
{
_logger.LogDebug("Using cached ETag for {Path}", filePath);
return cachedEtag;
}

var fileInfo = context.ReadFileSystem.FileInfo.New(filePath);
var fileSize = fileInfo.Length;

// For files under 5MB, use simple MD5 (matching TransferUtility behavior)
if (fileSize <= PartSize)
{
await using var stream = context.ReadFileSystem.FileStream.New(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
var smallBuffer = new byte[fileSize];
var bytesRead = await stream.ReadAsync(smallBuffer.AsMemory(0, (int)fileSize), ctx);
var hash = MD5.HashData(smallBuffer.AsSpan(0, bytesRead));
var etag = Convert.ToHexStringLower(hash);
EtagCache[filePath] = etag;
return etag;
}

// For files over 5MB, use multipart format with 5MB parts (matching TransferUtility)
var parts = (int)Math.Ceiling((double)fileSize / PartSize);

await using var fileStream = context.ReadFileSystem.FileStream.New(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
var partBuffer = new byte[PartSize];
var partHashes = new List<byte[]>();

for (var i = 0; i < parts; i++)
{
var bytesRead = await fileStream.ReadAsync(partBuffer.AsMemory(0, partBuffer.Length), ctx);
var partHash = MD5.HashData(partBuffer.AsSpan(0, bytesRead));
partHashes.Add(partHash);
}

// Concatenate all part hashes
var concatenatedHashes = partHashes.SelectMany(h => h).ToArray();
var finalHash = MD5.HashData(concatenatedHashes);

var multipartEtag = $"{Convert.ToHexStringLower(finalHash)}-{parts}";
EtagCache[filePath] = multipartEtag;
return multipartEtag;
}
}
Loading
Loading