Skip to content
17 changes: 10 additions & 7 deletions src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
using System;
using Foundatio.Storage;
using Microsoft.Azure.Storage.Blob;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs;

namespace Foundatio.Azure.Extensions {
public static class StorageExtensions {
public static FileSpec ToFileInfo(this CloudBlockBlob blob) {
if (blob.Properties.Length == -1)
public static FileSpec ToFileInfo(this BlobProperties blob, string name) {
if (blob.ContentLength == -1)
return null;

return new FileSpec {
Path = blob.Name,
Size = blob.Properties.Length,
Created = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue,
Modified = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue
Path = name,
Size = blob.ContentLength,
Created = blob.CreatedOn.UtcDateTime,
Modified = blob.LastModified.UtcDateTime
};
}


}
}
4 changes: 2 additions & 2 deletions src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<PackageTags>Queue;Messaging;Message;File;Distributed;Storage;Blob;Azure</PackageTags>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" Version="12.7.0" />
<PackageReference Include="Azure.Storage.Queues" Version="12.5.0" />
<PackageReference Include="Foundatio" Version="10.0.0" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.0" />
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.0" />

<ProjectReference Include="..\..\..\Foundatio\src\Foundatio\Foundatio.csproj" Condition="'$(ReferenceFoundatioSource)' == 'true'" />
</ItemGroup>
Expand Down
84 changes: 45 additions & 39 deletions src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
using Foundatio.Serializer;
using Foundatio.Utility;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using LogLevel = Microsoft.Extensions.Logging.LogLevel;

namespace Foundatio.Queues {
public class AzureStorageQueue<T> : QueueBase<T, AzureStorageQueueOptions<T>> where T : class {
private readonly AsyncLock _lock = new AsyncLock();
private readonly CloudQueue _queueReference;
private readonly CloudQueue _deadletterQueueReference;
private readonly QueueClient _queueReference;
private readonly QueueClient _deadletterQueueReference;
private long _enqueuedCount;
private long _dequeuedCount;
private long _completedCount;
Expand All @@ -27,14 +27,20 @@ public class AzureStorageQueue<T> : QueueBase<T, AzureStorageQueueOptions<T>> wh
public AzureStorageQueue(AzureStorageQueueOptions<T> options) : base(options) {
if (String.IsNullOrEmpty(options.ConnectionString))
throw new ArgumentException("ConnectionString is required.");

var account = CloudStorageAccount.Parse(options.ConnectionString);
var client = account.CreateCloudQueueClient();
if (options.RetryPolicy != null)
client.DefaultRequestOptions.RetryPolicy = options.RetryPolicy;

_queueReference = client.GetQueueReference(_options.Name);
_deadletterQueueReference = client.GetQueueReference($"{_options.Name}-poison");
// properties going in the queueservice client ( should match with IRetryPolicy of v11)
// with exponential mode ( RetryOptions.Delay vs retryDelay , RetryOptons.MaxRetris vs retries of v11)
var queueClientOptions = new QueueClientOptions {
Retry = {
MaxRetries = options.Retries, //The maximum number of retry attempts before giving up
Delay = options.Delay, //The delay between retry attempts for a fixed approach or the delay on which to base
Mode = options.RetryMode
}
};
var queueServiceClient = new QueueServiceClient(options.ConnectionString, queueClientOptions);

_queueReference = queueServiceClient.GetQueueClient(_options.Name);
_deadletterQueueReference = queueServiceClient.GetQueueClient($"{_options.Name}-poison");
}

public AzureStorageQueue(Builder<AzureStorageQueueOptionsBuilder<T>, AzureStorageQueueOptions<T>> config)
Expand All @@ -49,9 +55,11 @@ protected override async Task EnsureQueueCreatedAsync(CancellationToken cancella
return;

var sw = Stopwatch.StartNew();
var qTask = _queueReference.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
var dTask = _deadletterQueueReference.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
await Task.WhenAll(
_queueReference.CreateIfNotExistsAsync(),
_deadletterQueueReference.CreateIfNotExistsAsync()
qTask,
dTask
).AnyContext();
_queueCreated = true;

Expand All @@ -65,25 +73,26 @@ protected override async Task<string> EnqueueImplAsync(T data, QueueEntryOptions
return null;

Interlocked.Increment(ref _enqueuedCount);
var message = new CloudQueueMessage(_serializer.SerializeToBytes(data));
await _queueReference.AddMessageAsync(message).AnyContext();
var body = _serializer.SerializeToBytes(data);
var binaryData = new BinaryData(body);
SendReceipt result = await _queueReference.SendMessageAsync(binaryData).AnyContext();

var entry = new QueueEntry<T>(message.Id, null, data, this, SystemClock.UtcNow, 0);
var entry = new QueueEntry<T>(result.MessageId, null, data, this, SystemClock.UtcNow, 0);
await OnEnqueuedAsync(entry).AnyContext();

return message.Id;
return result.MessageId;
}

protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken linkedCancellationToken) {
var message = await _queueReference.GetMessageAsync(_options.WorkItemTimeout, null, null, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext();
QueueMessage[] receivedMessage = await _queueReference.ReceiveMessagesAsync(null, _options.WorkItemTimeout, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext();
bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);

if (message == null) {
if (receivedMessage != null && receivedMessage.Length == 0) {
var sw = Stopwatch.StartNew();
var lastReport = DateTime.Now;
if (isTraceLogLevelEnabled) _logger.LogTrace("No message available to dequeue, waiting...");

while (message == null && !linkedCancellationToken.IsCancellationRequested) {
while (receivedMessage != null && receivedMessage.Length == 0 && !linkedCancellationToken.IsCancellationRequested) {
if (isTraceLogLevelEnabled && DateTime.Now.Subtract(lastReport) > TimeSpan.FromSeconds(10))
_logger.LogTrace("Still waiting for message to dequeue: {Elapsed:g}", sw.Elapsed);

Expand All @@ -92,30 +101,31 @@ protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken
await SystemClock.SleepAsync(_options.DequeueInterval, linkedCancellationToken).AnyContext();
} catch (OperationCanceledException) { }

message = await _queueReference.GetMessageAsync(_options.WorkItemTimeout, null, null, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext();
receivedMessage = await _queueReference.ReceiveMessagesAsync(null, _options.WorkItemTimeout, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext();
}

sw.Stop();
if (isTraceLogLevelEnabled) _logger.LogTrace("Waited to dequeue message: {Elapsed:g}", sw.Elapsed);
}

if (message == null) {
if (receivedMessage != null && receivedMessage.Length == 0) {
if (isTraceLogLevelEnabled) _logger.LogTrace("No message was dequeued.");
return null;
}

if (isTraceLogLevelEnabled) _logger.LogTrace("Dequeued message {Id}", message.Id);
if (isTraceLogLevelEnabled) _logger.LogTrace("Dequeued message {Id}", receivedMessage[0].MessageId);
Interlocked.Increment(ref _dequeuedCount);
var data = _serializer.Deserialize<T>(message.AsBytes);
var entry = new AzureStorageQueueEntry<T>(message, data, this);
var data = _serializer.Deserialize<T>(receivedMessage[0].Body.ToArray());
var entry = new AzureStorageQueueEntry<T>(receivedMessage[0], data, this);
await OnDequeuedAsync(entry).AnyContext();
return entry;
}

public override async Task RenewLockAsync(IQueueEntry<T> entry) {
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Queue {Name} renew lock item: {EntryId}", _options.Name, entry.Id);
var azureQueueEntry = ToAzureEntryWithCheck(entry);
await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage, _options.WorkItemTimeout, MessageUpdateFields.Visibility).AnyContext();
await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage.MessageId, azureQueueEntry.UnderlyingMessage.PopReceipt, azureQueueEntry.UnderlyingMessage.Body, TimeSpan.Zero).AnyContext();

await OnLockRenewedAsync(entry).AnyContext();
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Renew lock done: {EntryId}", entry.Id);
}
Expand All @@ -126,7 +136,7 @@ public override async Task CompleteAsync(IQueueEntry<T> entry) {
throw new InvalidOperationException("Queue entry has already been completed or abandoned.");

var azureQueueEntry = ToAzureEntryWithCheck(entry);
await _queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage).AnyContext();
await _queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage.MessageId, azureQueueEntry.UnderlyingMessage.PopReceipt).AnyContext();

Interlocked.Increment(ref _completedCount);
entry.MarkCompleted();
Expand All @@ -142,12 +152,12 @@ public override async Task AbandonAsync(IQueueEntry<T> entry) {
var azureQueueEntry = ToAzureEntryWithCheck(entry);
if (azureQueueEntry.Attempts > _options.Retries) {
await Task.WhenAll(
_queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage),
_deadletterQueueReference.AddMessageAsync(azureQueueEntry.UnderlyingMessage)
_queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage.MessageId, azureQueueEntry.UnderlyingMessage.PopReceipt),
_deadletterQueueReference.SendMessageAsync(azureQueueEntry.UnderlyingMessage.Body)
).AnyContext();
} else {
// Make the item visible immediately
await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage, TimeSpan.Zero, MessageUpdateFields.Visibility).AnyContext();
await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage.MessageId, azureQueueEntry.UnderlyingMessage.PopReceipt, azureQueueEntry.UnderlyingMessage.Body, TimeSpan.Zero).AnyContext();
}

Interlocked.Increment(ref _abandonedCount);
Expand All @@ -162,18 +172,14 @@ protected override Task<IEnumerable<T>> GetDeadletterItemsImplAsync(Cancellation

protected override async Task<QueueStats> GetQueueStatsImplAsync() {
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Fetching stats.");
var sw = Stopwatch.StartNew();
await Task.WhenAll(
_queueReference.FetchAttributesAsync(),
_deadletterQueueReference.FetchAttributesAsync()
).AnyContext();
sw.Stop();
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Fetching stats took {Elapsed:g}.", sw.Elapsed);

QueueProperties queuedMessageCount = await _queueReference.GetPropertiesAsync();
QueueProperties deadLetterQueueMessageCount = await _deadletterQueueReference.GetPropertiesAsync();

return new QueueStats {
Queued = _queueReference.ApproximateMessageCount.GetValueOrDefault(),
Queued = queuedMessageCount.ApproximateMessagesCount,
Working = 0,
Deadletter = _deadletterQueueReference.ApproximateMessageCount.GetValueOrDefault(),
Deadletter = deadLetterQueueMessageCount.ApproximateMessagesCount,
Enqueued = _enqueuedCount,
Dequeued = _dequeuedCount,
Completed = _completedCount,
Expand Down
8 changes: 4 additions & 4 deletions src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using System;
using Microsoft.Azure.Storage.Queue;
using Azure.Storage.Queues.Models;

namespace Foundatio.Queues {
public class AzureStorageQueueEntry<T> : QueueEntry<T> where T : class {
public CloudQueueMessage UnderlyingMessage { get; }
public QueueMessage UnderlyingMessage { get; }

public AzureStorageQueueEntry(CloudQueueMessage message, T value, IQueue<T> queue)
: base(message.Id, null, value, queue, message.InsertionTime.GetValueOrDefault().UtcDateTime, message.DequeueCount) {
public AzureStorageQueueEntry(QueueMessage message, T value, IQueue<T> queue)
: base(message.MessageId, null, value, queue, message.InsertedOn.GetValueOrDefault().UtcDateTime, (int)message.DequeueCount) {

UnderlyingMessage = message;
}
Expand Down
21 changes: 15 additions & 6 deletions src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
using System;
using Microsoft.Azure.Storage.RetryPolicies;
using Azure.Core;

namespace Foundatio.Queues {
public class AzureStorageQueueOptions<T> : SharedQueueOptions<T> where T : class {
public string ConnectionString { get; set; }
public IRetryPolicy RetryPolicy { get; set; }
public TimeSpan DequeueInterval { get; set; } = TimeSpan.FromSeconds(2);
public RetryMode RetryMode { get; set; }

// The delay between retry attempts for a fixed approach or the delay on which to base calculations for a backoff-based approach.
public TimeSpan Delay { get; set; }
}

public class AzureStorageQueueOptionsBuilder<T> : SharedQueueOptionsBuilder<T, AzureStorageQueueOptions<T>, AzureStorageQueueOptionsBuilder<T>> where T: class {
Expand All @@ -14,14 +17,20 @@ public AzureStorageQueueOptionsBuilder<T> ConnectionString(string connectionStri
return this;
}

public AzureStorageQueueOptionsBuilder<T> RetryPolicy(IRetryPolicy retryPolicy) {
Target.RetryPolicy = retryPolicy ?? throw new ArgumentNullException(nameof(retryPolicy));
public AzureStorageQueueOptionsBuilder<T> DequeueInterval(TimeSpan dequeueInterval) {
Target.DequeueInterval = dequeueInterval;
return this;
}

public AzureStorageQueueOptionsBuilder<T> DequeueInterval(TimeSpan dequeueInterval) {
Target.DequeueInterval = dequeueInterval;
public AzureStorageQueueOptionsBuilder<T> RetryMode(RetryMode retryMode) {
Target.RetryMode = retryMode;
return this;
}

public AzureStorageQueueOptionsBuilder<T> RetryDelay(TimeSpan retryDelay) {
Target.Delay = retryDelay;
return this;
}

}
}
Loading