diff --git a/Test/DurableTask.AzureStorage.Tests/QueueClientEncodingStrategyIntegrationTests.cs b/Test/DurableTask.AzureStorage.Tests/QueueClientEncodingStrategyIntegrationTests.cs new file mode 100644 index 000000000..42e0f00a0 --- /dev/null +++ b/Test/DurableTask.AzureStorage.Tests/QueueClientEncodingStrategyIntegrationTests.cs @@ -0,0 +1,316 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureStorage.Tests +{ + using DurableTask.AzureStorage; + using DurableTask.Core; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using System; + using System.Runtime.Serialization; + using System.Threading.Tasks; + + [TestClass] + public class QueueClientMessageEncodingIntegrationTests + { + private const string TestConnectionString = "UseDevelopmentStorage=true"; + + [TestMethod] + // Verifies that messages sent with Base64 encoding can be processed by a worker with UTF8 encoding + public async Task CrossEncodingCompatibility_Base64ToUtf8() + { + string testName = "Base64ToUtf8"; + string input = "世界! 🌍 Test with émojis and spéciål chàracters: ñáéíóúü"; + + // Create service with Base64 encoding to send messages + var base64Settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageAccountClientProvider = new StorageAccountClientProvider(TestConnectionString), + QueueClientMessageEncoding = QueueClientMessageEncoding.Base64, + }; + + var base64Service = new AzureStorageOrchestrationService(base64Settings); + await base64Service.CreateIfNotExistsAsync(); + // DON'T start the service - this prevents the dequeue loop from running + + try + { + // Create orchestration instance with Base64 encoding + var base64Client = new TaskHubClient(base64Service); + var instance = await base64Client.CreateOrchestrationInstanceAsync(typeof(HelloOrchestrator), input); + + // Create worker with UTF8 encoding to process the message + var utf8Settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageAccountClientProvider = new StorageAccountClientProvider(TestConnectionString), + QueueClientMessageEncoding = QueueClientMessageEncoding.UTF8, + }; + + var utf8Service = new AzureStorageOrchestrationService(utf8Settings); + var utf8Client = new TaskHubClient(utf8Service); + var worker = new TaskHubWorker(utf8Service); + worker.AddTaskOrchestrations(typeof(HelloOrchestrator)); + worker.AddTaskActivities(typeof(Hello)); + + await worker.StartAsync(); + + try + { + // Wait for the orchestration to complete using UTF8 worker + var state = await utf8Client.WaitForOrchestrationAsync(instance, TimeSpan.FromSeconds(60)); + + // Verify UTF8 worker successfully processed the Base64 encoded message + Assert.IsNotNull(state); + Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus); + Assert.AreEqual($"\"Hello, {input}!\"", state.Output); + } + finally + { + await worker.StopAsync(); + } + } + finally + { + await base64Service.DeleteAsync(); + } + } + + [TestMethod] + // Verifies that messages sent with UTF8 encoding can be processed by a worker with Base64 encoding + public async Task CrossEncodingCompatibility_Utf8ToBase64() + { + string testName = "Utf8ToBase64test0"; + string input = "世界! 🌍 Test with émojis and spéciål chàracters: ñáéíóúü"; + + // Create service with UTF8 encoding to send messages + var utf8Settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageAccountClientProvider = new StorageAccountClientProvider(TestConnectionString), + QueueClientMessageEncoding = QueueClientMessageEncoding.UTF8, + }; + + var utf8Service = new AzureStorageOrchestrationService(utf8Settings); + await utf8Service.CreateIfNotExistsAsync(); + // DON'T start the service - this prevents the dequeue loop from running + + try + { + // Create orchestration instance with UTF8 encoding + var utf8Client = new TaskHubClient(utf8Service); + var instance = await utf8Client.CreateOrchestrationInstanceAsync(typeof(HelloOrchestrator), input); + + // Create worker with Base64 encoding to process the message + var base64Settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageAccountClientProvider = new StorageAccountClientProvider(TestConnectionString), + QueueClientMessageEncoding = QueueClientMessageEncoding.Base64, + }; + + var base64Service = new AzureStorageOrchestrationService(base64Settings); + var base64Client = new TaskHubClient(base64Service); + var worker = new TaskHubWorker(base64Service); + worker.AddTaskOrchestrations(typeof(HelloOrchestrator)); + worker.AddTaskActivities(typeof(Hello)); + + await worker.StartAsync(); + + try + { + // Wait for the orchestration to complete using Base64 worker + var state = await base64Client.WaitForOrchestrationAsync(instance, TimeSpan.FromSeconds(60)); + + // Verify Base64 worker successfully processed the UTF8 encoded message + Assert.IsNotNull(state); + Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus); + Assert.AreEqual($"\"Hello, {input}!\"", state.Output); + } + finally + { + await worker.StopAsync(); + } + } + finally + { + await utf8Service.DeleteAsync(); + } + } + + [TestMethod] + // Verifies that messages sent with Base64 encoding can be processed by a worker with Base64 encoding + public async Task SameEncodingStrategy_Base64ToBase64() + { + string testName = "Base64ToBase64"; + string input = "世界! 🌍 Test with émojis and spéciål chàracters: ñáéíóúü"; + + var base64Settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageAccountClientProvider = new StorageAccountClientProvider(TestConnectionString), + QueueClientMessageEncoding = QueueClientMessageEncoding.Base64, + }; + + var base64Service = new AzureStorageOrchestrationService(base64Settings); + + try + { + var base64Client = new TaskHubClient(base64Service); + var worker = new TaskHubWorker(base64Service); + worker.AddTaskOrchestrations(typeof(HelloOrchestrator)); + worker.AddTaskActivities(typeof(Hello)); + + await worker.StartAsync(); + + try + { + var instance = await base64Client.CreateOrchestrationInstanceAsync(typeof(HelloOrchestrator), input); + var state = await base64Client.WaitForOrchestrationAsync(instance, TimeSpan.FromSeconds(60)); + + // Verify Base64 worker successfully processed Base64 encoded message + Assert.IsNotNull(state); + Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus); + Assert.AreEqual($"\"Hello, {input}!\"", state.Output); + } + finally + { + await worker.StopAsync(); + } + } + finally + { + await base64Service.DeleteAsync(); + } + } + + [TestMethod] + // Verifies that messages sent with UTF8 encoding can be processed by a worker with UTF8 encoding + public async Task SameEncodingStrategy_Utf8ToUtf8() + { + string testName = "Utf8ToUtf8"; + string input = "世界! 🌍 Test with émojis and spéciål chàracters: ñáéíóúü"; + + var utf8Settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageAccountClientProvider = new StorageAccountClientProvider(TestConnectionString), + QueueClientMessageEncoding = QueueClientMessageEncoding.UTF8, + }; + + var utf8Service = new AzureStorageOrchestrationService(utf8Settings); + + try + { + var utf8Client = new TaskHubClient(utf8Service); + var worker = new TaskHubWorker(utf8Service); + worker.AddTaskOrchestrations(typeof(HelloOrchestrator)); + worker.AddTaskActivities(typeof(Hello)); + + await worker.StartAsync(); + + try + { + var instance = await utf8Client.CreateOrchestrationInstanceAsync(typeof(HelloOrchestrator), input); + var state = await utf8Client.WaitForOrchestrationAsync(instance, TimeSpan.FromSeconds(60)); + + // Verify UTF8 worker successfully processed UTF8 encoded message + Assert.IsNotNull(state); + Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus); + Assert.AreEqual($"\"Hello, {input}!\"", state.Output); + } + finally + { + await worker.StopAsync(); + } + } + finally + { + await utf8Service.DeleteAsync(); + } + } + + [TestMethod] + // Verifies that Base64 encoding can handle non-UTF8 characters like 0xFFFE (Byte Order Mark) + public async Task Base64Encodig_HandlesNonUtf8Characters() + { + string testName = "Base64WithUtf8Chars"; + // Create a string with non-UTF8 characters including 0xFFFE (Byte Order Mark) + string input = "Normal text " + (char)0xFFFE + " with BOM and " + (char)0xFFFF + " other invalid chars"; + + var base64Settings = new AzureStorageOrchestrationServiceSettings + { + TaskHubName = testName, + StorageAccountClientProvider = new StorageAccountClientProvider(TestConnectionString), + QueueClientMessageEncoding = QueueClientMessageEncoding.Base64, + }; + + var base64Service = new AzureStorageOrchestrationService(base64Settings); + + try + { + var base64Client = new TaskHubClient(base64Service); + var worker = new TaskHubWorker(base64Service); + worker.AddTaskOrchestrations(typeof(HelloOrchestrator)); + worker.AddTaskActivities(typeof(Hello)); + + await worker.StartAsync(); + + try + { + var instance = await base64Client.CreateOrchestrationInstanceAsync(typeof(HelloOrchestrator), input); + var state = await base64Client.WaitForOrchestrationAsync(instance, TimeSpan.FromSeconds(60)); + + // Verify Base64 encoding successfully handled non-UTF8 characters + Assert.IsNotNull(state); + Assert.AreEqual(OrchestrationStatus.Completed, state.OrchestrationStatus); + Assert.AreEqual($"\"Hello, {input}!\"", state.Output); + } + finally + { + await worker.StopAsync(); + } + } + finally + { + await base64Service.DeleteAsync(); + } + } + + // Test orchestrator and activity + [KnownType(typeof(Hello))] + internal class HelloOrchestrator : TaskOrchestration + { + public override async Task RunTask(OrchestrationContext context, string input) + { + // Wait for 5 seconds before executing the activity (shorter for faster tests) + // await context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(5), null); + return await context.ScheduleTask(typeof(Hello), input); + } + } + + internal class Hello : TaskActivity + { + protected override string Execute(TaskContext context, string input) + { + if (string.IsNullOrEmpty(input)) + { + throw new ArgumentNullException(nameof(input)); + } + + return $"Hello, {input}!"; + } + } + } +} \ No newline at end of file diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index a5dd568b2..48e653868 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -288,5 +288,11 @@ internal LogHelper Logger /// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true. /// public bool UseSeparateQueueForEntityWorkItems { get; set; } = false; + + /// + /// Gets or sets the encoding strategy used for Azure Storage Queue messages. + /// The default is . + /// + public QueueClientMessageEncoding QueueClientMessageEncoding { get; set; } = QueueClientMessageEncoding.UTF8; } } diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index a1fc796cf..ff48d507f 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -154,18 +154,45 @@ internal static bool TryGetLargeMessageReference(string messagePayload, out Uri public async Task DeserializeQueueMessageAsync(QueueMessage queueMessage, string queueName, CancellationToken cancellationToken = default) { - // TODO: Deserialize with Stream? - byte[] body = queueMessage.Body.ToArray(); - MessageData envelope; + // No matter what the queue client encoding is (Base64 or UTF8), + // we should always successfully get the string here. + string bodyAsString = queueMessage.Body.ToString(); + MessageData envelope = null; + try { - envelope = this.DeserializeMessageData(Encoding.UTF8.GetString(body)); + // Check if the message starts with '{' which indicates it's a JSON message + // If so, deserialize it directly. Otherwise, try Base64 decoding strategy. + if (bodyAsString.StartsWith("{")) + { + envelope = this.DeserializeMessageData(bodyAsString); + } + else + { + // The message we got is not a valid json message (doesn't start with '{'). + // This could happen in the case where our queue client is UTF8 encoding + // while receiving a message sent by a Base64 queue client. + // So we try to re-decode it as Base64. + byte[] decodedBytes = Convert.FromBase64String(bodyAsString); + string decodedMessage = Encoding.UTF8.GetString(decodedBytes); + envelope = this.DeserializeMessageData(decodedMessage); + } } - catch(JsonReaderException) + catch (Exception ex) + { + // Note: For a Base64 queue client, it is supposed to always receive a Base64-encoded queue message. + // Becasue error handling for the case where a Base64 queue client receives a UTF8 message is implemented in AzureStorageClient.cs. + // This exception catch block handles any other unexpected errors during deserialization. + this.azureStorageClient.Settings.Logger.GeneralWarning( + this.azureStorageClient.QueueAccountName, + this.azureStorageClient.Settings.TaskHubName, + $"Failed to process message. MessageId: {queueMessage.MessageId}, Error: {ex.Message}"); + throw; + } + + if (envelope == null) { - // This catch block is a hotfix and better implementation might be needed in future. - // DTFx.AzureStorage 1.x and 2.x use different encoding methods. Adding this line to enable forward compatibility. - envelope = this.DeserializeMessageData(Encoding.UTF8.GetString(Convert.FromBase64String(Encoding.UTF8.GetString(body)))); + throw new InvalidOperationException($"Failed to deserialize message {queueMessage.MessageId}"); } if (!string.IsNullOrEmpty(envelope.CompressedBlobName)) @@ -177,7 +204,7 @@ public async Task DeserializeQueueMessageAsync(QueueMessage queueMe } else { - envelope.TotalMessageSizeBytes = body.Length; + envelope.TotalMessageSizeBytes = Encoding.UTF8.GetByteCount(bodyAsString); } envelope.OriginalQueueMessage = queueMessage; diff --git a/src/DurableTask.AzureStorage/QueueClientEncodingStrategy.cs b/src/DurableTask.AzureStorage/QueueClientEncodingStrategy.cs new file mode 100644 index 000000000..4516ea7c7 --- /dev/null +++ b/src/DurableTask.AzureStorage/QueueClientEncodingStrategy.cs @@ -0,0 +1,36 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +using Azure.Storage.Queues; + +namespace DurableTask.AzureStorage +{ + /// + /// Specifies the encoding strategy used for Azure Storage Queue messages. + /// This enum maps to the Azure Storage SDK's values. + /// + public enum QueueClientMessageEncoding + { + /// + /// Use UTF8 encoding for queue messages. Maps to . + /// + UTF8 = 0, + + /// + /// Use Base64 encoding for queue messages. Maps to . + /// This provides compatibility with older versions and may be required in some scenarios + /// where UTF8 encoding is not supported. + /// + Base64 = 1, + } +} diff --git a/src/DurableTask.AzureStorage/Storage/AzureStorageClient.cs b/src/DurableTask.AzureStorage/Storage/AzureStorageClient.cs index 9e459fcc7..76b8a2678 100644 --- a/src/DurableTask.AzureStorage/Storage/AzureStorageClient.cs +++ b/src/DurableTask.AzureStorage/Storage/AzureStorageClient.cs @@ -14,6 +14,7 @@ namespace DurableTask.AzureStorage.Storage { using System; + using System.Text; using Azure.Core; using Azure.Data.Tables; using Azure.Storage.Blobs; @@ -46,7 +47,7 @@ public AzureStorageClient(AzureStorageOrchestrationServiceSettings settings) var timeoutPolicy = new LeaseTimeoutHttpPipelinePolicy(this.Settings.LeaseRenewInterval); var monitoringPolicy = new MonitoringHttpPipelinePolicy(this.Stats); - this.queueClient = CreateClient(settings.StorageAccountClientProvider.Queue, ConfigureClientPolicies); + this.queueClient = CreateClient(settings.StorageAccountClientProvider.Queue, ConfigureQueueClientPolicies); if (settings.HasTrackingStoreStorageAccount) { this.blobClient = CreateClient(settings.TrackingServiceClientProvider!.Blob, ConfigureClientPolicies); @@ -64,6 +65,72 @@ void ConfigureClientPolicies(TClientOptions options) where TClie options.AddPolicy(timeoutPolicy!, HttpPipelinePosition.PerCall); options.AddPolicy(monitoringPolicy!, HttpPipelinePosition.PerRetry); } + + void ConfigureQueueClientPolicies(QueueClientOptions options) + { + // Configure message encoding based on settings + options.MessageEncoding = this.Settings.QueueClientMessageEncoding switch + { + QueueClientMessageEncoding.UTF8 => QueueMessageEncoding.None, + QueueClientMessageEncoding.Base64 => QueueMessageEncoding.Base64, + _ => throw new ArgumentException($"Unsupported encoding strategy: {this.Settings.QueueClientMessageEncoding}") + }; + + // Base64-encoded clients will fail to decode messages sent in UTF-8 format. + // This handler catches decoding failures and update the message with its the original content, + // so that the client can successfully process it on the next attempt. + if (this.Settings.QueueClientMessageEncoding == QueueClientMessageEncoding.Base64) + { + options.MessageDecodingFailed += async (QueueMessageDecodingFailedEventArgs args) => + { + this.Settings.Logger.GeneralWarning( + this.QueueAccountName, + this.Settings.TaskHubName, + $"Base64-encoded queue client failed to decode message with ID: {args.ReceivedMessage.MessageId}. " + + "The message appears to have been originally sent using UTF-8 encoding. " + + "Will attempt to re-encode the message content as Base64 and update the queue message in-place " + + "so it can be successfully processed on the next attempt." + ); + + if (args.ReceivedMessage != null) + { + var queueMessage = args.ReceivedMessage; + + try + { + // Get the raw message content and update the message. + string originalJson = Encoding.UTF8.GetString(queueMessage.Body.ToArray()); + + // Update the message in the queue with the Base64-encoded body + if (args.IsRunningSynchronously) + { + args.Queue.UpdateMessage( + queueMessage.MessageId, + queueMessage.PopReceipt, + originalJson, + TimeSpan.FromSeconds(0)); + } + else + { + await args.Queue.UpdateMessageAsync( + queueMessage.MessageId, + queueMessage.PopReceipt, + originalJson, + TimeSpan.FromSeconds(0)); + } + } + catch (Exception ex) + { + // If re-encoding or update fails, rethrow the error to be handled upstream + throw new InvalidOperationException( + $"Failed to re-encode and update UTF-8 message as Base64. MessageId: {queueMessage.MessageId}", ex); + } + } + }; + } + + ConfigureClientPolicies(options); + } } public AzureStorageOrchestrationServiceSettings Settings { get; }