Skip to content

Commit 071bf23

Browse files
committed
Adds compatibility mode for Azure Storage Queue
Introduces a compatibility mode to handle legacy messages and enables support for CorrelationId and Properties. This change introduces a `CompatibilityMode` option to the `AzureStorageQueue` to support different message formats. The `Default` mode uses an envelope to wrap the message, preserving `CorrelationId` and `Properties`. The `Legacy` mode supports older messages serialized using the v11 SDK, ensuring backward compatibility during migration.
1 parent a789d51 commit 071bf23

File tree

5 files changed

+318
-31
lines changed

5 files changed

+318
-31
lines changed

src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public AzureStorageQueue(AzureStorageQueueOptions<T> options) : base(options)
3131
throw new ArgumentException("ConnectionString is required.");
3232

3333
var clientOptions = new QueueClientOptions();
34-
if (options.UseBase64Encoding)
34+
if (options.CompatibilityMode == AzureStorageQueueCompatibilityMode.Legacy)
3535
clientOptions.MessageEncoding = QueueMessageEncoding.Base64;
3636

3737
options.ConfigureRetry?.Invoke(clientOptions.Retry);
@@ -76,10 +76,26 @@ protected override async Task<string> EnqueueImplAsync(T data, QueueEntryOptions
7676

7777
Interlocked.Increment(ref _enqueuedCount);
7878

79-
// Note: CorrelationId and Properties from QueueEntryOptions are not persisted.
80-
// Azure Storage Queue only supports a message body. Wrapping in an envelope would
81-
// support these features but would break backward compatibility with existing messages.
82-
var messageBody = new BinaryData(_serializer.SerializeToBytes(data));
79+
byte[] messageBodyBytes;
80+
81+
if (_options.CompatibilityMode == AzureStorageQueueCompatibilityMode.Default)
82+
{
83+
// Wrap in envelope to preserve CorrelationId and Properties
84+
var envelope = new QueueMessageEnvelope<T>
85+
{
86+
CorrelationId = options.CorrelationId,
87+
Properties = options.Properties,
88+
Data = data
89+
};
90+
messageBodyBytes = _serializer.SerializeToBytes(envelope);
91+
}
92+
else
93+
{
94+
// Legacy mode: serialize data directly for backward compatibility
95+
messageBodyBytes = _serializer.SerializeToBytes(data);
96+
}
97+
98+
var messageBody = new BinaryData(messageBodyBytes);
8399
var response = await _queueClient.Value.SendMessageAsync(
84100
messageBody,
85101
visibilityTimeout: options.DeliveryDelay,
@@ -155,9 +171,25 @@ protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken
155171
message.MessageId, insertedOn, nowUtc, queueTime.TotalMilliseconds, linkedCancellationToken.IsCancellationRequested);
156172
Interlocked.Increment(ref _dequeuedCount);
157173

158-
// Deserialize the message body directly (no envelope wrapper for backward compatibility)
159-
var data = _serializer.Deserialize<T>(message.Body.ToArray());
160-
var entry = new AzureStorageQueueEntry<T>(message, data, this);
174+
T data;
175+
string correlationId = null;
176+
IDictionary<string, string> properties = null;
177+
178+
if (_options.CompatibilityMode == AzureStorageQueueCompatibilityMode.Default)
179+
{
180+
// Unwrap envelope to extract metadata
181+
var envelope = _serializer.Deserialize<QueueMessageEnvelope<T>>(message.Body.ToArray());
182+
data = envelope.Data;
183+
correlationId = envelope.CorrelationId;
184+
properties = envelope.Properties;
185+
}
186+
else
187+
{
188+
// Legacy mode: deserialize data directly (no envelope)
189+
data = _serializer.Deserialize<T>(message.Body.ToArray());
190+
}
191+
192+
var entry = new AzureStorageQueueEntry<T>(message, correlationId, properties, data, this);
161193

162194
await OnDequeuedAsync(entry).AnyContext();
163195
_logger.LogTrace("Dequeued message: {QueueEntryId}", message.MessageId);
@@ -402,3 +434,25 @@ private static AzureStorageQueueEntry<T> ToAzureEntryWithCheck(IQueueEntry<T> qu
402434
return azureQueueEntry;
403435
}
404436
}
437+
438+
/// <summary>
439+
/// Envelope for wrapping queue data with metadata (correlation id, properties).
440+
/// Azure Storage Queue only supports a message body, so we serialize this entire envelope.
441+
/// </summary>
442+
internal record QueueMessageEnvelope<T> where T : class
443+
{
444+
/// <summary>
445+
/// Correlation ID for distributed tracing
446+
/// </summary>
447+
public string CorrelationId { get; init; }
448+
449+
/// <summary>
450+
/// Custom properties/metadata
451+
/// </summary>
452+
public IDictionary<string, string> Properties { get; init; }
453+
454+
/// <summary>
455+
/// The actual message payload
456+
/// </summary>
457+
public T Data { get; init; }
458+
}

src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using Azure.Storage.Queues.Models;
34

45
namespace Foundatio.Queues;
@@ -13,10 +14,16 @@ public class AzureStorageQueueEntry<T> : QueueEntry<T> where T : class
1314
/// </summary>
1415
public string PopReceipt { get; internal set; }
1516

