diff --git a/src/Arius.Cli.Tests/ArchiveCliCommandTests.cs b/src/Arius.Cli.Tests/ArchiveCliCommandTests.cs index d6b5f133..f862e841 100644 --- a/src/Arius.Cli.Tests/ArchiveCliCommandTests.cs +++ b/src/Arius.Cli.Tests/ArchiveCliCommandTests.cs @@ -4,7 +4,6 @@ using Mediator; using NSubstitute; using Shouldly; -using System.Threading.Tasks; namespace Arius.Cli.Tests; diff --git a/src/Arius.Cli.Tests/RestoreCliCommandTests.cs b/src/Arius.Cli.Tests/RestoreCliCommandTests.cs index 83925a26..d05f98fd 100644 --- a/src/Arius.Cli.Tests/RestoreCliCommandTests.cs +++ b/src/Arius.Cli.Tests/RestoreCliCommandTests.cs @@ -3,7 +3,6 @@ using Mediator; using NSubstitute; using Shouldly; -using System.Threading.Tasks; namespace Arius.Cli.Tests; diff --git a/src/Arius.Cli/Program.cs b/src/Arius.Cli/Program.cs index 48bb1cca..5034f2e8 100644 --- a/src/Arius.Cli/Program.cs +++ b/src/Arius.Cli/Program.cs @@ -6,7 +6,6 @@ using Serilog; using Serilog.Core; using Serilog.Events; -using System.IO; using System.Reflection; namespace Arius.Cli; diff --git a/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerContextTests.cs b/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerContextTests.cs index caf05856..b2c0ad88 100644 --- a/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerContextTests.cs +++ b/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerContextTests.cs @@ -52,7 +52,7 @@ public async Task CreateAsync_WhenNoRemoteStateExists_ShouldCreateNewStateFile() // Verify a new state file was created (with current timestamp format) var stateFiles = stateCache.GetStateFileEntries().ToArray(); stateFiles.Length.ShouldBe(1); - stateFiles[0].Name.ShouldMatch(@"\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}\.db"); + stateFiles[0].Name.ShouldMatch(@"\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}\-\d{3}\.db"); // Verify no download was attempted since no remote state exists await mockArchiveStorage.DidNotReceive().DownloadStateAsync(Arg.Any(), Arg.Any(), Arg.Any()); @@ -101,7 +101,7 @@ await mockArchiveStorage.Received(1).DownloadStateAsync( var newStateFile = stateFiles.FirstOrDefault(f => f.Name != $"{existingStateName}.db"); newStateFile.ShouldNotBeNull("a new state file should be created"); - newStateFile.Name.ShouldMatch(@"\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}\.db"); + newStateFile.Name.ShouldMatch(@"\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}\-\d{3}\.db"); } [Fact] @@ -139,6 +139,6 @@ public async Task CreateAsync_WhenRemoteStateExistsAndIsPresentLocally_ShouldNot var newStateFile = stateFiles.FirstOrDefault(f => f.Name != $"{existingStateName}.db"); newStateFile.ShouldNotBeNull("a new state file should be created"); - newStateFile.Name.ShouldMatch(@"\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}\.db"); + newStateFile.Name.ShouldMatch(@"\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}\-\d{3}\.db"); } } \ No newline at end of file diff --git a/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerHandleTests.cs b/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerHandleTests.cs new file mode 100644 index 00000000..e5173eaa --- /dev/null +++ b/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerHandleTests.cs @@ -0,0 +1,1083 @@ +using Arius.Core.Features.Commands.Archive; +using Arius.Core.Shared.FileSystem; +using Arius.Core.Shared.Hashing; +using Arius.Core.Shared.StateRepositories; +using Arius.Core.Shared.Storage; +using Arius.Core.Tests.Helpers.Builders; +using Arius.Core.Tests.Helpers.FakeLogger; +using Arius.Core.Tests.Helpers.Fakes; +using Arius.Core.Tests.Helpers.Fixtures; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Logging.Testing; +using NSubstitute; +using Shouldly; +using Zio; + +namespace Arius.Core.Tests.Features.Commands.Archive; + +public class ArchiveCommandHandlerHandleTests +{ + private readonly FixtureWithFileSystem fixture; + + public ArchiveCommandHandlerHandleTests() + { + this.fixture = new(); + } + + private ArchiveCommandHandler CreateHandler() => new(new FakeLogger(), NullLoggerFactory.Instance, fixture.AriusConfiguration); + + private const int DefaultSmallFileBoundary = 1024; + + private static string ToRelativePointerPath(UPath binaryPath) => binaryPath.GetPointerFilePath().ToString(); + + private static string ToAbsolutePointerPath(FixtureWithFileSystem fixture, UPath binaryPath) => Path.Combine(fixture.TestRunSourceFolder.FullName, binaryPath.GetPointerFilePath().ToString().TrimStart('/')); + + private static (int FileCount, int ExistingPointerCount) GetInitialFileStatistics(HandlerContext handlerContext) + { + var entries = handlerContext.FileSystem + .EnumerateFileEntries(UPath.Root, "*", SearchOption.AllDirectories) + .Select(FilePair.FromBinaryFileFileEntry) + .ToList(); + + var existingPointerFileCount = entries.Count(fp => fp.PointerFile.Exists); + + return (entries.Count, existingPointerFileCount); + } + + private async Task<(ArchiveCommand Command, HandlerContext Context, MockArchiveStorageBuilder StorageBuilder, FakeLoggerFactory LoggerFactory)> CreateHandlerContextAsync(Action? configureCommand = null, + MockArchiveStorageBuilder? storageBuilder = null, + FakeLoggerFactory? loggerFactory = null, + ISha256Hasher? hasher = null) + { + storageBuilder ??= new MockArchiveStorageBuilder(fixture); + loggerFactory ??= new FakeLoggerFactory(); + + var commandBuilder = new ArchiveCommandBuilder(fixture) + .WithSmallFileBoundary(DefaultSmallFileBoundary) + .WithHashingParallelism(1) + .WithUploadParallelism(1); + + configureCommand?.Invoke(commandBuilder); + + var command = commandBuilder.Build(); + var archiveStorage = storageBuilder.Build(); + + var builder = new HandlerContextBuilder(command, loggerFactory) + .WithArchiveStorage(archiveStorage); + + if (hasher != null) + builder = builder.WithHasher(hasher); + + var handlerContext = await builder.BuildAsync(); + + return (command, handlerContext, storageBuilder, loggerFactory); + } + + [Fact(Skip = "TODO")] + public void UpdatedCreationTimeOrLastWriteTimeShouldBeUpdatedInStateDatabase() + { + } + + + // --- SINGLE FILE + + [Fact] + public async Task Single_LargeFile_FirstUpload_ShouldUploadBinaryAndCreatePointer() + { + // Arrange + var binaryPath = UPath.Root / "documents" / "presentation.pptx"; + var largeFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, binaryPath) + .WithRandomContent(4096, seed: 10) + .Build(); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + var (expectedInitialFileCount, expectedExistingPointerFile) = GetInitialFileStatistics(handlerContext); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.TotalLocalFiles.ShouldBe(expectedInitialFileCount); + summary.UniqueBinariesUploaded.ShouldBe(1); + summary.UniqueChunksUploaded.ShouldBe(1); + summary.PointerFilesCreated.ShouldBe(1); + summary.PointerFileEntriesDeleted.ShouldBe(0); + summary.ExistingPointerFiles.ShouldBe(expectedExistingPointerFile); + summary.BytesUploadedUncompressed.ShouldBe(largeFile.OriginalContent.LongLength); + summary.NewStateName.ShouldNotBeNull(); + + storageBuilder.StoredChunks.Count.ShouldBe(1); + var chunk = storageBuilder.StoredChunks.Single().Value; + chunk.ContentType.ShouldBe("application/aes256cbc+gzip"); + chunk.Metadata.ShouldContainKey("OriginalContentLength"); + chunk.Metadata["OriginalContentLength"].ShouldBe(largeFile.OriginalContent.Length.ToString()); + + File.Exists(ToAbsolutePointerPath(fixture, binaryPath)).ShouldBeTrue(); + + var pointerEntry = handlerContext.StateRepository + .GetPointerFileEntry(ToRelativePointerPath(binaryPath), includeBinaryProperties: true); + pointerEntry.ShouldNotBeNull(); + pointerEntry!.Hash.ShouldBe(largeFile.OriginalHash); + pointerEntry.BinaryProperties.ShouldNotBeNull(); + pointerEntry.BinaryProperties.OriginalSize.ShouldBe(largeFile.OriginalContent.LongLength); + pointerEntry.BinaryProperties.ArchivedSize.ShouldBe(chunk.ContentLength); + pointerEntry.BinaryProperties.ParentHash.ShouldBeNull(); + + var binaryProperties = handlerContext.StateRepository.GetBinaryProperty(pointerEntry.Hash); + binaryProperties.ShouldNotBeNull(); + binaryProperties!.ArchivedSize.ShouldBe(chunk.ContentLength); + } + + [Fact] + public async Task Single_SmallFile_FirstUpload_ShouldCreateTarParentAndChildBinaryProperties() + { + // Arrange + var binaryPath = UPath.Root / "notes" / "small.txt"; + var smallFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, binaryPath) + .WithRandomContent(512, seed: 2) + .Build(); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + var (expectedInitialFileCount, expectedExistingPointerFile) = GetInitialFileStatistics(handlerContext); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.TotalLocalFiles.ShouldBe(expectedInitialFileCount); + summary.UniqueBinariesUploaded.ShouldBe(1); + summary.UniqueChunksUploaded.ShouldBe(1); + summary.PointerFilesCreated.ShouldBe(1); + summary.PointerFileEntriesDeleted.ShouldBe(0); + summary.ExistingPointerFiles.ShouldBe(expectedExistingPointerFile); + summary.BytesUploadedUncompressed.ShouldBe(smallFile.OriginalContent.LongLength); + summary.NewStateName.ShouldNotBeNull(); + + storageBuilder.StoredChunks.Count.ShouldBe(1); + var tarChunk = storageBuilder.StoredChunks.Single().Value; + tarChunk.ContentType.ShouldBe("application/aes256cbc+tar+gzip"); + tarChunk.Metadata.ShouldContainKey("OriginalContentLength"); + tarChunk.Metadata.ShouldContainKey("SmallChunkCount"); + tarChunk.Metadata["OriginalContentLength"].ShouldBe(tarChunk.ContentLength.ToString()); + tarChunk.Metadata["SmallChunkCount"].ShouldBe("1"); + + File.Exists(ToAbsolutePointerPath(fixture, binaryPath)).ShouldBeTrue(); + + var pointerEntry = handlerContext.StateRepository + .GetPointerFileEntry(ToRelativePointerPath(binaryPath), includeBinaryProperties: true); + pointerEntry.ShouldNotBeNull(); + pointerEntry!.BinaryProperties.ShouldNotBeNull(); + pointerEntry.BinaryProperties.Hash.ShouldBe(smallFile.OriginalHash); + pointerEntry.BinaryProperties.ParentHash.ShouldNotBeNull(); + pointerEntry.BinaryProperties.OriginalSize.ShouldBe(smallFile.OriginalContent.LongLength); + pointerEntry.BinaryProperties.ArchivedSize.ShouldBeGreaterThan(0L); + + var parentHash = pointerEntry.BinaryProperties.ParentHash!; + var parentProperties = handlerContext.StateRepository.GetBinaryProperty(parentHash); + parentProperties.ShouldNotBeNull(); + parentProperties!.OriginalSize.ShouldBe(smallFile.OriginalContent.LongLength); + parentProperties.ArchivedSize.ShouldBe(tarChunk.ContentLength); + tarChunk.Metadata["OriginalContentLength"].ShouldBe(parentProperties.ArchivedSize.ToString()); + + handlerContext.StateRepository.GetBinaryProperty(pointerEntry.Hash).ShouldNotBeNull(); + } + + [Fact] + public async Task Single_EmptyFile_ShouldUploadZeroLengthBinary() + { + // Arrange + var binaryPath = UPath.Root / "empty" / "file.bin"; + var emptyFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, binaryPath) + .Build(); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + var (expectedInitialFileCount, expectedExistingPointerFile) = GetInitialFileStatistics(handlerContext); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.TotalLocalFiles.ShouldBe(expectedInitialFileCount); + summary.UniqueBinariesUploaded.ShouldBe(1); + summary.UniqueChunksUploaded.ShouldBe(1); + summary.PointerFilesCreated.ShouldBe(1); + summary.PointerFileEntriesDeleted.ShouldBe(0); + summary.ExistingPointerFiles.ShouldBe(expectedExistingPointerFile); + summary.BytesUploadedUncompressed.ShouldBe(0); + summary.NewStateName.ShouldNotBeNull(); + + storageBuilder.StoredChunks.Count.ShouldBe(1); + var tarChunk = storageBuilder.StoredChunks.Single().Value; + tarChunk.ContentType.ShouldBe("application/aes256cbc+tar+gzip"); + tarChunk.Metadata.ShouldContainKey("SmallChunkCount"); + tarChunk.Metadata["SmallChunkCount"].ShouldBe("1"); + + File.Exists(ToAbsolutePointerPath(fixture, binaryPath)).ShouldBeTrue(); + + var pointerEntry = handlerContext.StateRepository + .GetPointerFileEntry(ToRelativePointerPath(binaryPath), includeBinaryProperties: true); + pointerEntry.ShouldNotBeNull(); + pointerEntry!.BinaryProperties.ShouldNotBeNull(); + pointerEntry.BinaryProperties.OriginalSize.ShouldBe(0); + pointerEntry.BinaryProperties.ArchivedSize.ShouldBeGreaterThanOrEqualTo(0L); + + var parentHash = pointerEntry.BinaryProperties.ParentHash!; + handlerContext.StateRepository.GetBinaryProperty(parentHash).ShouldNotBeNull(); + } + + [Fact] + public async Task Single_BinaryWithExistingPointer_ShouldOverwritePointerAndTrackExistingCount() + { + // Arrange + var binaryPath = UPath.Root / "existing" / "document.pdf"; + var binaryWithPointer = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, binaryPath) + .WithRandomContent(2048, seed: 5) + .Build(); + + var staleHash = FakeHashBuilder.GenerateValidHash(42); + staleHash.ShouldNotBe(binaryWithPointer.OriginalHash); + var stalePointer = binaryWithPointer.FilePair.CreatePointerFile(staleHash); + stalePointer.ReadHash().ShouldBe(staleHash); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + var (expectedInitialFileCount, expectedExistingPointerFile) = GetInitialFileStatistics(handlerContext); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.TotalLocalFiles.ShouldBe(expectedInitialFileCount); + summary.UniqueBinariesUploaded.ShouldBe(1); + summary.UniqueChunksUploaded.ShouldBe(1); + summary.PointerFilesCreated.ShouldBe(0); + summary.ExistingPointerFiles.ShouldBe(expectedExistingPointerFile); + summary.PointerFileEntriesDeleted.ShouldBe(0); + summary.BytesUploadedUncompressed.ShouldBe(binaryWithPointer.OriginalContent.LongLength); + summary.NewStateName.ShouldNotBeNull(); + + storageBuilder.StoredChunks.Count.ShouldBe(1); + + var pointerPath = ToAbsolutePointerPath(fixture, binaryPath); + File.Exists(pointerPath).ShouldBeTrue(); + + var updatedHash = binaryWithPointer.FilePair.PointerFile.ReadHash(); + updatedHash.ShouldBe(binaryWithPointer.OriginalHash); + + var pointerEntry = handlerContext.StateRepository + .GetPointerFileEntry(ToRelativePointerPath(binaryPath), includeBinaryProperties: true); + pointerEntry.ShouldNotBeNull(); + pointerEntry!.Hash.ShouldBe(binaryWithPointer.OriginalHash); + + var binaryProperties = handlerContext.StateRepository.GetBinaryProperty(pointerEntry.Hash); + binaryProperties.ShouldNotBeNull(); + binaryProperties!.ArchivedSize.ShouldBeGreaterThan(0L); + } + + + [Fact] + public async Task Single_LatentPointer_ShouldLogWarning() + { + // Arrange + var latentFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.PointerFileOnly, UPath.Root / "latent.txt") + .WithRandomContent(512, seed: 1) + .Build(); + + latentFile.FilePair.BinaryFile.Exists.ShouldBeFalse(); + latentFile.FilePair.PointerFile.Exists.ShouldBeTrue(); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.FilesSkipped.ShouldBe(1); + summary.Warnings.ShouldContain("File '/latent.txt' is a pointer file without an associated binary, skipping"); + + handlerContext.StateRepository.HasChanges.ShouldBeFalse(); + handlerContext.StateRepository.StateDatabaseFile.Exists.ShouldBeFalse(); + } + + + // --- MULTIPLE FILES + + [Fact] + public async Task Multiple_AllUnique_MixedSizes_ShouldUploadLargeAndSmallBatches() + { + // Arrange + var smallFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "small.txt") + .WithRandomContent(512, seed: 1) + .Build(); + + var largeFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "large.bin") + .WithRandomContent(4096, seed: 2) + .Build(); + + var progressUpdates = new List(); + var progressReporter = new Progress(progressUpdates.Add); + + var (command, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(builder => builder + .WithProgressReporter(progressReporter) + .WithHashingParallelism(1) + .WithUploadParallelism(1) + .WithSmallFileBoundary(DefaultSmallFileBoundary)); + + var (expectedInitialFileCount, _) = GetInitialFileStatistics(handlerContext); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.TotalLocalFiles.ShouldBe(expectedInitialFileCount); + summary.UniqueBinariesUploaded.ShouldBe(2); + summary.UniqueChunksUploaded.ShouldBe(2); + summary.PointerFilesCreated.ShouldBe(2); + summary.PointerFileEntriesDeleted.ShouldBe(0); + summary.ExistingPointerFiles.ShouldBe(0); + summary.BytesUploadedUncompressed.ShouldBe(smallFile.OriginalContent.Length + largeFile.OriginalContent.Length); + summary.NewStateName.ShouldNotBeNull(); + + storageBuilder.StoredChunks.Count.ShouldBe(2); + storageBuilder.UploadedStates.ShouldContain(summary.NewStateName!); + + var tarChunk = storageBuilder.StoredChunks.Values.Single(c => c.ContentType == "application/aes256cbc+tar+gzip"); + tarChunk.Metadata.ShouldContainKey("OriginalContentLength"); + tarChunk.Metadata.ShouldContainKey("SmallChunkCount"); + tarChunk.Metadata["SmallChunkCount"].ShouldBe("1"); + + var largeChunk = storageBuilder.StoredChunks.Values.Single(c => c.ContentType == "application/aes256cbc+gzip"); + largeChunk.Metadata.ShouldContainKey("OriginalContentLength"); + largeChunk.Metadata["OriginalContentLength"].ShouldBe(largeFile.OriginalContent.Length.ToString()); + + var smallPointerPath = Path.Combine(fixture.TestRunSourceFolder.FullName, "small.txt.pointer.arius"); + File.Exists(smallPointerPath).ShouldBeTrue(); + + var largePointerPath = Path.Combine(fixture.TestRunSourceFolder.FullName, "large.bin.pointer.arius"); + File.Exists(largePointerPath).ShouldBeTrue(); + + handlerContext.StateRepository.GetPointerFileEntry("/small.txt.pointer.arius", includeBinaryProperties: true) + .ShouldNotBeNull(); + handlerContext.StateRepository.GetPointerFileEntry("/large.bin.pointer.arius", includeBinaryProperties: true) + .ShouldNotBeNull(); + } + + [Fact] + public async Task Multiple_WithDuplicates_InSameRun_ShouldUploadBinaryOnceAndCreateMultiplePointers() + { + // Arrange + var originalLargeFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "shared.bin") + .WithRandomContent(4096, seed: 42) + .Build(); + + _ = new FakeFileBuilder(fixture) + .WithDuplicate(originalLargeFile, UPath.Root / "duplicates" / "shared-copy.bin") + .Build(); + + var originalSmallFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "texts" / "note.txt") + .WithRandomContent(320, seed: 7) + .Build(); + + _ = new FakeFileBuilder(fixture) + .WithDuplicate(originalSmallFile, UPath.Root / "texts" / "archive" / "note-copy.txt") + .Build(); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + var (expectedInitialFileCount, _) = GetInitialFileStatistics(handlerContext); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.TotalLocalFiles.ShouldBe(expectedInitialFileCount); + summary.UniqueBinariesUploaded.ShouldBe(2); // large owner + small owner + summary.UniqueChunksUploaded.ShouldBe(2); // large chunk + TAR + summary.PointerFilesCreated.ShouldBe(4); + summary.BytesUploadedUncompressed.ShouldBe( + originalLargeFile.OriginalContent.Length + originalSmallFile.OriginalContent.Length); + + storageBuilder.StoredChunks.Count.ShouldBe(2); + storageBuilder.StoredChunks.Values.Count(c => c.ContentType == "application/aes256cbc+gzip").ShouldBe(1); + storageBuilder.StoredChunks.Values.Count(c => c.ContentType == "application/aes256cbc+tar+gzip").ShouldBe(1); + + var largeChunk = storageBuilder.StoredChunks.Values.Single(c => c.ContentType == "application/aes256cbc+gzip"); + largeChunk.Metadata.ShouldContainKey("OriginalContentLength"); + largeChunk.Metadata["OriginalContentLength"].ShouldBe(originalLargeFile.OriginalContent.Length.ToString()); + + var tarChunk = storageBuilder.StoredChunks.Values.Single(c => c.ContentType == "application/aes256cbc+tar+gzip"); + tarChunk.Metadata.ShouldContainKey("SmallChunkCount"); + tarChunk.Metadata["SmallChunkCount"].ShouldBe("1"); + + File.Exists(Path.Combine(fixture.TestRunSourceFolder.FullName, "shared.bin.pointer.arius")).ShouldBeTrue(); + File.Exists(Path.Combine(fixture.TestRunSourceFolder.FullName, "duplicates", "shared-copy.bin.pointer.arius")).ShouldBeTrue(); + File.Exists(Path.Combine(fixture.TestRunSourceFolder.FullName, "texts", "note.txt.pointer.arius")).ShouldBeTrue(); + File.Exists(Path.Combine(fixture.TestRunSourceFolder.FullName, "texts", "archive", "note-copy.txt.pointer.arius")).ShouldBeTrue(); + + handlerContext.StateRepository.GetPointerFileEntry("/shared.bin.pointer.arius", includeBinaryProperties: true) + .ShouldNotBeNull(); + handlerContext.StateRepository.GetPointerFileEntry("/duplicates/shared-copy.bin.pointer.arius", includeBinaryProperties: true) + .ShouldNotBeNull(); + handlerContext.StateRepository.GetPointerFileEntry("/texts/note.txt.pointer.arius", includeBinaryProperties: true) + .ShouldNotBeNull(); + handlerContext.StateRepository.GetPointerFileEntry("/texts/archive/note-copy.txt.pointer.arius", includeBinaryProperties: true) + .ShouldNotBeNull(); + } + + [Fact] + public async Task Multiple_SmallFiles_SingleTarBatch_ShouldUploadSingleParentChunk() + { + // Arrange + var paths = new[] + { + UPath.Root / "tar" / "alpha.txt", + UPath.Root / "tar" / "beta.txt", + UPath.Root / "tar" / "gamma.txt" + }; + + var smallFiles = paths + .Select((path, index) => new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, path) + .WithRandomContent(256 + index * 10, seed: 100 + index) + .Build()) + .ToArray(); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + var (expectedInitialFileCount, _) = GetInitialFileStatistics(handlerContext); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.TotalLocalFiles.ShouldBe(expectedInitialFileCount); + summary.UniqueBinariesUploaded.ShouldBe(paths.Length); + summary.UniqueChunksUploaded.ShouldBe(1); + summary.PointerFilesCreated.ShouldBe(paths.Length); + summary.BytesUploadedUncompressed.ShouldBe(smallFiles.Sum(f => f.OriginalContent.Length)); + summary.PointerFileEntriesDeleted.ShouldBe(0); + + storageBuilder.StoredChunks.Count.ShouldBe(1); + var tarChunk = storageBuilder.StoredChunks.Single().Value; + tarChunk.ContentType.ShouldBe("application/aes256cbc+tar+gzip"); + tarChunk.Metadata.ShouldContainKey("SmallChunkCount"); + tarChunk.Metadata["SmallChunkCount"].ShouldBe(paths.Length.ToString()); + + foreach (var path in paths) + { + var pointerPath = ToAbsolutePointerPath(fixture, path); + File.Exists(pointerPath).ShouldBeTrue(); + handlerContext.StateRepository.GetPointerFileEntry(ToRelativePointerPath(path), includeBinaryProperties: true) + .ShouldNotBeNull(); + } + } + + [Fact] + public async Task Multiple_SmallFiles_MultipleTarBatches_ShouldFlushWhenBoundaryExceeded() + { + // We will hit the UploadSmallFileAsync / OWNER path with this test + + // Arrange + var paths = new[] + { + UPath.Root / "tar" / "alpha.bin", + UPath.Root / "tar" / "beta.bin", + UPath.Root / "tar" / "gamma.bin" + }; + + var sizes = new[] { 600, 600, 600 }; // note: esp. for small binaries the TAR overhead is substantial + + var smallFiles = paths + .Select((path, index) => new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, path) + .WithRandomContent(sizes[index], seed: index) + .Build()) + .ToArray(); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + var (expectedInitialFileCount, _) = GetInitialFileStatistics(handlerContext); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.TotalLocalFiles.ShouldBe(expectedInitialFileCount); + summary.UniqueBinariesUploaded.ShouldBe(paths.Length); + summary.UniqueChunksUploaded.ShouldBe(2); // <-- the Tar Writer flushed when the boundary exceeded + summary.PointerFilesCreated.ShouldBe(paths.Length); + summary.PointerFileEntriesDeleted.ShouldBe(0); + summary.BytesUploadedUncompressed.ShouldBe(smallFiles.Sum(f => f.OriginalContent.Length)); + + var tarChunks = storageBuilder.StoredChunks.Values + .Where(c => c.ContentType == "application/aes256cbc+tar+gzip") + .ToList(); + tarChunks.Count.ShouldBe(2); + tarChunks.Select(c => int.Parse(c.Metadata["SmallChunkCount"])) + .OrderBy(v => v) + .ShouldBe(new[] { 1, 2 }); // we expect a TAR with one small chunk and one with two small chunks + + foreach (var path in paths) + { + File.Exists(ToAbsolutePointerPath(fixture, path)).ShouldBeTrue(); // the local binary still exists + handlerContext.StateRepository.GetPointerFileEntry(ToRelativePointerPath(path), includeBinaryProperties: true) + .ShouldNotBeNull(); // the pointerfileentry has been saved + } + } + + [Fact] + public async Task Multiple_SmallFiles_WithDuplicates_CrossTarBatches_ShouldWriteDeferredPointers() + { + // We will hit the UploadSmallFileAsync / NON-OWNER path with this test + + // Arrange + var f10 = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "tar" / "alpha.txt") + .WithRandomContent(600, seed: 1) + .Build(); + var f11 = new FakeFileBuilder(fixture) + .WithDuplicate(f10, UPath.Root / "tar" / "alpha-duplicate.txt") + .Build(); + + var f20 = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "tar" / "beta.txt") + .WithRandomContent(600, seed: 2) + .Build(); + + var f30 = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "tar" / "omega.txt") + .WithRandomContent(600, seed: 3) + .Build(); + var f31 = new FakeFileBuilder(fixture) + .WithDuplicate(f30, UPath.Root / "tar" / "omega-duplicate.txt") + .Build(); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + var (expectedInitialFileCount, _) = GetInitialFileStatistics(handlerContext); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.TotalLocalFiles.ShouldBe(expectedInitialFileCount); + summary.UniqueBinariesUploaded.ShouldBe(3); + summary.UniqueChunksUploaded.ShouldBe(2); + summary.PointerFilesCreated.ShouldBe(5); + summary.PointerFileEntriesDeleted.ShouldBe(0); + + var tarChunks = storageBuilder.StoredChunks.Values + .Where(c => c.ContentType == "application/aes256cbc+tar+gzip") + .ToList(); + tarChunks.Count.ShouldBe(2); + tarChunks.Select(c => int.Parse(c.Metadata["SmallChunkCount"])) + .OrderBy(v => v) + .ShouldBe([1, 2]); + + + f10.FilePair.BinaryFile.Exists.ShouldBeTrue(); + f11.FilePair.BinaryFile.Exists.ShouldBeTrue(); + f20.FilePair.BinaryFile.Exists.ShouldBeTrue(); + f30.FilePair.BinaryFile.Exists.ShouldBeTrue(); + f31.FilePair.BinaryFile.Exists.ShouldBeTrue(); + + handlerContext.StateRepository.GetPointerFileEntry(ToRelativePointerPath(f10.OriginalPath), includeBinaryProperties: true).ShouldNotBeNull(); + handlerContext.StateRepository.GetPointerFileEntry(ToRelativePointerPath(f11.OriginalPath), includeBinaryProperties: true).ShouldNotBeNull(); + handlerContext.StateRepository.GetPointerFileEntry(ToRelativePointerPath(f20.OriginalPath), includeBinaryProperties: true).ShouldNotBeNull(); + handlerContext.StateRepository.GetPointerFileEntry(ToRelativePointerPath(f30.OriginalPath), includeBinaryProperties: true).ShouldNotBeNull(); + handlerContext.StateRepository.GetPointerFileEntry(ToRelativePointerPath(f31.OriginalPath), includeBinaryProperties: true).ShouldNotBeNull(); + } + + + // --- INCREMENTAL RUNS + + [Fact] + public async Task Incremental_AllFilesAlreadyUploaded_ShouldSkipUploads() + { + // Arrange + var binaryPath = UPath.Root / "incremental" / "presentation.pptx"; + var largeFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, binaryPath) + .WithRandomContent(4096, seed: 501) + .Build(); + + var storageBuilder = new MockArchiveStorageBuilder(fixture); + + var (_, initialContext, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var (initialFileCount, _) = GetInitialFileStatistics(initialContext); + + var result1 = await CreateHandler().Handle(initialContext, CancellationToken.None); + + result1.IsSuccess.ShouldBeTrue(); + + storageBuilder.StoredChunks.Count.ShouldBe(1); + storageBuilder.UploadedStates.Count.ShouldBe(1); + + // Corrupt pointer file to ensure it is rewritten on incremental run + var staleHash = FakeHashBuilder.GenerateValidHash(999); + staleHash.ShouldNotBe(largeFile.OriginalHash); + largeFile.FilePair.CreatePointerFile(staleHash); + + var (_, incrementalContext, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var (expectedFileCount, existingPointerCount) = GetInitialFileStatistics(incrementalContext); + + // Act + var result2 = await CreateHandler().Handle(incrementalContext, CancellationToken.None); + + // Assert + result2.IsSuccess.ShouldBeTrue(); + var summary2 = result2.Value; + + summary2.TotalLocalFiles.ShouldBe(expectedFileCount); + summary2.ExistingPointerFiles.ShouldBe(existingPointerCount); + summary2.UniqueBinariesUploaded.ShouldBe(0); // <-- no additional binaries were uploaded + summary2.UniqueChunksUploaded.ShouldBe(0); // <-- etc + summary2.PointerFilesCreated.ShouldBe(0); // <-- etc + summary2.PointerFileEntriesDeleted.ShouldBe(0); // <-- etc + summary2.BytesUploadedUncompressed.ShouldBe(0); // <-- etc + + // No new state was created & uploaded and the (temporary) database file was deleted + summary2.NewStateName.ShouldBeNull(); + incrementalContext.StateRepository.HasChanges.ShouldBeFalse(); + incrementalContext.StateRepository.StateDatabaseFile.Exists.ShouldBeFalse(); + storageBuilder.UploadedStates.Count.ShouldBe(1); + + // Pointer file was corrected + largeFile.FilePair.PointerFile.ReadHash().ShouldBe(largeFile.OriginalHash); + + storageBuilder.StoredChunks.Count.ShouldBe(1); // <-- no additional chunks were uploaded + } + + [Fact] + public async Task Incremental_MixOfNewAndExisting_ShouldUploadOnlyNewFiles() + { + // Arrange + var existingFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "docs" / "existing.pdf") + .WithRandomContent(4096, seed: 2001) + .Build(); + + var storageBuilder = new MockArchiveStorageBuilder(fixture); + + var (_, initialContext, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var result1 = await CreateHandler().Handle(initialContext, CancellationToken.None); + result1.IsSuccess.ShouldBeTrue(); + + var newSmallFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "docs" / "new-note.txt") + .WithRandomContent(512, seed: 2002) + .Build(); + + var (_, incrementalContext, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var (expectedFileCount, existingPointerCount) = GetInitialFileStatistics(incrementalContext); + + // Act + var result2 = await CreateHandler().Handle(incrementalContext, CancellationToken.None); + + // Assert + result2.IsSuccess.ShouldBeTrue(); + var summary = result2.Value; + + summary.TotalLocalFiles.ShouldBe(expectedFileCount); + summary.ExistingPointerFiles.ShouldBe(existingPointerCount); + summary.UniqueBinariesUploaded.ShouldBe(1); + summary.UniqueChunksUploaded.ShouldBe(1); + summary.PointerFilesCreated.ShouldBe(1); + summary.BytesUploadedUncompressed.ShouldBe(newSmallFile.OriginalContent.Length); + summary.PointerFileEntriesDeleted.ShouldBe(0); + + // A new state was created & uploaded + summary.NewStateName.ShouldNotBeNull(); + incrementalContext.StateRepository.HasChanges.ShouldBeTrue(); + + var pointerEntry = incrementalContext.StateRepository.GetPointerFileEntry(ToRelativePointerPath(newSmallFile.OriginalPath), includeBinaryProperties: true); + pointerEntry.ShouldNotBeNull(); + pointerEntry!.BinaryProperties.ShouldNotBeNull(); + pointerEntry.BinaryProperties!.OriginalSize.ShouldBe(newSmallFile.OriginalContent.Length); + + storageBuilder.StoredChunks.Values.Count().ShouldBe(2); + } + + [Fact] + public async Task Incremental_FileAndPointerDeleted_PointerFileEntryDeleted() + { + // Arrange + var deletedFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "docs" / "to-delete.txt") + .WithRandomContent(2048, seed: 1) + .Build(); + + var storageBuilder = new MockArchiveStorageBuilder(fixture); + + var (_, context1, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var result1 = await CreateHandler().Handle(context1, CancellationToken.None); + result1.IsSuccess.ShouldBeTrue(); + + // Delete the pointer and the binary + deletedFile.FilePair.BinaryFile.Delete(); + deletedFile.FilePair.PointerFile.Delete(); + + var (_, context2, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var (expectedFileCount, existingPointerCount) = GetInitialFileStatistics(context2); + + // Act + var result2 = await CreateHandler().Handle(context2, CancellationToken.None); + + // Assert + result2.IsSuccess.ShouldBeTrue(); + var summary2 = result2.Value; + + summary2.TotalLocalFiles.ShouldBe(0); + summary2.ExistingPointerFiles.ShouldBe(0); + summary2.UniqueBinariesUploaded.ShouldBe(0); + summary2.PointerFileEntriesDeleted.ShouldBe(1); + + // A new state was uploaded + summary2.NewStateName.ShouldNotBeNull(); + storageBuilder.UploadedStates.Count.ShouldBe(2); + + // The PointerFileEntry should not exist + var pfe = context2.StateRepository.GetPointerFileEntry(deletedFile.FilePair.PointerFile.FullName, includeBinaryProperties: true); + pfe.ShouldBeNull(); + + context2.StateRepository.GetPointerFileEntries("/", false).ShouldBeEmpty(); + + // The deleted chunks should still be present + storageBuilder.StoredChunks.Count.ShouldBe(1); + } + + [Fact] + public async Task Incremental_FileDeleted_PointerRemains_ShouldStillExist() + { + // Arrange + var deletedBinary = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "docs" / "to-delete.txt") + .WithRandomContent(2048, seed: 1) + .Build(); + + var storageBuilder = new MockArchiveStorageBuilder(fixture); + + var (_, context1, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var result1 = await CreateHandler().Handle(context1, CancellationToken.None); + result1.IsSuccess.ShouldBeTrue(); + + // Delete the binary, the pointer remains + deletedBinary.FilePair.BinaryFile.Delete(); + + var (_, context2, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var (expectedFileCount, existingPointerCount) = GetInitialFileStatistics(context2); + + // Act + var result2 = await CreateHandler().Handle(context2, CancellationToken.None); + + // Assert + result2.IsSuccess.ShouldBeTrue(); + var summary2 = result2.Value; + + summary2.TotalLocalFiles.ShouldBe(1); + summary2.ExistingPointerFiles.ShouldBe(1); + summary2.UniqueBinariesUploaded.ShouldBe(0); + summary2.PointerFileEntriesDeleted.ShouldBe(0); + + // A new state was uploaded + summary2.NewStateName.ShouldBeNull(); + storageBuilder.UploadedStates.Count.ShouldBe(1); + } + + + [Fact] + public async Task Incremental_FileModified_ShouldUploadNewBinaryAndPreserveOldBinary() + { + // Arrange + var filePath = UPath.Root / "docs" / "mutable.bin"; + var originalFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, filePath) + .WithRandomContent(3072, seed: 1) + .Build(); + + var storageBuilder = new MockArchiveStorageBuilder(fixture); + + var (_, context1, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var result1 = await CreateHandler().Handle(context1, CancellationToken.None); + result1.IsSuccess.ShouldBeTrue(); + + var bp1 = context1.StateRepository.GetBinaryProperty(originalFile.OriginalHash); + bp1.ShouldNotBeNull(); + + // Overwrite the file with new content + var modifiedFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, filePath) + .WithRandomContent(4000, seed: 2) + .Build(); + + var (_, context2, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var (expectedFileCount, existingPointerCount) = GetInitialFileStatistics(context2); + + // Act + var result2 = await CreateHandler().Handle(context2, CancellationToken.None); + + // Assert + result2.IsSuccess.ShouldBeTrue(); + var summary = result2.Value; + + summary.TotalLocalFiles.ShouldBe(1); + summary.ExistingPointerFiles.ShouldBe(1); + summary.UniqueBinariesUploaded.ShouldBe(1); // one additional binary uploaded + summary.UniqueChunksUploaded.ShouldBe(1); + summary.PointerFilesCreated.ShouldBe(0); // pointer already existed + summary.NewStateName.ShouldNotBeNull(); + + var pfe = context2.StateRepository.GetPointerFileEntry(ToRelativePointerPath(filePath), includeBinaryProperties: true); + pfe.ShouldNotBeNull(); + pfe.Hash.ShouldBe(modifiedFile.OriginalHash); + pfe.BinaryProperties.ShouldNotBeNull(); + pfe.BinaryProperties.OriginalSize.ShouldBe(4000); + + // The BinaryProperties of the originalFile are still present + var originalBinaryProperties = context2.StateRepository.GetBinaryProperty(originalFile.OriginalHash); + originalBinaryProperties.ShouldNotBeNull(); + + // The old Binary is still present + storageBuilder.StoredChunks.Count.ShouldBe(2); + } + + [Fact] + public async Task Incremental_NoChanges_ShouldSkipStateUploadAndDeleteLocalState() + { + // Arrange + var baselineFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "docs" / "baseline.txt") + .WithRandomContent(2048, seed: 5001) + .Build(); + + var storageBuilder = new MockArchiveStorageBuilder(fixture); + + var (_, initialContext, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var firstResult = await CreateHandler().Handle(initialContext, CancellationToken.None); + firstResult.IsSuccess.ShouldBeTrue(); + + var (_, incrementalContext, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + var (expectedFileCount, existingPointerCount) = GetInitialFileStatistics(incrementalContext); + + // Act + var incrementalResult = await CreateHandler().Handle(incrementalContext, CancellationToken.None); + + // Assert + incrementalResult.IsSuccess.ShouldBeTrue(); + var summary = incrementalResult.Value; + + summary.TotalLocalFiles.ShouldBe(expectedFileCount); + summary.ExistingPointerFiles.ShouldBe(existingPointerCount); + summary.UniqueBinariesUploaded.ShouldBe(0); + summary.UniqueChunksUploaded.ShouldBe(0); + summary.PointerFilesCreated.ShouldBe(0); + summary.PointerFileEntriesDeleted.ShouldBe(0); + summary.NewStateName.ShouldBeNull(); + + storageBuilder.UploadedStates.Count.ShouldBe(1); + File.Exists(incrementalContext.StateRepository.StateDatabaseFile.FullName).ShouldBeFalse(); + } + + [Fact] + public async Task Error_CancellationByUser_ShouldReturnFailureResult() + { + // Arrange + _ = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "cancel" / "large1.bin") + .WithRandomContent(4096, seed: 1) + .Build(); + + _ = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "cancel" / "large2.bin") + .WithRandomContent(4096, seed: 2) + .Build(); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // Act + var result = await CreateHandler().Handle(handlerContext, cts.Token); + + // Assert + result.IsFailed.ShouldBeTrue(); + result.Errors.ShouldNotBeEmpty(); + result.Errors.First().Message.ShouldContain("cancelled by user"); + + storageBuilder.StoredChunks.Count.ShouldBe(0); + } + + [Fact(Skip = "TODO")] + public async Task Error_IndexTaskFails_ShouldSkipProblematicFileAndContinue() + { + // See example Error_HashTaskFails_ShouldSkipProblematicFileAndContinue + } + + [Fact] + public async Task Error_HashTaskFails_ShouldSkipProblematicFileAndContinue() + { + // Arrange + var failingFile = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "hash" / "will-fail.bin") + .WithRandomContent(1024, seed: 1) + .Build(); + + // Create a mock hasher that fails for will-fail.bin + var mockHasher = Substitute.For(); + + // Configure mock + mockHasher.GetHashAsync(Arg.Any()) + .Returns>(_ => throw new ArgumentException("BinaryFile does not exist")); + + var (_, handlerContext, _, _) = await CreateHandlerContextAsync(hasher: mockHasher); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.FilesSkipped.ShouldBe(1); + summary.Warnings.First().ShouldContain("Error when hashing file '/hash/will-fail.bin': BinaryFile does not exist"); + } + + [Fact] + public async Task Error_UploadTaskFails_ShouldReturnFailure() + { + // Arrange + _ = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "uploads" / "large.bin") + .WithRandomContent(4096, seed: 6201) + .Build(); + + var storageBuilder = new MockArchiveStorageBuilder(fixture) + .WithThrowOnWrite(failureCount: 1); + + var (_, handlerContext, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsFailed.ShouldBeTrue(); + result.Errors.First().Message.ShouldStartWith("Archive operation failed: UploadLargeFilesTask failed with "); + } + + [Fact] + public async Task Error_MultipleTasksFail_ShouldReturnAggregateException() + { + // Arrange + _ = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "multi" / "large.bin") + .WithRandomContent(4096, seed: 6301) + .Build(); + + _ = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "multi" / "small.txt") + .WithRandomContent(256, seed: 6302) + .Build(); + + var storageBuilder = new MockArchiveStorageBuilder(fixture) + .WithThrowOnWrite(failureCount: 2); + + var (_, handlerContext, _, _) = await CreateHandlerContextAsync(storageBuilder: storageBuilder); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsFailed.ShouldBeTrue(); + result.Errors.First().Message.ShouldMatch("Archive operation failed: .* tasks failed"); + } + + [Fact] + public async Task StalePointerEntries_ShouldBeRemovedWhenMissingOnDisk() + { + // Arrange + _ = new FakeFileBuilder(fixture) + .WithActualFile(FilePairType.BinaryFileOnly, UPath.Root / "active.txt") + .WithRandomContent(256, seed: 7) + .Build(); + + var (_, handlerContext, storageBuilder, _) = await CreateHandlerContextAsync(); + + var (expectedInitialFileCount, _) = GetInitialFileStatistics(handlerContext); + + var staleHash = FakeHashBuilder.GenerateValidHash(99); + handlerContext.StateRepository.AddBinaryProperties(new BinaryProperties + { + Hash = staleHash, + OriginalSize = 1, + ArchivedSize = 1, + StorageTier = StorageTier.Cool + }); + handlerContext.StateRepository.UpsertPointerFileEntries(new PointerFileEntry + { + Hash = staleHash, + RelativeName = "/stale.bin.pointer.arius", + CreationTimeUtc = DateTime.UtcNow, + LastWriteTimeUtc = DateTime.UtcNow + }); + + // Act + var result = await CreateHandler().Handle(handlerContext, CancellationToken.None); + + // Assert + result.IsSuccess.ShouldBeTrue(); + var summary = result.Value; + + summary.TotalLocalFiles.ShouldBe(expectedInitialFileCount); + summary.PointerFileEntriesDeleted.ShouldBe(1); + handlerContext.StateRepository.GetPointerFileEntry("/stale.bin.pointer.arius") + .ShouldBeNull(); + + storageBuilder.StoredChunks.Count.ShouldBe(1); + storageBuilder.UploadedStates.ShouldNotBeEmpty(); + + File.Exists(Path.Combine(fixture.TestRunSourceFolder.FullName, "active.txt.pointer.arius")).ShouldBeTrue(); + } +} diff --git a/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerTests.cs b/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerUploadIfNotExistsTests.cs similarity index 78% rename from src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerTests.cs rename to src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerUploadIfNotExistsTests.cs index f4549891..12c4554b 100644 --- a/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerTests.cs +++ b/src/Arius.Core.Tests/Features/Commands/Archive/ArchiveCommandHandlerUploadIfNotExistsTests.cs @@ -9,18 +9,17 @@ using Microsoft.Extensions.Logging.Testing; using Shouldly; using System.IO.Compression; -using System.Runtime.InteropServices; using System.Text; namespace Arius.Core.Tests.Features.Commands.Archive; -public class ArchiveCommandHandlerTests : IClassFixture +public class ArchiveCommandHandlerUploadIfNotExistsTests : IClassFixture { private readonly FixtureWithFileSystem fixture; private readonly FakeLogger logger; private readonly ArchiveCommandHandler handler; - public ArchiveCommandHandlerTests(FixtureWithFileSystem fixture) + public ArchiveCommandHandlerUploadIfNotExistsTests(FixtureWithFileSystem fixture) { this.fixture = fixture; logger = new(); @@ -28,27 +27,6 @@ public ArchiveCommandHandlerTests(FixtureWithFileSystem fixture) } - [Fact] - [Trait("Category", "SkipCI")] - public async Task RunArchiveCommandTEMP() // NOTE TEMP this one is skipped in CI via the SkipCI category - { - var logger = new FakeLogger(); - - // TODO Make this better - var isWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows); - var c = new ArchiveCommandBuilder(fixture) - .WithLocalRoot(isWindows ? - new DirectoryInfo("C:\\Users\\WouterVanRanst\\Downloads\\Photos-001 (1)") : - new DirectoryInfo("/mnt/c/Users/WouterVanRanst/Downloads/Photos-001 (1)")) - .Build(); - await handler.Handle(c, CancellationToken.None); - } - - [Fact(Skip = "TODO")] - public void UpdatedCreationTimeOrLastWriteTimeShouldBeUpdatedInStateDatabase() - { - } - [Fact] public async Task UploadIfNotExistsAsync_WhenChunkDoesNotExist_ShouldUpload() { @@ -61,30 +39,32 @@ public async Task UploadIfNotExistsAsync_WhenChunkDoesNotExist_ShouldUpload() var handlerContext = await CreateHandlerContextAsync(); + // Act var result = await handler.UploadIfNotExistsAsync(handlerContext, hash, sourceStream, compressionLevel, expectedContentType, null, CancellationToken.None); + // Assert result.OriginalSize.ShouldBeGreaterThan(0); result.ArchivedSize.ShouldBeGreaterThan(0); - // Verify the blob was actually created with correct properties and metadata + // Verify the blob was actually created with correct properties and metadata var properties = await handlerContext.ArchiveStorage.GetChunkPropertiesAsync(hash, CancellationToken.None); properties.ShouldNotBeNull(); properties.ContentType.ShouldBe(expectedContentType); - // Verify metadata is read from storage and matches returned values + // Verify metadata is read from storage and matches returned values properties.Metadata.ShouldNotBeNull(); properties.Metadata.ShouldContainKey("OriginalContentLength"); properties.Metadata["OriginalContentLength"].ShouldBe(result.OriginalSize.ToString()); - // Verify correct contentlength + // Verify correct contentlength properties.ContentLength.ShouldBe(result.ArchivedSize); - // Verify Storage Tier + // Verify Storage Tier properties.StorageTier.ShouldBe(StorageTier.Cool); - // Verify the stream was read to the end (ie the binary was uploaded) + // Verify the stream was read to the end (ie the binary was uploaded) sourceStream.Position.ShouldBe(sourceStream.Length); } @@ -100,38 +80,40 @@ public async Task UploadIfNotExistsAsync_WhenValidChunkExists_ShouldNotUploadAga var handlerContext = await CreateHandlerContextAsync(); - // First upload to create the blob + // First upload to create the blob await handler.UploadIfNotExistsAsync(handlerContext, hash, sourceStream, compressionLevel, expectedContentType, null, CancellationToken.None); await handlerContext.ArchiveStorage.SetChunkStorageTierPerPolicy(hash, 0, StorageTier.Hot); // Set to Hot tier to check if the correct storage tier was applied afterwards - // Reset stream for second call + // Reset stream for second call sourceStream.Seek(0, SeekOrigin.Begin); + // Act - Second call should detect existing blob var result = await handler.UploadIfNotExistsAsync(handlerContext, hash, sourceStream, compressionLevel, expectedContentType, null, CancellationToken.None); + // Assert result.OriginalSize.ShouldBeGreaterThan(0); result.ArchivedSize.ShouldBeGreaterThan(0); - // Verify properties are still correct and metadata is read from storage + // Verify properties are still correct and metadata is read from storage var properties = await handlerContext.ArchiveStorage.GetChunkPropertiesAsync(hash, CancellationToken.None); properties.ShouldNotBeNull(); properties.ContentType.ShouldBe(expectedContentType); - // Verify metadata is read from storage and matches returned values + // Verify metadata is read from storage and matches returned values properties.Metadata.ShouldNotBeNull(); properties.Metadata.ShouldContainKey("OriginalContentLength"); properties.Metadata["OriginalContentLength"].ShouldBe(result.OriginalSize.ToString()); - // Verify correct contentlength + // Verify correct contentlength properties.ContentLength.ShouldBe(result.ArchivedSize); - // Verify Storage Tier + // Verify Storage Tier properties.StorageTier.ShouldBe(StorageTier.Cool); - // Verify the stream was NOT read (ie the binary was NOT uploaded again) + // Verify the stream was NOT read (ie the binary was NOT uploaded again) sourceStream.Position.ShouldBe(0); } @@ -147,43 +129,45 @@ public async Task UploadIfNotExistsAsync_WhenInvalidChunk_ShouldDeleteAndReUploa var handlerContext = await CreateHandlerContextAsync(); - // Create a blob with wrong content type using BlobClient directly (simulating corruption) + // Create a blob with wrong content type using BlobClient directly (simulating corruption) var blobServiceClient = new BlobServiceClient(new Uri($"https://{fixture.RepositoryOptions.AccountName}.blob.core.windows.net"), new Azure.Storage.StorageSharedKeyCredential(fixture.RepositoryOptions.AccountName, fixture.RepositoryOptions.AccountKey)); var containerClient = blobServiceClient.GetBlobContainerClient(fixture.RepositoryOptions.ContainerName); var blobClient = containerClient.GetBlobClient($"chunks/{hash}"); - // Upload blob without metadata + // Upload blob without metadata var uploadOptions = new BlobUploadOptions { HttpHeaders = new BlobHttpHeaders { ContentType = correctContentType } }; await blobClient.UploadAsync(new MemoryStream("corrupted content"u8.ToArray()), uploadOptions, CancellationToken.None); - // Reset source stream + // Reset source stream sourceStream.Seek(0, SeekOrigin.Begin); + // Act - Should detect wrong content type, delete, and re-upload var result = await handler.UploadIfNotExistsAsync(handlerContext, hash, sourceStream, compressionLevel, correctContentType, null, CancellationToken.None); + // Assert result.OriginalSize.ShouldBeGreaterThan(0); result.ArchivedSize.ShouldBeGreaterThan(0); - // Verify the blob now has correct content type and metadata + // Verify the blob now has correct content type and metadata var properties = await handlerContext.ArchiveStorage.GetChunkPropertiesAsync(hash, CancellationToken.None); properties.ShouldNotBeNull(); properties.ContentType.ShouldBe(correctContentType); - // Verify metadata is read from storage and matches returned values + // Verify metadata is read from storage and matches returned values properties.Metadata.ShouldNotBeNull(); properties.Metadata.ShouldContainKey("OriginalContentLength"); properties.Metadata["OriginalContentLength"].ShouldBe(result.OriginalSize.ToString()); - // Verify correct contentlength + // Verify correct contentlength properties.ContentLength.ShouldBe(result.ArchivedSize); - // Verify Storage Tier + // Verify Storage Tier properties.StorageTier.ShouldBe(StorageTier.Cool); - // Verify the stream was read to the end (ie the binary was uploaded again) + // Verify the stream was read to the end (ie the binary was uploaded again) sourceStream.Position.ShouldBe(sourceStream.Length); } @@ -212,12 +196,12 @@ public async Task UploadIfNotExistsAsync_WhenTarArchive_ShouldIncludeSmallChunkC //result.OriginalSize.ShouldBeGreaterThan(0); //result.ArchivedSize.ShouldBeGreaterThan(0); - // Verify the blob was created with correct properties and metadata + // Verify the blob was created with correct properties and metadata var properties = await handlerContext.ArchiveStorage.GetChunkPropertiesAsync(hash, CancellationToken.None); properties.ShouldNotBeNull(); //properties.ContentType.ShouldBe(expectedContentType); - // Verify metadata includes both OriginalContentLength and SmallChunkCount + // Verify metadata includes both OriginalContentLength and SmallChunkCount properties.Metadata.ShouldNotBeNull(); //properties.Metadata.ShouldContainKey("OriginalContentLength"); //properties.Metadata["OriginalContentLength"].ShouldBe(result.OriginalSize.ToString()); @@ -225,10 +209,10 @@ public async Task UploadIfNotExistsAsync_WhenTarArchive_ShouldIncludeSmallChunkC properties.Metadata.ShouldContainKey("SmallChunkCount"); properties.Metadata["SmallChunkCount"].ShouldBe(expectedChunkCount.ToString()); - // Verify correct contentlength + // Verify correct contentlength properties.ContentLength.ShouldBe(result.ArchivedSize); - // Verify Storage Tier + // Verify Storage Tier properties.StorageTier.ShouldBe(StorageTier.Cool); } diff --git a/src/Arius.Core.Tests/Features/Commands/Archive/MockArchiveStorageBuilder.cs b/src/Arius.Core.Tests/Features/Commands/Archive/MockArchiveStorageBuilder.cs new file mode 100644 index 00000000..609b9e90 --- /dev/null +++ b/src/Arius.Core.Tests/Features/Commands/Archive/MockArchiveStorageBuilder.cs @@ -0,0 +1,320 @@ +using Arius.Core.Shared.Hashing; +using Arius.Core.Shared.Storage; +using Arius.Core.Tests.Helpers.Fixtures; +using FluentResults; +using NSubstitute; +using System.Collections.Concurrent; +using System.IO.Compression; +using Zio; + +namespace Arius.Core.Tests.Features.Commands.Archive; + +internal class MockArchiveStorageBuilder +{ + private readonly Fixture fixture; + + // Internal state for mock configuration + private readonly Dictionary chunks = new(); + private readonly Dictionary hydratedChunks = new(); + private readonly Dictionary states = new(StringComparer.OrdinalIgnoreCase); + private bool containerExists = true; + + // Track operations for potential assertions + private readonly ConcurrentDictionary writtenChunks = new(); + private readonly List uploadedStates = new(); + + // Error simulation + private int throwOnWriteFailureCount; + private Func? throwOnWritePredicate; + + // Expose internal state for test assertions + public IReadOnlyDictionary StoredChunks => chunks; + public IReadOnlyDictionary StoredStates => states; + public List UploadedStates => uploadedStates; + + public MockArchiveStorageBuilder(Fixture fixture) + { + this.fixture = fixture; + } + + public MockArchiveStorageBuilder WithThrowOnWrite(int failureCount, Func? predicate = null) + { + throwOnWriteFailureCount = failureCount; + throwOnWritePredicate = predicate; + return this; + } + + public MockArchiveStorageBuilder WithContainerExists(bool exists = true) + { + containerExists = exists; + return this; + } + + public MockArchiveStorageBuilder AddChunk(Hash hash, byte[] content, string contentType = "application/octet-stream", StorageTier tier = StorageTier.Cool, CompressionLevel compressionLevel = CompressionLevel.Optimal, IDictionary? metadata = null) + { + chunks[hash] = new FakeArchiveStorageChunk + { + Content = content, + ContentLength = content.Length, + ContentType = contentType, + StorageTier = tier, + CompressionLevel = compressionLevel, + Metadata = metadata != null ? new Dictionary(metadata, StringComparer.OrdinalIgnoreCase) : new Dictionary(StringComparer.OrdinalIgnoreCase) + }; + return this; + } + + public MockArchiveStorageBuilder AddHydratedChunk(Hash hash, byte[] content, string contentType = "application/octet-stream", StorageTier tier = StorageTier.Cool, CompressionLevel compressionLevel = CompressionLevel.Optimal, IDictionary? metadata = null) + { + hydratedChunks[hash] = new FakeArchiveStorageChunk + { + Content = content, + ContentLength = content.Length, + ContentType = contentType, + StorageTier = tier, + CompressionLevel = compressionLevel, + Metadata = metadata != null ? new Dictionary(metadata, StringComparer.OrdinalIgnoreCase) : new Dictionary(StringComparer.OrdinalIgnoreCase) + }; + return this; + } + + public MockArchiveStorageBuilder AddState(string name, byte[] content) + { + states[name] = content; + return this; + } + + public IArchiveStorage Build() + { + var mock = Substitute.For(); + + // Container operations + var containerCreated = containerExists; + mock.CreateContainerIfNotExistsAsync() + .Returns(callInfo => + { + var wasCreated = !containerCreated; + containerCreated = true; + return Task.FromResult(wasCreated); + }); + + mock.ContainerExistsAsync() + .Returns(_ => Task.FromResult(containerCreated)); + + // State operations + mock.GetStates(Arg.Any()) + .Returns(callInfo => + { + var orderedStates = states.Keys + .OrderBy(s => s, StringComparer.OrdinalIgnoreCase) + .ToArray(); + return orderedStates.ToAsyncEnumerable(); + }); + + mock.DownloadStateAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + var stateName = callInfo.Arg(); + var targetFile = callInfo.ArgAt(1); + + if (!states.TryGetValue(stateName, out var content)) + throw new InvalidOperationException($"State '{stateName}' does not exist in fake storage."); + + targetFile.Directory.Create(); + using var stream = targetFile.Open(FileMode.Create, FileAccess.Write, FileShare.None); + stream.Write(content); + return Task.CompletedTask; + }); + + mock.UploadStateAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(async callInfo => + { + var stateName = callInfo.Arg(); + var sourceFile = callInfo.ArgAt(1); + var cancellationToken = callInfo.ArgAt(2); + + await using var sourceStream = sourceFile.Open(FileMode.Open, FileAccess.Read, FileShare.Read); + using var memoryStream = new MemoryStream(); + await sourceStream.CopyToAsync(memoryStream, cancellationToken); + + states[stateName] = memoryStream.ToArray(); + uploadedStates.Add(stateName); + }); + + // Chunk read operations + mock.OpenReadChunkAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + var hash = callInfo.Arg(); + + if (chunks.TryGetValue(hash, out var chunk)) + { + return Task.FromResult(Result.Ok(new MemoryStream(chunk.Content, writable: false))); + } + + if (hydratedChunks.TryGetValue(hash, out var hydratedChunk)) + { + return Task.FromResult(Result.Ok(new MemoryStream(hydratedChunk.Content, writable: false))); + } + + return Task.FromResult(Result.Fail(new BlobNotFoundError(hash.ToString()))); + }); + + mock.OpenReadHydratedChunkAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + var hash = callInfo.Arg(); + + if (hydratedChunks.TryGetValue(hash, out var chunk)) + { + return Task.FromResult(Result.Ok(new MemoryStream(chunk.Content, writable: false))); + } + + return Task.FromResult(Result.Fail(new BlobNotFoundError(hash.ToString()))); + }); + + // Chunk write operation + var remainingFailures = throwOnWriteFailureCount; + mock.OpenWriteChunkAsync(Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any>(), Arg.Any>(), Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + var hash = callInfo.Arg(); + var compressionLevel = callInfo.ArgAt(1); + var contentType = callInfo.ArgAt(2); + var metadata = callInfo.ArgAt>(3); + var overwrite = callInfo.ArgAt(5); + + // Simulate write failure if configured + if ((throwOnWritePredicate is null || throwOnWritePredicate(hash)) && Interlocked.Decrement(ref remainingFailures) >= 0) + { + return Task.FromResult(Result.Fail(new ExceptionalError(new IOException("Simulated upload failure")))); + } + + if (!overwrite && (chunks.ContainsKey(hash) || writtenChunks.ContainsKey(hash))) + { + return Task.FromResult(Result.Fail(new BlobAlreadyExistsError(hash.ToString()))); + } + + var chunk = new FakeArchiveStorageChunk + { + ContentType = contentType, + CompressionLevel = compressionLevel, + Metadata = metadata != null ? new Dictionary(metadata, StringComparer.OrdinalIgnoreCase) : new Dictionary(StringComparer.OrdinalIgnoreCase), + StorageTier = StorageTier.Cool + }; + + var recordingStream = new RecordingMemoryStream(bytes => + { + chunk.Content = bytes; + chunk.ContentLength = bytes.LongLength; + writtenChunks[hash] = chunk; + chunks[hash] = chunk; + }); + + return Task.FromResult(Result.Ok(recordingStream)); + }); + + // Get chunk properties + mock.GetChunkPropertiesAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + var hash = callInfo.Arg(); + + if (chunks.TryGetValue(hash, out var chunk)) + { + return Task.FromResult(new StorageProperties( + hash.ToString(), + chunk.ContentType, + chunk.Metadata.Count == 0 ? null : new Dictionary(chunk.Metadata, StringComparer.OrdinalIgnoreCase), + chunk.StorageTier, + chunk.ContentLength)); + } + + return Task.FromResult(null); + }); + + // Delete chunk + mock.DeleteChunkAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + var hash = callInfo.Arg(); + chunks.Remove(hash); + hydratedChunks.Remove(hash); + writtenChunks.TryRemove(hash, out _); + return Task.CompletedTask; + }); + + // Set chunk metadata + mock.SetChunkMetadataAsync(Arg.Any(), Arg.Any>(), Arg.Any()) + .Returns(callInfo => + { + var hash = callInfo.Arg(); + var metadata = callInfo.ArgAt>(1); + + if (chunks.TryGetValue(hash, out var chunk)) + { + chunk.Metadata = new Dictionary(metadata, StringComparer.OrdinalIgnoreCase); + } + + return Task.CompletedTask; + }); + + // Set storage tier + mock.SetChunkStorageTierPerPolicy(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + var hash = callInfo.Arg(); + var targetTier = callInfo.ArgAt(2); + + if (chunks.TryGetValue(hash, out var chunk)) + { + chunk.StorageTier = targetTier; + } + + return Task.FromResult(targetTier); + }); + + // Start hydration + mock.StartHydrationAsync(Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + var hash = callInfo.Arg(); + + if (chunks.TryGetValue(hash, out var chunk)) + { + hydratedChunks[hash] = chunk; + } + + return Task.CompletedTask; + }); + + return mock; + } + + private sealed class RecordingMemoryStream : MemoryStream + { + private readonly Action onDispose; + + public RecordingMemoryStream(Action onDispose) => this.onDispose = onDispose; + + protected override void Dispose(bool disposing) + { + if (disposing) + { + onDispose(ToArray()); + } + + base.Dispose(disposing); + } + } + + internal sealed class FakeArchiveStorageChunk + { + public byte[] Content { get; set; } = []; + public long ContentLength { get; set; } + public string ContentType { get; set; } = string.Empty; + public Dictionary Metadata { get; set; } = new(StringComparer.OrdinalIgnoreCase); + public StorageTier StorageTier { get; set; } = StorageTier.Cool; + public CompressionLevel CompressionLevel { get; set; } + } +} \ No newline at end of file diff --git a/src/Arius.Core.Tests/Helpers/Builders/MockArchiveStorageBuilder.cs b/src/Arius.Core.Tests/Features/Commands/Restore/MockArchiveStorageBuilder.cs similarity index 99% rename from src/Arius.Core.Tests/Helpers/Builders/MockArchiveStorageBuilder.cs rename to src/Arius.Core.Tests/Features/Commands/Restore/MockArchiveStorageBuilder.cs index 2eda7ee0..631c10c1 100644 --- a/src/Arius.Core.Tests/Helpers/Builders/MockArchiveStorageBuilder.cs +++ b/src/Arius.Core.Tests/Features/Commands/Restore/MockArchiveStorageBuilder.cs @@ -5,7 +5,7 @@ using NSubstitute; using System.Formats.Tar; -namespace Arius.Core.Tests.Helpers.Builders; +namespace Arius.Core.Tests.Features.Commands.Restore; internal class MockArchiveStorageBuilder { diff --git a/src/Arius.Core.Tests/Features/Queries/PointerFileEntries/PointerFileEntriesQueryHandlerTests.cs b/src/Arius.Core.Tests/Features/Queries/PointerFileEntries/PointerFileEntriesQueryHandlerTests.cs index e13d270b..ec695cd9 100644 --- a/src/Arius.Core.Tests/Features/Queries/PointerFileEntries/PointerFileEntriesQueryHandlerTests.cs +++ b/src/Arius.Core.Tests/Features/Queries/PointerFileEntries/PointerFileEntriesQueryHandlerTests.cs @@ -1,7 +1,7 @@ using Arius.Core.Features.Queries.PointerFileEntries; using Arius.Core.Shared.FileSystem; using Arius.Core.Shared.StateRepositories; -using Arius.Core.Shared.Storage; +using Arius.Core.Tests.Features.Commands.Restore; using Arius.Core.Tests.Helpers.Builders; using Arius.Core.Tests.Helpers.FakeLogger; using Arius.Core.Tests.Helpers.Fakes; diff --git a/src/Arius.Core.Tests/Helpers/Fixtures/Fixture.cs b/src/Arius.Core.Tests/Helpers/Fixtures/Fixture.cs index d86d4fc1..b8df0d89 100644 --- a/src/Arius.Core.Tests/Helpers/Fixtures/Fixture.cs +++ b/src/Arius.Core.Tests/Helpers/Fixtures/Fixture.cs @@ -37,9 +37,9 @@ public class FixtureWithFileSystem : Fixture, IDisposable public IFileSystem FileSystem { get; } public DirectoryInfo TestRunSourceFolder { get; } - public FixtureWithFileSystem() : base() + public FixtureWithFileSystem() { - TestRunSourceFolder = Directory.CreateTempSubdirectory($"arius-core-tests-{DateTime.Now:yyyyMMddTHHmmss}_{Guid.CreateVersion7()}"); + TestRunSourceFolder = new DirectoryInfo(Path.Join(Path.GetTempPath(), "Arius-Core-Tests", $"{DateTime.Now:yyyyMMddTHHmmss}_{Guid.CreateVersion7()}")); TestRunSourceFolder.Create(); var pfs = new PhysicalFileSystem(); diff --git a/src/Arius.Core.Tests/Shared/StateRepositories/StateRepositoryTests.cs b/src/Arius.Core.Tests/Shared/StateRepositories/StateRepositoryTests.cs index 423d22e5..bfe4a4c3 100644 --- a/src/Arius.Core.Tests/Shared/StateRepositories/StateRepositoryTests.cs +++ b/src/Arius.Core.Tests/Shared/StateRepositories/StateRepositoryTests.cs @@ -1,5 +1,4 @@ using Arius.Core.Shared.FileSystem; -using Arius.Core.Shared.Hashing; using Arius.Core.Shared.StateRepositories; using Arius.Core.Shared.Storage; using Arius.Core.Tests.Helpers.Builders; diff --git a/src/Arius.Core.Tests/Utils.cs b/src/Arius.Core.Tests/Utils.cs index 69548e50..19eb9672 100644 --- a/src/Arius.Core.Tests/Utils.cs +++ b/src/Arius.Core.Tests/Utils.cs @@ -27,7 +27,8 @@ public void CleanupLocalTemp() { var cutoff = DateTime.UtcNow.AddDays(-2); - foreach (var dir in Directory.EnumerateDirectories(Path.GetTempPath(), "Arius.Core.Tests*")) + foreach (var dir in Directory.EnumerateDirectories(Path.GetTempPath(), "Arius.Core.Tests*").Union( + Directory.EnumerateDirectories(Path.GetTempPath(), "arius-*"))) { var info = new DirectoryInfo(dir); diff --git a/src/Arius.Core/Features/Commands/Archive/ArchiveCommandHandler.cs b/src/Arius.Core/Features/Commands/Archive/ArchiveCommandHandler.cs index f68fe48d..94b6564a 100644 --- a/src/Arius.Core/Features/Commands/Archive/ArchiveCommandHandler.cs +++ b/src/Arius.Core/Features/Commands/Archive/ArchiveCommandHandler.cs @@ -9,6 +9,7 @@ using Mediator; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using System.Collections.Concurrent; using System.IO.Compression; using System.Threading.Channels; using Zio; @@ -23,6 +24,7 @@ internal class ArchiveCommandHandler : ICommandHandler logger; private readonly ILoggerFactory loggerFactory; + private int used; public ArchiveCommandHandler(ILogger logger, ILoggerFactory loggerFactory, IOptions config) { @@ -37,14 +39,16 @@ public ArchiveCommandHandler(ILogger logger, ILoggerFacto private readonly InFlightGate uploadGate = new(); // Statistics tracking - private int totalLocalFiles; - private int uniqueBinariesUploaded; - private int uniqueChunksUploaded; - private long bytesUploadedUncompressed; - private long bytesUploadedCompressed; - private int existingPointerFiles; - private int pointerFilesCreated; - private int pointerFileEntriesDeleted; + private int totalLocalFiles = 0; + private int existingPointerFiles = 0; + private int uniqueBinariesUploaded = 0; + private int uniqueChunksUploaded = 0; + private long bytesUploadedUncompressed = 0; + private long bytesUploadedCompressed = 0; + private int pointerFilesCreated = 0; + private int pointerFileEntriesDeleted = 0; + private readonly ConcurrentBag warnings = []; + private int filesSkipped = 0; // Pipeline channels: // @@ -56,11 +60,13 @@ public ArchiveCommandHandler(ILogger logger, ILoggerFacto // - small files: a single consumer batches entries into a TAR; only the "owner" of a hash adds to the TAR. // duplicates (non-owners) are *deferred* — we DO NOT block the reader. // - large files: each owner uploads the blob directly; non-owners await the owner (safe here because it's in a parallel consumer). + private record FilePairWithHash(FilePair FilePair, Hash Hash); private readonly Channel indexedFilesChannel = ChannelExtensions.CreateBounded(capacity: 20, singleWriter: true, singleReader: false); private readonly Channel hashedLargeFilesChannel = ChannelExtensions.CreateBounded(capacity: 10, singleWriter: false, singleReader: false); private readonly Channel hashedSmallFilesChannel = ChannelExtensions.CreateBounded(capacity: 10, singleWriter: false, singleReader: true); - private record FilePairWithHash(FilePair FilePair, Hash Hash); + + // --- HANDLER public async ValueTask> Handle(ArchiveCommand request, CancellationToken cancellationToken) { @@ -72,30 +78,27 @@ public async ValueTask> Handle(ArchiveCommand reque internal async ValueTask> Handle(HandlerContext handlerContext, CancellationToken cancellationToken) { - logger.LogInformation("Starting archive operation for path {LocalRoot} with hashing parallelism {HashingParallelism}, upload parallelism {UploadParallelism}", handlerContext.Request.LocalRoot, handlerContext.Request.HashingParallelism, handlerContext.Request.UploadParallelism); - - // Reset statistics for this operation - totalLocalFiles = 0; - existingPointerFiles = 0; + // Enforce single-use + if (Interlocked.Exchange(ref used, 1) != 0) + throw new InvalidOperationException($"{nameof(ArchiveCommandHandler)} can only be used once."); - uniqueBinariesUploaded = 0; - uniqueChunksUploaded = 0; - bytesUploadedUncompressed = 0; - bytesUploadedCompressed = 0; - pointerFilesCreated = 0; - pointerFileEntriesDeleted = 0; + logger.LogInformation("Starting archive operation for path {LocalRoot} with hashing parallelism {HashingParallelism}, upload parallelism {UploadParallelism}", handlerContext.Request.LocalRoot, handlerContext.Request.HashingParallelism, handlerContext.Request.UploadParallelism); - using var errorCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - var errorCancellationToken = errorCancellationTokenSource.Token; + using var errorCancellationTokenSource = new CancellationTokenSource(); + using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, errorCancellationTokenSource.Token); + var errorCancellationToken = linkedCancellationTokenSource.Token; - var indexTask = CreateIndexTask(handlerContext, errorCancellationToken, errorCancellationTokenSource); - var hashTask = CreateHashTask(handlerContext, errorCancellationToken, errorCancellationTokenSource); - var uploadLargeFilesTask = CreateUploadLargeFilesTask(handlerContext, errorCancellationToken, errorCancellationTokenSource); - var uploadSmallFilesTask = CreateUploadSmallFilesTarArchiveTask(handlerContext, errorCancellationToken, errorCancellationTokenSource); + var tasks = new Dictionary + { + ["IndexTask"] = CreateIndexTask(handlerContext, errorCancellationToken, errorCancellationTokenSource), + ["HashTask"] = CreateHashTask(handlerContext, errorCancellationToken, errorCancellationTokenSource), + ["UploadLargeFilesTask"] = CreateUploadLargeFilesTask(handlerContext, errorCancellationToken, errorCancellationTokenSource), + ["UploadSmallFilesTask"] = CreateUploadSmallFilesTarArchiveTask(handlerContext, errorCancellationToken, errorCancellationTokenSource) + }; try { - await Task.WhenAll(indexTask, hashTask, uploadLargeFilesTask, uploadSmallFilesTask); + await Task.WhenAll(tasks.Values); // 6. Remove PointerFileEntries that do not exist on disk logger.LogDebug("Cleaning up pointer file entries that no longer exist on disk"); @@ -135,10 +138,12 @@ internal async ValueTask> Handle(HandlerContext han BytesUploadedCompressed = bytesUploadedCompressed, PointerFilesCreated = pointerFilesCreated, PointerFileEntriesDeleted = pointerFileEntriesDeleted, - NewStateName = newStateName + NewStateName = newStateName, + Warnings = warnings.ToArray(), + FilesSkipped = filesSkipped }); } - catch (OperationCanceledException) when (!errorCancellationToken.IsCancellationRequested && cancellationToken.IsCancellationRequested) + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested && !errorCancellationTokenSource.IsCancellationRequested) { // User-triggered cancellation logger.LogInformation("Archive operation cancelled by user"); @@ -147,58 +152,55 @@ internal async ValueTask> Handle(HandlerContext han catch (Exception ex) { // Either a task failed with an exception or error-triggered cancellation occurred + + var faultedTasks = tasks.Where(kvp => kvp.Value.IsFaulted).Select(kvp => (Name: kvp.Key, Exception: kvp.Value.Exception!.GetBaseException())).ToArray(); + + // Trigger error-driven cancellation to signal other tasks to stop gracefully + errorCancellationTokenSource.Cancel(); + // Wait for all tasks to complete gracefully - var allTasks = new[] { indexTask, hashTask, uploadLargeFilesTask, uploadSmallFilesTask }; - await Task.WhenAll(allTasks.Select(async t => + await Task.WhenAll(tasks.Values.Select(async t => { try { await t; } catch { /* Ignore exceptions during graceful shutdown */ } })); - // Map tasks to their names for logging - var taskNames = new Dictionary + // Observe all task exceptions to prevent UnobservedTaskException + foreach (var task in tasks.Values.Where(t => t.IsFaulted)) { - { indexTask, nameof(indexTask) }, - { hashTask, nameof(hashTask) }, - { uploadLargeFilesTask, nameof(uploadLargeFilesTask) }, - { uploadSmallFilesTask, nameof(uploadSmallFilesTask) } - }; + _ = task.Exception; + } // Log cancelled tasks (debug level) - var cancelledTasks = allTasks.Where(t => t.IsCanceled).ToArray(); - if (cancelledTasks.Any()) + var cancelledTaskNames = tasks.Where(kvp => kvp.Value.IsCanceled).Select(kvp => kvp.Key).ToArray(); + if (cancelledTaskNames.Any()) { - var cancelledTaskNames = cancelledTasks.Select(t => taskNames[t]).ToArray(); logger.LogDebug("Tasks cancelled during graceful shutdown: {TaskNames}", string.Join(", ", cancelledTaskNames)); } // Log and handle failed tasks (error level) - var faultedTasks = allTasks.Where(t => t.IsFaulted).ToArray(); - if (faultedTasks.Any()) + if (faultedTasks is { Length: 1 } && faultedTasks.Single() is var faultedTask) { - if (faultedTasks is { Length: 1 } && faultedTasks.Single() is var faultedTask) - { - // Single faulted task - return the exception - var baseException = faultedTask.Exception!.GetBaseException(); - logger.LogError(baseException, "Task '{TaskName}' failed with exception '{Exception}'", taskNames[faultedTask], baseException.Message); - return Result.Fail($"Archive operation failed: {baseException.Message}").WithError(new ExceptionalError(baseException)); - } - else - { - // Multiple faulted tasks - return aggregate exception - var exceptions = faultedTasks.Select(t => t.Exception!.GetBaseException()).ToArray(); - var aggregateException = new AggregateException("Multiple tasks failed during archive operation", exceptions); - logger.LogError(aggregateException, "Tasks failed: {TaskNames}", string.Join(", ", faultedTasks.Select(t => taskNames[t]))); - return Result.Fail($"Archive operation failed: multiple tasks failed").WithError(new ExceptionalError(aggregateException)); - } + // Single faulted task - return the exception + var msg = faultedTask.Exception?.GetBaseException().Message ?? "UNKNOWN"; + logger.LogError(faultedTask.Exception, "Task '{TaskName}' failed with exception '{Exception}'", faultedTask.Name, msg); + return Result.Fail($"Archive operation failed: {faultedTask.Name} failed with {msg}").WithError(new ExceptionalError(faultedTask.Exception)); + } + else + { + // Multiple faulted tasks - return aggregate exception + var exceptions = faultedTasks.Select(ft => ft.Exception).ToArray(); + var aggregateException = new AggregateException("Multiple tasks failed during archive operation", exceptions); + var faultedTaskNames = string.Join(", ", faultedTasks.Select(ft => ft.Name)); + logger.LogError(aggregateException, "Tasks failed: {FaultedTaskNames}", faultedTaskNames); + return Result.Fail($"Archive operation failed: {faultedTaskNames} tasks failed").WithError(new ExceptionalError(aggregateException)); } - - // Unexpected error path - return generic failure - logger.LogError(ex, "Archive operation failed with unexpected error"); - return Result.Fail($"Archive operation failed: {ex.Message}").WithError(new ExceptionalError(ex)); } } + + // --- HIGH LEVEL TASKS + private Task CreateIndexTask(HandlerContext handlerContext, CancellationToken cancellationToken, CancellationTokenSource errorCancellationTokenSource) => Task.Run(async () => { @@ -225,11 +227,10 @@ private Task CreateIndexTask(HandlerContext handlerContext, CancellationToken ca indexedFilesChannel.Writer.Complete(); throw; } - catch (Exception e) + catch (Exception e) // TODO Align with approach of HashTask where we skip the file and log a warning instead of failing the entire task, write test Error_IndexTaskFails_ShouldSkipProblematicFileAndContinue { logger.LogError(e, "File indexing failed with exception"); indexedFilesChannel.Writer.Complete(); - errorCancellationTokenSource.Cancel(); throw; } }, cancellationToken); @@ -258,6 +259,10 @@ private Task CreateHashTask(HandlerContext handlerContext, CancellationToken can logger.LogWarning("File {FileName} is a pointer file without an associated binary, skipping", filePair.FullName); handlerContext.Request.ProgressReporter?.Report(new FileProgressUpdate(filePair.FullName, -1, "Error: pointer file without binary")); + + var warningMessage = $"File '{filePair.FullName}' is a pointer file without an associated binary, skipping"; + warnings.Add(warningMessage); + Interlocked.Increment(ref filesSkipped); } else { @@ -275,20 +280,18 @@ private Task CreateHashTask(HandlerContext handlerContext, CancellationToken can await hashedLargeFilesChannel.Writer.WriteAsync(new(filePair, h), cancellationToken: innerCancellationToken); } } - catch (IOException e) - { - logger.LogWarning("Error when hashing file {FileName}: {Message}, skipping.", filePair.FullName, e.Message); - handlerContext.Request.ProgressReporter?.Report(new FileProgressUpdate(filePair.FullName, -1, $"Error: {e.Message}")); - } catch (OperationCanceledException) { throw; } catch (Exception e) { - logger.LogError(e, "File hashing failed for {FileName}", filePair.FullName); - errorCancellationTokenSource.Cancel(); - throw; + logger.LogWarning("Error when hashing file {FileName}: {Message}, skipping.", filePair.FullName, e.Message); + handlerContext.Request.ProgressReporter?.Report(new FileProgressUpdate(filePair.FullName, -1, $"Error: {e.Message}")); + + var warningMessage = $"Error when hashing file '{filePair.FullName}': {e.Message}, skipping"; + warnings.Add(warningMessage); + Interlocked.Increment(ref filesSkipped); } }); @@ -315,10 +318,9 @@ private Task CreateUploadLargeFilesTask(HandlerContext handlerContext, Cancellat { throw; } - catch (Exception e) + catch (Exception e) // TODO Align with approach of HashTask where we skip the file and log a warning instead of failing the entire task, update test Error_UploadTaskFails_ShouldReturnFailure { logger.LogError(e, "Large file upload task failed"); - errorCancellationTokenSource.Cancel(); throw; } }); @@ -334,14 +336,16 @@ private Task CreateUploadSmallFilesTarArchiveTask(HandlerContext handlerContext, { throw; } - catch (Exception e) + catch (Exception e) // TODO Align with approach of HashTask where we skip the file and log a warning instead of failing the entire task, write a test like Error_HashTaskFails_ShouldSkipProblematicFileAndContinue { logger.LogError(e, "Small files TAR archive task failed"); - errorCancellationTokenSource.Cancel(); throw; } }, cancellationToken); + + // --- HELPERS + private const string ChunkContentType = "application/aes256cbc+gzip"; private const string TarChunkContentType = "application/aes256cbc+tar+gzip"; @@ -750,5 +754,4 @@ private async Task ProcessTarArchive(HandlerContext handlerContext, InMemoryGzip foreach (var entry in tarWriter.TarredEntries) handlerContext.Request.ProgressReporter?.Report(new FileProgressUpdate(entry.FilePair.FullName, 100, "Archive complete")); } - -} \ No newline at end of file +} diff --git a/src/Arius.Core/Features/Commands/Archive/ArchiveCommandResult.cs b/src/Arius.Core/Features/Commands/Archive/ArchiveCommandResult.cs index 1f1cc9c8..c907fd78 100644 --- a/src/Arius.Core/Features/Commands/Archive/ArchiveCommandResult.cs +++ b/src/Arius.Core/Features/Commands/Archive/ArchiveCommandResult.cs @@ -52,4 +52,8 @@ public sealed record ArchiveCommandResult /// The name of the new state file that was uploaded, or null if no state changes occurred /// public string? NewStateName { get; init; } + + + public IReadOnlyList Warnings { get; init; } = []; + public int FilesSkipped { get; init; } } diff --git a/src/Arius.Core/Features/Commands/Archive/HandlerContext.cs b/src/Arius.Core/Features/Commands/Archive/HandlerContext.cs index 8013bda1..b9ebcfd2 100644 --- a/src/Arius.Core/Features/Commands/Archive/HandlerContext.cs +++ b/src/Arius.Core/Features/Commands/Archive/HandlerContext.cs @@ -10,6 +10,6 @@ internal record HandlerContext public required ArchiveCommand Request { get; init; } public required IArchiveStorage ArchiveStorage { get; init; } public required StateRepository StateRepository { get; init; } - public required Sha256Hasher Hasher { get; init; } + public required ISha256Hasher Hasher { get; init; } public required FilePairFileSystem FileSystem { get; init; } } \ No newline at end of file diff --git a/src/Arius.Core/Features/Commands/Archive/HandlerContextBuilder.cs b/src/Arius.Core/Features/Commands/Archive/HandlerContextBuilder.cs index 651bf8ec..04c911ac 100644 --- a/src/Arius.Core/Features/Commands/Archive/HandlerContextBuilder.cs +++ b/src/Arius.Core/Features/Commands/Archive/HandlerContextBuilder.cs @@ -18,6 +18,7 @@ internal class HandlerContextBuilder private IArchiveStorage? archiveStorage; private StateRepository? stateRepository; private IFileSystem? baseFileSystem; + private ISha256Hasher? hasher; public HandlerContextBuilder(ArchiveCommand request, ILoggerFactory loggerFactory) { @@ -44,6 +45,12 @@ public HandlerContextBuilder WithBaseFileSystem(IFileSystem fileSystem) return this; } + public HandlerContextBuilder WithHasher(ISha256Hasher hasher) + { + this.hasher = hasher; + return this; + } + public async Task BuildAsync() { await new ArchiveCommandValidator().ValidateAndThrowAsync(request); @@ -65,7 +72,7 @@ public async Task BuildAsync() Request = request, ArchiveStorage = archiveStorage, StateRepository = stateRepository ?? await BuildStateRepositoryAsync(archiveStorage), - Hasher = new Sha256Hasher(request.Passphrase), + Hasher = hasher ?? new Sha256Hasher(request.Passphrase), FileSystem = GetFileSystem() }; @@ -75,7 +82,7 @@ async Task BuildStateRepositoryAsync(IArchiveStorage archiveSto var stateCache = new StateCache(request.AccountName, request.ContainerName); // Determine the version name for this run - var versionName = DateTime.UtcNow.ToString("yyyy-MM-ddTHH-mm-ss"); + var versionName = DateTime.UtcNow.ToString("yyyy-MM-ddTHH-mm-ss-fff"); request.ProgressReporter?.Report(new TaskProgressUpdate($"Determining version name '{versionName}'...", 0)); // Get the latest state from blob storage diff --git a/src/Arius.Core/Features/Commands/Restore/RestoreCommandHandler.cs b/src/Arius.Core/Features/Commands/Restore/RestoreCommandHandler.cs index dd19fbff..2c1ae8f4 100644 --- a/src/Arius.Core/Features/Commands/Restore/RestoreCommandHandler.cs +++ b/src/Arius.Core/Features/Commands/Restore/RestoreCommandHandler.cs @@ -25,6 +25,7 @@ internal class RestoreCommandHandler : ICommandHandler logger; private readonly ILoggerFactory loggerFactory; + private int used; public RestoreCommandHandler(ILogger logger, ILoggerFactory loggerFactory, IOptions config) { @@ -33,12 +34,13 @@ public RestoreCommandHandler(ILogger logger, ILoggerFacto } // Statistics tracking - private int totalTargetFiles; - private int verifiedFilesAlreadyExisting; - private int chunksDownloaded; - private long bytesDownloaded; - private int filesWrittenToDisk; - private long bytesWrittenToDisk; + private readonly ConcurrentBag warnings = []; + private int totalTargetFiles = 0; + private int verifiedFilesAlreadyExisting = 0; + private int chunksDownloaded = 0; + private long bytesDownloaded = 0; + private int filesWrittenToDisk = 0; + private long bytesWrittenToDisk = 0; private readonly Channel filePairsToRestoreChannel = ChannelExtensions.CreateBounded(capacity: 25, singleWriter: true, singleReader: false); private readonly Channel filePairsToHashChannel = ChannelExtensions.CreateBounded(capacity: 25, singleWriter: true, singleReader: false); @@ -57,16 +59,11 @@ public async ValueTask> Handle(RestoreCommand reque internal async ValueTask> Handle(HandlerContext handlerContext, CancellationToken cancellationToken) { - logger.LogInformation("Starting restore operation for {TargetCount} targets with hashing parallelism {HashParallelism}, download parallelism {DownloadParallelism}", handlerContext.Targets.Length, handlerContext.Request.HashParallelism, handlerContext.Request.DownloadParallelism); - - // Reset statistics for this operation - totalTargetFiles = 0; - verifiedFilesAlreadyExisting = 0; - chunksDownloaded = 0; - bytesDownloaded = 0; - filesWrittenToDisk = 0; - bytesWrittenToDisk = 0; + // Enforce single-use + if (Interlocked.Exchange(ref used, 1) != 0) + throw new InvalidOperationException($"{nameof(RestoreCommandHandler)} can only be used once."); + logger.LogInformation("Starting restore operation for {TargetCount} targets with hashing parallelism {HashParallelism}, download parallelism {DownloadParallelism}", handlerContext.Targets.Length, handlerContext.Request.HashParallelism, handlerContext.Request.DownloadParallelism); using var errorCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); var errorCancellationToken = errorCancellationTokenSource.Token; @@ -125,7 +122,8 @@ await Task.WhenAll(allTasks.Select(async t => BytesDownloaded = bytesDownloaded, BytesWrittenToDisk = bytesWrittenToDisk, ChunksDownloaded = chunksDownloaded, - Rehydrating = rehydratingFiles + Rehydrating = rehydratingFiles, + Warnings = warnings.ToArray() }); } @@ -152,6 +150,9 @@ private Task CreateIndexTask(HandlerContext handlerContext, CancellationToken ca { logger.LogWarning("Target {TargetPath} was specified but no matching PointerFileEntry found", targetPath); handlerContext.Request.ProgressReporter?.Report(new FileProgressUpdate(targetPath.FullName, -1, "Error: no matching entry found")); + + var warningMessage = $"Target '{targetPath}' was specified but no matching PointerFileEntry found"; + warnings.Add(warningMessage); } else { @@ -245,6 +246,10 @@ private Task CreateHashTask(HandlerContext handlerContext, CancellationToken can // The hash does not match - we need to restore this binaryfile logger.LogWarning("File {FileName} hash mismatch (expected: {ExpectedHash}, actual: {ActualHash}), queued for restore", filePair.BinaryFile.FullName, pointerFileEntry.Hash.ToShortString(), h.ToShortString()); handlerContext.Request.ProgressReporter?.Report(new FileProgressUpdate(filePair.BinaryFile.FullName, 50, "Hash mismatch, restoring...")); + + var warningMessage = $"File '{filePair.BinaryFile.FullName}' hash mismatch (expected: {pointerFileEntry.Hash.ToShortString()}, actual: {h.ToShortString()}), queued for restore"; + warnings.Add(warningMessage); + await filePairsToRestoreChannel.Writer.WriteAsync(filePairWithPointerFileEntry, innerCancellationToken); } } diff --git a/src/Arius.Core/Features/Commands/Restore/RestoreCommandResult.cs b/src/Arius.Core/Features/Commands/Restore/RestoreCommandResult.cs index 2d07a112..a06ba5c2 100644 --- a/src/Arius.Core/Features/Commands/Restore/RestoreCommandResult.cs +++ b/src/Arius.Core/Features/Commands/Restore/RestoreCommandResult.cs @@ -39,6 +39,8 @@ public sealed record RestoreCommandResult /// Details about the files that are still hydrating /// public IReadOnlyList Rehydrating { get; init; } = []; + + public IReadOnlyList Warnings { get; init; } = []; } public sealed record RehydrationDetail diff --git a/src/Arius.Core/Features/Queries/PointerFileEntries/PointerFileEntriesQueryHandler.cs b/src/Arius.Core/Features/Queries/PointerFileEntries/PointerFileEntriesQueryHandler.cs index 9b5a8482..f2381bd5 100644 --- a/src/Arius.Core/Features/Queries/PointerFileEntries/PointerFileEntriesQueryHandler.cs +++ b/src/Arius.Core/Features/Queries/PointerFileEntries/PointerFileEntriesQueryHandler.cs @@ -1,10 +1,10 @@ +using Arius.Core.Shared.FileSystem; +using Arius.Core.Shared.Storage; using FluentValidation; using Mediator; using Microsoft.Extensions.Logging; using System.Runtime.CompilerServices; using System.Threading.Channels; -using Arius.Core.Shared.FileSystem; -using Arius.Core.Shared.Storage; using Zio; namespace Arius.Core.Features.Queries.PointerFileEntries; diff --git a/src/Arius.Core/Shared/Hashing/Sha256Hasher.cs b/src/Arius.Core/Shared/Hashing/Sha256Hasher.cs index 6edae13d..a3203283 100644 --- a/src/Arius.Core/Shared/Hashing/Sha256Hasher.cs +++ b/src/Arius.Core/Shared/Hashing/Sha256Hasher.cs @@ -5,7 +5,33 @@ namespace Arius.Core.Shared.Hashing; -internal sealed class Sha256Hasher +internal interface ISha256Hasher +{ + /// + /// Gets the salted hash of a raw byte array. + /// + Task GetHashAsync(byte[] data); + + /// + /// Gets the salted hash of a Stream. + /// + /// + /// + Task GetHashAsync(Stream s); + + /// + /// Gets the salted hash of a FilePair. If it is PointerFileOnly, we simply + /// return the hash stored in the pointer file. Otherwise, we hash the BinaryFile. + /// + Task GetHashAsync(FilePair fp); + + /// + /// Gets the salted hash of a BinaryFile. Throws if the file does not exist. + /// + Task GetHashAsync(BinaryFile bf); +} + +internal sealed class Sha256Hasher : ISha256Hasher { private const int BufferSize = 81920; // 80 KB buffer private readonly byte[] saltBytes; @@ -22,10 +48,7 @@ internal sealed class Sha256Hasher ///// Raw salt bytes. //private Sha256Hasher(byte[] salt) => saltBytes = salt; - /// - /// Gets the salted hash of a raw byte array. Returns a Task for consistency. - /// - public Task GetHashAsync(byte[] data) + public Task GetHashAsync(byte[] data) // NOTE: returns a Task for consistency { // No I/O, so no real async needed. We keep a Task-based signature to match the rest of the code. var hashValue = ComputeSaltedHash(data); diff --git a/src/Arius.Core/Shared/StateRepositories/IStateRepository.cs b/src/Arius.Core/Shared/StateRepositories/IStateRepository.cs index 1f98e17f..f6025847 100644 --- a/src/Arius.Core/Shared/StateRepositories/IStateRepository.cs +++ b/src/Arius.Core/Shared/StateRepositories/IStateRepository.cs @@ -6,16 +6,77 @@ namespace Arius.Core.Shared.StateRepositories; internal interface IStateRepository { - FileEntry StateDatabaseFile { get; } - bool HasChanges { get; } - void Vacuum(); - void Delete(); - BinaryProperties? GetBinaryProperty(Hash h); - void SetBinaryPropertyArchiveTier(Hash h, StorageTier tier); - void AddBinaryProperties(params BinaryProperties[] bps); - void UpsertPointerFileEntries(params PointerFileEntry[] pfes); + /// + /// Gets the state repository database file that backs this repository instance. + /// + FileEntry StateDatabaseFile { get; } + + /// + /// Gets a value indicating whether the repository has tracked changes that are not yet persisted. + /// + bool HasChanges { get; } + + /// + /// Compacts the underlying state database to reclaim unused space. + /// + void Vacuum(); + + /// + /// Deletes the underlying state database and resets the repository. + /// + void Delete(); + + /// + /// Retrieves the binary properties for the specified hash, or null when absent. + /// + /// The hash that identifies the binary. + BinaryProperties? GetBinaryProperty(Hash h); + + /// + /// Updates the storage tier for the binary that matches the given hash. + /// + /// The hash that identifies the binary. + /// The new storage tier to apply. + void SetBinaryPropertyArchiveTier(Hash h, StorageTier tier); + + /// + /// Adds the provided binary property records to the repository. + /// + /// The binary property entries to persist. + void AddBinaryProperties(params BinaryProperties[] bps); + + /// + /// Inserts new pointer file entries or updates existing ones as needed. + /// + /// The pointer file entries to upsert. + void UpsertPointerFileEntries(params PointerFileEntry[] pfes); + + /// + /// Enumerates pointer file directories that match the specified prefix. + /// + /// A prefix that must start with '/' and is matched against stored entries. + /// When true, returns only directories one level below the prefix. IEnumerable GetPointerFileDirectories(string relativeNamePrefix, bool topDirectoryOnly); - IEnumerable GetPointerFileEntries(string relativeNamePrefix, bool topDirectoryOnly, bool includeBinaryProperties = false); - PointerFileEntry? GetPointerFileEntry(string relativeName, bool includeBinaryProperties = false); - int DeletePointerFileEntries(Func shouldBeDeleted); -} \ No newline at end of file + + /// + /// Enumerates pointer file entries that match the specified prefix. + /// + /// A prefix that must start with '/' and is matched against stored entries. + /// When true, limits results to the first level of the hierarchy. + /// When true, includes related binary properties in the results. + IEnumerable GetPointerFileEntries(string relativeNamePrefix, bool topDirectoryOnly, bool includeBinaryProperties = false); + + /// + /// Retrieves a single pointer file entry that matches the supplied relative name, or null if none exists. + /// + /// The full relative name, starting with '/'. + /// When true, includes the related binary properties. + PointerFileEntry? GetPointerFileEntry(string relativeName, bool includeBinaryProperties = false); + + /// + /// Deletes pointer file entries that satisfy the supplied predicate. + /// + /// A predicate used to determine which entries to remove. + /// The number of deleted entries. + int DeletePointerFileEntries(Func shouldBeDeleted); +} diff --git a/src/Arius.Core/Shared/StateRepositories/Migrations/20250922100845_InitialCreate.cs b/src/Arius.Core/Shared/StateRepositories/Migrations/20250922100845_InitialCreate.cs index a427bc9e..8faab9be 100644 --- a/src/Arius.Core/Shared/StateRepositories/Migrations/20250922100845_InitialCreate.cs +++ b/src/Arius.Core/Shared/StateRepositories/Migrations/20250922100845_InitialCreate.cs @@ -1,5 +1,4 @@ -using System; -using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Migrations; #nullable disable diff --git a/src/Arius.Core/Shared/StateRepositories/Migrations/20251031145332_PointerFileEntryKeyIsOnlyRelativeName.Designer.cs b/src/Arius.Core/Shared/StateRepositories/Migrations/20251031145332_PointerFileEntryKeyIsOnlyRelativeName.Designer.cs new file mode 100644 index 00000000..cf5aea15 --- /dev/null +++ b/src/Arius.Core/Shared/StateRepositories/Migrations/20251031145332_PointerFileEntryKeyIsOnlyRelativeName.Designer.cs @@ -0,0 +1,91 @@ +// +using System; +using Arius.Core.Shared.StateRepositories; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; + +#nullable disable + +namespace Arius.Core.Shared.StateRepositories.Migrations +{ + [DbContext(typeof(StateRepositoryDbContext))] + [Migration("20251031145332_PointerFileEntryKeyIsOnlyRelativeName")] + partial class PointerFileEntryKeyIsOnlyRelativeName + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder.HasAnnotation("ProductVersion", "9.0.10"); + + modelBuilder.Entity("Arius.Core.Shared.StateRepositories.BinaryProperties", b => + { + b.Property("Hash") + .HasColumnType("BLOB"); + + b.Property("ArchivedSize") + .HasColumnType("INTEGER"); + + b.Property("OriginalSize") + .HasColumnType("INTEGER"); + + b.Property("ParentHash") + .HasColumnType("BLOB"); + + b.Property("StorageTier") + .HasColumnType("INTEGER"); + + b.HasKey("Hash"); + + b.HasIndex("Hash") + .IsUnique(); + + b.ToTable("BinaryProperties", (string)null); + }); + + modelBuilder.Entity("Arius.Core.Shared.StateRepositories.PointerFileEntry", b => + { + b.Property("RelativeName") + .HasColumnType("TEXT"); + + b.Property("CreationTimeUtc") + .HasColumnType("TEXT"); + + b.Property("Hash") + .IsRequired() + .HasColumnType("BLOB"); + + b.Property("LastWriteTimeUtc") + .HasColumnType("TEXT"); + + b.HasKey("RelativeName"); + + b.HasIndex("Hash"); + + b.HasIndex("RelativeName", "Hash") + .HasDatabaseName("IX_PointerFileEntries_RelativeName_Hash"); + + b.ToTable("PointerFileEntries", (string)null); + }); + + modelBuilder.Entity("Arius.Core.Shared.StateRepositories.PointerFileEntry", b => + { + b.HasOne("Arius.Core.Shared.StateRepositories.BinaryProperties", "BinaryProperties") + .WithMany("PointerFileEntries") + .HasForeignKey("Hash") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired(); + + b.Navigation("BinaryProperties"); + }); + + modelBuilder.Entity("Arius.Core.Shared.StateRepositories.BinaryProperties", b => + { + b.Navigation("PointerFileEntries"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/Arius.Core/Shared/StateRepositories/Migrations/20251031145332_PointerFileEntryKeyIsOnlyRelativeName.cs b/src/Arius.Core/Shared/StateRepositories/Migrations/20251031145332_PointerFileEntryKeyIsOnlyRelativeName.cs new file mode 100644 index 00000000..c5b0b466 --- /dev/null +++ b/src/Arius.Core/Shared/StateRepositories/Migrations/20251031145332_PointerFileEntryKeyIsOnlyRelativeName.cs @@ -0,0 +1,45 @@ +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace Arius.Core.Shared.StateRepositories.Migrations +{ + /// + internal partial class PointerFileEntryKeyIsOnlyRelativeName : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropPrimaryKey( + name: "PK_PointerFileEntries", + table: "PointerFileEntries"); + + migrationBuilder.DropIndex( + name: "IX_PointerFileEntries_RelativeName", + table: "PointerFileEntries"); + + migrationBuilder.AddPrimaryKey( + name: "PK_PointerFileEntries", + table: "PointerFileEntries", + column: "RelativeName"); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropPrimaryKey( + name: "PK_PointerFileEntries", + table: "PointerFileEntries"); + + migrationBuilder.AddPrimaryKey( + name: "PK_PointerFileEntries", + table: "PointerFileEntries", + columns: new[] { "Hash", "RelativeName" }); + + migrationBuilder.CreateIndex( + name: "IX_PointerFileEntries_RelativeName", + table: "PointerFileEntries", + column: "RelativeName"); + } + } +} diff --git a/src/Arius.Core/Shared/StateRepositories/Migrations/StateRepositoryDbContextModelSnapshot.cs b/src/Arius.Core/Shared/StateRepositories/Migrations/StateRepositoryDbContextModelSnapshot.cs index 4e30c956..40117800 100644 --- a/src/Arius.Core/Shared/StateRepositories/Migrations/StateRepositoryDbContextModelSnapshot.cs +++ b/src/Arius.Core/Shared/StateRepositories/Migrations/StateRepositoryDbContextModelSnapshot.cs @@ -15,7 +15,7 @@ partial class StateRepositoryDbContextModelSnapshot : ModelSnapshot protected override void BuildModel(ModelBuilder modelBuilder) { #pragma warning disable 612, 618 - modelBuilder.HasAnnotation("ProductVersion", "9.0.9"); + modelBuilder.HasAnnotation("ProductVersion", "9.0.10"); modelBuilder.Entity("Arius.Core.Shared.StateRepositories.BinaryProperties", b => { @@ -44,24 +44,23 @@ protected override void BuildModel(ModelBuilder modelBuilder) modelBuilder.Entity("Arius.Core.Shared.StateRepositories.PointerFileEntry", b => { - b.Property("Hash") - .HasColumnType("BLOB"); - b.Property("RelativeName") .HasColumnType("TEXT"); b.Property("CreationTimeUtc") .HasColumnType("TEXT"); + b.Property("Hash") + .IsRequired() + .HasColumnType("BLOB"); + b.Property("LastWriteTimeUtc") .HasColumnType("TEXT"); - b.HasKey("Hash", "RelativeName"); + b.HasKey("RelativeName"); b.HasIndex("Hash"); - b.HasIndex("RelativeName"); - b.HasIndex("RelativeName", "Hash") .HasDatabaseName("IX_PointerFileEntries_RelativeName_Hash"); diff --git a/src/Arius.Core/Shared/StateRepositories/StateRepository.cs b/src/Arius.Core/Shared/StateRepositories/StateRepository.cs index c0d434f2..8949bc19 100644 --- a/src/Arius.Core/Shared/StateRepositories/StateRepository.cs +++ b/src/Arius.Core/Shared/StateRepositories/StateRepository.cs @@ -1,9 +1,9 @@ -using Arius.Core.Shared.Hashing; +using Arius.Core.Shared.FileSystem; +using Arius.Core.Shared.Hashing; using Arius.Core.Shared.Storage; using EFCore.BulkExtensions; using Microsoft.EntityFrameworkCore; using System.Runtime.CompilerServices; -using Arius.Core.Shared.FileSystem; using WouterVanRanst.Utils.Extensions; using Zio; diff --git a/src/Arius.Core/Shared/StateRepositories/StateRepositoryDbContext.cs b/src/Arius.Core/Shared/StateRepositories/StateRepositoryDbContext.cs index 05e85e2f..b2971c9d 100644 --- a/src/Arius.Core/Shared/StateRepositories/StateRepositoryDbContext.cs +++ b/src/Arius.Core/Shared/StateRepositories/StateRepositoryDbContext.cs @@ -46,10 +46,10 @@ protected override void OnModelCreating(ModelBuilder modelBuilder) var pfeb = modelBuilder.Entity(); pfeb.ToTable("PointerFileEntries"); - pfeb.HasKey(pfe => new { pfe.Hash, pfe.RelativeName }); + pfeb.HasKey(pfe => pfe.RelativeName); pfeb.HasIndex(pfe => pfe.Hash); // NOT unique - pfeb.HasIndex(pfe => pfe.RelativeName); // to facilitate GetPointerFileEntriesAtVersionAsync + //pfeb.HasIndex(pfe => pfe.RelativeName); // to facilitate GetPointerFileEntriesAtVersionAsync // Composite index for better query performance on prefix searches with joins (for GetPointerFileEntries / topdirectoryonly & GetPointerFileDirectories / topdirectoryonly) pfeb.HasIndex(pfe => new { pfe.RelativeName, pfe.Hash }) diff --git a/src/Arius.Core/Shared/StateRepositories/StateRepositoryDbContextFactory.cs b/src/Arius.Core/Shared/StateRepositories/StateRepositoryDbContextFactory.cs index 30d76eab..962e46fe 100644 --- a/src/Arius.Core/Shared/StateRepositories/StateRepositoryDbContextFactory.cs +++ b/src/Arius.Core/Shared/StateRepositories/StateRepositoryDbContextFactory.cs @@ -47,13 +47,19 @@ public StateRepositoryDbContextPool(FileEntry stateDatabaseFile, bool ensureCrea { lock (ensureCreatedLock) { - using var context = CreateContext(); + using var context = CreateContext(skipExistCheck: true); context.Database.Migrate(); } } } - public StateRepositoryDbContext CreateContext() => factory.CreateDbContext(); + public StateRepositoryDbContext CreateContext(bool skipExistCheck = false) + { + if (!skipExistCheck && !StateDatabaseFile.Exists) + throw new InvalidOperationException($"Database file {StateDatabaseFile} does not exist"); + + return factory.CreateDbContext(); + } public void SetHasChanges() => Interlocked.Exchange(ref hasChanges, true); diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 549456d3..81c2efcf 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -11,41 +11,41 @@ https://github.com/woutervanranst/Arius - + - + - + - - - - - - - - - - - - + + + + + + + + + + + + - + - +