diff --git a/Foundatio.AzureStorage.slnx b/Foundatio.AzureStorage.slnx index 5106bfc..5d0fc41 100644 --- a/Foundatio.AzureStorage.slnx +++ b/Foundatio.AzureStorage.slnx @@ -5,9 +5,14 @@ + + + + + diff --git a/build/common.props b/build/common.props index ed9fa09..3b26720 100644 --- a/build/common.props +++ b/build/common.props @@ -39,7 +39,7 @@ - + diff --git a/docker-compose.yml b/docker-compose.yml index 14c3ff5..0fcb0f7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,7 @@ services: azurite: image: mcr.microsoft.com/azure-storage/azurite:3.35.0 + command: azurite --skipApiVersionCheck --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0 ports: - 10000:10000 - 10001:10001 diff --git a/samples/Directory.Build.props b/samples/Directory.Build.props new file mode 100644 index 0000000..a3f573a --- /dev/null +++ b/samples/Directory.Build.props @@ -0,0 +1,8 @@ + + + + net8.0 + Exe + False + + diff --git a/samples/Foundatio.AzureStorage.Dequeue/Foundatio.AzureStorage.Dequeue.csproj b/samples/Foundatio.AzureStorage.Dequeue/Foundatio.AzureStorage.Dequeue.csproj new file mode 100644 index 0000000..5be4e46 --- /dev/null +++ b/samples/Foundatio.AzureStorage.Dequeue/Foundatio.AzureStorage.Dequeue.csproj @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/samples/Foundatio.AzureStorage.Dequeue/Program.cs b/samples/Foundatio.AzureStorage.Dequeue/Program.cs new file mode 100644 index 0000000..d5c3dc1 --- /dev/null +++ b/samples/Foundatio.AzureStorage.Dequeue/Program.cs @@ -0,0 +1,156 @@ +using System; +using System.CommandLine; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Foundatio.AzureStorage.Samples; +using Foundatio.Queues; +using Microsoft.Extensions.Logging; + +// Azure Storage Emulator connection string +const string EmulatorConnectionString = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;QueueEndpoint=http://localhost:10001/devstoreaccount1;"; + +// Define options +var connectionStringOption = new Option("--connection-string", "-c") +{ + Description = "Azure Storage connection string (defaults to emulator)" +}; + +var queueOption = new Option("--queue", "-q") +{ + Description = "Queue name", + DefaultValueFactory = _ => "sample-queue" +}; + +var modeOption = new Option("--mode") +{ + Description = "Compatibility mode (Default or Legacy)", + DefaultValueFactory = _ => AzureStorageQueueCompatibilityMode.Default +}; + +var countOption = new Option("--count") +{ + Description = "Number of messages to process (0 = infinite)", + DefaultValueFactory = _ => 1 +}; + +// Create root command +var rootCommand = new RootCommand("Azure Storage Queue Dequeue Sample"); +rootCommand.Options.Add(connectionStringOption); +rootCommand.Options.Add(queueOption); +rootCommand.Options.Add(modeOption); +rootCommand.Options.Add(countOption); + +// Set handler +rootCommand.SetAction(async parseResult => +{ + var connectionString = parseResult.GetValue(connectionStringOption) ?? + Environment.GetEnvironmentVariable("AZURE_STORAGE_CONNECTION_STRING") ?? + EmulatorConnectionString; + + var queueName = parseResult.GetValue(queueOption); + var mode = parseResult.GetValue(modeOption); + var count = parseResult.GetValue(countOption); + + Console.WriteLine($"Using connection: {(connectionString == EmulatorConnectionString ? "Azure Storage Emulator" : "Custom connection string")}"); + Console.WriteLine($"Mode: {mode}"); + Console.WriteLine($"Queue: {queueName}"); + Console.WriteLine($"To process: {(count == 0 ? "infinite messages" : $"{count} message(s)")}"); + Console.WriteLine(); + Console.WriteLine("Press Ctrl+C to stop..."); + Console.WriteLine(); + + await DequeueMessages(connectionString, queueName, mode, count); + return 0; +}); + +// Parse and invoke +return await rootCommand.Parse(args).InvokeAsync(); + +static async Task DequeueMessages(string connectionString, string queueName, AzureStorageQueueCompatibilityMode mode, int count) +{ + using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information)); + var logger = loggerFactory.CreateLogger("Dequeue"); + using var cts = new CancellationTokenSource(); + + Console.CancelKeyPress += (s, e) => + { + e.Cancel = true; + try + { + cts.Cancel(); + } + catch + { + // ignored + } + + logger.LogInformation("Cancellation requested..."); + }; + + logger.LogInformation("Creating queue with mode: {Mode}", mode); + + using var queue = new AzureStorageQueue(options => options + .ConnectionString(connectionString) + .Name(queueName) + .CompatibilityMode(mode) + .LoggerFactory(loggerFactory)); + + int processed = 0; + bool infinite = count == 0; + + logger.LogInformation("Waiting for messages... (Press Ctrl+C to stop)"); + + try + { + while (!cts.Token.IsCancellationRequested && (infinite || processed < count)) + { + var entry = await queue.DequeueAsync(cts.Token); + + if (entry == null) + { + if (!infinite && processed >= count) + break; + + continue; + } + + try + { + processed++; + + logger.LogInformation("Dequeued message {MessageId}: '{Message}' from '{Source}' at {Timestamp}", + entry.Id, entry.Value.Message, entry.Value.Source, entry.Value.Timestamp); + + logger.LogInformation(" CorrelationId: '{CorrelationId}'", entry.CorrelationId ?? ""); + + if (entry.Properties != null && entry.Properties.Count > 0) + { + logger.LogInformation(" Properties: [{Properties}]", + string.Join(", ", entry.Properties.Select(p => $"{p.Key}={p.Value}"))); + } + else + { + logger.LogInformation(" Properties: "); + } + + // Simulate processing time + await Task.Delay(100, cts.Token); + + await entry.CompleteAsync(); + logger.LogInformation(" Completed message {MessageId}", entry.Id); + } + catch (Exception ex) + { + logger.LogError(ex, "Error processing message {MessageId}", entry.Id); + await entry.AbandonAsync(); + } + } + } + catch (OperationCanceledException ex) + { + logger.LogInformation(ex, "Operation was cancelled"); + } + + logger.LogInformation("Processed {ProcessedCount} message(s)", processed); +} diff --git a/samples/Foundatio.AzureStorage.Enqueue/Foundatio.AzureStorage.Enqueue.csproj b/samples/Foundatio.AzureStorage.Enqueue/Foundatio.AzureStorage.Enqueue.csproj new file mode 100644 index 0000000..4cecd22 --- /dev/null +++ b/samples/Foundatio.AzureStorage.Enqueue/Foundatio.AzureStorage.Enqueue.csproj @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/samples/Foundatio.AzureStorage.Enqueue/Program.cs b/samples/Foundatio.AzureStorage.Enqueue/Program.cs new file mode 100644 index 0000000..b9a2dc8 --- /dev/null +++ b/samples/Foundatio.AzureStorage.Enqueue/Program.cs @@ -0,0 +1,137 @@ +using System; +using System.Collections.Generic; +using System.CommandLine; +using System.Linq; +using System.Threading.Tasks; +using Foundatio.AzureStorage.Samples; +using Foundatio.Queues; +using Microsoft.Extensions.Logging; + +// Azure Storage Emulator connection string +const string EmulatorConnectionString = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;QueueEndpoint=http://localhost:10001/devstoreaccount1;"; + +// Define options +var connectionStringOption = new Option("--connection-string", "-c") +{ + Description = "Azure Storage connection string (defaults to emulator)" +}; + +var queueOption = new Option("--queue", "-q") +{ + Description = "Queue name", + DefaultValueFactory = _ => "sample-queue" +}; + +var messageOption = new Option("--message", "-m") +{ + Description = "Message to send", + DefaultValueFactory = _ => "Hello World" +}; + +var correlationIdOption = new Option("--correlation-id") +{ + Description = "Correlation ID for the message" +}; + +var propertiesOption = new Option("--property") +{ + Description = "Custom properties in key=value format", + DefaultValueFactory = _ => Array.Empty() +}; + +var modeOption = new Option("--mode") +{ + Description = "Compatibility mode (Default or Legacy)", + DefaultValueFactory = _ => AzureStorageQueueCompatibilityMode.Default +}; + +var countOption = new Option("--count") +{ + Description = "Number of messages to send", + DefaultValueFactory = _ => 1 +}; + +// Create root command +var rootCommand = new RootCommand("Azure Storage Queue Enqueue Sample"); +rootCommand.Options.Add(connectionStringOption); +rootCommand.Options.Add(queueOption); +rootCommand.Options.Add(messageOption); +rootCommand.Options.Add(correlationIdOption); +rootCommand.Options.Add(propertiesOption); +rootCommand.Options.Add(modeOption); +rootCommand.Options.Add(countOption); + +// Set handler +rootCommand.SetAction(async parseResult => +{ + var connectionString = parseResult.GetValue(connectionStringOption) ?? + Environment.GetEnvironmentVariable("AZURE_STORAGE_CONNECTION_STRING") ?? + EmulatorConnectionString; + + var queueName = parseResult.GetValue(queueOption); + var message = parseResult.GetValue(messageOption); + var correlationId = parseResult.GetValue(correlationIdOption); + var properties = parseResult.GetValue(propertiesOption); + var mode = parseResult.GetValue(modeOption); + var count = parseResult.GetValue(countOption); + + Console.WriteLine($"Using connection: {(connectionString == EmulatorConnectionString ? "Azure Storage Emulator" : "Custom connection string")}"); + Console.WriteLine($"Mode: {mode}"); + Console.WriteLine(); + + await EnqueueMessages(connectionString, queueName, message, correlationId, properties, mode, count); + return 0; +}); + +// Parse and invoke +return await rootCommand.Parse(args).InvokeAsync(); + +static async Task EnqueueMessages(string connectionString, string queueName, string message, string correlationId, string[] properties, AzureStorageQueueCompatibilityMode mode, int count) +{ + using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information)); + var logger = loggerFactory.CreateLogger("Enqueue"); + + logger.LogInformation("Creating queue with mode: {Mode}", mode); + + using var queue = new AzureStorageQueue(options => options + .ConnectionString(connectionString) + .Name(queueName) + .CompatibilityMode(mode) + .LoggerFactory(loggerFactory)); + + var queueProperties = new Dictionary(); + if (properties != null) + { + foreach (var prop in properties) + { + var parts = prop.Split('=', 2); + if (parts.Length == 2) + { + queueProperties[parts[0]] = parts[1]; + } + } + } + + for (int i = 0; i < count; i++) + { + var sampleMessage = new SampleMessage + { + Message = count > 1 ? $"{message} #{i + 1}" : message, + Source = "Enqueue Sample" + }; + + var entryOptions = new QueueEntryOptions + { + CorrelationId = correlationId, + Properties = queueProperties.Count > 0 ? queueProperties : null + }; + + var messageId = await queue.EnqueueAsync(sampleMessage, entryOptions); + + logger.LogInformation("Enqueued message {MessageId}: '{Message}' with CorrelationId: '{CorrelationId}' Properties: [{Properties}]", + messageId, sampleMessage.Message, correlationId ?? "", + string.Join(", ", queueProperties.Select(p => $"{p.Key}={p.Value}"))); + } + + logger.LogInformation("Successfully enqueued {Count} message(s)", count); +} diff --git a/samples/Foundatio.AzureStorage.Enqueue/SampleMessage.cs b/samples/Foundatio.AzureStorage.Enqueue/SampleMessage.cs new file mode 100644 index 0000000..f36d9a5 --- /dev/null +++ b/samples/Foundatio.AzureStorage.Enqueue/SampleMessage.cs @@ -0,0 +1,10 @@ +using System; + +namespace Foundatio.AzureStorage.Samples; + +public record SampleMessage +{ + public string Message { get; init; } = string.Empty; + public DateTime Timestamp { get; init; } = DateTime.UtcNow; + public string Source { get; init; } = string.Empty; +} diff --git a/samples/README.md b/samples/README.md new file mode 100644 index 0000000..2a6573e --- /dev/null +++ b/samples/README.md @@ -0,0 +1,140 @@ +# Azure Storage Queue Sample Applications + +This folder contains sample console applications demonstrating the Azure Storage Queue implementation with both Default (envelope) and Legacy (v11 SDK compatible) modes. + +## Sample Applications + +### Foundatio.AzureStorage.Enqueue + +Enqueues messages to an Azure Storage Queue with support for correlation IDs and custom properties. + +**Usage:** +```bash +dotnet run --project samples/Foundatio.AzureStorage.Enqueue --framework net8.0 -- [options] +``` + +**Options:** +- `-c, --connection-string` - Azure Storage connection string (defaults to emulator) +- `-q, --queue` - Queue name (default: sample-queue) +- `-m, --message` - Message to send (default: Hello World) +- `--correlation-id` - Correlation ID for the message +- `--property` - Custom properties in key=value format (can be used multiple times) +- `--mode` - Compatibility mode: Default or Legacy (default: Default) +- `--count` - Number of messages to send (default: 1) + +**Examples:** +```bash +# Send a simple message using emulator +dotnet run --project samples/Foundatio.AzureStorage.Enqueue --framework net8.0 + +# Send message with metadata (Default mode) +dotnet run --project samples/Foundatio.AzureStorage.Enqueue --framework net8.0 -- \ + --message "Test Message" \ + --correlation-id "12345" \ + --property "Source=Sample" \ + --property "Priority=High" + +# Send message in Legacy mode (v11 SDK compatibility) +dotnet run --project samples/Foundatio.AzureStorage.Enqueue --framework net8.0 -- \ + --mode Legacy \ + --message "Legacy Message" +``` + +### Foundatio.AzureStorage.Dequeue + +Dequeues and processes messages from an Azure Storage Queue, displaying all metadata. + +**Usage:** +```bash +dotnet run --project samples/Foundatio.AzureStorage.Dequeue --framework net8.0 -- [options] +``` + +**Options:** +- `-c, --connection-string` - Azure Storage connection string (defaults to emulator) +- `-q, --queue` - Queue name (default: sample-queue) +- `--mode` - Compatibility mode: Default or Legacy (default: Default) +- `--count` - Number of messages to process, 0 = infinite (default: 1) + +**Examples:** +```bash +# Process one message using emulator +dotnet run --project samples/Foundatio.AzureStorage.Dequeue --framework net8.0 + +# Process messages continuously (press Ctrl+C to stop) +dotnet run --project samples/Foundatio.AzureStorage.Dequeue --framework net8.0 -- \ + --count 0 + +# Process messages in Legacy mode +dotnet run --project samples/Foundatio.AzureStorage.Dequeue --framework net8.0 -- \ + --mode Legacy +``` + +## Azure Storage Emulator / Azurite + +The sample applications default to the Azure Storage Emulator connection string for local development: + +``` +DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;QueueEndpoint=http://localhost:10001/devstoreaccount1; +``` + +### Running Azurite + +To start Azurite (the Azure Storage Emulator): + +```bash +# Install Azurite globally +npm install -g azurite + +# Start Azurite +azurite --silent --location c:\azurite --debug c:\azurite\debug.log +``` + +Or using Docker: +```bash +docker run -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azure-storage/azurite +``` + +### Connection String Priority + +The applications check for connection strings in this order: +1. `--connection-string` command line argument +2. `AZURE_STORAGE_CONNECTION_STRING` environment variable +3. Azure Storage Emulator default connection string + +## Compatibility Modes + +### Default Mode +- Uses message envelope format with full metadata support +- Supports `CorrelationId` and `Properties` for distributed tracing +- Recommended for new applications + +### Legacy Mode +- Compatible with v11 Microsoft.Azure.Storage.Queue SDK +- Uses Base64 encoding and raw message body +- No metadata support (CorrelationId/Properties are lost) +- Use for backward compatibility with existing queues + +## Demo Workflow + +1. Start Azurite: + ```bash + azurite --silent --location c:\azurite + ``` + +2. Send messages with metadata: + ```bash + dotnet run --project samples/Foundatio.AzureStorage.Enqueue --framework net8.0 -- \ + --message "Hello from Foundatio" \ + --correlation-id "demo-123" \ + --property "Source=Demo" \ + --property "Environment=Development" \ + --count 3 + ``` + +3. Process the messages: + ```bash + dotnet run --project samples/Foundatio.AzureStorage.Dequeue --framework net8.0 -- \ + --count 0 + ``` + +The dequeue application will display the full message content along with all metadata preserved by the envelope format. diff --git a/src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs b/src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs deleted file mode 100644 index aa651cd..0000000 --- a/src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using Foundatio.Storage; -using Microsoft.Azure.Storage.Blob; - -namespace Foundatio.Azure.Extensions; - -public static class StorageExtensions -{ - public static FileSpec ToFileInfo(this CloudBlockBlob blob) - { - if (blob.Properties.Length == -1) - return null; - - return new FileSpec - { - Path = blob.Name, - Size = blob.Properties.Length, - Created = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue, - Modified = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue - }; - } -} diff --git a/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj b/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj index e46221c..39cae2c 100644 --- a/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj +++ b/src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj @@ -4,8 +4,8 @@ Queue;Messaging;Message;File;Distributed;Storage;Blob;Azure - - + + diff --git a/src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs b/src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs index 4f68a5a..55f2e54 100644 --- a/src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs +++ b/src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs @@ -3,11 +3,12 @@ using System.Diagnostics; using System.Threading; using System.Threading.Tasks; +using Azure; +using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; using Foundatio.AsyncEx; using Foundatio.Extensions; using Foundatio.Serializer; -using Microsoft.Azure.Storage; -using Microsoft.Azure.Storage.Queue; using Microsoft.Extensions.Logging; namespace Foundatio.Queues; @@ -15,8 +16,8 @@ namespace Foundatio.Queues; public class AzureStorageQueue : QueueBase> where T : class { private readonly AsyncLock _lock = new(); - private readonly CloudQueue _queueReference; - private readonly CloudQueue _deadletterQueueReference; + private readonly Lazy _queueClient; + private readonly Lazy _deadletterQueueClient; private long _enqueuedCount; private long _dequeuedCount; private long _completedCount; @@ -29,17 +30,22 @@ public AzureStorageQueue(AzureStorageQueueOptions options) : base(options) if (String.IsNullOrEmpty(options.ConnectionString)) throw new ArgumentException("ConnectionString is required."); - var account = CloudStorageAccount.Parse(options.ConnectionString); - var client = account.CreateCloudQueueClient(); - if (options.RetryPolicy != null) - client.DefaultRequestOptions.RetryPolicy = options.RetryPolicy; + var clientOptions = new QueueClientOptions(); + if (options.CompatibilityMode == AzureStorageQueueCompatibilityMode.Legacy) + clientOptions.MessageEncoding = QueueMessageEncoding.Base64; - _queueReference = client.GetQueueReference(_options.Name); - _deadletterQueueReference = client.GetQueueReference($"{_options.Name}-poison"); + options.ConfigureRetry?.Invoke(clientOptions.Retry); + + _queueClient = new Lazy(() => + new QueueClient(options.ConnectionString, _options.Name, clientOptions)); + _deadletterQueueClient = new Lazy(() => + new QueueClient(options.ConnectionString, $"{_options.Name}-poison", clientOptions)); } public AzureStorageQueue(Builder, AzureStorageQueueOptions> config) - : this(config(new AzureStorageQueueOptionsBuilder()).Build()) { } + : this(config(new AzureStorageQueueOptionsBuilder()).Build()) + { + } protected override async Task EnsureQueueCreatedAsync(CancellationToken cancellationToken = default) { @@ -53,8 +59,8 @@ protected override async Task EnsureQueueCreatedAsync(CancellationToken cancella var sw = Stopwatch.StartNew(); await Task.WhenAll( - _queueReference.CreateIfNotExistsAsync(), - _deadletterQueueReference.CreateIfNotExistsAsync() + _queueClient.Value.CreateIfNotExistsAsync(cancellationToken: cancellationToken), + _deadletterQueueClient.Value.CreateIfNotExistsAsync(cancellationToken: cancellationToken) ).AnyContext(); _queueCreated = true; @@ -69,37 +75,87 @@ protected override async Task EnqueueImplAsync(T data, QueueEntryOptions return null; Interlocked.Increment(ref _enqueuedCount); - var message = new CloudQueueMessage(_serializer.SerializeToBytes(data)); - await _queueReference.AddMessageAsync(message, null, options.DeliveryDelay, null, null).AnyContext(); - var entry = new QueueEntry(message.Id, options.CorrelationId, data, this, _timeProvider.GetLocalNow().UtcDateTime, 0); + byte[] messageBodyBytes; + + if (_options.CompatibilityMode == AzureStorageQueueCompatibilityMode.Default) + { + // Wrap in envelope to preserve CorrelationId and Properties + var envelope = new QueueMessageEnvelope + { + CorrelationId = options.CorrelationId, + Properties = options.Properties, + Data = data + }; + messageBodyBytes = _serializer.SerializeToBytes(envelope); + } + else + { + // Legacy mode: serialize data directly for backward compatibility + messageBodyBytes = _serializer.SerializeToBytes(data); + } + + var messageBody = new BinaryData(messageBodyBytes); + var response = await _queueClient.Value.SendMessageAsync( + messageBody, + visibilityTimeout: options.DeliveryDelay, + cancellationToken: CancellationToken.None).AnyContext(); + + var entry = new QueueEntry(response.Value.MessageId, null, data, this, _timeProvider.GetLocalNow().UtcDateTime, 0); await OnEnqueuedAsync(entry).AnyContext(); - return message.Id; + _logger.LogTrace("Enqueued message {MessageId}", response.Value.MessageId); + return response.Value.MessageId; } protected override async Task> DequeueImplAsync(CancellationToken linkedCancellationToken) { - var message = await _queueReference.GetMessageAsync(_options.WorkItemTimeout, null, null, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext(); + _logger.LogTrace("Checking for message: IsCancellationRequested={IsCancellationRequested} VisibilityTimeout={VisibilityTimeout}", linkedCancellationToken.IsCancellationRequested, _options.WorkItemTimeout); + + // Try to receive a message immediately + // Note: We use CancellationToken.None here because Azure SDK throws TaskCanceledException when the token + // is canceled, but we want to return null gracefully. Cancellation is checked before/after the call. + var response = await _queueClient.Value.ReceiveMessageAsync(_options.WorkItemTimeout, CancellationToken.None).AnyContext(); + var message = response?.Value; + + // If no message and cancellation requested, return null immediately (don't wait/poll) + if (message == null && linkedCancellationToken.IsCancellationRequested) + { + _logger.LogTrace("No message available and cancellation requested, returning null"); + return null; + } + + // If no message and not canceled, poll with fixed interval + // Note: Azure Storage Queue doesn't support long-polling, so we must poll if (message == null) { var sw = Stopwatch.StartNew(); - var lastReport = DateTime.Now; - _logger.LogTrace("No message available to dequeue, waiting..."); + var lastReport = _timeProvider.GetUtcNow(); + _logger.LogTrace("No message available to dequeue, polling..."); while (message == null && !linkedCancellationToken.IsCancellationRequested) { - if (DateTime.Now.Subtract(lastReport) > TimeSpan.FromSeconds(10)) + if (_timeProvider.GetUtcNow().Subtract(lastReport) > TimeSpan.FromSeconds(10)) + { + lastReport = _timeProvider.GetUtcNow(); _logger.LogTrace("Still waiting for message to dequeue: {Elapsed:g}", sw.Elapsed); + } try { if (!linkedCancellationToken.IsCancellationRequested) await _timeProvider.Delay(_options.DequeueInterval, linkedCancellationToken).AnyContext(); } - catch (OperationCanceledException) { } + catch (OperationCanceledException) + { + // Ignore cancellation during delay + } - message = await _queueReference.GetMessageAsync(_options.WorkItemTimeout, null, null, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext(); + if (linkedCancellationToken.IsCancellationRequested) + break; + + response = await _queueClient.Value.ReceiveMessageAsync(_options.WorkItemTimeout, CancellationToken.None).AnyContext(); + message = response?.Value; } sw.Stop(); @@ -112,11 +168,35 @@ protected override async Task> DequeueImplAsync(CancellationToken return null; } - _logger.LogTrace("Dequeued message {QueueEntryId}", message.Id); + var nowUtc = _timeProvider.GetUtcNow().UtcDateTime; + var insertedOn = message.InsertedOn?.UtcDateTime ?? DateTime.MinValue; + var queueTime = nowUtc - insertedOn; + _logger.LogTrace("Received message: {QueueEntryId} InsertedOn={InsertedOn} NowUtc={NowUtc} QueueTime={QueueTime}ms IsCancellationRequested={IsCancellationRequested}", + message.MessageId, insertedOn, nowUtc, queueTime.TotalMilliseconds, linkedCancellationToken.IsCancellationRequested); Interlocked.Increment(ref _dequeuedCount); - var data = _serializer.Deserialize(message.AsBytes); - var entry = new AzureStorageQueueEntry(message, data, this); + + T data; + string correlationId = null; + IDictionary properties = null; + + if (_options.CompatibilityMode == AzureStorageQueueCompatibilityMode.Default) + { + // Unwrap envelope to extract metadata + var envelope = _serializer.Deserialize>(message.Body.ToArray()); + data = envelope.Data; + correlationId = envelope.CorrelationId; + properties = envelope.Properties; + } + else + { + // Legacy mode: deserialize data directly (no envelope) + data = _serializer.Deserialize(message.Body.ToArray()); + } + + var entry = new AzureStorageQueueEntry(message, correlationId, properties, data, this); + await OnDequeuedAsync(entry).AnyContext(); + _logger.LogTrace("Dequeued message: {QueueEntryId}", message.MessageId); return entry; } @@ -124,9 +204,16 @@ public override async Task RenewLockAsync(IQueueEntry entry) { _logger.LogDebug("Queue {QueueName} renew lock item: {QueueEntryId}", _options.Name, entry.Id); var azureQueueEntry = ToAzureEntryWithCheck(entry); - await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage, _options.WorkItemTimeout, MessageUpdateFields.Visibility).AnyContext(); + var response = await _queueClient.Value.UpdateMessageAsync( + azureQueueEntry.UnderlyingMessage.MessageId, + azureQueueEntry.PopReceipt, + visibilityTimeout: _options.WorkItemTimeout).AnyContext(); + + // Update the pop receipt since it changes after each update + azureQueueEntry.PopReceipt = response.Value.PopReceipt; + await OnLockRenewedAsync(entry).AnyContext(); - _logger.LogTrace("Renew lock done: {QueueEntryId}", entry.Id); + _logger.LogTrace("Renew lock done: {QueueEntryId} MessageId={MessageId} VisibilityTimeout={VisibilityTimeout}", entry.Id, azureQueueEntry.UnderlyingMessage.MessageId, _options.WorkItemTimeout); } public override async Task CompleteAsync(IQueueEntry entry) @@ -136,7 +223,20 @@ public override async Task CompleteAsync(IQueueEntry entry) throw new InvalidOperationException("Queue entry has already been completed or abandoned."); var azureQueueEntry = ToAzureEntryWithCheck(entry); - await _queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage).AnyContext(); + + try + { + await _queueClient.Value.DeleteMessageAsync( + azureQueueEntry.UnderlyingMessage.MessageId, + azureQueueEntry.PopReceipt).AnyContext(); + } + catch (RequestFailedException ex) when (ex.Status == 404 || ex.ErrorCode == QueueErrorCode.MessageNotFound || ex.ErrorCode == QueueErrorCode.PopReceiptMismatch) + { + // Message was already deleted or the pop receipt expired (visibility timeout) + // This means the item was auto-abandoned by Azure + _logger.LogWarning("Failed to complete message {MessageId}: message not found or pop receipt expired (visibility timeout may have elapsed)", azureQueueEntry.UnderlyingMessage.MessageId); + throw new InvalidOperationException("Queue entry visibility timeout has elapsed and the message is no longer locked.", ex); + } Interlocked.Increment(ref _completedCount); entry.MarkCompleted(); @@ -153,21 +253,51 @@ public override async Task AbandonAsync(IQueueEntry entry) var azureQueueEntry = ToAzureEntryWithCheck(entry); if (azureQueueEntry.Attempts > _options.Retries) { - await Task.WhenAll( - _queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage), - _deadletterQueueReference.AddMessageAsync(azureQueueEntry.UnderlyingMessage) - ).AnyContext(); + _logger.LogDebug("Moving message {QueueEntryId} to deadletter after {Attempts} attempts", entry.Id, azureQueueEntry.Attempts); + + // Send to deadletter queue first, then delete from main queue (sequential for data integrity) + byte[] deadletterBytes; + if (_options.CompatibilityMode == AzureStorageQueueCompatibilityMode.Default) + { + // Preserve envelope with metadata for Default mode + var envelope = new QueueMessageEnvelope + { + CorrelationId = entry.CorrelationId, + Properties = entry.Properties, + Data = entry.Value + }; + deadletterBytes = _serializer.SerializeToBytes(envelope); + } + else + { + deadletterBytes = _serializer.SerializeToBytes(entry.Value); + } + + var messageBody = new BinaryData(deadletterBytes); + await _deadletterQueueClient.Value.SendMessageAsync(messageBody).AnyContext(); + await _queueClient.Value.DeleteMessageAsync( + azureQueueEntry.UnderlyingMessage.MessageId, + azureQueueEntry.PopReceipt).AnyContext(); } else { - // Make the item visible immediately - await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage, TimeSpan.Zero, MessageUpdateFields.Visibility).AnyContext(); + // Calculate visibility timeout based on retry delay + var retryDelay = _options.RetryDelay(azureQueueEntry.Attempts); + _logger.LogDebug("Making message {QueueEntryId} visible after {RetryDelay}", entry.Id, retryDelay); + + var response = await _queueClient.Value.UpdateMessageAsync( + azureQueueEntry.UnderlyingMessage.MessageId, + azureQueueEntry.PopReceipt, + visibilityTimeout: retryDelay).AnyContext(); + + // Update the pop receipt since it changes after each update + azureQueueEntry.PopReceipt = response.Value.PopReceipt; } Interlocked.Increment(ref _abandonedCount); entry.MarkAbandoned(); await OnAbandonedAsync(entry).AnyContext(); - _logger.LogTrace("Abandon complete: {QueueEntryId}", entry.Id); + _logger.LogTrace("Abandon complete: {QueueEntryId} MessageId={MessageId} Attempts={Attempts}", entry.Id, azureQueueEntry.UnderlyingMessage.MessageId, azureQueueEntry.Attempts); } protected override Task> GetDeadletterItemsImplAsync(CancellationToken cancellationToken) @@ -177,7 +307,9 @@ protected override Task> GetDeadletterItemsImplAsync(Cancellation protected override async Task GetQueueStatsImplAsync() { - if (_queueReference == null || _deadletterQueueReference == null || !_queueCreated) + // Note: Azure Storage Queue does not provide Working count (in-flight messages) or Timeouts. + // These stats are only available per-process and meaningless in distributed scenarios. + if (!_queueCreated) return new QueueStats { Queued = 0, @@ -192,30 +324,29 @@ protected override async Task GetQueueStatsImplAsync() }; var sw = Stopwatch.StartNew(); - await Task.WhenAll( - _queueReference.FetchAttributesAsync(), - _deadletterQueueReference.FetchAttributesAsync() - ).AnyContext(); + var queuePropertiesTask = _queueClient.Value.GetPropertiesAsync(); + var deadLetterPropertiesTask = _deadletterQueueClient.Value.GetPropertiesAsync(); + await Task.WhenAll(queuePropertiesTask, deadLetterPropertiesTask).AnyContext(); sw.Stop(); _logger.LogTrace("Fetching stats took {Elapsed:g}", sw.Elapsed); return new QueueStats { - Queued = _queueReference.ApproximateMessageCount.GetValueOrDefault(), - Working = 0, - Deadletter = _deadletterQueueReference.ApproximateMessageCount.GetValueOrDefault(), + Queued = queuePropertiesTask.Result.Value.ApproximateMessagesCount, + Working = 0, // Azure Storage Queue does not expose in-flight message count + Deadletter = deadLetterPropertiesTask.Result.Value.ApproximateMessagesCount, Enqueued = _enqueuedCount, Dequeued = _dequeuedCount, Completed = _completedCount, Abandoned = _abandonedCount, Errors = _workerErrorCount, - Timeouts = 0 + Timeouts = 0 // Azure handles visibility timeout natively; client-side tracking not meaningful }; } protected override QueueStats GetMetricsQueueStats() { - if (_queueReference == null || _deadletterQueueReference == null || !_queueCreated) + if (!_queueCreated) return new QueueStats { Queued = 0, @@ -230,16 +361,16 @@ protected override QueueStats GetMetricsQueueStats() }; var sw = Stopwatch.StartNew(); - _queueReference.FetchAttributes(); - _deadletterQueueReference.FetchAttributes(); + var queueProperties = _queueClient.Value.GetProperties(); + var deadletterProperties = _deadletterQueueClient.Value.GetProperties(); sw.Stop(); _logger.LogTrace("Fetching stats took {Elapsed:g}", sw.Elapsed); return new QueueStats { - Queued = _queueReference.ApproximateMessageCount.GetValueOrDefault(), + Queued = queueProperties.Value.ApproximateMessagesCount, Working = 0, - Deadletter = _deadletterQueueReference.ApproximateMessageCount.GetValueOrDefault(), + Deadletter = deadletterProperties.Value.ApproximateMessagesCount, Enqueued = _enqueuedCount, Dequeued = _dequeuedCount, Completed = _completedCount, @@ -253,8 +384,8 @@ public override async Task DeleteQueueAsync() { var sw = Stopwatch.StartNew(); await Task.WhenAll( - _queueReference.DeleteIfExistsAsync(), - _deadletterQueueReference.DeleteIfExistsAsync() + _queueClient.Value.DeleteIfExistsAsync(), + _deadletterQueueClient.Value.DeleteIfExistsAsync() ).AnyContext(); _queueCreated = false; @@ -270,8 +401,7 @@ await Task.WhenAll( protected override void StartWorkingImpl(Func, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken) { - if (handler == null) - throw new ArgumentNullException(nameof(handler)); + ArgumentNullException.ThrowIfNull(handler); var linkedCancellationToken = GetLinkedDisposableCancellationTokenSource(cancellationToken); @@ -288,7 +418,10 @@ protected override void StartWorkingImpl(Func, CancellationToken, { queueEntry = await DequeueImplAsync(linkedCancellationToken.Token).AnyContext(); } - catch (OperationCanceledException) { } + catch (OperationCanceledException) + { + // Ignore cancellation + } if (linkedCancellationToken.IsCancellationRequested || queueEntry == null) continue; @@ -309,15 +442,37 @@ protected override void StartWorkingImpl(Func, CancellationToken, } } - _logger.LogTrace("Worker exiting: {QueueName} Cancel Requested: {IsCancellationRequested}", _queueReference.Name, linkedCancellationToken.IsCancellationRequested); + _logger.LogTrace("Worker exiting: {QueueName} Cancel Requested: {IsCancellationRequested}", _options.Name, linkedCancellationToken.IsCancellationRequested); }, linkedCancellationToken.Token).ContinueWith(t => linkedCancellationToken.Dispose()); } private static AzureStorageQueueEntry ToAzureEntryWithCheck(IQueueEntry queueEntry) { - if (!(queueEntry is AzureStorageQueueEntry azureQueueEntry)) + if (queueEntry is not AzureStorageQueueEntry azureQueueEntry) throw new ArgumentException($"Unknown entry type. Can only process entries of type '{nameof(AzureStorageQueueEntry)}'"); return azureQueueEntry; } } + +/// +/// Envelope for wrapping queue data with metadata (correlation id, properties). +/// Azure Storage Queue only supports a message body, so we serialize this entire envelope. +/// +internal record QueueMessageEnvelope where T : class +{ + /// + /// Correlation ID for distributed tracing + /// + public string CorrelationId { get; init; } + + /// + /// Custom properties/metadata + /// + public IDictionary Properties { get; init; } + + /// + /// The actual message payload + /// + public T Data { get; init; } +} diff --git a/src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs b/src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs index 3f2b4d0..b32a0c2 100644 --- a/src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs +++ b/src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs @@ -1,15 +1,29 @@ -using Microsoft.Azure.Storage.Queue; +using System; +using System.Collections.Generic; +using Azure.Storage.Queues.Models; namespace Foundatio.Queues; public class AzureStorageQueueEntry : QueueEntry where T : class { - public CloudQueueMessage UnderlyingMessage { get; } + public QueueMessage UnderlyingMessage { get; } - public AzureStorageQueueEntry(CloudQueueMessage message, T value, IQueue queue) - : base(message.Id, null, value, queue, message.InsertionTime.GetValueOrDefault().UtcDateTime, message.DequeueCount) - { + /// + /// The current pop receipt for this message. This gets updated after each + /// UpdateMessageAsync call (renew lock, abandon with retry delay). + /// + public string PopReceipt { get; internal set; } + public AzureStorageQueueEntry(QueueMessage message, string correlationId, IDictionary properties, T data, IQueue queue) + : base(message.MessageId, correlationId, data, queue, message.InsertedOn?.UtcDateTime ?? DateTime.MinValue, (int)message.DequeueCount) + { UnderlyingMessage = message; + PopReceipt = message.PopReceipt; + + if (properties != null) + { + foreach (var property in properties) + Properties[property.Key] = property.Value; + } } } diff --git a/src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs b/src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs index 03a2399..e6b66de 100644 --- a/src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs +++ b/src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs @@ -1,32 +1,134 @@ -using System; -using Microsoft.Azure.Storage.RetryPolicies; +using System; +using Azure.Core; namespace Foundatio.Queues; +/// +/// Compatibility mode for Azure Storage Queue message format. +/// +public enum AzureStorageQueueCompatibilityMode +{ + /// + /// Default mode: Uses message envelope for full metadata support. + /// Supports CorrelationId and Properties. New installations should use this mode. + /// + Default = 0, + + /// + /// Legacy mode: Raw message body with Base64 encoding for v11 SDK compatibility. + /// Use this for backward compatibility with existing queues that have messages + /// written with the v11 Microsoft.Azure.Storage.Queue SDK. + /// + Legacy = 1 +} + public class AzureStorageQueueOptions : SharedQueueOptions where T : class { public string ConnectionString { get; set; } - public IRetryPolicy RetryPolicy { get; set; } + + /// + /// The interval to wait between polling for new messages when the queue is empty. + /// public TimeSpan DequeueInterval { get; set; } = TimeSpan.FromSeconds(2); + + /// + /// A function that returns the delay before retrying a failed message based on the attempt number. + /// Default is exponential backoff with jitter: 2^attempt seconds + random 0-100ms. + /// + public Func RetryDelay { get; set; } = attempt => + TimeSpan.FromSeconds(Math.Pow(2, attempt)) + TimeSpan.FromMilliseconds(Random.Shared.Next(0, 100)); + + /// + /// Controls message format compatibility. + /// Default mode uses an envelope wrapper that supports CorrelationId and Properties. + /// Legacy mode is provided for backward compatibility with existing queues that have messages + /// written with the v11 Microsoft.Azure.Storage.Queue SDK (uses Base64 encoding and raw payload). + /// + /// Default: AzureStorageQueueCompatibilityMode.Default (envelope with metadata) + /// + /// Migration from v11 SDK: + /// + /// Set CompatibilityMode = Legacy to read existing v11 messages + /// Process all existing messages from the queue + /// Once queue is empty, switch to CompatibilityMode = Default + /// All new messages will use envelope format with metadata support + /// + /// + public AzureStorageQueueCompatibilityMode CompatibilityMode { get; set; } = AzureStorageQueueCompatibilityMode.Default; + + /// + /// Optional action to configure Azure SDK retry options. + /// Default Azure SDK retry settings: MaxRetries=3, Delay=0.8s, MaxDelay=1min, Mode=Exponential. + /// + /// + /// + /// options.ConfigureRetry = retry => + /// { + /// retry.MaxRetries = 5; + /// retry.Delay = TimeSpan.FromSeconds(1); + /// retry.Mode = RetryMode.Exponential; + /// }; + /// + /// + public Action ConfigureRetry { get; set; } } public class AzureStorageQueueOptionsBuilder : SharedQueueOptionsBuilder, AzureStorageQueueOptionsBuilder> where T : class { public AzureStorageQueueOptionsBuilder ConnectionString(string connectionString) { - Target.ConnectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); + ArgumentException.ThrowIfNullOrEmpty(connectionString); + + Target.ConnectionString = connectionString; return this; } - public AzureStorageQueueOptionsBuilder RetryPolicy(IRetryPolicy retryPolicy) + public AzureStorageQueueOptionsBuilder DequeueInterval(TimeSpan dequeueInterval) { - Target.RetryPolicy = retryPolicy ?? throw new ArgumentNullException(nameof(retryPolicy)); + Target.DequeueInterval = dequeueInterval; return this; } - public AzureStorageQueueOptionsBuilder DequeueInterval(TimeSpan dequeueInterval) + /// + /// Sets a custom retry delay function for failed messages. + /// + /// A function that takes the attempt number and returns the delay before the next retry. + public AzureStorageQueueOptionsBuilder RetryDelay(Func retryDelay) { - Target.DequeueInterval = dequeueInterval; + ArgumentNullException.ThrowIfNull(retryDelay); + + Target.RetryDelay = retryDelay; + return this; + } + + /// + /// Sets the message format compatibility mode. + /// See for migration guidance. + /// + public AzureStorageQueueOptionsBuilder CompatibilityMode(AzureStorageQueueCompatibilityMode compatibilityMode) + { + Target.CompatibilityMode = compatibilityMode; + return this; + } + + /// + /// Configures Azure SDK retry options for transient failure handling. + /// + /// Action to configure retry options. + /// + /// + /// .ConfigureRetry(retry => + /// { + /// retry.MaxRetries = 5; + /// retry.Delay = TimeSpan.FromSeconds(1); + /// }) + /// + /// + public AzureStorageQueueOptionsBuilder ConfigureRetry(Action configure) + { + ArgumentNullException.ThrowIfNull(configure); + + Target.ConfigureRetry = configure; return this; } } diff --git a/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs b/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs index fbe828b..23c1e11 100644 --- a/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs +++ b/src/Foundatio.AzureStorage/Storage/AzureFileStorage.cs @@ -1,52 +1,61 @@ -using System; +using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; -using Foundatio.Azure.Extensions; +using Azure; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; using Foundatio.Extensions; using Foundatio.Serializer; -using Microsoft.Azure.Storage; -using Microsoft.Azure.Storage.Blob; +using Foundatio.Utility; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; namespace Foundatio.Storage; -public class AzureFileStorage : IFileStorage +public class AzureFileStorage : IFileStorage, IHaveLogger, IHaveLoggerFactory, IHaveTimeProvider { - private readonly CloudBlobContainer _container; + private readonly BlobContainerClient _container; private readonly ISerializer _serializer; - protected readonly ILogger _logger; - protected readonly TimeProvider _timeProvider; + private readonly ILoggerFactory _loggerFactory; + private readonly ILogger _logger; + private readonly TimeProvider _timeProvider; public AzureFileStorage(AzureFileStorageOptions options) { - if (options == null) - throw new ArgumentNullException(nameof(options)); + ArgumentNullException.ThrowIfNull(options); _timeProvider = options.TimeProvider ?? TimeProvider.System; _serializer = options.Serializer ?? DefaultSerializer.Instance; - _logger = options.LoggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance; + _loggerFactory = options.LoggerFactory ?? NullLoggerFactory.Instance; + _logger = _loggerFactory.CreateLogger(GetType()); - var account = CloudStorageAccount.Parse(options.ConnectionString); - var client = account.CreateCloudBlobClient(); + var clientOptions = new BlobClientOptions(); + options.ConfigureRetry?.Invoke(clientOptions.Retry); - _container = client.GetContainerReference(options.ContainerName); + _container = new BlobContainerClient(options.ConnectionString, options.ContainerName, clientOptions); _logger.LogTrace("Checking if {Container} container exists", _container.Name); - bool created = _container.CreateIfNotExistsAsync().GetAwaiter().GetResult(); - if (created) - _logger.LogInformation("Created {Container}", _container.Name); + var response = _container.CreateIfNotExists(); + if (response != null) + _logger.LogDebug("Created {Container}", _container.Name); + else + _logger.LogDebug("{Container} already exists", _container.Name); } public AzureFileStorage(Builder config) - : this(config(new AzureFileStorageOptionsBuilder()).Build()) { } + : this(config(new AzureFileStorageOptionsBuilder()).Build()) + { + } ISerializer IHaveSerializer.Serializer => _serializer; - public CloudBlobContainer Container => _container; + ILogger IHaveLogger.Logger => _logger; + ILoggerFactory IHaveLoggerFactory.LoggerFactory => _loggerFactory; + TimeProvider IHaveTimeProvider.TimeProvider => _timeProvider; + public BlobContainerClient Container => _container; [Obsolete($"Use {nameof(GetFileStreamAsync)} with {nameof(StreamMode)} instead to define read or write behavior of stream")] public Task GetFileStreamAsync(string path, CancellationToken cancellationToken = default) @@ -54,140 +63,196 @@ public Task GetFileStreamAsync(string path, CancellationToken cancellati public async Task GetFileStreamAsync(string path, StreamMode streamMode, CancellationToken cancellationToken = default) { - if (String.IsNullOrEmpty(path)) - throw new ArgumentNullException(nameof(path)); + ArgumentException.ThrowIfNullOrEmpty(path); string normalizedPath = NormalizePath(path); _logger.LogTrace("Getting file stream for {Path}", normalizedPath); - var blockBlob = _container.GetBlockBlobReference(normalizedPath); + var blobClient = _container.GetBlobClient(normalizedPath); try { return streamMode switch { - StreamMode.Read => await blockBlob.OpenReadAsync(null, null, null, cancellationToken).AnyContext(), - StreamMode.Write => await blockBlob.OpenWriteAsync(null, null, null, cancellationToken).AnyContext(), + StreamMode.Read => await blobClient.OpenReadAsync(cancellationToken: cancellationToken).AnyContext(), + StreamMode.Write => await blobClient.OpenWriteAsync(overwrite: true, cancellationToken: cancellationToken).AnyContext(), _ => throw new NotSupportedException($"Stream mode {streamMode} is not supported.") }; } - catch (Microsoft.Azure.Storage.StorageException ex) when (ex is { RequestInformation.HttpStatusCode: 404 }) + catch (RequestFailedException ex) when (ex.Status is 404) { - _logger.LogDebug(ex, "Unable to get file stream for {Path}: File Not Found", normalizedPath); + _logger.LogDebug(ex, "[{Status}] Unable to get file stream for {Path}: File Not Found", ex.Status, normalizedPath); return null; } + catch (RequestFailedException ex) + { + _logger.LogError(ex, "[{Status}] Unable to get file stream for {Path}: {Message}", ex.Status, normalizedPath, ex.Message); + throw new StorageException("Unable to get file stream.", ex); + } } public async Task GetFileInfoAsync(string path) { - if (String.IsNullOrEmpty(path)) - throw new ArgumentNullException(nameof(path)); + ArgumentException.ThrowIfNullOrEmpty(path); string normalizedPath = NormalizePath(path); _logger.LogTrace("Getting file info for {Path}", normalizedPath); - var blob = _container.GetBlockBlobReference(normalizedPath); + var blobClient = _container.GetBlobClient(normalizedPath); try { - await blob.FetchAttributesAsync().AnyContext(); - return blob.ToFileInfo(); + var properties = await blobClient.GetPropertiesAsync().AnyContext(); + return ToFileInfo(normalizedPath, properties.Value); + } + catch (RequestFailedException ex) when (ex.Status is 404) + { + _logger.LogDebug(ex, "[{Status}] Unable to get file info for {Path}: File Not Found", ex.Status, normalizedPath); + return null; + } + catch (RequestFailedException ex) + { + _logger.LogError(ex, "[{Status}] Unable to get file info for {Path}: {Message}", ex.Status, normalizedPath, ex.Message); + return null; } catch (Exception ex) { _logger.LogError(ex, "Unable to get file info for {Path}: {Message}", normalizedPath, ex.Message); + return null; } - - return null; } - public Task ExistsAsync(string path) + public async Task ExistsAsync(string path) { - if (String.IsNullOrEmpty(path)) - throw new ArgumentNullException(nameof(path)); + ArgumentException.ThrowIfNullOrEmpty(path); string normalizedPath = NormalizePath(path); _logger.LogTrace("Checking if {Path} exists", normalizedPath); - var blockBlob = _container.GetBlockBlobReference(normalizedPath); - return blockBlob.ExistsAsync(); + var blobClient = _container.GetBlobClient(normalizedPath); + var response = await blobClient.ExistsAsync().AnyContext(); + return response.Value; } public async Task SaveFileAsync(string path, Stream stream, CancellationToken cancellationToken = default) { - if (String.IsNullOrEmpty(path)) - throw new ArgumentNullException(nameof(path)); - if (stream == null) - throw new ArgumentNullException(nameof(stream)); + ArgumentException.ThrowIfNullOrEmpty(path); + ArgumentNullException.ThrowIfNull(stream); string normalizedPath = NormalizePath(path); _logger.LogTrace("Saving {Path}", normalizedPath); - var blockBlob = _container.GetBlockBlobReference(normalizedPath); - await blockBlob.UploadFromStreamAsync(stream, null, null, null, cancellationToken).AnyContext(); - - return true; + try + { + var blobClient = _container.GetBlobClient(normalizedPath); + await blobClient.UploadAsync(stream, overwrite: true, cancellationToken: cancellationToken).AnyContext(); + return true; + } + catch (RequestFailedException ex) + { + _logger.LogError(ex, "[{Status}] Error saving {Path}: {Message}", ex.Status, normalizedPath, ex.Message); + return false; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error saving {Path}: {Message}", normalizedPath, ex.Message); + return false; + } } public async Task RenameFileAsync(string path, string newPath, CancellationToken cancellationToken = default) { - if (String.IsNullOrEmpty(path)) - throw new ArgumentNullException(nameof(path)); - if (String.IsNullOrEmpty(newPath)) - throw new ArgumentNullException(nameof(newPath)); + ArgumentException.ThrowIfNullOrEmpty(path); + ArgumentException.ThrowIfNullOrEmpty(newPath); string normalizedPath = NormalizePath(path); string normalizedNewPath = NormalizePath(newPath); - _logger.LogInformation("Renaming {Path} to {NewPath}", normalizedPath, normalizedNewPath); + _logger.LogDebug("Renaming {Path} to {NewPath}", normalizedPath, normalizedNewPath); - var oldBlob = _container.GetBlockBlobReference(normalizedPath); - if (!(await CopyFileAsync(normalizedPath, normalizedNewPath, cancellationToken).AnyContext())) + try { - _logger.LogError("Unable to rename {Path} to {NewPath}", normalizedPath, normalizedNewPath); - return false; - } + if (!await CopyFileAsync(normalizedPath, normalizedNewPath, cancellationToken).AnyContext()) + { + _logger.LogError("Unable to rename {Path} to {NewPath}", normalizedPath, normalizedNewPath); + return false; + } + + var oldBlob = _container.GetBlobClient(normalizedPath); + _logger.LogDebug("Deleting renamed {Path}", normalizedPath); + var deleteResponse = await oldBlob.DeleteIfExistsAsync(cancellationToken: cancellationToken).AnyContext(); + if (!deleteResponse.Value) + { + _logger.LogDebug("Unable to delete renamed {Path}", normalizedPath); + return false; + } - _logger.LogDebug("Deleting renamed {Path}", normalizedPath); - bool deleted = await oldBlob.DeleteIfExistsAsync(DeleteSnapshotsOption.None, null, null, null, cancellationToken).AnyContext(); - if (!deleted) + return true; + } + catch (RequestFailedException ex) { - _logger.LogDebug("Unable to delete renamed {Path}", normalizedPath); + _logger.LogError(ex, "[{Status}] Unable to rename {Path} to {NewPath}: {Message}", ex.Status, normalizedPath, normalizedNewPath, ex.Message); return false; } - - return true; } public async Task CopyFileAsync(string path, string targetPath, CancellationToken cancellationToken = default) { - if (String.IsNullOrEmpty(path)) - throw new ArgumentNullException(nameof(path)); - if (String.IsNullOrEmpty(targetPath)) - throw new ArgumentNullException(nameof(targetPath)); + ArgumentException.ThrowIfNullOrEmpty(path); + ArgumentException.ThrowIfNullOrEmpty(targetPath); string normalizedPath = NormalizePath(path); string normalizedTargetPath = NormalizePath(targetPath); - _logger.LogInformation("Copying {Path} to {TargetPath}", normalizedPath, normalizedTargetPath); + _logger.LogDebug("Copying {Path} to {TargetPath}", normalizedPath, normalizedTargetPath); - var oldBlob = _container.GetBlockBlobReference(normalizedPath); - var newBlob = _container.GetBlockBlobReference(normalizedTargetPath); + try + { + var sourceBlob = _container.GetBlobClient(normalizedPath); + var targetBlob = _container.GetBlobClient(normalizedTargetPath); + + var copyOperation = await targetBlob.StartCopyFromUriAsync(sourceBlob.Uri, cancellationToken: cancellationToken).AnyContext(); + + // Wait for copy operation to complete + await copyOperation.WaitForCompletionAsync(cancellationToken).ConfigureAwait(false); + if (!copyOperation.HasCompleted) + { + _logger.LogError("Copy operation did not complete for {Path} to {TargetPath}", normalizedPath, normalizedTargetPath); + return false; + } - await newBlob.StartCopyAsync(oldBlob, cancellationToken).AnyContext(); - while (newBlob.CopyState.Status == CopyStatus.Pending) - await _timeProvider.Delay(TimeSpan.FromMilliseconds(50), cancellationToken).AnyContext(); + // Check final status + var properties = await targetBlob.GetPropertiesAsync(cancellationToken: cancellationToken).AnyContext(); + if (properties.Value.CopyStatus == CopyStatus.Success) + return true; - return newBlob.CopyState.Status == CopyStatus.Success; + _logger.LogError("Copy operation failed for {Path} to {TargetPath}: {CopyStatus}", normalizedPath, normalizedTargetPath, properties.Value.CopyStatus); + return false; + } + catch (RequestFailedException ex) + { + _logger.LogError(ex, "[{Status}] Unable to copy {Path} to {TargetPath}: {Message}", ex.Status, normalizedPath, normalizedTargetPath, ex.Message); + return false; + } } - public Task DeleteFileAsync(string path, CancellationToken cancellationToken = default) + public async Task DeleteFileAsync(string path, CancellationToken cancellationToken = default) { - if (String.IsNullOrEmpty(path)) - throw new ArgumentNullException(nameof(path)); + ArgumentException.ThrowIfNullOrEmpty(path); string normalizedPath = NormalizePath(path); _logger.LogTrace("Deleting {Path}", normalizedPath); - var blockBlob = _container.GetBlockBlobReference(normalizedPath); - return blockBlob.DeleteIfExistsAsync(DeleteSnapshotsOption.None, null, null, null, cancellationToken); + try + { + var blobClient = _container.GetBlobClient(normalizedPath); + var response = await blobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).AnyContext(); + if (!response.Value) + _logger.LogDebug("Unable to delete {Path}: File not found", normalizedPath); + return response.Value; + } + catch (RequestFailedException ex) + { + _logger.LogError(ex, "[{Status}] Unable to delete {Path}: {Message}", ex.Status, normalizedPath, ex.Message); + return false; + } } public async Task DeleteFilesAsync(string searchPattern = null, CancellationToken cancellationToken = default) @@ -196,7 +261,8 @@ public async Task DeleteFilesAsync(string searchPattern = null, Cancellatio int count = 0; // TODO: We could batch this, but we should ensure the batch isn't thousands of files. - _logger.LogInformation("Deleting {FileCount} files matching {SearchPattern}", files.Count, searchPattern); + _logger.LogDebug("Deleting {FileCount} files matching {SearchPattern}", files.Count, searchPattern); + foreach (var file in files) { await DeleteFileAsync(file.Path, cancellationToken).AnyContext(); @@ -249,28 +315,25 @@ private async Task> GetFileListAsync(string searchPattern = null, var criteria = GetRequestCriteria(searchPattern); int totalLimit = limit.GetValueOrDefault(Int32.MaxValue) < Int32.MaxValue - ? skip.GetValueOrDefault() + limit.Value + ? skip.GetValueOrDefault() + limit.GetValueOrDefault() : Int32.MaxValue; - BlobContinuationToken continuationToken = null; - var blobs = new List(); - do - { - var listingResult = await _container.ListBlobsSegmentedAsync(criteria.Prefix, true, BlobListingDetails.Metadata, limit, continuationToken, null, null, cancellationToken).AnyContext(); - continuationToken = listingResult.ContinuationToken; + _logger.LogTrace("Getting file list: Prefix={Prefix} Pattern={Pattern} Limit={Limit}", criteria.Prefix, criteria.Pattern, totalLimit); - foreach (var blob in listingResult.Results.OfType()) + var blobs = new List(); + await foreach (var blobItem in _container.GetBlobsAsync(new GetBlobsOptions { Prefix = criteria.Prefix }, cancellationToken: cancellationToken)) + { + // TODO: Verify if it's possible to create empty folders in storage. If so, don't return them. + if (criteria.Pattern != null && !criteria.Pattern.IsMatch(blobItem.Name)) { - // TODO: Verify if it's possible to create empty folders in storage. If so, don't return them. - if (criteria.Pattern != null && !criteria.Pattern.IsMatch(blob.Name)) - { - _logger.LogTrace("Skipping {Path}: Doesn't match pattern", blob.Name); - continue; - } - - blobs.Add(blob); + _logger.LogTrace("Skipping {Path}: Doesn't match pattern", blobItem.Name); + continue; } - } while (continuationToken != null && blobs.Count < totalLimit); + + blobs.Add(ToFileInfo(blobItem)); + if (blobs.Count >= totalLimit) + break; + } if (skip.HasValue) blobs = blobs.Skip(skip.Value).ToList(); @@ -278,15 +341,37 @@ private async Task> GetFileListAsync(string searchPattern = null, if (limit.HasValue) blobs = blobs.Take(limit.Value).ToList(); - return blobs.Select(blob => blob.ToFileInfo()).ToList(); + return blobs; + } + + private static FileSpec ToFileInfo(BlobItem blob) + { + return new FileSpec + { + Path = blob.Name, + Size = blob.Properties.ContentLength ?? -1, + Created = blob.Properties.CreatedOn?.UtcDateTime ?? DateTime.MinValue, + Modified = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue + }; + } + + private static FileSpec ToFileInfo(string path, BlobProperties properties) + { + return new FileSpec + { + Path = path, + Size = properties.ContentLength, + Created = properties.CreatedOn.UtcDateTime, + Modified = properties.LastModified.UtcDateTime + }; } - private string NormalizePath(string path) + private static string NormalizePath(string path) { return path?.Replace('\\', '/'); } - private class SearchCriteria + private record SearchCriteria { public string Prefix { get; set; } public Regex Pattern { get; set; } @@ -308,7 +393,7 @@ private SearchCriteria GetRequestCriteria(string searchPattern) { patternRegex = new Regex($"^{Regex.Escape(normalizedSearchPattern).Replace("\\*", ".*?")}$"); int slashPos = normalizedSearchPattern.LastIndexOf('/'); - prefix = slashPos >= 0 ? normalizedSearchPattern.Substring(0, slashPos) : String.Empty; + prefix = slashPos >= 0 ? normalizedSearchPattern[..slashPos] : String.Empty; } return new SearchCriteria @@ -318,5 +403,7 @@ private SearchCriteria GetRequestCriteria(string searchPattern) }; } - public void Dispose() { } + public void Dispose() + { + } } diff --git a/src/Foundatio.AzureStorage/Storage/AzureFileStorageOptions.cs b/src/Foundatio.AzureStorage/Storage/AzureFileStorageOptions.cs index a11f1b7..d5e9dc3 100644 --- a/src/Foundatio.AzureStorage/Storage/AzureFileStorageOptions.cs +++ b/src/Foundatio.AzureStorage/Storage/AzureFileStorageOptions.cs @@ -1,4 +1,5 @@ using System; +using Azure.Core; namespace Foundatio.Storage; @@ -6,19 +7,60 @@ public class AzureFileStorageOptions : SharedOptions { public string ConnectionString { get; set; } public string ContainerName { get; set; } = "storage"; + + /// + /// Optional action to configure Azure SDK retry options. + /// Default Azure SDK retry settings: MaxRetries=3, Delay=0.8s, MaxDelay=1min, Mode=Exponential. + /// + /// + /// + /// options.ConfigureRetry = retry => + /// { + /// retry.MaxRetries = 5; + /// retry.Delay = TimeSpan.FromSeconds(1); + /// retry.Mode = RetryMode.Exponential; + /// }; + /// + /// + public Action ConfigureRetry { get; set; } } public class AzureFileStorageOptionsBuilder : SharedOptionsBuilder { public AzureFileStorageOptionsBuilder ConnectionString(string connectionString) { - Target.ConnectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString)); + ArgumentException.ThrowIfNullOrEmpty(connectionString); + + Target.ConnectionString = connectionString; return this; } public AzureFileStorageOptionsBuilder ContainerName(string containerName) { - Target.ContainerName = containerName ?? throw new ArgumentNullException(nameof(containerName)); + ArgumentException.ThrowIfNullOrEmpty(containerName); + + Target.ContainerName = containerName; + return this; + } + + /// + /// Configures Azure SDK retry options for transient failure handling. + /// + /// Action to configure retry options. + /// + /// + /// .ConfigureRetry(retry => + /// { + /// retry.MaxRetries = 5; + /// retry.Delay = TimeSpan.FromSeconds(1); + /// }) + /// + /// + public AzureFileStorageOptionsBuilder ConfigureRetry(Action configure) + { + ArgumentNullException.ThrowIfNull(configure); + + Target.ConfigureRetry = configure; return this; } } diff --git a/tests/Foundatio.AzureStorage.Tests/Queues/AzureStorageQueueTests.cs b/tests/Foundatio.AzureStorage.Tests/Queues/AzureStorageQueueTests.cs index 7a6d58c..d9d323c 100644 --- a/tests/Foundatio.AzureStorage.Tests/Queues/AzureStorageQueueTests.cs +++ b/tests/Foundatio.AzureStorage.Tests/Queues/AzureStorageQueueTests.cs @@ -3,7 +3,6 @@ using Foundatio.Queues; using Foundatio.Tests.Queue; using Foundatio.Tests.Utility; -using Microsoft.Azure.Storage.RetryPolicies; using Microsoft.Extensions.Logging; using Xunit; @@ -13,7 +12,9 @@ public class AzureStorageQueueTests : QueueTestBase { private readonly string _queueName = "foundatio-" + Guid.NewGuid().ToString("N").Substring(10); - public AzureStorageQueueTests(ITestOutputHelper output) : base(output) { } + public AzureStorageQueueTests(ITestOutputHelper output) : base(output) + { + } protected override IQueue GetQueue(int retries = 1, TimeSpan? workItemTimeout = null, TimeSpan? retryDelay = null, int[] retryMultipliers = null, int deadLetterMaxItems = 100, bool runQueueMaintenance = true, TimeProvider timeProvider = null) { @@ -26,7 +27,6 @@ protected override IQueue GetQueue(int retries = 1, TimeSpan? wo .ConnectionString(connectionString) .Name(_queueName) .Retries(retries) - .RetryPolicy(retries <= 0 ? new NoRetry() : new ExponentialRetry(retryDelay.GetValueOrDefault(TimeSpan.FromMinutes(1)), retries)) .WorkItemTimeout(workItemTimeout.GetValueOrDefault(TimeSpan.FromMinutes(5))) .DequeueInterval(TimeSpan.FromSeconds(1)) .MetricsPollingInterval(TimeSpan.Zero) @@ -40,6 +40,7 @@ protected override Task CleanupQueueAsync(IQueue queue) queue?.Dispose(); return Task.CompletedTask; } + [Fact] public override Task CanQueueAndDequeueWorkItemAsync() { @@ -52,7 +53,7 @@ public override Task CanQueueAndDequeueWorkItemWithDelayAsync() return base.CanQueueAndDequeueWorkItemWithDelayAsync(); } - [Fact(Skip = "Storage Queues don't support the round tripping of user headers for values like correlation id")] + [Fact] public override Task CanUseQueueOptionsAsync() { return base.CanUseQueueOptionsAsync(); @@ -118,7 +119,7 @@ public override Task CanHandleErrorInWorkerAsync() return base.CanHandleErrorInWorkerAsync(); } - [Fact(Skip = "CompleteAsync after timeout will not throw")] + [Fact(Skip = "Azure Storage Queue handles visibility timeout natively; no client-side auto-abandon")] public override Task WorkItemsWillTimeoutAsync() { return base.WorkItemsWillTimeoutAsync(); @@ -142,7 +143,7 @@ public override Task CanHaveMultipleQueueInstancesAsync() return base.CanHaveMultipleQueueInstancesAsync(); } - [Fact(Skip = "TODO: Retry delays are currently not applied to abandoned items")] + [Fact] public override Task CanDelayRetryAsync() { return base.CanDelayRetryAsync(); @@ -202,7 +203,7 @@ public override Task VerifyDelayedRetryAttemptsAsync() return base.VerifyDelayedRetryAttemptsAsync(); } - [Fact(Skip = "Storage Queues has no queue stats for abandoned, it just increments the queued count and decrements the working count. Only the entry attribute ApproximateNumberOfMessages is available.")] + [Fact(Skip = "Azure Storage Queue handles visibility timeout natively; no client-side auto-abandon")] public override Task CanHandleAutoAbandonInWorker() { return base.CanHandleAutoAbandonInWorker(); diff --git a/tests/Foundatio.AzureStorage.Tests/Queues/LegacyAzureStorageQueueTests.cs b/tests/Foundatio.AzureStorage.Tests/Queues/LegacyAzureStorageQueueTests.cs new file mode 100644 index 0000000..a505eb5 --- /dev/null +++ b/tests/Foundatio.AzureStorage.Tests/Queues/LegacyAzureStorageQueueTests.cs @@ -0,0 +1,212 @@ +using System; +using System.Threading.Tasks; +using Foundatio.Queues; +using Foundatio.Tests.Queue; +using Foundatio.Tests.Utility; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace Foundatio.Azure.Tests.Queue; + +public class LegacyAzureStorageQueueTests : QueueTestBase +{ + private readonly string _queueName = "foundatio-legacy-" + Guid.NewGuid().ToString("N").Substring(10); + + public LegacyAzureStorageQueueTests(ITestOutputHelper output) : base(output) + { + } + + protected override IQueue GetQueue(int retries = 1, TimeSpan? workItemTimeout = null, TimeSpan? retryDelay = null, int[] retryMultipliers = null, int deadLetterMaxItems = 100, bool runQueueMaintenance = true, TimeProvider timeProvider = null) + { + string connectionString = Configuration.GetConnectionString("AzureStorageConnectionString"); + if (String.IsNullOrEmpty(connectionString)) + return null; + + _logger.LogDebug("Queue Id: {Name}", _queueName); + return new AzureStorageQueue(o => o + .ConnectionString(connectionString) + .Name(_queueName) + .CompatibilityMode(AzureStorageQueueCompatibilityMode.Legacy) + .Retries(retries) + .WorkItemTimeout(workItemTimeout.GetValueOrDefault(TimeSpan.FromMinutes(5))) + .DequeueInterval(TimeSpan.FromSeconds(1)) + .MetricsPollingInterval(TimeSpan.Zero) + .TimeProvider(timeProvider) + .LoggerFactory(Log)); + } + + protected override Task CleanupQueueAsync(IQueue queue) + { + // Don't delete the queue, it's super expensive and will be cleaned up later. + queue?.Dispose(); + return Task.CompletedTask; + } + + [Fact] + public override Task CanQueueAndDequeueWorkItemAsync() + { + return base.CanQueueAndDequeueWorkItemAsync(); + } + + [Fact] + public override Task CanQueueAndDequeueWorkItemWithDelayAsync() + { + return base.CanQueueAndDequeueWorkItemWithDelayAsync(); + } + + [Fact(Skip = "Legacy mode does not support CorrelationId or Properties - only message body is persisted")] + public override Task CanUseQueueOptionsAsync() + { + return base.CanUseQueueOptionsAsync(); + } + + [Fact] + public override Task CanDiscardDuplicateQueueEntriesAsync() + { + return base.CanDiscardDuplicateQueueEntriesAsync(); + } + + [Fact] + public override Task CanDequeueWithCancelledTokenAsync() + { + return base.CanDequeueWithCancelledTokenAsync(); + } + + [Fact(Skip = "Dequeue Time takes forever")] + public override Task CanDequeueEfficientlyAsync() + { + return base.CanDequeueEfficientlyAsync(); + } + + [Fact(Skip = "Dequeue Time takes forever")] + public override Task CanResumeDequeueEfficientlyAsync() + { + return base.CanResumeDequeueEfficientlyAsync(); + } + + [Fact] + public override Task CanQueueAndDequeueMultipleWorkItemsAsync() + { + return base.CanQueueAndDequeueMultipleWorkItemsAsync(); + } + + [Fact] + public override Task WillNotWaitForItemAsync() + { + return base.WillNotWaitForItemAsync(); + } + + [Fact] + public override Task WillWaitForItemAsync() + { + return base.WillWaitForItemAsync(); + } + + [Fact] + public override Task DequeueWaitWillGetSignaledAsync() + { + return base.DequeueWaitWillGetSignaledAsync(); + } + + [Fact] + public override Task CanUseQueueWorkerAsync() + { + return base.CanUseQueueWorkerAsync(); + } + + [Fact] + public override Task CanHandleErrorInWorkerAsync() + { + return base.CanHandleErrorInWorkerAsync(); + } + + [Fact(Skip = "Azure Storage Queue handles visibility timeout natively; no client-side auto-abandon")] + public override Task WorkItemsWillTimeoutAsync() + { + return base.WorkItemsWillTimeoutAsync(); + } + + [Fact] + public override Task WorkItemsWillGetMovedToDeadletterAsync() + { + return base.WorkItemsWillGetMovedToDeadletterAsync(); + } + + [Fact] + public override Task CanAutoCompleteWorkerAsync() + { + return base.CanAutoCompleteWorkerAsync(); + } + + [Fact] + public override Task CanHaveMultipleQueueInstancesAsync() + { + return base.CanHaveMultipleQueueInstancesAsync(); + } + + [Fact] + public override Task CanDelayRetryAsync() + { + return base.CanDelayRetryAsync(); + } + + [Fact] + public override Task CanRunWorkItemWithMetricsAsync() + { + return base.CanRunWorkItemWithMetricsAsync(); + } + + [Fact] + public override Task CanRenewLockAsync() + { + return base.CanRenewLockAsync(); + } + + [Fact] + public override Task CanAbandonQueueEntryOnceAsync() + { + return base.CanAbandonQueueEntryOnceAsync(); + } + + [Fact] + public override Task CanCompleteQueueEntryOnceAsync() + { + return base.CanCompleteQueueEntryOnceAsync(); + } + + [Fact] + public override Task CanDequeueWithLockingAsync() + { + return base.CanDequeueWithLockingAsync(); + } + + [Fact] + public override Task CanHaveMultipleQueueInstancesWithLockingAsync() + { + return base.CanHaveMultipleQueueInstancesWithLockingAsync(); + } + + [Fact] + public override Task MaintainJobNotAbandon_NotWorkTimeOutEntry() + { + return base.MaintainJobNotAbandon_NotWorkTimeOutEntry(); + } + + [Fact] + public override Task VerifyRetryAttemptsAsync() + { + return base.VerifyRetryAttemptsAsync(); + } + + [Fact] + public override Task VerifyDelayedRetryAttemptsAsync() + { + return base.VerifyDelayedRetryAttemptsAsync(); + } + + [Fact(Skip = "Azure Storage Queue handles visibility timeout natively; no client-side auto-abandon")] + public override Task CanHandleAutoAbandonInWorker() + { + return base.CanHandleAutoAbandonInWorker(); + } +} diff --git a/tests/Foundatio.AzureStorage.Tests/Storage/AzureStorageTests.cs b/tests/Foundatio.AzureStorage.Tests/Storage/AzureStorageTests.cs index 956f63e..1728cb9 100644 --- a/tests/Foundatio.AzureStorage.Tests/Storage/AzureStorageTests.cs +++ b/tests/Foundatio.AzureStorage.Tests/Storage/AzureStorageTests.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.IO; using System.Threading.Tasks; @@ -11,7 +11,9 @@ namespace Foundatio.Azure.Tests.Storage; public class AzureStorageTests : FileStorageTestsBase { - public AzureStorageTests(ITestOutputHelper output) : base(output) { } + public AzureStorageTests(ITestOutputHelper output) : base(output) + { + } protected override IFileStorage GetStorage() { @@ -166,11 +168,14 @@ public virtual async Task WillNotReturnDirectoryInGetPagedFileListAsync() Assert.False(result.HasMore); Assert.Empty(result.Files); - var container = storage is AzureFileStorage azureFileStorage ? azureFileStorage.Container : null; + var azureFileStorage = Assert.IsType(storage); + var container = azureFileStorage.Container; Assert.NotNull(container); - var blockBlob = container.GetBlockBlobReference("EmptyFolder/"); - await blockBlob.UploadFromStreamAsync(new MemoryStream(), null, null, null, TestCancellationToken); + var blobClient = container.GetBlobClient("EmptyFolder/"); + Assert.NotNull(blobClient); + using var ms = new MemoryStream(); + await blobClient.UploadAsync(ms, TestCancellationToken); result = await storage.GetPagedFileListAsync(cancellationToken: TestCancellationToken); Assert.False(result.HasMore);