Skip to content

Commit a2f80e1

Browse files
feat: fix concurrency issue in GetCachedTarAsync
1 parent 15c325d commit a2f80e1

File tree

1 file changed

+47
-18
lines changed

1 file changed

+47
-18
lines changed

src/Arius.Core/Features/Commands/Restore/RestoreCommandHandler.cs

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Arius.Core.Shared.Extensions;
1+
using Arius.Core.Shared.Concurrency;
2+
using Arius.Core.Shared.Extensions;
23
using Arius.Core.Shared.FileSystem;
34
using Arius.Core.Shared.Hashing;
45
using Arius.Core.Shared.StateRepositories;
@@ -38,6 +39,8 @@ public RestoreCommandHandler(
3839
private readonly Channel<FilePairWithPointerFileEntry> filePairsToRestoreChannel = ChannelExtensions.CreateBounded<FilePairWithPointerFileEntry>(capacity: 25, singleWriter: true, singleReader: false);
3940
private readonly Channel<FilePairWithPointerFileEntry> filePairsToHashChannel = ChannelExtensions.CreateBounded<FilePairWithPointerFileEntry>(capacity: 25, singleWriter: true, singleReader: false);
4041

42+
private readonly InFlightGate<Hash, FileEntry?> _tarCacheGate = new();
43+
4144
private record FilePairWithPointerFileEntry(FilePair FilePair, PointerFileEntry PointerFileEntry);
4245

4346
public async ValueTask<RestoreCommandResult> Handle(RestoreCommand request, CancellationToken cancellationToken)
@@ -369,32 +372,58 @@ private async Task DownloadSmallFileAsync(HandlerContext handlerContext, FilePai
369372

370373
async Task<FileEntry?> GetCachedTarAsync()
371374
{
372-
var cachedBinary = handlerContext.BinaryCache.GetFileEntry(parentHash.ToString());
373-
if (!cachedBinary.Exists)
375+
var (isOwner, waitTask) = _tarCacheGate.Enter(parentHash);
376+
if (isOwner)
374377
{
375-
logger.LogDebug("TAR archive not cached, downloading from blob storage (parent hash: {ParentHash})", parentHash.ToShortString());
376-
377-
// The TAR was not yet downloaded from blob storage
378-
await using var ss = await GetChunkStreamAsync(handlerContext, pointerFileEntry, cancellationToken);
378+
try
379+
{
380+
var cachedBinary = handlerContext.BinaryCache.GetFileEntry(parentHash.ToString());
381+
if (!cachedBinary.Exists)
382+
{
383+
// The TAR was not yet downloaded from blob storage
384+
logger.LogDebug("TAR archive not cached, downloading from blob storage (parent hash: {ParentHash})", parentHash.ToShortString());
379385

380-
if (ss is null) // Chunk is not available (either archived or rehydrating)
386+
await using var ss = await GetChunkStreamAsync(handlerContext, pointerFileEntry, cancellationToken);
387+
if (ss is null)
388+
{
389+
// Chunk is not available (either archived or rehydrating)
390+
_tarCacheGate.Complete(parentHash, null);
391+
return null;
392+
}
393+
394+
try
395+
{
396+
await using var ts = cachedBinary.Open(FileMode.CreateNew, FileAccess.Write, FileShare.None);
397+
await ss.CopyToAsync(ts, cancellationToken);
398+
await ts.FlushAsync(cancellationToken); // Explicitly flush
399+
400+
logger.LogDebug("TAR archive cached successfully for {ParentHash}", parentHash.ToShortString());
401+
}
402+
catch (IOException)
403+
{
404+
// Another writer raced in; file now exists — OK to continue
405+
}
406+
}
407+
408+
_tarCacheGate.Complete(parentHash, cachedBinary);
409+
return cachedBinary;
410+
}
411+
catch (OperationCanceledException)
381412
{
382-
logger.LogDebug("TAR archive chunk stream not available for {ParentHash}", parentHash.ToShortString());
383-
return null;
413+
_tarCacheGate.Cancel(parentHash, cancellationToken);
414+
throw;
415+
}
416+
catch (Exception ex)
417+
{
418+
_tarCacheGate.Fault(parentHash, ex);
419+
throw;
384420
}
385-
386-
await using var ts = cachedBinary.Open(FileMode.CreateNew, FileAccess.Write, FileShare.None);
387-
await ss.CopyToAsync(ts, cancellationToken);
388-
await ts.FlushAsync(cancellationToken); // Explicitly flush
389-
390-
logger.LogDebug("TAR archive cached successfully for {ParentHash}", parentHash.ToShortString());
391421
}
392422
else
393423
{
394424
logger.LogDebug("Using cached TAR archive for {ParentHash}", parentHash.ToShortString());
425+
return await waitTask;
395426
}
396-
397-
return cachedBinary;
398427
}
399428

400429
async Task<TarEntry?> GetTarEntryAsync(Hash hash)

0 commit comments

Comments
 (0)