16-
public AzureStorageQueueEntry(QueueMessage message, T data, IQueue<T> queue)
17-
: base(message.MessageId, null, data, queue, message.InsertedOn?.UtcDateTime ?? DateTime.MinValue, (int)message.DequeueCount)
17+
public AzureStorageQueueEntry(QueueMessage message, string correlationId, IDictionary<string, string> properties, T data, IQueue<T> queue)
18+
: base(message.MessageId, correlationId, data, queue, message.InsertedOn?.UtcDateTime ?? DateTime.MinValue, (int)message.DequeueCount)
1819
{
1920
UnderlyingMessage = message;
2021
PopReceipt = message.PopReceipt;
22+
23+
if (properties != null)
24+
{
25+
foreach (var property in properties)
26+
Properties[property.Key] = property.Value;
27+
}
2128
}
2229
}

src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,25 @@
33

44
namespace Foundatio.Queues;
55

6+
/// <summary>
7+
/// Compatibility mode for Azure Storage Queue message format.
8+
/// </summary>
9+
public enum AzureStorageQueueCompatibilityMode
10+
{
11+
/// <summary>
12+
/// Default mode: Uses message envelope for full metadata support.
13+
/// Supports CorrelationId and Properties. New installations should use this mode.
14+
/// </summary>
15+
Default = 0,
16+
17+
/// <summary>
18+
/// Legacy mode: Raw message body with Base64 encoding for v11 SDK compatibility.
19+
/// Use this for backward compatibility with existing queues that have messages
20+
/// written with the v11 Microsoft.Azure.Storage.Queue SDK.
21+
/// </summary>
22+
Legacy = 1
23+
}
24+
625
public class AzureStorageQueueOptions<T> : SharedQueueOptions<T> where T : class
726
{
827
public string ConnectionString { get; set; }
@@ -20,28 +39,22 @@ public class AzureStorageQueueOptions<T> : SharedQueueOptions<T> where T : class
2039
TimeSpan.FromSeconds(Math.Pow(2, attempt)) + TimeSpan.FromMilliseconds(Random.Shared.Next(0, 100));
2140

2241
/// <summary>
23-
/// When true, messages are Base64-encoded for backward compatibility with the legacy
24-
/// Microsoft.Azure.Storage.Queue SDK (v11). The v11 SDK encoded all messages as Base64
25-
/// by default, while the v12 Azure.Storage.Queues SDK does not.
42+
/// Controls message format compatibility.
43+
/// Default mode uses an envelope wrapper that supports CorrelationId and Properties.
44+
/// Legacy mode is provided for backward compatibility with existing queues that have messages
45+
/// written with the v11 Microsoft.Azure.Storage.Queue SDK (uses Base64 encoding and raw payload).
2646
///
27-
/// <para><b>Default:</b> false (v12 behavior - no encoding)</para>
47+
/// <para><b>Default:</b> AzureStorageQueueCompatibilityMode.Default (envelope with metadata)</para>
2848
///
29-
/// <para><b>When to enable:</b> Only enable this if you have existing messages in your
30-
/// queue that were written using the v11 SDK and need to be read during migration.</para>
31-
///
32-
/// <para><b>Migration path:</b></para>
49+
/// <para><b>Migration from v11 SDK:</b></para>
3350
/// <list type="number">
34-
/// <item><description>Enable UseBase64Encoding=true to read existing v11 messages</description></item>
51+
/// <item><description>Set CompatibilityMode = Legacy to read existing v11 messages</description></item>
3552
/// <item><description>Process all existing messages from the queue</description></item>
36-
/// <item><description>Once queue is empty, disable UseBase64Encoding (set to false)</description></item>
37-
/// <item><description>All new messages will use raw encoding (v12 default)</description></item>
53+
/// <item><description>Once queue is empty, switch to CompatibilityMode = Default</description></item>
54+
/// <item><description>All new messages will use envelope format with metadata support</description></item>
3855
/// </list>
39-
///
40-
/// <para><b>Deprecation notice:</b> This option exists solely for migration purposes
41-
/// and may be removed in a future major version. Plan to migrate away from Base64
42-
/// encoding as soon as practical.</para>
4356
/// </summary>
44-
public bool UseBase64Encoding { get; set; }
57+
public AzureStorageQueueCompatibilityMode CompatibilityMode { get; set; } = AzureStorageQueueCompatibilityMode.Default;
4558

4659
/// <summary>
4760
/// Optional action to configure Azure SDK retry options.
@@ -89,12 +102,12 @@ public AzureStorageQueueOptionsBuilder<T> RetryDelay(Func<int, TimeSpan> retryDe
89102
}
90103

91104
/// <summary>
92-
/// Enables Base64 message encoding for backward compatibility with the legacy v11 SDK.
93-
/// See <see cref="AzureStorageQueueOptions{T}.UseBase64Encoding"/> for migration guidance.
105+
/// Sets the message format compatibility mode.
106+
/// See <see cref="AzureStorageQueueOptions{T}.CompatibilityMode"/> for migration guidance.
94107
/// </summary>
95-
public AzureStorageQueueOptionsBuilder<T> UseBase64Encoding(bool useBase64Encoding = true)
108+
public AzureStorageQueueOptionsBuilder<T> CompatibilityMode(AzureStorageQueueCompatibilityMode compatibilityMode)
96109
{
97-
Target.UseBase64Encoding = useBase64Encoding;
110+
Target.CompatibilityMode = compatibilityMode;
98111
return this;
99112
}
100113

tests/Foundatio.AzureStorage.Tests/Queues/AzureStorageQueueTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public override Task CanQueueAndDequeueWorkItemWithDelayAsync()
5454
return base.CanQueueAndDequeueWorkItemWithDelayAsync();
5555
}
5656

57-
[Fact(Skip = "Azure Storage Queue does not support CorrelationId or Properties - only message body is persisted")]
57+
[Fact]
5858
public override Task CanUseQueueOptionsAsync()
5959
{
6060
return base.CanUseQueueOptionsAsync();

0 commit comments

Comments
 (0)