diff --git a/src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs b/src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs
deleted file mode 100644
index aa651cd..0000000
--- a/src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs
+++ /dev/null
@@ -1,22 +0,0 @@
-using System;
-using Foundatio.Storage;
-using Microsoft.Azure.Storage.Blob;
-
-namespace Foundatio.Azure.Extensions;
-
-public static class StorageExtensions
-{
- public static FileSpec ToFileInfo(this CloudBlockBlob blob)
- {
- if (blob.Properties.Length == -1)
- return null;
-
- return new FileSpec
- {
- Path = blob.Name,
- Size = blob.Properties.Length,
- Created = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue,
- Modified = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue
- };
- }
-}
diff --git a/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj b/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj
index 63f563f..ad487d0 100644
--- a/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj
+++ b/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj
@@ -4,8 +4,8 @@
Queue;Messaging;Message;File;Distributed;Storage;Blob;Azure
-
-
+
+
diff --git a/src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs b/src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs
index 4f68a5a..c9050c2 100644
--- a/src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs
+++ b/src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs
@@ -3,11 +3,12 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
+using Azure;
+using Azure.Storage.Queues;
+using Azure.Storage.Queues.Models;
using Foundatio.AsyncEx;
using Foundatio.Extensions;
using Foundatio.Serializer;
-using Microsoft.Azure.Storage;
-using Microsoft.Azure.Storage.Queue;
using Microsoft.Extensions.Logging;
namespace Foundatio.Queues;
@@ -15,8 +16,8 @@ namespace Foundatio.Queues;
public class AzureStorageQueue : QueueBase> where T : class
{
private readonly AsyncLock _lock = new();
- private readonly CloudQueue _queueReference;
- private readonly CloudQueue _deadletterQueueReference;
+ private readonly Lazy _queueClient;
+ private readonly Lazy _deadletterQueueClient;
private long _enqueuedCount;
private long _dequeuedCount;
private long _completedCount;
@@ -29,17 +30,22 @@ public AzureStorageQueue(AzureStorageQueueOptions options) : base(options)
if (String.IsNullOrEmpty(options.ConnectionString))
throw new ArgumentException("ConnectionString is required.");
- var account = CloudStorageAccount.Parse(options.ConnectionString);
- var client = account.CreateCloudQueueClient();
- if (options.RetryPolicy != null)
- client.DefaultRequestOptions.RetryPolicy = options.RetryPolicy;
+ var clientOptions = new QueueClientOptions();
+ if (options.UseBase64Encoding)
+ clientOptions.MessageEncoding = QueueMessageEncoding.Base64;
- _queueReference = client.GetQueueReference(_options.Name);
- _deadletterQueueReference = client.GetQueueReference($"{_options.Name}-poison");
+ options.ConfigureRetry?.Invoke(clientOptions.Retry);
+
+ _queueClient = new Lazy(() =>
+ new QueueClient(options.ConnectionString, _options.Name, clientOptions));
+ _deadletterQueueClient = new Lazy(() =>
+ new QueueClient(options.ConnectionString, $"{_options.Name}-poison", clientOptions));
}
public AzureStorageQueue(Builder, AzureStorageQueueOptions> config)
- : this(config(new AzureStorageQueueOptionsBuilder()).Build()) { }
+ : this(config(new AzureStorageQueueOptionsBuilder()).Build())
+ {
+ }
protected override async Task EnsureQueueCreatedAsync(CancellationToken cancellationToken = default)
{
@@ -53,8 +59,8 @@ protected override async Task EnsureQueueCreatedAsync(CancellationToken cancella
var sw = Stopwatch.StartNew();
await Task.WhenAll(
- _queueReference.CreateIfNotExistsAsync(),
- _deadletterQueueReference.CreateIfNotExistsAsync()
+ _queueClient.Value.CreateIfNotExistsAsync(cancellationToken: cancellationToken),
+ _deadletterQueueClient.Value.CreateIfNotExistsAsync(cancellationToken: cancellationToken)
).AnyContext();
_queueCreated = true;
@@ -69,37 +75,67 @@ protected override async Task EnqueueImplAsync(T data, QueueEntryOptions
return null;
Interlocked.Increment(ref _enqueuedCount);
- var message = new CloudQueueMessage(_serializer.SerializeToBytes(data));
- await _queueReference.AddMessageAsync(message, null, options.DeliveryDelay, null, null).AnyContext();
- var entry = new QueueEntry(message.Id, options.CorrelationId, data, this, _timeProvider.GetLocalNow().UtcDateTime, 0);
+ // Note: CorrelationId and Properties from QueueEntryOptions are not persisted.
+ // Azure Storage Queue only supports a message body. Wrapping in an envelope would
+ // support these features but would break backward compatibility with existing messages.
+ var messageBody = new BinaryData(_serializer.SerializeToBytes(data));
+ var response = await _queueClient.Value.SendMessageAsync(
+ messageBody,
+ visibilityTimeout: options.DeliveryDelay,
+ cancellationToken: CancellationToken.None).AnyContext();
+
+ var entry = new QueueEntry(response.Value.MessageId, null, data, this, _timeProvider.GetLocalNow().UtcDateTime, 0);
await OnEnqueuedAsync(entry).AnyContext();
- return message.Id;
+ _logger.LogTrace("Enqueued message {MessageId}", response.Value.MessageId);
+ return response.Value.MessageId;
}
protected override async Task> DequeueImplAsync(CancellationToken linkedCancellationToken)
{
- var message = await _queueReference.GetMessageAsync(_options.WorkItemTimeout, null, null, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext();
+ _logger.LogTrace("Checking for message: IsCancellationRequested={IsCancellationRequested} VisibilityTimeout={VisibilityTimeout}", linkedCancellationToken.IsCancellationRequested, _options.WorkItemTimeout);
+
+ // Try to receive a message immediately
+ var response = await _queueClient.Value.ReceiveMessageAsync(_options.WorkItemTimeout, CancellationToken.None).AnyContext();
+ var message = response?.Value;
+
+ // If we got a message, process it immediately
+ // If no message and cancellation requested, return null immediately (don't wait/poll)
+ if (message == null && linkedCancellationToken.IsCancellationRequested)
+ {
+ _logger.LogTrace("No message available and cancellation requested, returning null");
+ return null;
+ }
+
+ // If no message and not cancelled, poll with fixed interval
+ // Note: Azure Storage Queue doesn't support long-polling, so we must poll
if (message == null)
{
var sw = Stopwatch.StartNew();
- var lastReport = DateTime.Now;
- _logger.LogTrace("No message available to dequeue, waiting...");
+ var lastReport = _timeProvider.GetUtcNow();
+ _logger.LogTrace("No message available to dequeue, polling...");
while (message == null && !linkedCancellationToken.IsCancellationRequested)
{
- if (DateTime.Now.Subtract(lastReport) > TimeSpan.FromSeconds(10))
+ if (_timeProvider.GetUtcNow().Subtract(lastReport) > TimeSpan.FromSeconds(10))
+ {
+ lastReport = _timeProvider.GetUtcNow();
_logger.LogTrace("Still waiting for message to dequeue: {Elapsed:g}", sw.Elapsed);
+ }
try
{
if (!linkedCancellationToken.IsCancellationRequested)
await _timeProvider.Delay(_options.DequeueInterval, linkedCancellationToken).AnyContext();
}
- catch (OperationCanceledException) { }
+ catch (OperationCanceledException)
+ {
+ // Ignore cancellation during delay
+ }
- message = await _queueReference.GetMessageAsync(_options.WorkItemTimeout, null, null, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext();
+ response = await _queueClient.Value.ReceiveMessageAsync(_options.WorkItemTimeout, CancellationToken.None).AnyContext();
+ message = response?.Value;
}
sw.Stop();
@@ -112,11 +148,19 @@ protected override async Task> DequeueImplAsync(CancellationToken
return null;
}
- _logger.LogTrace("Dequeued message {QueueEntryId}", message.Id);
+ var nowUtc = _timeProvider.GetUtcNow().UtcDateTime;
+ var insertedOn = message.InsertedOn?.UtcDateTime ?? DateTime.MinValue;
+ var queueTime = nowUtc - insertedOn;
+ _logger.LogTrace("Received message: {QueueEntryId} InsertedOn={InsertedOn} NowUtc={NowUtc} QueueTime={QueueTime}ms IsCancellationRequested={IsCancellationRequested}",
+ message.MessageId, insertedOn, nowUtc, queueTime.TotalMilliseconds, linkedCancellationToken.IsCancellationRequested);
Interlocked.Increment(ref _dequeuedCount);
- var data = _serializer.Deserialize(message.AsBytes);
+
+ // Deserialize the message body directly (no envelope wrapper for backward compatibility)
+ var data = _serializer.Deserialize(message.Body.ToArray());
var entry = new AzureStorageQueueEntry(message, data, this);
+
await OnDequeuedAsync(entry).AnyContext();
+ _logger.LogTrace("Dequeued message: {QueueEntryId}", message.MessageId);
return entry;
}
@@ -124,9 +168,16 @@ public override async Task RenewLockAsync(IQueueEntry entry)
{
_logger.LogDebug("Queue {QueueName} renew lock item: {QueueEntryId}", _options.Name, entry.Id);
var azureQueueEntry = ToAzureEntryWithCheck(entry);
- await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage, _options.WorkItemTimeout, MessageUpdateFields.Visibility).AnyContext();
+ var response = await _queueClient.Value.UpdateMessageAsync(
+ azureQueueEntry.UnderlyingMessage.MessageId,
+ azureQueueEntry.PopReceipt,
+ visibilityTimeout: _options.WorkItemTimeout).AnyContext();
+
+ // Update the pop receipt since it changes after each update
+ azureQueueEntry.PopReceipt = response.Value.PopReceipt;
+
await OnLockRenewedAsync(entry).AnyContext();
- _logger.LogTrace("Renew lock done: {QueueEntryId}", entry.Id);
+ _logger.LogTrace("Renew lock done: {QueueEntryId} MessageId={MessageId} VisibilityTimeout={VisibilityTimeout}", entry.Id, azureQueueEntry.UnderlyingMessage.MessageId, _options.WorkItemTimeout);
}
public override async Task CompleteAsync(IQueueEntry entry)
@@ -136,7 +187,20 @@ public override async Task CompleteAsync(IQueueEntry entry)
throw new InvalidOperationException("Queue entry has already been completed or abandoned.");
var azureQueueEntry = ToAzureEntryWithCheck(entry);
- await _queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage).AnyContext();
+
+ try
+ {
+ await _queueClient.Value.DeleteMessageAsync(
+ azureQueueEntry.UnderlyingMessage.MessageId,
+ azureQueueEntry.PopReceipt).AnyContext();
+ }
+ catch (RequestFailedException ex) when (ex.Status == 404 || ex.ErrorCode == QueueErrorCode.MessageNotFound.ToString() || ex.ErrorCode == QueueErrorCode.PopReceiptMismatch.ToString())
+ {
+ // Message was already deleted or the pop receipt expired (visibility timeout)
+ // This means the item was auto-abandoned by Azure
+ _logger.LogWarning("Failed to complete message {MessageId}: message not found or pop receipt expired (visibility timeout may have elapsed)", azureQueueEntry.UnderlyingMessage.MessageId);
+ throw new InvalidOperationException("Queue entry visibility timeout has elapsed and the message is no longer locked.", ex);
+ }
Interlocked.Increment(ref _completedCount);
entry.MarkCompleted();
@@ -151,23 +215,37 @@ public override async Task AbandonAsync(IQueueEntry entry)
throw new InvalidOperationException("Queue entry has already been completed or abandoned.");
var azureQueueEntry = ToAzureEntryWithCheck(entry);
+
if (azureQueueEntry.Attempts > _options.Retries)
{
- await Task.WhenAll(
- _queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage),
- _deadletterQueueReference.AddMessageAsync(azureQueueEntry.UnderlyingMessage)
- ).AnyContext();
+ _logger.LogDebug("Moving message {QueueEntryId} to deadletter after {Attempts} attempts", entry.Id, azureQueueEntry.Attempts);
+
+ // Send to deadletter queue first, then delete from main queue (sequential for data integrity)
+ var messageBody = new BinaryData(_serializer.SerializeToBytes(entry.Value));
+ await _deadletterQueueClient.Value.SendMessageAsync(messageBody).AnyContext();
+ await _queueClient.Value.DeleteMessageAsync(
+ azureQueueEntry.UnderlyingMessage.MessageId,
+ azureQueueEntry.PopReceipt).AnyContext();
}
else
{
- // Make the item visible immediately
- await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage, TimeSpan.Zero, MessageUpdateFields.Visibility).AnyContext();
+ // Calculate visibility timeout based on retry delay
+ var retryDelay = _options.RetryDelay(azureQueueEntry.Attempts);
+ _logger.LogDebug("Making message {QueueEntryId} visible after {RetryDelay}", entry.Id, retryDelay);
+
+ var response = await _queueClient.Value.UpdateMessageAsync(
+ azureQueueEntry.UnderlyingMessage.MessageId,
+ azureQueueEntry.PopReceipt,
+ visibilityTimeout: retryDelay).AnyContext();
+
+ // Update the pop receipt since it changes after each update
+ azureQueueEntry.PopReceipt = response.Value.PopReceipt;
}
Interlocked.Increment(ref _abandonedCount);
entry.MarkAbandoned();
await OnAbandonedAsync(entry).AnyContext();
- _logger.LogTrace("Abandon complete: {QueueEntryId}", entry.Id);
+ _logger.LogTrace("Abandon complete: {QueueEntryId} MessageId={MessageId} Attempts={Attempts}", entry.Id, azureQueueEntry.UnderlyingMessage.MessageId, azureQueueEntry.Attempts);
}
protected override Task> GetDeadletterItemsImplAsync(CancellationToken cancellationToken)
@@ -177,7 +255,9 @@ protected override Task> GetDeadletterItemsImplAsync(Cancellation
protected override async Task GetQueueStatsImplAsync()
{
- if (_queueReference == null || _deadletterQueueReference == null || !_queueCreated)
+ // Note: Azure Storage Queue does not provide Working count (in-flight messages) or Timeouts.
+ // These stats are only available per-process and meaningless in distributed scenarios.
+ if (!_queueCreated)
return new QueueStats
{
Queued = 0,
@@ -192,30 +272,29 @@ protected override async Task GetQueueStatsImplAsync()
};
var sw = Stopwatch.StartNew();
- await Task.WhenAll(
- _queueReference.FetchAttributesAsync(),
- _deadletterQueueReference.FetchAttributesAsync()
- ).AnyContext();
+ var queuePropertiesTask = _queueClient.Value.GetPropertiesAsync();
+ var deadLetterPropertiesTask = _deadletterQueueClient.Value.GetPropertiesAsync();
+ await Task.WhenAll(queuePropertiesTask, deadLetterPropertiesTask).AnyContext();
sw.Stop();
_logger.LogTrace("Fetching stats took {Elapsed:g}", sw.Elapsed);
return new QueueStats
{
- Queued = _queueReference.ApproximateMessageCount.GetValueOrDefault(),
- Working = 0,
- Deadletter = _deadletterQueueReference.ApproximateMessageCount.GetValueOrDefault(),
+ Queued = queuePropertiesTask.Result.Value.ApproximateMessagesCount,
+ Working = 0, // Azure Storage Queue does not expose in-flight message count
+ Deadletter = deadLetterPropertiesTask.Result.Value.ApproximateMessagesCount,
Enqueued = _enqueuedCount,
Dequeued = _dequeuedCount,
Completed = _completedCount,
Abandoned = _abandonedCount,
Errors = _workerErrorCount,
- Timeouts = 0
+ Timeouts = 0 // Azure handles visibility timeout natively; client-side tracking not meaningful
};
}
protected override QueueStats GetMetricsQueueStats()
{
- if (_queueReference == null || _deadletterQueueReference == null || !_queueCreated)
+ if (!_queueCreated)
return new QueueStats
{
Queued = 0,
@@ -230,16 +309,16 @@ protected override QueueStats GetMetricsQueueStats()
};
var sw = Stopwatch.StartNew();
- _queueReference.FetchAttributes();
- _deadletterQueueReference.FetchAttributes();
+ var queueProperties = _queueClient.Value.GetProperties();
+ var deadletterProperties = _deadletterQueueClient.Value.GetProperties();
sw.Stop();
_logger.LogTrace("Fetching stats took {Elapsed:g}", sw.Elapsed);
return new QueueStats
{
- Queued = _queueReference.ApproximateMessageCount.GetValueOrDefault(),
+ Queued = queueProperties.Value.ApproximateMessagesCount,
Working = 0,
- Deadletter = _deadletterQueueReference.ApproximateMessageCount.GetValueOrDefault(),
+ Deadletter = deadletterProperties.Value.ApproximateMessagesCount,
Enqueued = _enqueuedCount,
Dequeued = _dequeuedCount,
Completed = _completedCount,
@@ -253,8 +332,8 @@ public override async Task DeleteQueueAsync()
{
var sw = Stopwatch.StartNew();
await Task.WhenAll(
- _queueReference.DeleteIfExistsAsync(),
- _deadletterQueueReference.DeleteIfExistsAsync()
+ _queueClient.Value.DeleteIfExistsAsync(),
+ _deadletterQueueClient.Value.DeleteIfExistsAsync()
).AnyContext();
_queueCreated = false;
@@ -270,8 +349,7 @@ await Task.WhenAll(
protected override void StartWorkingImpl(Func, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken)
{
- if (handler == null)
- throw new ArgumentNullException(nameof(handler));
+ ArgumentNullException.ThrowIfNull(handler);
var linkedCancellationToken = GetLinkedDisposableCancellationTokenSource(cancellationToken);
@@ -309,13 +387,13 @@ protected override void StartWorkingImpl(Func, CancellationToken,
}
}
- _logger.LogTrace("Worker exiting: {QueueName} Cancel Requested: {IsCancellationRequested}", _queueReference.Name, linkedCancellationToken.IsCancellationRequested);
+ _logger.LogTrace("Worker exiting: {QueueName} Cancel Requested: {IsCancellationRequested}", _options.Name, linkedCancellationToken.IsCancellationRequested);
}, linkedCancellationToken.Token).ContinueWith(t => linkedCancellationToken.Dispose());
}
private static AzureStorageQueueEntry ToAzureEntryWithCheck(IQueueEntry queueEntry)
{
- if (!(queueEntry is AzureStorageQueueEntry azureQueueEntry))
+ if (queueEntry is not AzureStorageQueueEntry azureQueueEntry)
throw new ArgumentException($"Unknown entry type. Can only process entries of type '{nameof(AzureStorageQueueEntry)}'");
return azureQueueEntry;
diff --git a/src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs b/src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs
index 3f2b4d0..0316b59 100644
--- a/src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs
+++ b/src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs
@@ -1,15 +1,22 @@
-using Microsoft.Azure.Storage.Queue;
+using System;
+using Azure.Storage.Queues.Models;
namespace Foundatio.Queues;
public class AzureStorageQueueEntry : QueueEntry where T : class
{
- public CloudQueueMessage UnderlyingMessage { get; }
+ public QueueMessage UnderlyingMessage { get; }
- public AzureStorageQueueEntry(CloudQueueMessage message, T value, IQueue queue)
- : base(message.Id, null, value, queue, message.InsertionTime.GetValueOrDefault().UtcDateTime, message.DequeueCount)
- {
+ ///
+ /// The current pop receipt for this message. This gets updated after each
+ /// UpdateMessageAsync call (renew lock, abandon with retry delay).
+ ///
+ public string PopReceipt { get; internal set; }
+ public AzureStorageQueueEntry(QueueMessage message, T data, IQueue queue)
+ : base(message.MessageId, null, data, queue, message.InsertedOn?.UtcDateTime ?? DateTime.MinValue, (int)message.DequeueCount)
+ {
UnderlyingMessage = message;
+ PopReceipt = message.PopReceipt;
}
}
diff --git a/src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs b/src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs
index 03a2399..4655c5f 100644
--- a/src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs
+++ b/src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs
@@ -1,32 +1,120 @@
using System;
-using Microsoft.Azure.Storage.RetryPolicies;
+using Azure.Core;
namespace Foundatio.Queues;
public class AzureStorageQueueOptions : SharedQueueOptions where T : class
{
public string ConnectionString { get; set; }
- public IRetryPolicy RetryPolicy { get; set; }
+
+ ///
+ /// The interval to wait between polling for new messages when the queue is empty.
+ ///
public TimeSpan DequeueInterval { get; set; } = TimeSpan.FromSeconds(2);
+
+ ///
+ /// A function that returns the delay before retrying a failed message based on the attempt number.
+ /// Default is exponential backoff with jitter: 2^attempt seconds + random 0-100ms.
+ ///
+ public Func RetryDelay { get; set; } = attempt =>
+ TimeSpan.FromSeconds(Math.Pow(2, attempt)) + TimeSpan.FromMilliseconds(Random.Shared.Next(0, 100));
+
+ ///
+ /// When true, messages are Base64-encoded for backward compatibility with the legacy
+ /// Microsoft.Azure.Storage.Queue SDK (v11). The v11 SDK encoded all messages as Base64
+ /// by default, while the v12 Azure.Storage.Queues SDK does not.
+ ///
+ /// Default: false (v12 behavior - no encoding)
+ ///
+ /// When to enable: Only enable this if you have existing messages in your
+ /// queue that were written using the v11 SDK and need to be read during migration.
+ ///
+ /// Migration path:
+ ///
+ /// - Enable UseBase64Encoding=true to read existing v11 messages
+ /// - Process all existing messages from the queue
+ /// - Once queue is empty, disable UseBase64Encoding (set to false)
+ /// - All new messages will use raw encoding (v12 default)
+ ///
+ ///
+ /// Deprecation notice: This option exists solely for migration purposes
+ /// and may be removed in a future major version. Plan to migrate away from Base64
+ /// encoding as soon as practical.
+ ///
+ public bool UseBase64Encoding { get; set; }
+
+ ///
+ /// Optional action to configure Azure SDK retry options.
+ /// Default Azure SDK retry settings: MaxRetries=3, Delay=0.8s, MaxDelay=1min, Mode=Exponential.
+ ///
+ ///
+ ///
+ /// options.ConfigureRetry = retry =>
+ /// {
+ /// retry.MaxRetries = 5;
+ /// retry.Delay = TimeSpan.FromSeconds(1);
+ /// retry.Mode = RetryMode.Exponential;
+ /// };
+ ///
+ ///
+ public Action ConfigureRetry { get; set; }
}
public class AzureStorageQueueOptionsBuilder : SharedQueueOptionsBuilder, AzureStorageQueueOptionsBuilder> where T : class
{
public AzureStorageQueueOptionsBuilder ConnectionString(string connectionString)
{
- Target.ConnectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
+ ArgumentException.ThrowIfNullOrEmpty(connectionString);
+
+ Target.ConnectionString = connectionString;
return this;
}
- public AzureStorageQueueOptionsBuilder RetryPolicy(IRetryPolicy retryPolicy)
+ public AzureStorageQueueOptionsBuilder DequeueInterval(TimeSpan dequeueInterval)
{
- Target.RetryPolicy = retryPolicy ?? throw new ArgumentNullException(nameof(retryPolicy));
+ Target.DequeueInterval = dequeueInterval;
return this;
}
- public AzureStorageQueueOptionsBuilder DequeueInterval(TimeSpan dequeueInterval)
+ ///
+ /// Sets a custom retry delay function for failed messages.
+ ///
+ /// A function that takes the attempt number and returns the delay before the next retry.
+ public AzureStorageQueueOptionsBuilder RetryDelay(Func retryDelay)
{
- Target.DequeueInterval = dequeueInterval;
+ ArgumentNullException.ThrowIfNull(retryDelay);
+
+ Target.RetryDelay = retryDelay;
+ return this;
+ }
+
+ ///
+ /// Enables Base64 message encoding for backward compatibility with the legacy v11 SDK.
+ /// See for migration guidance.
+ ///
+ public AzureStorageQueueOptionsBuilder UseBase64Encoding(bool useBase64Encoding = true)
+ {
+ Target.UseBase64Encoding = useBase64Encoding;
+ return this;
+ }
+
+ ///
+ /// Configures Azure SDK retry options for transient failure handling.
+ ///
+ /// Action to configure retry options.
+ ///
+ ///
+ /// .ConfigureRetry(retry =>
+ /// {
+ /// retry.MaxRetries = 5;
+ /// retry.Delay = TimeSpan.FromSeconds(1);
+ /// })
+ ///
+ ///
+ public AzureStorageQueueOptionsBuilder ConfigureRetry(Action configure)
+ {
+ ArgumentNullException.ThrowIfNull(configure);
+ Target.ConfigureRetry = configure;
return this;
}
}
diff --git a/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs b/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs
index fbe828b..00e09c6 100644
--- a/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs
+++ b/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs
@@ -5,40 +5,42 @@
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
-using Foundatio.Azure.Extensions;
+using Azure;
+using Azure.Storage.Blobs;
+using Azure.Storage.Blobs.Models;
using Foundatio.Extensions;
using Foundatio.Serializer;
-using Microsoft.Azure.Storage;
-using Microsoft.Azure.Storage.Blob;
+using Foundatio.Utility;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Foundatio.Storage;
-public class AzureFileStorage : IFileStorage
+public class AzureFileStorage : IFileStorage, IHaveLogger, IHaveLoggerFactory, IHaveTimeProvider
{
- private readonly CloudBlobContainer _container;
+ private readonly BlobContainerClient _container;
private readonly ISerializer _serializer;
+ private readonly ILoggerFactory _loggerFactory;
protected readonly ILogger _logger;
protected readonly TimeProvider _timeProvider;
public AzureFileStorage(AzureFileStorageOptions options)
{
- if (options == null)
- throw new ArgumentNullException(nameof(options));
+ ArgumentNullException.ThrowIfNull(options);
_timeProvider = options.TimeProvider ?? TimeProvider.System;
_serializer = options.Serializer ?? DefaultSerializer.Instance;
- _logger = options.LoggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance;
+ _loggerFactory = options.LoggerFactory ?? NullLoggerFactory.Instance;
+ _logger = _loggerFactory.CreateLogger(GetType());
- var account = CloudStorageAccount.Parse(options.ConnectionString);
- var client = account.CreateCloudBlobClient();
+ var clientOptions = new BlobClientOptions();
+ options.ConfigureRetry?.Invoke(clientOptions.Retry);
- _container = client.GetContainerReference(options.ContainerName);
+ _container = new BlobContainerClient(options.ConnectionString, options.ContainerName, clientOptions);
_logger.LogTrace("Checking if {Container} container exists", _container.Name);
- bool created = _container.CreateIfNotExistsAsync().GetAwaiter().GetResult();
- if (created)
+ var response = _container.CreateIfNotExists();
+ if (response != null)
_logger.LogInformation("Created {Container}", _container.Name);
}
@@ -46,7 +48,10 @@ public AzureFileStorage(Builder _serializer;
- public CloudBlobContainer Container => _container;
+ ILogger IHaveLogger.Logger => _logger;
+ ILoggerFactory IHaveLoggerFactory.LoggerFactory => _loggerFactory;
+ TimeProvider IHaveTimeProvider.TimeProvider => _timeProvider;
+ public BlobContainerClient Container => _container;
[Obsolete($"Use {nameof(GetFileStreamAsync)} with {nameof(StreamMode)} instead to define read or write behavior of stream")]
public Task GetFileStreamAsync(string path, CancellationToken cancellationToken = default)
@@ -54,140 +59,195 @@ public Task GetFileStreamAsync(string path, CancellationToken cancellati
public async Task GetFileStreamAsync(string path, StreamMode streamMode, CancellationToken cancellationToken = default)
{
- if (String.IsNullOrEmpty(path))
- throw new ArgumentNullException(nameof(path));
+ ArgumentException.ThrowIfNullOrEmpty(path);
string normalizedPath = NormalizePath(path);
_logger.LogTrace("Getting file stream for {Path}", normalizedPath);
- var blockBlob = _container.GetBlockBlobReference(normalizedPath);
+ var blobClient = _container.GetBlobClient(normalizedPath);
try
{
return streamMode switch
{
- StreamMode.Read => await blockBlob.OpenReadAsync(null, null, null, cancellationToken).AnyContext(),
- StreamMode.Write => await blockBlob.OpenWriteAsync(null, null, null, cancellationToken).AnyContext(),
+ StreamMode.Read => await blobClient.OpenReadAsync(cancellationToken: cancellationToken).AnyContext(),
+ StreamMode.Write => await blobClient.OpenWriteAsync(overwrite: true, cancellationToken: cancellationToken).AnyContext(),
_ => throw new NotSupportedException($"Stream mode {streamMode} is not supported.")
};
}
- catch (Microsoft.Azure.Storage.StorageException ex) when (ex is { RequestInformation.HttpStatusCode: 404 })
+ catch (RequestFailedException ex) when (ex.Status == 404)
{
- _logger.LogDebug(ex, "Unable to get file stream for {Path}: File Not Found", normalizedPath);
+ _logger.LogDebug(ex, "[{Status}] Unable to get file stream for {Path}: File Not Found", ex.Status, normalizedPath);
return null;
}
+ catch (RequestFailedException ex)
+ {
+ _logger.LogError(ex, "[{Status}] Unable to get file stream for {Path}: {Message}", ex.Status, normalizedPath, ex.Message);
+ throw;
+ }
}
public async Task GetFileInfoAsync(string path)
{
- if (String.IsNullOrEmpty(path))
- throw new ArgumentNullException(nameof(path));
+ ArgumentException.ThrowIfNullOrEmpty(path);
string normalizedPath = NormalizePath(path);
_logger.LogTrace("Getting file info for {Path}", normalizedPath);
- var blob = _container.GetBlockBlobReference(normalizedPath);
+ var blobClient = _container.GetBlobClient(normalizedPath);
try
{
- await blob.FetchAttributesAsync().AnyContext();
- return blob.ToFileInfo();
+ var properties = await blobClient.GetPropertiesAsync().AnyContext();
+ return ToFileInfo(normalizedPath, properties.Value);
+ }
+ catch (RequestFailedException ex) when (ex.Status == 404)
+ {
+ _logger.LogDebug(ex, "[{Status}] Unable to get file info for {Path}: File Not Found", ex.Status, normalizedPath);
+ return null;
+ }
+ catch (RequestFailedException ex)
+ {
+ _logger.LogError(ex, "[{Status}] Unable to get file info for {Path}: {Message}", ex.Status, normalizedPath, ex.Message);
+ return null;
}
catch (Exception ex)
{
_logger.LogError(ex, "Unable to get file info for {Path}: {Message}", normalizedPath, ex.Message);
+ return null;
}
-
- return null;
}
- public Task ExistsAsync(string path)
+ public async Task ExistsAsync(string path)
{
- if (String.IsNullOrEmpty(path))
- throw new ArgumentNullException(nameof(path));
+ ArgumentException.ThrowIfNullOrEmpty(path);
string normalizedPath = NormalizePath(path);
_logger.LogTrace("Checking if {Path} exists", normalizedPath);
- var blockBlob = _container.GetBlockBlobReference(normalizedPath);
- return blockBlob.ExistsAsync();
+ var blobClient = _container.GetBlobClient(normalizedPath);
+ var response = await blobClient.ExistsAsync().AnyContext();
+ return response.Value;
}
public async Task SaveFileAsync(string path, Stream stream, CancellationToken cancellationToken = default)
{
- if (String.IsNullOrEmpty(path))
- throw new ArgumentNullException(nameof(path));
- if (stream == null)
- throw new ArgumentNullException(nameof(stream));
+ ArgumentException.ThrowIfNullOrEmpty(path);
+ ArgumentNullException.ThrowIfNull(stream);
string normalizedPath = NormalizePath(path);
_logger.LogTrace("Saving {Path}", normalizedPath);
- var blockBlob = _container.GetBlockBlobReference(normalizedPath);
- await blockBlob.UploadFromStreamAsync(stream, null, null, null, cancellationToken).AnyContext();
-
- return true;
+ try
+ {
+ var blobClient = _container.GetBlobClient(normalizedPath);
+ await blobClient.UploadAsync(stream, overwrite: true, cancellationToken: cancellationToken).AnyContext();
+ return true;
+ }
+ catch (RequestFailedException ex)
+ {
+ _logger.LogError(ex, "[{Status}] Error saving {Path}: {Message}", ex.Status, normalizedPath, ex.Message);
+ return false;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error saving {Path}: {Message}", normalizedPath, ex.Message);
+ return false;
+ }
}
public async Task RenameFileAsync(string path, string newPath, CancellationToken cancellationToken = default)
{
- if (String.IsNullOrEmpty(path))
- throw new ArgumentNullException(nameof(path));
- if (String.IsNullOrEmpty(newPath))
- throw new ArgumentNullException(nameof(newPath));
+ ArgumentException.ThrowIfNullOrEmpty(path);
+ ArgumentException.ThrowIfNullOrEmpty(newPath);
string normalizedPath = NormalizePath(path);
string normalizedNewPath = NormalizePath(newPath);
_logger.LogInformation("Renaming {Path} to {NewPath}", normalizedPath, normalizedNewPath);
- var oldBlob = _container.GetBlockBlobReference(normalizedPath);
- if (!(await CopyFileAsync(normalizedPath, normalizedNewPath, cancellationToken).AnyContext()))
+ try
{
- _logger.LogError("Unable to rename {Path} to {NewPath}", normalizedPath, normalizedNewPath);
- return false;
- }
+ if (!await CopyFileAsync(normalizedPath, normalizedNewPath, cancellationToken).AnyContext())
+ {
+ _logger.LogError("Unable to rename {Path} to {NewPath}", normalizedPath, normalizedNewPath);
+ return false;
+ }
- _logger.LogDebug("Deleting renamed {Path}", normalizedPath);
- bool deleted = await oldBlob.DeleteIfExistsAsync(DeleteSnapshotsOption.None, null, null, null, cancellationToken).AnyContext();
- if (!deleted)
+ var oldBlob = _container.GetBlobClient(normalizedPath);
+ _logger.LogDebug("Deleting renamed {Path}", normalizedPath);
+ var deleteResponse = await oldBlob.DeleteIfExistsAsync(cancellationToken: cancellationToken).AnyContext();
+ if (!deleteResponse.Value)
+ {
+ _logger.LogDebug("Unable to delete renamed {Path}", normalizedPath);
+ return false;
+ }
+
+ return true;
+ }
+ catch (RequestFailedException ex)
{
- _logger.LogDebug("Unable to delete renamed {Path}", normalizedPath);
+ _logger.LogError(ex, "[{Status}] Unable to rename {Path} to {NewPath}: {Message}", ex.Status, normalizedPath, normalizedNewPath, ex.Message);
return false;
}
-
- return true;
}
public async Task CopyFileAsync(string path, string targetPath, CancellationToken cancellationToken = default)
{
- if (String.IsNullOrEmpty(path))
- throw new ArgumentNullException(nameof(path));
- if (String.IsNullOrEmpty(targetPath))
- throw new ArgumentNullException(nameof(targetPath));
+ ArgumentException.ThrowIfNullOrEmpty(path);
+ ArgumentException.ThrowIfNullOrEmpty(targetPath);
string normalizedPath = NormalizePath(path);
string normalizedTargetPath = NormalizePath(targetPath);
_logger.LogInformation("Copying {Path} to {TargetPath}", normalizedPath, normalizedTargetPath);
- var oldBlob = _container.GetBlockBlobReference(normalizedPath);
- var newBlob = _container.GetBlockBlobReference(normalizedTargetPath);
+ try
+ {
+ var sourceBlob = _container.GetBlobClient(normalizedPath);
+ var targetBlob = _container.GetBlobClient(normalizedTargetPath);
- await newBlob.StartCopyAsync(oldBlob, cancellationToken).AnyContext();
- while (newBlob.CopyState.Status == CopyStatus.Pending)
- await _timeProvider.Delay(TimeSpan.FromMilliseconds(50), cancellationToken).AnyContext();
+ var copyOperation = await targetBlob.StartCopyFromUriAsync(sourceBlob.Uri, cancellationToken: cancellationToken).AnyContext();
+ // TODO: Instead of true should we be checking copyOperation.HasCompleted? and copyOperation.UpdateStatusAsync
+ // Wait for copy to complete
+ while (true)
+ {
+ var properties = await targetBlob.GetPropertiesAsync(cancellationToken: cancellationToken).AnyContext();
+ if (properties.Value.CopyStatus == CopyStatus.Success)
+ return true;
+ if (properties.Value.CopyStatus == CopyStatus.Failed || properties.Value.CopyStatus == CopyStatus.Aborted)
+ {
+ _logger.LogError("Copy operation failed for {Path} to {TargetPath}: {CopyStatus}", normalizedPath, normalizedTargetPath, properties.Value.CopyStatus);
+ return false;
+ }
- return newBlob.CopyState.Status == CopyStatus.Success;
+ await _timeProvider.Delay(TimeSpan.FromMilliseconds(50), cancellationToken).AnyContext();
+ }
+ }
+ catch (RequestFailedException ex)
+ {
+ _logger.LogError(ex, "[{Status}] Unable to copy {Path} to {TargetPath}: {Message}", ex.Status, normalizedPath, normalizedTargetPath, ex.Message);
+ return false;
+ }
}
- public Task DeleteFileAsync(string path, CancellationToken cancellationToken = default)
+ public async Task DeleteFileAsync(string path, CancellationToken cancellationToken = default)
{
- if (String.IsNullOrEmpty(path))
- throw new ArgumentNullException(nameof(path));
+ ArgumentException.ThrowIfNullOrEmpty(path);
string normalizedPath = NormalizePath(path);
_logger.LogTrace("Deleting {Path}", normalizedPath);
- var blockBlob = _container.GetBlockBlobReference(normalizedPath);
- return blockBlob.DeleteIfExistsAsync(DeleteSnapshotsOption.None, null, null, null, cancellationToken);
+ try
+ {
+ var blobClient = _container.GetBlobClient(normalizedPath);
+ var response = await blobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).AnyContext();
+ if (!response.Value)
+ _logger.LogDebug("Unable to delete {Path}: File not found", normalizedPath);
+ return response.Value;
+ }
+ catch (RequestFailedException ex)
+ {
+ _logger.LogError(ex, "[{Status}] Unable to delete {Path}: {Message}", ex.Status, normalizedPath, ex.Message);
+ return false;
+ }
}
public async Task DeleteFilesAsync(string searchPattern = null, CancellationToken cancellationToken = default)
@@ -252,25 +312,23 @@ private async Task> GetFileListAsync(string searchPattern = null,
? skip.GetValueOrDefault() + limit.Value
: Int32.MaxValue;
- BlobContinuationToken continuationToken = null;
- var blobs = new List();
- do
- {
- var listingResult = await _container.ListBlobsSegmentedAsync(criteria.Prefix, true, BlobListingDetails.Metadata, limit, continuationToken, null, null, cancellationToken).AnyContext();
- continuationToken = listingResult.ContinuationToken;
+ _logger.LogTrace("Getting file list: Prefix={Prefix} Pattern={Pattern} Limit={Limit}", criteria.Prefix, criteria.Pattern, totalLimit);
- foreach (var blob in listingResult.Results.OfType())
+ var blobs = new List();
+ await foreach (var blobItem in _container.GetBlobsAsync(prefix: criteria.Prefix, cancellationToken: cancellationToken))
+ {
+ // TODO: Verify if it's possible to create empty folders in storage. If so, don't return them.
+ if (criteria.Pattern != null && !criteria.Pattern.IsMatch(blobItem.Name))
{
- // TODO: Verify if it's possible to create empty folders in storage. If so, don't return them.
- if (criteria.Pattern != null && !criteria.Pattern.IsMatch(blob.Name))
- {
- _logger.LogTrace("Skipping {Path}: Doesn't match pattern", blob.Name);
- continue;
- }
-
- blobs.Add(blob);
+ _logger.LogTrace("Skipping {Path}: Doesn't match pattern", blobItem.Name);
+ continue;
}
- } while (continuationToken != null && blobs.Count < totalLimit);
+
+ blobs.Add(ToFileInfo(blobItem));
+
+ if (blobs.Count >= totalLimit)
+ break;
+ }
if (skip.HasValue)
blobs = blobs.Skip(skip.Value).ToList();
@@ -278,7 +336,31 @@ private async Task> GetFileListAsync(string searchPattern = null,
if (limit.HasValue)
blobs = blobs.Take(limit.Value).ToList();
- return blobs.Select(blob => blob.ToFileInfo()).ToList();
+ return blobs;
+ }
+
+ private static FileSpec ToFileInfo(BlobItem blob)
+ {
+ // TODO: We used to check if Properties.Length was -1 in old storage extension and return null. I'm not sure why this was ever needed. But if we do we need to be sure we filter nulls out.
+ return new FileSpec
+ {
+ Path = blob.Name,
+ Size = blob.Properties.ContentLength ?? -1,
+ Created = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue,
+ Modified = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue
+ };
+ }
+
+ private static FileSpec ToFileInfo(string path, BlobProperties properties)
+ {
+ // TODO: We used to check if Properties.Length was -1 in old storage extension and return null. I'm not sure why this was ever needed. But if we do we need to be sure we filter nulls out.
+ return new FileSpec
+ {
+ Path = path,
+ Size = properties.ContentLength,
+ Created = properties.LastModified.UtcDateTime,
+ Modified = properties.LastModified.UtcDateTime
+ };
}
private string NormalizePath(string path)
@@ -318,5 +400,7 @@ private SearchCriteria GetRequestCriteria(string searchPattern)
};
}
- public void Dispose() { }
+ public void Dispose()
+ {
+ }
}
diff --git a/src/Foundatio.AzureStorage/Storage/AzureFileStorageOptions.cs b/src/Foundatio.AzureStorage/Storage/AzureFileStorageOptions.cs
index a11f1b7..d5e9dc3 100644
--- a/src/Foundatio.AzureStorage/Storage/AzureFileStorageOptions.cs
+++ b/src/Foundatio.AzureStorage/Storage/AzureFileStorageOptions.cs
@@ -1,4 +1,5 @@
using System;
+using Azure.Core;
namespace Foundatio.Storage;
@@ -6,19 +7,60 @@ public class AzureFileStorageOptions : SharedOptions
{
public string ConnectionString { get; set; }
public string ContainerName { get; set; } = "storage";
+
+ ///
+ /// Optional action to configure Azure SDK retry options.
+ /// Default Azure SDK retry settings: MaxRetries=3, Delay=0.8s, MaxDelay=1min, Mode=Exponential.
+ ///
+ ///
+ ///
+ /// options.ConfigureRetry = retry =>
+ /// {
+ /// retry.MaxRetries = 5;
+ /// retry.Delay = TimeSpan.FromSeconds(1);
+ /// retry.Mode = RetryMode.Exponential;
+ /// };
+ ///
+ ///
+ public Action ConfigureRetry { get; set; }
}
public class AzureFileStorageOptionsBuilder : SharedOptionsBuilder
{
public AzureFileStorageOptionsBuilder ConnectionString(string connectionString)
{
- Target.ConnectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
+ ArgumentException.ThrowIfNullOrEmpty(connectionString);
+
+ Target.ConnectionString = connectionString;
return this;
}
public AzureFileStorageOptionsBuilder ContainerName(string containerName)
{
- Target.ContainerName = containerName ?? throw new ArgumentNullException(nameof(containerName));
+ ArgumentException.ThrowIfNullOrEmpty(containerName);
+
+ Target.ContainerName = containerName;
+ return this;
+ }
+
+ ///
+ /// Configures Azure SDK retry options for transient failure handling.
+ ///
+ /// Action to configure retry options.
+ ///
+ ///
+ /// .ConfigureRetry(retry =>
+ /// {
+ /// retry.MaxRetries = 5;
+ /// retry.Delay = TimeSpan.FromSeconds(1);
+ /// })
+ ///
+ ///
+ public AzureFileStorageOptionsBuilder ConfigureRetry(Action configure)
+ {
+ ArgumentNullException.ThrowIfNull(configure);
+
+ Target.ConfigureRetry = configure;
return this;
}
}
diff --git a/tests/Foundatio.AzureStorage.Tests/Queues/AzureStorageQueueTests.cs b/tests/Foundatio.AzureStorage.Tests/Queues/AzureStorageQueueTests.cs
index 4450613..a925e61 100644
--- a/tests/Foundatio.AzureStorage.Tests/Queues/AzureStorageQueueTests.cs
+++ b/tests/Foundatio.AzureStorage.Tests/Queues/AzureStorageQueueTests.cs
@@ -1,9 +1,9 @@
using System;
using System.Threading.Tasks;
+using Azure.Core;
using Foundatio.Queues;
using Foundatio.Tests.Queue;
using Foundatio.Tests.Utility;
-using Microsoft.Azure.Storage.RetryPolicies;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;
@@ -14,7 +14,9 @@ public class AzureStorageQueueTests : QueueTestBase
{
private readonly string _queueName = "foundatio-" + Guid.NewGuid().ToString("N").Substring(10);
- public AzureStorageQueueTests(ITestOutputHelper output) : base(output) { }
+ public AzureStorageQueueTests(ITestOutputHelper output) : base(output)
+ {
+ }
protected override IQueue GetQueue(int retries = 1, TimeSpan? workItemTimeout = null, TimeSpan? retryDelay = null, int[] retryMultipliers = null, int deadLetterMaxItems = 100, bool runQueueMaintenance = true, TimeProvider timeProvider = null)
{
@@ -22,12 +24,15 @@ protected override IQueue GetQueue(int retries = 1, TimeSpan? wo
if (String.IsNullOrEmpty(connectionString))
return null;
+ // TODO: We could use ExponentialRetry here if we wanted to test that as well. Could it use the same as options (into a shared helper) public Func RetryDelay { get; set; } = attempt =>
+ // TimeSpan.FromSeconds(Math.Pow(2, attempt)) + TimeSpan.FromMilliseconds(Random.Shared.Next(0, 100));
+
_logger.LogDebug("Queue Id: {Name}", _queueName);
return new AzureStorageQueue(o => o
.ConnectionString(connectionString)
.Name(_queueName)
.Retries(retries)
- .RetryPolicy(retries <= 0 ? new NoRetry() : new ExponentialRetry(retryDelay.GetValueOrDefault(TimeSpan.FromMinutes(1)), retries))
+ .RetryDelay(_ => retries <= 0 ? TimeSpan.Zero : retryDelay.GetValueOrDefault(TimeSpan.FromMinutes(1)))
.WorkItemTimeout(workItemTimeout.GetValueOrDefault(TimeSpan.FromMinutes(5)))
.DequeueInterval(TimeSpan.FromSeconds(1))
.MetricsPollingInterval(TimeSpan.Zero)
@@ -41,6 +46,7 @@ protected override Task CleanupQueueAsync(IQueue queue)
queue?.Dispose();
return Task.CompletedTask;
}
+
[Fact]
public override Task CanQueueAndDequeueWorkItemAsync()
{
@@ -53,7 +59,7 @@ public override Task CanQueueAndDequeueWorkItemWithDelayAsync()
return base.CanQueueAndDequeueWorkItemWithDelayAsync();
}
- [Fact(Skip = "Storage Queues don't support the round tripping of user headers for values like correlation id")]
+ [Fact(Skip = "Azure Storage Queue does not support CorrelationId or Properties - only message body is persisted")]
public override Task CanUseQueueOptionsAsync()
{
return base.CanUseQueueOptionsAsync();
@@ -119,7 +125,7 @@ public override Task CanHandleErrorInWorkerAsync()
return base.CanHandleErrorInWorkerAsync();
}
- [Fact(Skip = "CompleteAsync after timeout will not throw")]
+ [Fact(Skip = "Azure Storage Queue handles visibility timeout natively; no client-side auto-abandon")]
public override Task WorkItemsWillTimeoutAsync()
{
return base.WorkItemsWillTimeoutAsync();
@@ -143,7 +149,7 @@ public override Task CanHaveMultipleQueueInstancesAsync()
return base.CanHaveMultipleQueueInstancesAsync();
}
- [Fact(Skip = "TODO: Retry delays are currently not applied to abandoned items")]
+ [Fact]
public override Task CanDelayRetryAsync()
{
return base.CanDelayRetryAsync();
@@ -203,7 +209,7 @@ public override Task VerifyDelayedRetryAttemptsAsync()
return base.VerifyDelayedRetryAttemptsAsync();
}
- [Fact(Skip = "Storage Queues has no queue stats for abandoned, it just increments the queued count and decrements the working count. Only the entry attribute ApproximateNumberOfMessages is available.")]
+ [Fact(Skip = "Azure Storage Queue handles visibility timeout natively; no client-side auto-abandon")]
public override Task CanHandleAutoAbandonInWorker()
{
return base.CanHandleAutoAbandonInWorker();
diff --git a/tests/Foundatio.AzureStorage.Tests/Storage/AzureStorageTests.cs b/tests/Foundatio.AzureStorage.Tests/Storage/AzureStorageTests.cs
index f08cc8e..077d937 100644
--- a/tests/Foundatio.AzureStorage.Tests/Storage/AzureStorageTests.cs
+++ b/tests/Foundatio.AzureStorage.Tests/Storage/AzureStorageTests.cs
@@ -170,8 +170,8 @@ public virtual async Task WillNotReturnDirectoryInGetPagedFileListAsync()
var container = storage is AzureFileStorage azureFileStorage ? azureFileStorage.Container : null;
Assert.NotNull(container);
- var blockBlob = container.GetBlockBlobReference("EmptyFolder/");
- await blockBlob.UploadFromStreamAsync(new MemoryStream(), null, null, null);
+ var blobClient = container.GetBlobClient("EmptyFolder/");
+ await blobClient.UploadAsync(new MemoryStream());
result = await storage.GetPagedFileListAsync();
Assert.False(result.HasMore);