diff --git a/src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs b/src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs index d846690..fa1f2b3 100644 --- a/src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs +++ b/src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs @@ -4,13 +4,20 @@ namespace Foundatio.Azure.Extensions { public static class StorageExtensions { + internal const string UncompressedLength = "XUncompressedLength"; public static FileSpec ToFileInfo(this CloudBlockBlob blob) { - if (blob.Properties.Length == -1) + if (!blob.Metadata.TryGetValue(UncompressedLength, out string lengthStr) || + !Int64.TryParse(lengthStr, out long length)) + { + length = blob.Properties.Length; + } + + if (length == -1) return null; return new FileSpec { Path = blob.Name, - Size = blob.Properties.Length, + Size = length, Created = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue, Modified = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue }; diff --git a/src/Foundatio.AzureStorage/Extensions/TaskExtensions.cs b/src/Foundatio.AzureStorage/Extensions/TaskExtensions.cs index 42ec656..fd317ae 100644 --- a/src/Foundatio.AzureStorage/Extensions/TaskExtensions.cs +++ b/src/Foundatio.AzureStorage/Extensions/TaskExtensions.cs @@ -16,6 +16,11 @@ public static ConfiguredTaskAwaitable AnyContext(this Task task) { return task.ConfigureAwait(continueOnCapturedContext: false); } + [DebuggerStepThrough] + public static ConfiguredValueTaskAwaitable AnyContext(this ValueTask task) { + return task.ConfigureAwait(continueOnCapturedContext: false); + } + [DebuggerStepThrough] public static ConfiguredTaskAwaitable AnyContext(this AwaitableDisposable task) where TResult : IDisposable { return task.ConfigureAwait(continueOnCapturedContext: false); diff --git a/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj b/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj index 50a8c65..a6cffa9 100644 --- a/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj +++ b/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj @@ -8,6 +8,7 @@ + diff --git a/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs b/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs index d03d2f0..75d8200 100644 --- a/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs +++ b/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; using System.IO; +using System.IO.Compression; +using System.IO.Pipelines; using System.Linq; using System.Text.RegularExpressions; using System.Threading; @@ -14,6 +16,10 @@ namespace Foundatio.Storage { public class AzureFileStorage : IFileStorage { + private const int cacheSize = 64; + private static readonly Queue s_pipeCache = new Queue(cacheSize); + + private readonly Task _containerCreation; private readonly CloudBlobContainer _container; private readonly ISerializer _serializer; @@ -24,7 +30,7 @@ public AzureFileStorage(AzureFileStorageOptions options) { var account = CloudStorageAccount.Parse(options.ConnectionString); var client = account.CreateCloudBlobClient(); _container = client.GetContainerReference(options.ContainerName); - _container.CreateIfNotExistsAsync().GetAwaiter().GetResult(); + _containerCreation = _container.CreateIfNotExistsAsync(); _serializer = options.Serializer ?? DefaultSerializer.Instance; } @@ -37,9 +43,22 @@ public async Task GetFileStreamAsync(string path, CancellationToken canc if (String.IsNullOrEmpty(path)) throw new ArgumentNullException(nameof(path)); + await EnsureContainerCreated().AnyContext(); + var blockBlob = _container.GetBlockBlobReference(path); try { - return await blockBlob.OpenReadAsync(null, null, null, cancellationToken).AnyContext(); + await blockBlob.FetchAttributesAsync().AnyContext(); + var option = new BlobRequestOptions() { + // As we are transmitting over TLS we don't need the MD5 validation + DisableContentMD5Validation = false + }; + var blobStream = await blockBlob.OpenReadAsync(null, option, null, cancellationToken).AnyContext(); + if (blockBlob.Metadata.TryGetValue(StorageExtensions.UncompressedLength, out _)) { + // If compressed return decompressing Stream + return new GZipStream(blobStream, CompressionMode.Decompress); + } + // Otherwise return original Stream + return blobStream; } catch (StorageException ex) { if (ex.RequestInformation.HttpStatusCode == 404) return null; @@ -52,6 +71,8 @@ public async Task GetFileInfoAsync(string path) { if (String.IsNullOrEmpty(path)) throw new ArgumentNullException(nameof(path)); + await EnsureContainerCreated().AnyContext(); + var blob = _container.GetBlockBlobReference(path); try { await blob.FetchAttributesAsync().AnyContext(); @@ -61,12 +82,14 @@ public async Task GetFileInfoAsync(string path) { return null; } - public Task ExistsAsync(string path) { + public async Task ExistsAsync(string path) { if (String.IsNullOrEmpty(path)) throw new ArgumentNullException(nameof(path)); + await EnsureContainerCreated().AnyContext(); + var blockBlob = _container.GetBlockBlobReference(path); - return blockBlob.ExistsAsync(); + return await blockBlob.ExistsAsync().AnyContext(); } public async Task SaveFileAsync(string path, Stream stream, CancellationToken cancellationToken = default) { @@ -75,9 +98,50 @@ public async Task SaveFileAsync(string path, Stream stream, CancellationTo if (stream == null) throw new ArgumentNullException(nameof(stream)); - + + await EnsureContainerCreated().AnyContext(); + var blockBlob = _container.GetBlockBlobReference(path); - await blockBlob.UploadFromStreamAsync(stream, null, null, null, cancellationToken).AnyContext(); + + // GZipStream doesn't allow it to be the source of compressed data, and needs to go via an intermediate like MemoryStream. + // Rather than buffering everything in memory this way, we use Pipelines to invert the Stream, so the compression can be + // streamed on the fly. + var pipe = RentPipe(); + var gzipStream = new GZipStream(pipe.Writer.AsStream(), CompressionMode.Compress); + var uploadTask = Task.CompletedTask; + try { + if (!stream.CanSeek) { + // Need to record the uncompressed size in the metadata + stream = new CountingStream(stream); + } + + var copyTask = stream.CopyToAsync(gzipStream); + + var option = new BlobRequestOptions() { + // As we are transmitting over TLS we don't need the MD5 validation + DisableContentMD5Validation = false + }; + uploadTask = blockBlob.UploadFromStreamAsync(pipe.Reader.AsStream(), null, option, null, cancellationToken); + + await copyTask.AnyContext(); + await gzipStream.FlushAsync().AnyContext(); + } finally { + gzipStream.Dispose(); + await pipe.Writer.CompleteAsync().AnyContext(); + await uploadTask.AnyContext(); + await pipe.Reader.CompleteAsync().AnyContext(); + } + + ReturnPipe(pipe); + + // Set headers + if (path.EndsWith(".json")) { + blockBlob.Properties.ContentType = "application/json"; + } + blockBlob.Properties.ContentEncoding = "gzip"; + await blockBlob.SetPropertiesAsync().AnyContext(); + blockBlob.Metadata.Add(StorageExtensions.UncompressedLength, stream.Length.ToString((IFormatProvider)null)); + await blockBlob.SetMetadataAsync().AnyContext(); return true; } @@ -88,6 +152,8 @@ public async Task RenameFileAsync(string path, string newPath, Cancellatio if (String.IsNullOrEmpty(newPath)) throw new ArgumentNullException(nameof(newPath)); + await EnsureContainerCreated().AnyContext(); + var oldBlob = _container.GetBlockBlobReference(path); if (!(await CopyFileAsync(path, newPath, cancellationToken).AnyContext())) return false; @@ -101,6 +167,8 @@ public async Task CopyFileAsync(string path, string targetPath, Cancellati if (String.IsNullOrEmpty(targetPath)) throw new ArgumentNullException(nameof(targetPath)); + await EnsureContainerCreated().AnyContext(); + var oldBlob = _container.GetBlockBlobReference(path); var newBlob = _container.GetBlockBlobReference(targetPath); @@ -111,12 +179,14 @@ public async Task CopyFileAsync(string path, string targetPath, Cancellati return newBlob.CopyState.Status == CopyStatus.Success; } - public Task DeleteFileAsync(string path, CancellationToken cancellationToken = default) { + public async Task DeleteFileAsync(string path, CancellationToken cancellationToken = default) { if (String.IsNullOrEmpty(path)) throw new ArgumentNullException(nameof(path)); + await EnsureContainerCreated().AnyContext(); + var blockBlob = _container.GetBlockBlobReference(path); - return blockBlob.DeleteIfExistsAsync(DeleteSnapshotsOption.None, null, null, null, cancellationToken); + return await blockBlob.DeleteIfExistsAsync(DeleteSnapshotsOption.None, null, null, null, cancellationToken).AnyContext(); } public async Task DeleteFilesAsync(string searchPattern = null, CancellationToken cancellationToken = default) { @@ -145,7 +215,7 @@ private async Task GetFiles(string searchPattern, int page, int int pagingLimit = pageSize; int skip = (page - 1) * pagingLimit; if (pagingLimit < Int32.MaxValue) - pagingLimit = pagingLimit + 1; + pagingLimit++; var list = (await GetFileListAsync(searchPattern, pagingLimit, skip, cancellationToken).AnyContext()).ToList(); bool hasMore = false; @@ -175,7 +245,9 @@ public async Task> GetFileListAsync(string searchPattern = int slashPos = searchPattern.LastIndexOf('/'); prefix = slashPos >= 0 ? searchPattern.Substring(0, slashPos) : String.Empty; } - prefix = prefix ?? String.Empty; + prefix ??= String.Empty; + + await EnsureContainerCreated().AnyContext(); BlobContinuationToken continuationToken = null; var blobs = new List(); @@ -193,7 +265,74 @@ public async Task> GetFileListAsync(string searchPattern = return blobs.Select(blob => blob.ToFileInfo()); } + private Task EnsureContainerCreated() => _containerCreation; + + private static Pipe RentPipe() { + var cache = s_pipeCache; + lock (cache) { + if (cache.Count > 0) + return cache.Dequeue(); + } + + return new Pipe(); + } + + private static void ReturnPipe(Pipe pipe) { + pipe.Reset(); + var cache = s_pipeCache; + lock (cache) { + if (cache.Count < cacheSize) + cache.Enqueue(pipe); + } + } + public void Dispose() {} + + // Used to get the uncompressed length from a Stream that doesn't support querying it upfront + private class CountingStream : Stream { + private readonly Stream stream; + private long readLength = 0; + + public CountingStream(Stream stream) { + this.stream = stream; + } + + public override long Length => readLength; + + public override long Position { get => readLength; set => throw new NotSupportedException(); } + + public override int Read(byte[] buffer, int offset, int count) { + int amount = stream.Read(buffer, offset, count); + readLength += amount; + return amount; + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { + int amount = await stream.ReadAsync(buffer, offset, count, cancellationToken); + readLength += amount; + return amount; + } + + public override int EndRead(IAsyncResult asyncResult) { + int amount = stream.EndRead(asyncResult); + readLength += amount; + return amount; + } + + public override bool CanRead => stream.CanRead; + + public override bool CanSeek => false; + + public override bool CanWrite => false; + + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + + public override void SetLength(long value) => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + + public override void Flush() => throw new NotSupportedException(); + } } internal static class BlobListExtensions {