44using Arius . Core . Shared . Hashing ;
55using Arius . Core . Shared . StateRepositories ;
66using Arius . Core . Shared . Storage ;
7- using AsyncKeyedLock ;
87using Humanizer ;
98using Mediator ;
109using Microsoft . Extensions . Logging ;
@@ -36,7 +35,6 @@ public RestoreCommandHandler(ILogger<RestoreCommandHandler> logger, ILoggerFacto
3635 private readonly Channel < FilePairWithPointerFileEntry > filePairsToHashChannel = ChannelExtensions . CreateBounded < FilePairWithPointerFileEntry > ( capacity : 25 , singleWriter : true , singleReader : false ) ;
3736
3837 private readonly InFlightGate < Hash , FileEntry ? > tarCacheGate = new ( ) ;
39- //private readonly AsyncKeyedLocker<Hash> tarReaderLocker = new();
4038
4139 private record FilePairWithPointerFileEntry ( FilePair FilePair , PointerFileEntry PointerFileEntry ) ;
4240
@@ -324,7 +322,7 @@ private async Task DownloadSmallFileAsync(HandlerContext handlerContext, FilePai
324322 {
325323 var ( filePair , pointerFileEntry ) = filePairWithPointerFileEntry ;
326324 var fileSizeFormatted = pointerFileEntry . BinaryProperties . OriginalSize . Bytes ( ) . Humanize ( ) ;
327- var parentHash = pointerFileEntry . BinaryProperties . ParentHash ! ;
325+ var parentHash = pointerFileEntry . BinaryProperties . ParentHash ! ;
328326
329327 logger . LogDebug ( "Starting small file download for {FileName} from TAR (size: {FileSize}, parent hash: {ParentHash})" , filePair . BinaryFile . FullName , fileSizeFormatted , parentHash . ToShortString ( ) ) ;
330328 handlerContext . Request . ProgressReporter ? . Report ( new FileProgressUpdate ( filePair . BinaryFile . FullName , 30 , "Getting TAR archive..." ) ) ;
@@ -340,56 +338,31 @@ private async Task DownloadSmallFileAsync(HandlerContext handlerContext, FilePai
340338
341339 handlerContext . Request . ProgressReporter ? . Report ( new FileProgressUpdate ( filePair . BinaryFile . FullName , 50 , "Extracting from TAR..." ) ) ;
342340
343- //// Acquire exclusive lock for reading this TAR to prevent race conditions
344- //using (await tarReaderLocker.LockAsync(parentHash, cancellationToken))
345- //{
346- logger . LogDebug ( "Acquired TAR reader lock for {ParentHash}" , parentHash . ToShortString ( ) ) ;
341+ logger . LogDebug ( "Acquired TAR reader lock for {ParentHash}" , parentHash . ToShortString ( ) ) ;
347342
348- await using var tarStream = tar . Open ( FileMode . Open , FileAccess . Read , FileShare . Read ) ;
349- await using var tarReader = new TarReader ( tarStream ) ;
350- var tarEntry = await GetTarEntryAsync ( pointerFileEntry . BinaryProperties . Hash ) ;
351- if ( tarEntry is null )
352- {
353- logger . LogError ( "TAR entry not found for file {FileName} (hash: {Hash}) in TAR archive {ParentHash}" , filePair . BinaryFile . FullName , pointerFileEntry . BinaryProperties . Hash . ToShortString ( ) , parentHash . ToShortString ( ) ) ;
354- throw new InvalidOperationException ( $ "TAR entry not found for file { filePair . BinaryFile . FullName } ") ; // TODO handle more graceful?
355- }
343+ await using var tarStream = tar . Open ( FileMode . Open , FileAccess . Read , FileShare . Read ) ;
344+ await using var tarReader = new TarReader ( tarStream ) ;
345+ var tarEntry = await GetTarEntryAsync ( pointerFileEntry . BinaryProperties . Hash ) ;
346+ if ( tarEntry is null )
347+ {
348+ logger . LogError ( "TAR entry not found for file {FileName} (hash: {Hash}) in TAR archive {ParentHash}" , filePair . BinaryFile . FullName , pointerFileEntry . BinaryProperties . Hash . ToShortString ( ) , parentHash . ToShortString ( ) ) ;
349+ throw new InvalidOperationException ( $ "TAR entry not found for file { filePair . BinaryFile . FullName } ") ; // TODO handle more graceful?
350+ }
356351
357- handlerContext . Request . ProgressReporter ? . Report ( new FileProgressUpdate ( filePair . BinaryFile . FullName , 70 , "Writing file..." ) ) ;
352+ handlerContext . Request . ProgressReporter ? . Report ( new FileProgressUpdate ( filePair . BinaryFile . FullName , 70 , "Writing file..." ) ) ;
358353
359- // 2. Write to the target file
360- filePair . BinaryFile . Directory . Create ( ) ;
361- await using ( var ts = filePair . BinaryFile . OpenWrite ( pointerFileEntry . BinaryProperties . OriginalSize ) )
354+ // 2. Write to the target file
355+ filePair . BinaryFile . Directory . Create ( ) ;
356+ await using ( var ts = filePair . BinaryFile . OpenWrite ( pointerFileEntry . BinaryProperties . OriginalSize ) )
357+ {
358+ if ( tarEntry . DataStream is not null )
362359 {
363- if ( tarEntry . DataStream is not null )
364- {
365- // NOTE: an empty file (0 byte) has a DataStream null.
366- await tarEntry . DataStream . CopyToAsync ( ts , cancellationToken ) ;
367- }
368-
369- await ts . FlushAsync ( cancellationToken ) ; // Explicitly flush
360+ // NOTE: an empty file (0 byte) has a DataStream null.
361+ await tarEntry . DataStream . CopyToAsync ( ts , cancellationToken ) ;
370362 }
371363
372- logger . LogDebug ( "Released TAR reader lock for {ParentHash}" , parentHash . ToShortString ( ) ) ;
373-
374-
375- async Task < TarEntry ? > GetTarEntryAsync ( Hash hash )
376- {
377- logger . LogDebug ( "Searching for TAR entry with hash {Hash}" , hash . ToShortString ( ) ) ;
378-
379- TarEntry ? entry ;
380- while ( ( entry = await tarReader . GetNextEntryAsync ( copyData : true , cancellationToken ) ) != null )
381- {
382- if ( entry . Name == hash )
383- {
384- logger . LogDebug ( "Found TAR entry for hash {Hash}" , hash . ToShortString ( ) ) ;
385- return entry ;
386- }
387- }
388-
389- logger . LogError ( "TAR entry not found for hash {Hash}" , hash . ToShortString ( ) ) ;
390- return null ;
391- }
392- //}
364+ await ts . FlushAsync ( cancellationToken ) ; // Explicitly flush
365+ }
393366
394367 filePair . BinaryFile . CreationTimeUtc = pointerFileEntry . CreationTimeUtc ! . Value ;
395368 filePair . BinaryFile . LastWriteTimeUtc = pointerFileEntry . LastWriteTimeUtc ! . Value ;
@@ -413,7 +386,7 @@ private async Task DownloadSmallFileAsync(HandlerContext handlerContext, FilePai
413386 logger . LogDebug ( "TAR archive not cached, downloading from blob storage (parent hash: {ParentHash})" , parentHash . ToShortString ( ) ) ;
414387
415388 await using var ss = await GetChunkStreamAsync ( handlerContext , pointerFileEntry , cancellationToken ) ;
416- if ( ss is null )
389+ if ( ss is null )
417390 {
418391 // Chunk is not available (either archived or rehydrating)
419392 tarCacheGate . Complete ( parentHash , null ) ;
@@ -458,6 +431,23 @@ private async Task DownloadSmallFileAsync(HandlerContext handlerContext, FilePai
458431 }
459432 }
460433
434+ async Task < TarEntry ? > GetTarEntryAsync ( Hash hash )
435+ {
436+ logger . LogDebug ( "Searching for TAR entry with hash {Hash}" , hash . ToShortString ( ) ) ;
437+
438+ TarEntry ? entry ;
439+ while ( ( entry = await tarReader . GetNextEntryAsync ( copyData : false , cancellationToken ) ) != null )
440+ {
441+ if ( entry . Name == hash )
442+ {
443+ logger . LogDebug ( "Found TAR entry for hash {Hash}" , hash . ToShortString ( ) ) ;
444+ return entry ;
445+ }
446+ }
447+
448+ logger . LogError ( "TAR entry not found for hash {Hash}" , hash . ToShortString ( ) ) ;
449+ return null ;
450+ }
461451 }
462452
463453 private readonly ConcurrentBag < PointerFileEntry > toRehydrateList = new ( ) ;
0 commit comments