Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@

namespace Foundatio.Azure.Extensions {
public static class StorageExtensions {
internal const string UncompressedLength = "XUncompressedLength";
public static FileSpec ToFileInfo(this CloudBlockBlob blob) {
if (blob.Properties.Length == -1)
if (!blob.Metadata.TryGetValue(UncompressedLength, out string lengthStr) ||
!Int64.TryParse(lengthStr, out long length))
{
length = blob.Properties.Length;
}

if (length == -1)
return null;

return new FileSpec {
Path = blob.Name,
Size = blob.Properties.Length,
Size = length,
Created = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue,
Modified = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue
};
Expand Down
5 changes: 5 additions & 0 deletions src/Foundatio.AzureStorage/Extensions/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public static ConfiguredTaskAwaitable AnyContext(this Task task) {
return task.ConfigureAwait(continueOnCapturedContext: false);
}

[DebuggerStepThrough]
public static ConfiguredValueTaskAwaitable AnyContext(this ValueTask task) {
return task.ConfigureAwait(continueOnCapturedContext: false);
}

[DebuggerStepThrough]
public static ConfiguredTaskAwaitable<TResult> AnyContext<TResult>(this AwaitableDisposable<TResult> task) where TResult : IDisposable {
return task.ConfigureAwait(continueOnCapturedContext: false);
Expand Down
1 change: 1 addition & 0 deletions src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<PackageReference Include="Foundatio" Version="10.0.2" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.0" />
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.0" />
<PackageReference Include="System.IO.Pipelines" Version="5.0.1" />

<ProjectReference Include="..\..\..\Foundatio\src\Foundatio\Foundatio.csproj" Condition="'$(ReferenceFoundatioSource)' == 'true'" />
</ItemGroup>
Expand Down
159 changes: 149 additions & 10 deletions src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.IO.Pipelines;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
Expand All @@ -14,6 +16,10 @@

namespace Foundatio.Storage {
public class AzureFileStorage : IFileStorage {
private const int cacheSize = 64;
private static readonly Queue<Pipe> s_pipeCache = new Queue<Pipe>(cacheSize);

private readonly Task _containerCreation;
private readonly CloudBlobContainer _container;
private readonly ISerializer _serializer;

Expand All @@ -24,7 +30,7 @@ public AzureFileStorage(AzureFileStorageOptions options) {
var account = CloudStorageAccount.Parse(options.ConnectionString);
var client = account.CreateCloudBlobClient();
_container = client.GetContainerReference(options.ContainerName);
_container.CreateIfNotExistsAsync().GetAwaiter().GetResult();
_containerCreation = _container.CreateIfNotExistsAsync();
_serializer = options.Serializer ?? DefaultSerializer.Instance;
}

Expand All @@ -37,9 +43,22 @@ public async Task<Stream> GetFileStreamAsync(string path, CancellationToken canc
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

await EnsureContainerCreated().AnyContext();

var blockBlob = _container.GetBlockBlobReference(path);
try {
return await blockBlob.OpenReadAsync(null, null, null, cancellationToken).AnyContext();
await blockBlob.FetchAttributesAsync().AnyContext();
var option = new BlobRequestOptions() {
// As we are transmitting over TLS we don't need the MD5 validation
DisableContentMD5Validation = false
};
var blobStream = await blockBlob.OpenReadAsync(null, option, null, cancellationToken).AnyContext();
if (blockBlob.Metadata.TryGetValue(StorageExtensions.UncompressedLength, out _)) {
// If compressed return decompressing Stream
return new GZipStream(blobStream, CompressionMode.Decompress);
}
// Otherwise return original Stream
return blobStream;
} catch (StorageException ex) {
if (ex.RequestInformation.HttpStatusCode == 404)
return null;
Expand All @@ -52,6 +71,8 @@ public async Task<FileSpec> GetFileInfoAsync(string path) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

await EnsureContainerCreated().AnyContext();

var blob = _container.GetBlockBlobReference(path);
try {
await blob.FetchAttributesAsync().AnyContext();
Expand All @@ -61,12 +82,14 @@ public async Task<FileSpec> GetFileInfoAsync(string path) {
return null;
}

public Task<bool> ExistsAsync(string path) {
public async Task<bool> ExistsAsync(string path) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

await EnsureContainerCreated().AnyContext();

var blockBlob = _container.GetBlockBlobReference(path);
return blockBlob.ExistsAsync();
return await blockBlob.ExistsAsync().AnyContext();
}

public async Task<bool> SaveFileAsync(string path, Stream stream, CancellationToken cancellationToken = default) {
Expand All @@ -75,9 +98,50 @@ public async Task<bool> SaveFileAsync(string path, Stream stream, CancellationTo

if (stream == null)
throw new ArgumentNullException(nameof(stream));


await EnsureContainerCreated().AnyContext();

var blockBlob = _container.GetBlockBlobReference(path);
await blockBlob.UploadFromStreamAsync(stream, null, null, null, cancellationToken).AnyContext();

// GZipStream doesn't allow it to be the source of compressed data, and needs to go via an intermediate like MemoryStream.
// Rather than buffering everything in memory this way, we use Pipelines to invert the Stream, so the compression can be
// streamed on the fly.
var pipe = RentPipe();
var gzipStream = new GZipStream(pipe.Writer.AsStream(), CompressionMode.Compress);
var uploadTask = Task.CompletedTask;
try {
if (!stream.CanSeek) {
// Need to record the uncompressed size in the metadata
stream = new CountingStream(stream);
}

var copyTask = stream.CopyToAsync(gzipStream);

var option = new BlobRequestOptions() {
// As we are transmitting over TLS we don't need the MD5 validation
DisableContentMD5Validation = false
};
uploadTask = blockBlob.UploadFromStreamAsync(pipe.Reader.AsStream(), null, option, null, cancellationToken);

await copyTask.AnyContext();
await gzipStream.FlushAsync().AnyContext();
} finally {
gzipStream.Dispose();
await pipe.Writer.CompleteAsync().AnyContext();
await uploadTask.AnyContext();
await pipe.Reader.CompleteAsync().AnyContext();
}

ReturnPipe(pipe);

// Set headers
if (path.EndsWith(".json")) {
blockBlob.Properties.ContentType = "application/json";
}
blockBlob.Properties.ContentEncoding = "gzip";
await blockBlob.SetPropertiesAsync().AnyContext();
blockBlob.Metadata.Add(StorageExtensions.UncompressedLength, stream.Length.ToString((IFormatProvider)null));
await blockBlob.SetMetadataAsync().AnyContext();

return true;
}
Expand All @@ -88,6 +152,8 @@ public async Task<bool> RenameFileAsync(string path, string newPath, Cancellatio
if (String.IsNullOrEmpty(newPath))
throw new ArgumentNullException(nameof(newPath));

await EnsureContainerCreated().AnyContext();

var oldBlob = _container.GetBlockBlobReference(path);
if (!(await CopyFileAsync(path, newPath, cancellationToken).AnyContext()))
return false;
Expand All @@ -101,6 +167,8 @@ public async Task<bool> CopyFileAsync(string path, string targetPath, Cancellati
if (String.IsNullOrEmpty(targetPath))
throw new ArgumentNullException(nameof(targetPath));

await EnsureContainerCreated().AnyContext();

var oldBlob = _container.GetBlockBlobReference(path);
var newBlob = _container.GetBlockBlobReference(targetPath);

Expand All @@ -111,12 +179,14 @@ public async Task<bool> CopyFileAsync(string path, string targetPath, Cancellati
return newBlob.CopyState.Status == CopyStatus.Success;
}

public Task<bool> DeleteFileAsync(string path, CancellationToken cancellationToken = default) {
public async Task<bool> DeleteFileAsync(string path, CancellationToken cancellationToken = default) {
if (String.IsNullOrEmpty(path))
throw new ArgumentNullException(nameof(path));

await EnsureContainerCreated().AnyContext();

var blockBlob = _container.GetBlockBlobReference(path);
return blockBlob.DeleteIfExistsAsync(DeleteSnapshotsOption.None, null, null, null, cancellationToken);
return await blockBlob.DeleteIfExistsAsync(DeleteSnapshotsOption.None, null, null, null, cancellationToken).AnyContext();
}

public async Task<int> DeleteFilesAsync(string searchPattern = null, CancellationToken cancellationToken = default) {
Expand Down Expand Up @@ -145,7 +215,7 @@ private async Task<NextPageResult> GetFiles(string searchPattern, int page, int
int pagingLimit = pageSize;
int skip = (page - 1) * pagingLimit;
if (pagingLimit < Int32.MaxValue)
pagingLimit = pagingLimit + 1;
pagingLimit++;

var list = (await GetFileListAsync(searchPattern, pagingLimit, skip, cancellationToken).AnyContext()).ToList();
bool hasMore = false;
Expand Down Expand Up @@ -175,7 +245,9 @@ public async Task<IEnumerable<FileSpec>> GetFileListAsync(string searchPattern =
int slashPos = searchPattern.LastIndexOf('/');
prefix = slashPos >= 0 ? searchPattern.Substring(0, slashPos) : String.Empty;
}
prefix = prefix ?? String.Empty;
prefix ??= String.Empty;

await EnsureContainerCreated().AnyContext();

BlobContinuationToken continuationToken = null;
var blobs = new List<CloudBlockBlob>();
Expand All @@ -193,7 +265,74 @@ public async Task<IEnumerable<FileSpec>> GetFileListAsync(string searchPattern =
return blobs.Select(blob => blob.ToFileInfo());
}

private Task EnsureContainerCreated() => _containerCreation;

private static Pipe RentPipe() {
var cache = s_pipeCache;
lock (cache) {
if (cache.Count > 0)
return cache.Dequeue();
}

return new Pipe();
}

private static void ReturnPipe(Pipe pipe) {
pipe.Reset();
var cache = s_pipeCache;
lock (cache) {
if (cache.Count < cacheSize)
cache.Enqueue(pipe);
}
}

public void Dispose() {}

// Used to get the uncompressed length from a Stream that doesn't support querying it upfront
private class CountingStream : Stream {
private readonly Stream stream;
private long readLength = 0;

public CountingStream(Stream stream) {
this.stream = stream;
}

public override long Length => readLength;

public override long Position { get => readLength; set => throw new NotSupportedException(); }

public override int Read(byte[] buffer, int offset, int count) {
int amount = stream.Read(buffer, offset, count);
readLength += amount;
return amount;
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
int amount = await stream.ReadAsync(buffer, offset, count, cancellationToken);
readLength += amount;
return amount;
}

public override int EndRead(IAsyncResult asyncResult) {
int amount = stream.EndRead(asyncResult);
readLength += amount;
return amount;
}

public override bool CanRead => stream.CanRead;

public override bool CanSeek => false;

public override bool CanWrite => false;

public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();

public override void SetLength(long value) => throw new NotSupportedException();

public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

public override void Flush() => throw new NotSupportedException();
}
}

internal static class BlobListExtensions {
Expand Down