diff --git a/examples/201-dotnet-serverless-custom-handler/Program.cs b/examples/201-dotnet-serverless-custom-handler/Program.cs index 743ecac55..1ca961bab 100644 --- a/examples/201-dotnet-serverless-custom-handler/Program.cs +++ b/examples/201-dotnet-serverless-custom-handler/Program.cs @@ -47,7 +47,7 @@ public MyHandler( public string StepName { get; } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { /* ... your custom ... @@ -64,6 +64,6 @@ public MyHandler( // Remove this - here only to avoid build errors await Task.Delay(0, cancellationToken).ConfigureAwait(false); - return (true, pipeline); + return (ReturnType.Success, pipeline); } } diff --git a/examples/202-dotnet-custom-handler-as-a-service/MyHandler.cs b/examples/202-dotnet-custom-handler-as-a-service/MyHandler.cs index 5ebfbe948..8f0dff26d 100644 --- a/examples/202-dotnet-custom-handler-as-a-service/MyHandler.cs +++ b/examples/202-dotnet-custom-handler-as-a-service/MyHandler.cs @@ -38,7 +38,7 @@ public Task StopAsync(CancellationToken cancellationToken = default) } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default) + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default) { /* ... your custom ... * ... handler ... @@ -49,6 +49,6 @@ public Task StopAsync(CancellationToken cancellationToken = default) // Remove this - here only to avoid build errors await Task.Delay(0, cancellationToken).ConfigureAwait(false); - return (true, pipeline); + return (ReturnType.Success, pipeline); } } diff --git a/extensions/Anthropic/Client/RawAnthropicClient.cs b/extensions/Anthropic/Client/RawAnthropicClient.cs index 7e23e3783..d24945e8c 100644 --- a/extensions/Anthropic/Client/RawAnthropicClient.cs +++ b/extensions/Anthropic/Client/RawAnthropicClient.cs @@ -64,7 +64,8 @@ internal async IAsyncEnumerable CallClaudeStreamingAsy if (!response.IsSuccessStatusCode) { var responseError = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); - throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}"); + throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}", + isTransient: response.StatusCode.IsTransientError()); } var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); diff --git a/extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs b/extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs index 49f3c2a7f..98b6df6cf 100644 --- a/extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs +++ b/extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs @@ -58,12 +58,19 @@ public AzureAIDocIntelEngine( /// public async Task ExtractTextFromImageAsync(Stream imageContent, CancellationToken cancellationToken = default) { - // Start the OCR operation - var operation = await this._recognizerClient.AnalyzeDocumentAsync(WaitUntil.Completed, "prebuilt-read", imageContent, cancellationToken: cancellationToken).ConfigureAwait(false); + try + { + // Start the OCR operation + var operation = await this._recognizerClient.AnalyzeDocumentAsync(WaitUntil.Completed, "prebuilt-read", imageContent, cancellationToken: cancellationToken).ConfigureAwait(false); - // Wait for the result - Response operationResponse = await operation.WaitForCompletionAsync(cancellationToken).ConfigureAwait(false); + // Wait for the result + Response operationResponse = await operation.WaitForCompletionAsync(cancellationToken).ConfigureAwait(false); - return operationResponse.Value.Content; + return operationResponse.Value.Content; + } + catch (RequestFailedException e) + { + throw new AzureAIDocIntelException(e.Message, e, isTransient: HttpErrors.IsTransientError(e.Status)); + } } } diff --git a/extensions/AzureAIDocIntel/AzureAIDocIntelException.cs b/extensions/AzureAIDocIntel/AzureAIDocIntelException.cs new file mode 100644 index 000000000..0df68ee16 --- /dev/null +++ b/extensions/AzureAIDocIntel/AzureAIDocIntelException.cs @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; + +namespace Microsoft.KernelMemory.DataFormats.AzureAIDocIntel; + +public class AzureAIDocIntelException : KernelMemoryException +{ + /// + public AzureAIDocIntelException(bool? isTransient = null) + { + this.IsTransient = isTransient; + } + + /// + public AzureAIDocIntelException(string message, bool? isTransient = null) : base(message) + { + this.IsTransient = isTransient; + } + + /// + public AzureAIDocIntelException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException) + { + this.IsTransient = isTransient; + } +} diff --git a/extensions/AzureAISearch/AzureAISearch/AzureAISearchMemoryException.cs b/extensions/AzureAISearch/AzureAISearch/AzureAISearchMemoryException.cs index f63ccc594..d91955c1b 100644 --- a/extensions/AzureAISearch/AzureAISearch/AzureAISearchMemoryException.cs +++ b/extensions/AzureAISearch/AzureAISearch/AzureAISearchMemoryException.cs @@ -7,17 +7,21 @@ namespace Microsoft.KernelMemory.MemoryDb.AzureAISearch; public class AzureAISearchMemoryException : KernelMemoryException { /// - public AzureAISearchMemoryException() + public AzureAISearchMemoryException(bool? isTransient = null) { + this.IsTransient = isTransient; } /// - public AzureAISearchMemoryException(string? message) : base(message) + public AzureAISearchMemoryException(string message, bool? isTransient = null) : base(message) { + this.IsTransient = isTransient; } /// - public AzureAISearchMemoryException(string? message, Exception? innerException) : base(message, innerException) + public AzureAISearchMemoryException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException) { + this.IsTransient = isTransient; } } + diff --git a/extensions/AzureAISearch/AzureAISearch/Internals/MemoryDbSchema.cs b/extensions/AzureAISearch/AzureAISearch/Internals/MemoryDbSchema.cs index a5f8522bc..f06245dfe 100644 --- a/extensions/AzureAISearch/AzureAISearch/Internals/MemoryDbSchema.cs +++ b/extensions/AzureAISearch/AzureAISearch/Internals/MemoryDbSchema.cs @@ -13,41 +13,41 @@ public void Validate(bool vectorSizeRequired = false) { if (this.Fields.Count == 0) { - throw new KernelMemoryException("The schema is empty"); + throw new AzureAISearchMemoryException("The schema is empty", isTransient: false); } if (this.Fields.All(x => x.Type != MemoryDbField.FieldType.Vector)) { - throw new KernelMemoryException("The schema doesn't contain a vector field"); + throw new AzureAISearchMemoryException("The schema doesn't contain a vector field", isTransient: false); } int keys = this.Fields.Count(x => x.IsKey); switch (keys) { case 0: - throw new KernelMemoryException("The schema doesn't contain a key field"); + throw new AzureAISearchMemoryException("The schema doesn't contain a key field", isTransient: false); case > 1: - throw new KernelMemoryException("The schema cannot contain more than one key"); + throw new AzureAISearchMemoryException("The schema cannot contain more than one key", isTransient: false); } if (vectorSizeRequired && this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Vector, VectorSize: 0 })) { - throw new KernelMemoryException("Vector fields must have a size greater than zero defined"); + throw new AzureAISearchMemoryException("Vector fields must have a size greater than zero defined", isTransient: false); } if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Bool, IsKey: true })) { - throw new KernelMemoryException("Boolean fields cannot be used as unique keys"); + throw new AzureAISearchMemoryException("Boolean fields cannot be used as unique keys", isTransient: false); } if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.ListOfStrings, IsKey: true })) { - throw new KernelMemoryException("Collection fields cannot be used as unique keys"); + throw new AzureAISearchMemoryException("Collection fields cannot be used as unique keys", isTransient: false); } if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Vector, IsKey: true })) { - throw new KernelMemoryException("Vector fields cannot be used as unique keys"); + throw new AzureAISearchMemoryException("Vector fields cannot be used as unique keys", isTransient: false); } } } diff --git a/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAIException.cs b/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAIException.cs new file mode 100644 index 000000000..1f67ebc01 --- /dev/null +++ b/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAIException.cs @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; + +namespace Microsoft.KernelMemory.AI.AzureOpenAI; + +public class AzureOpenAIException : KernelMemoryException +{ + /// + public AzureOpenAIException(bool? isTransient = null) + { + this.IsTransient = isTransient; + } + + /// + public AzureOpenAIException(string message, bool? isTransient = null) : base(message) + { + this.IsTransient = isTransient; + } + + /// + public AzureOpenAIException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException) + { + this.IsTransient = isTransient; + } +} diff --git a/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextEmbeddingGenerator.cs b/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextEmbeddingGenerator.cs index 4a096c4b7..7aa954d36 100644 --- a/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextEmbeddingGenerator.cs +++ b/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextEmbeddingGenerator.cs @@ -12,6 +12,7 @@ using Microsoft.KernelMemory.AI.AzureOpenAI.Internals; using Microsoft.KernelMemory.AI.OpenAI; using Microsoft.KernelMemory.Diagnostics; +using Microsoft.SemanticKernel; using Microsoft.SemanticKernel.AI.Embeddings; using Microsoft.SemanticKernel.Connectors.AzureOpenAI; @@ -121,7 +122,14 @@ public IReadOnlyList GetTokens(string text) public Task GenerateEmbeddingAsync(string text, CancellationToken cancellationToken = default) { this._log.LogTrace("Generating embedding"); - return this._client.GenerateEmbeddingAsync(text, cancellationToken); + try + { + return this._client.GenerateEmbeddingAsync(text, cancellationToken); + } + catch (HttpOperationException e) + { + throw new AzureOpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError()); + } } /// @@ -129,7 +137,14 @@ public async Task GenerateEmbeddingBatchAsync(IEnumerable t { var list = textList.ToList(); this._log.LogTrace("Generating embeddings, batch size '{0}'", list.Count); - IList> embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false); - return embeddings.Select(e => new Embedding(e)).ToArray(); + try + { + IList> embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false); + return embeddings.Select(e => new Embedding(e)).ToArray(); + } + catch (HttpOperationException e) + { + throw new AzureOpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError()); + } } } diff --git a/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextGenerator.cs b/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextGenerator.cs index cb6126901..0661bed3e 100644 --- a/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextGenerator.cs +++ b/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextGenerator.cs @@ -5,6 +5,7 @@ using System.Net.Http; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Tasks; using Azure.AI.OpenAI; using Microsoft.Extensions.Logging; using Microsoft.KernelMemory.AI.AzureOpenAI.Internals; @@ -140,8 +141,17 @@ public async IAsyncEnumerable GenerateTextAsync( } this._log.LogTrace("Sending chat message generation request"); - IAsyncEnumerable result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken); - await foreach (StreamingTextContent x in result) + IAsyncEnumerable result; + try + { + result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken); + } + catch (HttpOperationException e) + { + throw new AzureOpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError()); + } + + await foreach (StreamingTextContent x in result.WithCancellation(cancellationToken)) { if (x.Text == null) { continue; } diff --git a/extensions/AzureQueues/AzureQueuesPipeline.cs b/extensions/AzureQueues/AzureQueuesPipeline.cs index f04771b55..2ce06ebd8 100644 --- a/extensions/AzureQueues/AzureQueuesPipeline.cs +++ b/extensions/AzureQueues/AzureQueuesPipeline.cs @@ -14,6 +14,7 @@ using Microsoft.Extensions.Logging; using Microsoft.KernelMemory.Diagnostics; using Microsoft.KernelMemory.DocumentStorage; +using Microsoft.KernelMemory.Pipeline; using Microsoft.KernelMemory.Pipeline.Queue; using Timer = System.Timers.Timer; @@ -180,7 +181,7 @@ public async Task EnqueueAsync(string message, CancellationToken cancellationTok } /// - public void OnDequeue(Func> processMessageAction) + public void OnDequeue(Func> processMessageAction) { this.Received += async (object sender, MessageEventArgs args) => { @@ -191,20 +192,30 @@ public void OnDequeue(Func> processMessageAction) try { + ReturnType returnType = await processMessageAction.Invoke(message.MessageText).ConfigureAwait(false); if (message.DequeueCount <= this._config.MaxRetriesBeforePoisonQueue) { - bool success = await processMessageAction.Invoke(message.MessageText).ConfigureAwait(false); - if (success) + switch (returnType) { - this._log.LogTrace("Message '{0}' successfully processed, deleting message", message.MessageId); - await this.DeleteMessageAsync(message, cancellationToken: default).ConfigureAwait(false); - } - else - { - var backoffDelay = TimeSpan.FromSeconds(1 * message.DequeueCount); - this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue with a delay of {1} msecs", - message.MessageId, backoffDelay.TotalMilliseconds); - await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false); + case ReturnType.Success: + this._log.LogTrace("Message '{0}' successfully processed, deleting message", message.MessageId); + await this.DeleteMessageAsync(message, cancellationToken: default).ConfigureAwait(false); + break; + + case ReturnType.TransientError: + var backoffDelay = TimeSpan.FromSeconds(1 * message.DequeueCount); + this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue with a delay of {1} msecs", + message.MessageId, backoffDelay.TotalMilliseconds); + await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false); + break; + + case ReturnType.FatalError: + this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.MessageId); + await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false); + break; + + default: + throw new ArgumentOutOfRangeException($"Unknown {returnType:G} result"); } } else @@ -213,6 +224,11 @@ public void OnDequeue(Func> processMessageAction) await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false); } } + catch (KernelMemoryException e) when (e.IsTransient.HasValue && !e.IsTransient.Value) + { + this._log.LogError(e, "Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.MessageId); + await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false); + } #pragma warning disable CA1031 // Must catch all to handle queue properly catch (Exception e) { diff --git a/extensions/OpenAI/OpenAI/OpenAIException.cs b/extensions/OpenAI/OpenAI/OpenAIException.cs new file mode 100644 index 000000000..47451d0ce --- /dev/null +++ b/extensions/OpenAI/OpenAI/OpenAIException.cs @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; + +namespace Microsoft.KernelMemory.AI.OpenAI; + +public class OpenAIException : KernelMemoryException +{ + /// + public OpenAIException(bool? isTransient = null) + { + this.IsTransient = isTransient; + } + + /// + public OpenAIException(string message, bool? isTransient = null) : base(message) + { + this.IsTransient = isTransient; + } + + /// + public OpenAIException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException) + { + this.IsTransient = isTransient; + } +} diff --git a/extensions/OpenAI/OpenAI/OpenAITextEmbeddingGenerator.cs b/extensions/OpenAI/OpenAI/OpenAITextEmbeddingGenerator.cs index 9efe22f42..d9582d70c 100644 --- a/extensions/OpenAI/OpenAI/OpenAITextEmbeddingGenerator.cs +++ b/extensions/OpenAI/OpenAI/OpenAITextEmbeddingGenerator.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Logging; using Microsoft.KernelMemory.AI.OpenAI.Internals; using Microsoft.KernelMemory.Diagnostics; +using Microsoft.SemanticKernel; using Microsoft.SemanticKernel.AI.Embeddings; using Microsoft.SemanticKernel.Embeddings; using OpenAI; @@ -122,7 +123,14 @@ public IReadOnlyList GetTokens(string text) public Task GenerateEmbeddingAsync(string text, CancellationToken cancellationToken = default) { this._log.LogTrace("Generating embedding"); - return this._client.GenerateEmbeddingAsync(text, cancellationToken); + try + { + return this._client.GenerateEmbeddingAsync(text, cancellationToken); + } + catch (HttpOperationException e) + { + throw new OpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError()); + } } /// @@ -130,7 +138,14 @@ public async Task GenerateEmbeddingBatchAsync(IEnumerable t { var list = textList.ToList(); this._log.LogTrace("Generating embeddings, batch size '{0}'", list.Count); - var embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false); - return embeddings.Select(e => new Embedding(e)).ToArray(); + try + { + var embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false); + return embeddings.Select(e => new Embedding(e)).ToArray(); + } + catch (HttpOperationException e) + { + throw new OpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError()); + } } } diff --git a/extensions/OpenAI/OpenAI/OpenAITextGenerator.cs b/extensions/OpenAI/OpenAI/OpenAITextGenerator.cs index 1e9a96c06..8faa3e947 100644 --- a/extensions/OpenAI/OpenAI/OpenAITextGenerator.cs +++ b/extensions/OpenAI/OpenAI/OpenAITextGenerator.cs @@ -5,6 +5,7 @@ using System.Net.Http; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.KernelMemory.AI.OpenAI.Internals; using Microsoft.KernelMemory.Diagnostics; @@ -139,8 +140,18 @@ public async IAsyncEnumerable GenerateTextAsync( } this._log.LogTrace("Sending chat message generation request"); - IAsyncEnumerable result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken); - await foreach (StreamingTextContent x in result) + + IAsyncEnumerable result; + try + { + result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken); + } + catch (HttpOperationException e) + { + throw new OpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError()); + } + + await foreach (StreamingTextContent x in result.WithCancellation(cancellationToken)) { // TODO: try catch // if (x.Metadata?["Usage"] is not null) diff --git a/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs b/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs index 10ece1b13..ac6556a67 100644 --- a/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs +++ b/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs @@ -4,6 +4,7 @@ using Microsoft.KernelMemory; using Microsoft.KernelMemory.Diagnostics; using Microsoft.KernelMemory.Orchestration.RabbitMQ; +using Microsoft.KernelMemory.Pipeline; using Microsoft.KernelMemory.Pipeline.Queue; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -38,7 +39,7 @@ public static async Task Main() { Console.WriteLine($"{++counter} Received message: {msg}"); await Task.Delay(0); - return false; + return ReturnType.TransientError; }); await pipeline.ConnectToQueueAsync(QueueName, QueueOptions.PubSub); diff --git a/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs b/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs index 6fcc816bb..acc08c337 100644 --- a/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs +++ b/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.KernelMemory.Diagnostics; +using Microsoft.KernelMemory.Pipeline; using Microsoft.KernelMemory.Pipeline.Queue; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -68,7 +69,7 @@ public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = n } /// - /// About dead letters, see https://www.rabbitmq.com/docs/dlx + /// About poison queue and dead letters, see https://www.rabbitmq.com/docs/dlx public Task ConnectToQueueAsync(string queueName, QueueOptions options = default, CancellationToken cancellationToken = default) { ArgumentNullExceptionEx.ThrowIfNullOrWhiteSpace(queueName, nameof(queueName), "The queue name is empty"); @@ -82,7 +83,7 @@ public Task ConnectToQueueAsync(string queueName, QueueOptions options = ArgumentExceptionEx.ThrowIf((Encoding.UTF8.GetByteCount(poisonExchangeName) > 255), nameof(poisonExchangeName), $"The exchange name '{poisonExchangeName}' is too long, max 255 UTF8 bytes allowed, try using a shorter queue name"); ArgumentExceptionEx.ThrowIf((Encoding.UTF8.GetByteCount(poisonQueueName) > 255), nameof(poisonQueueName), - $"The dead letter queue name '{poisonQueueName}' is too long, max 255 UTF8 bytes allowed, try using a shorter queue name"); + $"The poison queue name '{poisonQueueName}' is too long, max 255 UTF8 bytes allowed, try using a shorter queue name"); if (!string.IsNullOrEmpty(this._queueName)) { @@ -173,7 +174,7 @@ public Task EnqueueAsync(string message, CancellationToken cancellationToken = d } /// - public void OnDequeue(Func> processMessageAction) + public void OnDequeue(Func> processMessageAction) { this._consumer.Received += async (object sender, BasicDeliverEventArgs args) => { @@ -192,33 +193,47 @@ public void OnDequeue(Func> processMessageAction) byte[] body = args.Body.ToArray(); string message = Encoding.UTF8.GetString(body); - bool success = await processMessageAction.Invoke(message).ConfigureAwait(false); - if (success) + var returnType = await processMessageAction.Invoke(message).ConfigureAwait(false); + switch (returnType) { - this._log.LogTrace("Message '{0}' successfully processed, deleting message", args.BasicProperties?.MessageId); - this._channel.BasicAck(args.DeliveryTag, multiple: false); - } - else - { - if (attemptNumber < this._maxAttempts) - { - this._log.LogWarning("Message '{0}' failed to process (attempt {1} of {2}), putting message back in the queue", - args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts); - if (this._delayBeforeRetryingMsecs > 0) + case ReturnType.Success: + this._log.LogTrace("Message '{0}' successfully processed, deleting message", args.BasicProperties?.MessageId); + this._channel.BasicAck(args.DeliveryTag, multiple: false); + break; + + case ReturnType.TransientError: + if (attemptNumber < this._maxAttempts) { - await Task.Delay(TimeSpan.FromMilliseconds(this._delayBeforeRetryingMsecs)).ConfigureAwait(false); + this._log.LogWarning("Message '{0}' failed to process (attempt {1} of {2}), putting message back in the queue", + args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts); + if (this._delayBeforeRetryingMsecs > 0) + { + await Task.Delay(TimeSpan.FromMilliseconds(this._delayBeforeRetryingMsecs)).ConfigureAwait(false); + } } - } - else - { - this._log.LogError("Message '{0}' failed to process (attempt {1} of {2}), moving message to dead letter queue", - args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts); - } + else + { + this._log.LogError("Message '{0}' failed to process (attempt {1} of {2}), moving message to poison queue", + args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts); + } + + this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true); + break; - // Note: if "requeue == false" the message would be moved to the dead letter exchange - this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true); + case ReturnType.FatalError: + this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", args.BasicProperties?.MessageId); + this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: false); + break; + + default: + throw new ArgumentOutOfRangeException($"Unknown {returnType:G} result"); } } + catch (KernelMemoryException e) when (e.IsTransient.HasValue && !e.IsTransient.Value) + { + this._log.LogError(e, "Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", args.BasicProperties?.MessageId); + this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: false); + } #pragma warning disable CA1031 // Must catch all to handle queue properly catch (Exception e) { @@ -243,7 +258,6 @@ public void OnDequeue(Func> processMessageAction) } // TODO: verify and document what happens if this fails. RabbitMQ should automatically unlock messages. - // Note: if "requeue == false" the message would be moved to the dead letter exchange this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true); } #pragma warning restore CA1031 diff --git a/service/Abstractions/Diagnostics/HttpErrors.cs b/service/Abstractions/Diagnostics/HttpErrors.cs new file mode 100644 index 000000000..62a188e10 --- /dev/null +++ b/service/Abstractions/Diagnostics/HttpErrors.cs @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Net; + +namespace Microsoft.KernelMemory.Diagnostics; + +public static class HttpErrors +{ + // Errors that might disappear by retrying + private static readonly HashSet s_transientErrors = + [ + (int)HttpStatusCode.RequestTimeout, // 408 + (int)HttpStatusCode.PreconditionFailed, // 412 + (int)HttpStatusCode.Locked, // 423 + (int)HttpStatusCode.TooManyRequests, // 429 + (int)HttpStatusCode.InternalServerError, // 500 + (int)HttpStatusCode.BadGateway, // 502 + (int)HttpStatusCode.ServiceUnavailable, // 503 + (int)HttpStatusCode.GatewayTimeout, // 504 + (int)HttpStatusCode.InsufficientStorage // 507 + ]; + + public static bool IsTransientError(this HttpStatusCode statusCode) + { + return s_transientErrors.Contains((int)statusCode); + } + + public static bool IsTransientError(this HttpStatusCode? statusCode) + { + return statusCode.HasValue && s_transientErrors.Contains((int)statusCode.Value); + } + + public static bool IsTransientError(int statusCode) + { + return s_transientErrors.Contains(statusCode); + } + + public static bool IsFatalError(this HttpStatusCode statusCode) + { + return IsError(statusCode) && !IsTransientError(statusCode); + } + + public static bool IsFatalError(this HttpStatusCode? statusCode) + { + return statusCode.HasValue && IsError(statusCode) && !IsTransientError(statusCode); + } + + public static bool IsFatalError(int statusCode) + { + return IsError(statusCode) && !IsTransientError(statusCode); + } + + private static bool IsError(this HttpStatusCode? statusCode) + { + return statusCode.HasValue && (int)statusCode.Value >= 400; + } + + private static bool IsError(int statusCode) + { + return statusCode >= 400; + } +} diff --git a/service/Abstractions/KernelMemoryException.cs b/service/Abstractions/KernelMemoryException.cs index 387106d13..3afcdfe97 100644 --- a/service/Abstractions/KernelMemoryException.cs +++ b/service/Abstractions/KernelMemoryException.cs @@ -9,19 +9,25 @@ namespace Microsoft.KernelMemory; /// public class KernelMemoryException : Exception { + public bool? IsTransient { get; protected init; } = null; + /// /// Initializes a new instance of the class with a default message. /// - public KernelMemoryException() + /// Optional parameter to indicate if the error is temporary and might disappear by retrying. + public KernelMemoryException(bool? isTransient = null) { + this.IsTransient = isTransient; } /// /// Initializes a new instance of the class with its message set to . /// /// A string that describes the error. - public KernelMemoryException(string? message) : base(message) + /// Optional parameter to indicate if the error is temporary and might disappear by retrying. + public KernelMemoryException(string? message, bool? isTransient = null) : base(message) { + this.IsTransient = isTransient; } /// @@ -29,7 +35,9 @@ public KernelMemoryException(string? message) : base(message) /// /// A string that describes the error. /// The exception that is the cause of the current exception. - public KernelMemoryException(string? message, Exception? innerException) : base(message, innerException) + /// Optional parameter to indicate if the error is temporary and might disappear by retrying. + public KernelMemoryException(string? message, Exception? innerException, bool? isTransient = null) : base(message, innerException) { + this.IsTransient = isTransient; } } diff --git a/service/Abstractions/Pipeline/IPipelineStepHandler.cs b/service/Abstractions/Pipeline/IPipelineStepHandler.cs index ce5296321..32930c9b9 100644 --- a/service/Abstractions/Pipeline/IPipelineStepHandler.cs +++ b/service/Abstractions/Pipeline/IPipelineStepHandler.cs @@ -20,5 +20,5 @@ public interface IPipelineStepHandler /// Pipeline status /// Async task cancellation token /// Whether the pipeline step has been processed successfully, and the new pipeline status to use moving forward - Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default); + Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default); } diff --git a/service/Abstractions/Pipeline/MimeTypeException.cs b/service/Abstractions/Pipeline/MimeTypeException.cs new file mode 100644 index 000000000..3a305790d --- /dev/null +++ b/service/Abstractions/Pipeline/MimeTypeException.cs @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; + +namespace Microsoft.KernelMemory.Pipeline; + +public class MimeTypeException : KernelMemoryException +{ + /// + public MimeTypeException(bool? isTransient = null) + { + this.IsTransient = isTransient; + } + + /// + public MimeTypeException(string message, bool? isTransient = null) : base(message) + { + this.IsTransient = isTransient; + } + + /// + public MimeTypeException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException) + { + this.IsTransient = isTransient; + } +} diff --git a/service/Abstractions/Pipeline/MimeTypes.cs b/service/Abstractions/Pipeline/MimeTypes.cs index 6efdc4aa8..956eeae39 100644 --- a/service/Abstractions/Pipeline/MimeTypes.cs +++ b/service/Abstractions/Pipeline/MimeTypes.cs @@ -221,7 +221,7 @@ public string GetFileType(string filename) return mimeType; } - throw new NotSupportedException($"File type not supported: {filename}"); + throw new MimeTypeException($"File type not supported: {filename}", isTransient: false); } public bool TryGetFileType(string filename, out string? mimeType) diff --git a/service/Abstractions/Pipeline/OrchestrationException.cs b/service/Abstractions/Pipeline/OrchestrationException.cs index 6d2e3246c..6670e3cc4 100644 --- a/service/Abstractions/Pipeline/OrchestrationException.cs +++ b/service/Abstractions/Pipeline/OrchestrationException.cs @@ -7,11 +7,20 @@ namespace Microsoft.KernelMemory.Pipeline; public class OrchestrationException : KernelMemoryException { /// - public OrchestrationException() { } + public OrchestrationException(bool? isTransient = null) + { + this.IsTransient = isTransient; + } /// - public OrchestrationException(string message) : base(message) { } + public OrchestrationException(string message, bool? isTransient = null) : base(message) + { + this.IsTransient = isTransient; + } /// - public OrchestrationException(string message, Exception? innerException) : base(message, innerException) { } + public OrchestrationException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException) + { + this.IsTransient = isTransient; + } } diff --git a/service/Abstractions/Pipeline/Queue/IQueue.cs b/service/Abstractions/Pipeline/Queue/IQueue.cs index b4bc7c671..0a3ccad96 100644 --- a/service/Abstractions/Pipeline/Queue/IQueue.cs +++ b/service/Abstractions/Pipeline/Queue/IQueue.cs @@ -28,5 +28,5 @@ public interface IQueue : IDisposable /// Define the logic to execute when a new message is in the queue. /// /// Async action to execute - void OnDequeue(Func> processMessageAction); + void OnDequeue(Func> processMessageAction); } diff --git a/service/Abstractions/Pipeline/ReturnType.cs b/service/Abstractions/Pipeline/ReturnType.cs new file mode 100644 index 000000000..d751070f5 --- /dev/null +++ b/service/Abstractions/Pipeline/ReturnType.cs @@ -0,0 +1,10 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.KernelMemory.Pipeline; + +public enum ReturnType +{ + Success = 0, + TransientError = 1, + FatalError = 2, +} diff --git a/service/Core/Handlers/DeleteDocumentHandler.cs b/service/Core/Handlers/DeleteDocumentHandler.cs index 6324ed447..d869ef981 100644 --- a/service/Core/Handlers/DeleteDocumentHandler.cs +++ b/service/Core/Handlers/DeleteDocumentHandler.cs @@ -34,7 +34,7 @@ public DeleteDocumentHandler( } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { this._log.LogDebug("Deleting document, pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId); @@ -60,6 +60,6 @@ await this._documentStorage.EmptyDocumentDirectoryAsync( documentId: pipeline.DocumentId, cancellationToken).ConfigureAwait(false); - return (true, pipeline); + return (ReturnType.Success, pipeline); } } diff --git a/service/Core/Handlers/DeleteGeneratedFilesHandler.cs b/service/Core/Handlers/DeleteGeneratedFilesHandler.cs index 006eb58df..24c5763fd 100644 --- a/service/Core/Handlers/DeleteGeneratedFilesHandler.cs +++ b/service/Core/Handlers/DeleteGeneratedFilesHandler.cs @@ -29,7 +29,7 @@ public DeleteGeneratedFilesHandler( } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { this._log.LogDebug("Deleting generated files, pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId); @@ -40,6 +40,6 @@ await this._documentStorage.EmptyDocumentDirectoryAsync( documentId: pipeline.DocumentId, cancellationToken).ConfigureAwait(false); - return (true, pipeline); + return (ReturnType.Success, pipeline); } } diff --git a/service/Core/Handlers/DeleteIndexHandler.cs b/service/Core/Handlers/DeleteIndexHandler.cs index a2897d489..4deccf31a 100644 --- a/service/Core/Handlers/DeleteIndexHandler.cs +++ b/service/Core/Handlers/DeleteIndexHandler.cs @@ -34,7 +34,7 @@ public DeleteIndexHandler( } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { this._log.LogDebug("Deleting index, pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId); @@ -50,6 +50,6 @@ await this._documentStorage.DeleteIndexDirectoryAsync( index: pipeline.Index, cancellationToken).ConfigureAwait(false); - return (true, pipeline); + return (ReturnType.Success, pipeline); } } diff --git a/service/Core/Handlers/GenerateEmbeddingsHandler.cs b/service/Core/Handlers/GenerateEmbeddingsHandler.cs index 30f725b5b..9e41cbdc3 100644 --- a/service/Core/Handlers/GenerateEmbeddingsHandler.cs +++ b/service/Core/Handlers/GenerateEmbeddingsHandler.cs @@ -58,13 +58,13 @@ public GenerateEmbeddingsHandler( } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { if (!this._embeddingGenerationEnabled) { this._log.LogTrace("Embedding generation is disabled, skipping - pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId); - return (true, pipeline); + return (ReturnType.Success, pipeline); } foreach (ITextEmbeddingGenerator generator in this._embeddingGenerators) @@ -83,7 +83,7 @@ public GenerateEmbeddingsHandler( } } - return (true, pipeline); + return (ReturnType.Success, pipeline); } protected override IPipelineStepHandler ActualInstance => this; diff --git a/service/Core/Handlers/GenerateEmbeddingsParallelHandler.cs b/service/Core/Handlers/GenerateEmbeddingsParallelHandler.cs index 360c82874..2483f414b 100644 --- a/service/Core/Handlers/GenerateEmbeddingsParallelHandler.cs +++ b/service/Core/Handlers/GenerateEmbeddingsParallelHandler.cs @@ -58,13 +58,13 @@ public GenerateEmbeddingsParallelHandler( } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { if (!this._embeddingGenerationEnabled) { this._log.LogTrace("Embedding generation is disabled, skipping - pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId); - return (true, pipeline); + return (ReturnType.Success, pipeline); } foreach (ITextEmbeddingGenerator generator in this._embeddingGenerators) @@ -83,7 +83,7 @@ public GenerateEmbeddingsParallelHandler( } } - return (true, pipeline); + return (ReturnType.Success, pipeline); } protected override IPipelineStepHandler ActualInstance => this; diff --git a/service/Core/Handlers/SaveRecordsHandler.cs b/service/Core/Handlers/SaveRecordsHandler.cs index 50baf535d..cceeee66d 100644 --- a/service/Core/Handlers/SaveRecordsHandler.cs +++ b/service/Core/Handlers/SaveRecordsHandler.cs @@ -103,7 +103,7 @@ public SaveRecordsHandler( } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { this._log.LogDebug("Saving memory records, pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId); @@ -241,7 +241,7 @@ record = PrepareRecord( this._log.LogWarning("Pipeline '{0}/{1}': step {2}: no records found, cannot save, moving to next pipeline step.", pipeline.Index, pipeline.DocumentId, this.StepName); } - return (true, pipeline); + return (ReturnType.Success, pipeline); } private static IEnumerable GetListOfEmbeddingFiles(DataPipeline pipeline) diff --git a/service/Core/Handlers/SummarizationHandler.cs b/service/Core/Handlers/SummarizationHandler.cs index 81a08c65d..a4ee36f9e 100644 --- a/service/Core/Handlers/SummarizationHandler.cs +++ b/service/Core/Handlers/SummarizationHandler.cs @@ -54,7 +54,7 @@ public SummarizationHandler( } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { this._log.LogDebug("Generating summary, pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId); @@ -125,7 +125,7 @@ public SummarizationHandler( } } - return (true, pipeline); + return (ReturnType.Success, pipeline); } private async Task<(string summary, bool skip)> SummarizeAsync(string content, IContext context) diff --git a/service/Core/Handlers/SummarizationParallelHandler.cs b/service/Core/Handlers/SummarizationParallelHandler.cs index 19a685e3d..44e309270 100644 --- a/service/Core/Handlers/SummarizationParallelHandler.cs +++ b/service/Core/Handlers/SummarizationParallelHandler.cs @@ -53,7 +53,7 @@ public SummarizationParallelHandler( } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { this._log.LogDebug("Generating summary, pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId); @@ -133,7 +133,7 @@ await Parallel.ForEachAsync(uploadedFile.GeneratedFiles, options, async (generat } } - return (true, pipeline); + return (ReturnType.Success, pipeline); } private async Task<(string summary, bool skip)> SummarizeAsync(string content) diff --git a/service/Core/Handlers/TextExtractionHandler.cs b/service/Core/Handlers/TextExtractionHandler.cs index c68d4f32d..05279ea2b 100644 --- a/service/Core/Handlers/TextExtractionHandler.cs +++ b/service/Core/Handlers/TextExtractionHandler.cs @@ -54,7 +54,7 @@ public TextExtractionHandler( } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { this._log.LogDebug("Extracting text, pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId); @@ -135,7 +135,7 @@ public TextExtractionHandler( uploadedFile.MarkProcessedBy(this); } - return (true, pipeline); + return (ReturnType.Success, pipeline); } public void Dispose() diff --git a/service/Core/Handlers/TextPartitioningHandler.cs b/service/Core/Handlers/TextPartitioningHandler.cs index 9ac57da4a..960c4b47b 100644 --- a/service/Core/Handlers/TextPartitioningHandler.cs +++ b/service/Core/Handlers/TextPartitioningHandler.cs @@ -67,7 +67,7 @@ public TextPartitioningHandler( } /// - public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync( + public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync( DataPipeline pipeline, CancellationToken cancellationToken = default) { this._log.LogDebug("Partitioning text, pipeline '{0}/{1}'", pipeline.Index, pipeline.DocumentId); @@ -75,7 +75,7 @@ public TextPartitioningHandler( if (pipeline.Files.Count == 0) { this._log.LogWarning("Pipeline '{0}/{1}': there are no files to process, moving to next pipeline step.", pipeline.Index, pipeline.DocumentId); - return (true, pipeline); + return (ReturnType.Success, pipeline); } var context = pipeline.GetContext(); @@ -197,7 +197,7 @@ public TextPartitioningHandler( } } - return (true, pipeline); + return (ReturnType.Success, pipeline); } #pragma warning disable CA2254 // the msg is always used diff --git a/service/Core/MemoryStorage/DevTools/SimpleVectorDb.cs b/service/Core/MemoryStorage/DevTools/SimpleVectorDb.cs index 0d421dfcb..38ee33e06 100644 --- a/service/Core/MemoryStorage/DevTools/SimpleVectorDb.cs +++ b/service/Core/MemoryStorage/DevTools/SimpleVectorDb.cs @@ -113,6 +113,8 @@ public async Task UpsertAsync(string index, MemoryRecord record, Cancell records[r.Id] = r; } + this._log.LogDebug("{VectorCount} vectors loaded for similarity check", records.Count); + // Calculate all the distances from the given vector // Note: this is a brute force search, very slow, not meant for production use cases var similarity = new Dictionary(); diff --git a/service/Core/Pipeline/BaseOrchestrator.cs b/service/Core/Pipeline/BaseOrchestrator.cs index 84df7ec36..c613139b6 100644 --- a/service/Core/Pipeline/BaseOrchestrator.cs +++ b/service/Core/Pipeline/BaseOrchestrator.cs @@ -123,7 +123,7 @@ public async Task ImportDocumentAsync( } catch (Exception e) { - this.Log.LogError(e, "Pipeline start failed"); + this.Log.LogError(e, "Pipeline start failed."); throw; } } @@ -327,7 +327,7 @@ protected async Task CleanUpAfterCompletionAsync(DataPipeline pipeline, Cancella } catch (Exception e) { - this.Log.LogError(e, "Error while trying to delete the document directory"); + this.Log.LogError(e, "Error while trying to delete the document directory."); } } @@ -339,7 +339,7 @@ protected async Task CleanUpAfterCompletionAsync(DataPipeline pipeline, Cancella } catch (Exception e) { - this.Log.LogError(e, "Error while trying to delete the index directory"); + this.Log.LogError(e, "Error while trying to delete the index directory."); } } #pragma warning restore CA1031 @@ -477,7 +477,7 @@ private async Task UploadFormFilesAsync(DataPipeline pipeline, CancellationToken { mimeType = this._mimeTypeDetection.GetFileType(file.FileName); } - catch (NotSupportedException) + catch (MimeTypeException) { this.Log.LogWarning("File type not supported, the ingestion pipeline might skip it"); } diff --git a/service/Core/Pipeline/DistributedPipelineOrchestrator.cs b/service/Core/Pipeline/DistributedPipelineOrchestrator.cs index 8b35d6d58..2d2e462ff 100644 --- a/service/Core/Pipeline/DistributedPipelineOrchestrator.cs +++ b/service/Core/Pipeline/DistributedPipelineOrchestrator.cs @@ -83,12 +83,6 @@ public override async Task AddHandlerAsync( throw new ArgumentException($"There is already a handler for step '{handler.StepName}'"); } - // When returning False a message is put back in the queue and processed again - const bool Retry = false; - - // When returning True a message is removed from the queue and deleted - const bool Complete = true; - // Create a new queue client and start listening for messages this._queues[handler.StepName] = this._queueClientFactory.Build(); this._queues[handler.StepName].OnDequeue(async msg => @@ -99,7 +93,7 @@ public override async Task AddHandlerAsync( if (pipelinePointer == null) { this.Log.LogError("Pipeline pointer deserialization failed, queue `{0}`. Message discarded.", handler.StepName); - return Complete; + return ReturnType.FatalError; } DataPipeline? pipeline; @@ -127,18 +121,18 @@ public override async Task AddHandlerAsync( } this.Log.LogError("Pipeline `{0}/{1}` not found, cancelling step `{2}`", pipelinePointer.Index, pipelinePointer.DocumentId, handler.StepName); - return Complete; + return ReturnType.FatalError; } catch (InvalidPipelineDataException) { this.Log.LogError("Pipeline `{0}/{1}` state load failed, invalid state, queue `{2}`", pipelinePointer.Index, pipelinePointer.DocumentId, handler.StepName); - return Retry; + return ReturnType.TransientError; } if (pipeline == null) { this.Log.LogError("Pipeline `{0}/{1}` state load failed, the state is null, queue `{2}`", pipelinePointer.Index, pipelinePointer.DocumentId, handler.StepName); - return Retry; + return ReturnType.TransientError; } if (pipelinePointer.ExecutionId != pipeline.ExecutionId) @@ -147,7 +141,7 @@ public override async Task AddHandlerAsync( "Document `{0}/{1}` has been updated without waiting for the previous pipeline execution `{2}` to complete (current execution: `{3}`). " + "Step `{4}` and any consecutive steps from the previous execution have been cancelled.", pipelinePointer.Index, pipelinePointer.DocumentId, pipelinePointer.ExecutionId, pipeline.ExecutionId, handler.StepName); - return Complete; + return ReturnType.Success; } var currentStepName = pipeline.RemainingSteps.First(); @@ -207,7 +201,7 @@ public override async Task RunPipelineAsync(DataPipeline pipeline, CancellationT #region private - private async Task RunPipelineStepAsync( + private async Task RunPipelineStepAsync( DataPipeline pipeline, IPipelineStepHandler handler, CancellationToken cancellationToken) @@ -216,31 +210,37 @@ private async Task RunPipelineStepAsync( if (pipeline.Complete) { this.Log.LogInformation("Pipeline '{0}/{1}' complete", pipeline.Index, pipeline.DocumentId); - // Note: returning True, the message is removed from the queue - return true; + return ReturnType.Success; } string currentStepName = pipeline.RemainingSteps.First(); // Execute the business logic - exceptions are automatically handled by IQueue - (bool success, DataPipeline updatedPipeline) = await handler.InvokeAsync(pipeline, cancellationToken).ConfigureAwait(false); - if (success) + (ReturnType returnType, DataPipeline updatedPipeline) = await handler.InvokeAsync(pipeline, cancellationToken).ConfigureAwait(false); + switch (returnType) { - pipeline = updatedPipeline; - pipeline.LastUpdate = DateTimeOffset.UtcNow; + case ReturnType.Success: + pipeline = updatedPipeline; + pipeline.LastUpdate = DateTimeOffset.UtcNow; - this.Log.LogInformation("Handler {0} processed pipeline {1} successfully", currentStepName, pipeline.DocumentId); - pipeline.MoveToNextStep(); - await this.MoveForwardAsync(pipeline, cancellationToken).ConfigureAwait(false); - } - else - { - this.Log.LogError("Handler {0} failed to process pipeline {1}", currentStepName, pipeline.DocumentId); + this.Log.LogInformation("Handler {0} processed pipeline {1} successfully", currentStepName, pipeline.DocumentId); + pipeline.MoveToNextStep(); + await this.MoveForwardAsync(pipeline, cancellationToken).ConfigureAwait(false); + break; + + case ReturnType.TransientError: + this.Log.LogError("Handler {0} failed to process pipeline {1}", currentStepName, pipeline.DocumentId); + break; + + case ReturnType.FatalError: + this.Log.LogError("Handler {0} failed to process pipeline {1} due to an unrecoverable error", currentStepName, pipeline.DocumentId); + break; + + default: + throw new ArgumentOutOfRangeException($"Unknown {returnType:G} return type"); } - // Note: returning True, the message is removed from the queue - // Note: returning False, the message is put back in the queue and processed again - return success; + return returnType; } private async Task MoveForwardAsync(DataPipeline pipeline, CancellationToken cancellationToken = default) diff --git a/service/Core/Pipeline/InProcessPipelineOrchestrator.cs b/service/Core/Pipeline/InProcessPipelineOrchestrator.cs index 66ba8c473..07fd18371 100644 --- a/service/Core/Pipeline/InProcessPipelineOrchestrator.cs +++ b/service/Core/Pipeline/InProcessPipelineOrchestrator.cs @@ -171,21 +171,30 @@ public override async Task RunPipelineAsync(DataPipeline pipeline, CancellationT } // Run handler - (bool success, DataPipeline updatedPipeline) = await stepHandler + (ReturnType returnType, DataPipeline updatedPipeline) = await stepHandler .InvokeAsync(pipeline, this.CancellationTokenSource.Token) .ConfigureAwait(false); - if (success) - { - pipeline = updatedPipeline; - pipeline.LastUpdate = DateTimeOffset.UtcNow; - this.Log.LogInformation("Handler '{0}' processed pipeline '{1}/{2}' successfully", currentStepName, pipeline.Index, pipeline.DocumentId); - pipeline.MoveToNextStep(); - await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false); - } - else + + switch (returnType) { - this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId); - throw new OrchestrationException($"Pipeline error, step {currentStepName} failed"); + case ReturnType.Success: + pipeline = updatedPipeline; + pipeline.LastUpdate = DateTimeOffset.UtcNow; + this.Log.LogInformation("Handler '{0}' processed pipeline '{1}/{2}' successfully", currentStepName, pipeline.Index, pipeline.DocumentId); + pipeline.MoveToNextStep(); + await this.UpdatePipelineStatusAsync(pipeline, cancellationToken).ConfigureAwait(false); + break; + + case ReturnType.TransientError: + this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId); + throw new OrchestrationException($"Pipeline error, step {currentStepName} failed", isTransient: true); + + case ReturnType.FatalError: + this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}' due to an unrecoverable error", currentStepName, pipeline.Index, pipeline.DocumentId); + throw new OrchestrationException($"Unrecoverable pipeline error, step {currentStepName} failed and cannot be retried", isTransient: false); + + default: + throw new ArgumentOutOfRangeException($"Unknown {returnType:G} return type"); } } diff --git a/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs b/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs index 97c5aeeae..9ce65508d 100644 --- a/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs +++ b/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs @@ -177,7 +177,7 @@ await this.StoreMessageAsync( /// /// about the logic handling dequeued messages. - public void OnDequeue(Func> processMessageAction) + public void OnDequeue(Func> processMessageAction) { this._log.LogInformation("Queue {0}: subscribing...", this._queueName); this.Received += async (sender, args) => @@ -193,39 +193,56 @@ public void OnDequeue(Func> processMessageAction) this._log.LogInformation("Queue {0}: message {0} received", this._queueName, message.Id); // Process message with the logic provided by the orchestrator - bool success = await processMessageAction.Invoke(message.Content).ConfigureAwait(false); - if (success) + var returnType = await processMessageAction.Invoke(message.Content).ConfigureAwait(false); + switch (returnType) { - this._log.LogTrace("Message '{0}' successfully processed, deleting message", message.Id); - await this.DeleteMessageAsync(message.Id, this._cancellation.Token).ConfigureAwait(false); - } - else - { - message.LastError = "Message handler returned false"; - if (message.DequeueCount == this._maxAttempts) - { - this._log.LogError("Message '{0}' processing failed to process, max attempts reached, moving to dead letter queue. Message content: {1}", message.Id, message.Content); + case ReturnType.Success: + this._log.LogTrace("Message '{0}' successfully processed, deleting message", message.Id); + await this.DeleteMessageAsync(message.Id, this._cancellation.Token).ConfigureAwait(false); + break; + + case ReturnType.TransientError: + message.LastError = "Message handler returned false"; + if (message.DequeueCount == this._maxAttempts) + { + this._log.LogError("Message '{0}' processing failed to process, max attempts reached, moving to poison queue. Message content: {1}", message.Id, message.Content); + poison = true; + } + else + { + this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue. Message content: {1}", message.Id, message.Content); + retry = true; + } + + break; + + case ReturnType.FatalError: + this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.Id); poison = true; - } - else - { - this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue. Message content: {1}", message.Id, message.Content); - retry = true; - } + break; + + default: + throw new ArgumentOutOfRangeException($"Unknown {returnType:G} result"); } } + catch (KernelMemoryException e) when (e.IsTransient.HasValue && !e.IsTransient.Value) + { + message.LastError = $"{e.GetType().FullName} [{e.InnerException?.GetType().FullName}]: {e.Message}"; + this._log.LogError(e, "Message '{0}' failed to process due to a non-recoverable error, moving to poison queue.", message.Id); + poison = true; + } // Note: must catch all also because using a void event handler catch (Exception e) { message.LastError = $"{e.GetType().FullName}: {e.Message}"; if (message.DequeueCount == this._maxAttempts) { - this._log.LogError(e, "Message '{0}' processing failed with exception, max attempts reached, moving to dead letter queue. Message content: {1}", message.Id, message.Content); + this._log.LogError(e, "Message '{0}' processing failed with exception, max attempts reached, moving to poison queue. Message content: {1}.", message.Id, message.Content); poison = true; } else { - this._log.LogWarning(e, "Message '{0}' processing failed with exception, putting message back in the queue. Message content: {1}", message.Id, message.Content); + this._log.LogWarning(e, "Message '{0}' processing failed with exception, putting message back in the queue. Message content: {1}.", message.Id, message.Content); retry = true; } } @@ -260,8 +277,9 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs) await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false); // Loop through all messages on storage - this._log.LogTrace("Queue {0}: polling...", this._queueName); var messagesOnStorage = (await this._fileSystem.GetAllFileNamesAsync(this._queueName, "", this._cancellation.Token).ConfigureAwait(false)).ToList(); + if (messagesOnStorage.Count == 0) { return; } + this._log.LogTrace("Queue {0}: {1} messages on storage, {2} ready to dispatch, max batch size {3}", this._queueName, messagesOnStorage.Count, this._queue.Count, this._config.FetchBatchSize); @@ -313,12 +331,12 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs) } catch (DirectoryNotFoundException e) { - this._log.LogError(e, "Directory missing, recreating"); + this._log.LogError(e, "Directory missing, recreating."); await this.CreateDirectoriesAsync(this._cancellation.Token).ConfigureAwait(false); } catch (Exception e) { - this._log.LogError(e, "Queue {0}: Unexpected error while polling", this._queueName); + this._log.LogError(e, "Queue {0}: Unexpected error while polling.", this._queueName); } finally { diff --git a/service/Service.AspNetCore/WebAPIEndpoints.cs b/service/Service.AspNetCore/WebAPIEndpoints.cs index 49211441a..cef2388b8 100644 --- a/service/Service.AspNetCore/WebAPIEndpoints.cs +++ b/service/Service.AspNetCore/WebAPIEndpoints.cs @@ -322,7 +322,8 @@ async Task ( .Produces(StatusCodes.Status400BadRequest) .Produces(StatusCodes.Status401Unauthorized) .Produces(StatusCodes.Status403Forbidden) - .Produces(StatusCodes.Status404NotFound); + .Produces(StatusCodes.Status404NotFound) + .Produces(StatusCodes.Status413PayloadTooLarge); return route; } diff --git a/service/Service/Program.cs b/service/Service/Program.cs index 08a1db6e2..f0db0bed2 100644 --- a/service/Service/Program.cs +++ b/service/Service/Program.cs @@ -2,6 +2,8 @@ using System; using System.Collections.Generic; +using System.Globalization; +using System.IO; using System.Linq; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; @@ -182,8 +184,15 @@ public static void Main(string[] args) Console.WriteLine("* Memory type : " + memoryType); Console.WriteLine("* Pipeline handlers : " + $"{syncHandlersCount} synchronous / {asyncHandlersCount} asynchronous"); Console.WriteLine("* Web service : " + (config.Service.RunWebService ? "Enabled" : "Disabled")); - Console.WriteLine("* Web service auth : " + (config.ServiceAuthorization.Enabled ? "Enabled" : "Disabled")); - Console.WriteLine("* OpenAPI swagger : " + (config.Service.OpenApiEnabled ? "Enabled" : "Disabled")); + + if (config.Service.RunWebService) + { + const double AspnetDefaultMaxUploadSize = 30000000d / 1024 / 1024; + Console.WriteLine("* Web service auth : " + (config.ServiceAuthorization.Enabled ? "Enabled" : "Disabled")); + Console.WriteLine("* Max HTTP req size : " + (config.Service.MaxUploadSizeMb ?? AspnetDefaultMaxUploadSize).ToString("0.#", CultureInfo.CurrentCulture) + " Mb"); + Console.WriteLine("* OpenAPI swagger : " + (config.Service.OpenApiEnabled ? "Enabled" : "Disabled")); + } + Console.WriteLine("* Memory Db : " + app.Services.GetService()?.GetType().FullName); Console.WriteLine("* Document storage : " + app.Services.GetService()?.GetType().FullName); Console.WriteLine("* Embedding generation: " + app.Services.GetService()?.GetType().FullName); @@ -201,7 +210,15 @@ public static void Main(string[] args) config.Service.RunHandlers); // Start web service and handler services - app.Run(); + try + { + app.Run(); + } + catch (IOException e) + { + Console.WriteLine($"I/O error: {e.Message}"); + Environment.Exit(-1); + } } /// diff --git a/service/tests/Abstractions.UnitTests/Diagnostics/HttpErrorsTests.cs b/service/tests/Abstractions.UnitTests/Diagnostics/HttpErrorsTests.cs new file mode 100644 index 000000000..269330274 --- /dev/null +++ b/service/tests/Abstractions.UnitTests/Diagnostics/HttpErrorsTests.cs @@ -0,0 +1,182 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Net; +using Microsoft.KernelMemory.Diagnostics; + +namespace Microsoft.KM.Abstractions.UnitTests.Diagnostics; + +public sealed class HttpErrorsTests +{ + [Fact] + [Trait("Category", "UnitTest")] + public void ItRecognizesErrorsFromNulls() + { + HttpStatusCode? statusCode = null; + + Assert.False(statusCode.IsTransientError()); + Assert.False(statusCode.IsFatalError()); + } + + [Theory] + [Trait("Category", "UnitTest")] + [InlineData(HttpStatusCode.Continue)] // 100 + [InlineData(HttpStatusCode.SwitchingProtocols)] // 101 + [InlineData(HttpStatusCode.Processing)] // 102 + [InlineData(HttpStatusCode.EarlyHints)] // 103 + [InlineData(HttpStatusCode.OK)] // 200 + [InlineData(HttpStatusCode.Created)] // 201 + [InlineData(HttpStatusCode.Accepted)] // 202 + [InlineData(HttpStatusCode.NonAuthoritativeInformation)] // 203 + [InlineData(HttpStatusCode.NoContent)] // 204 + [InlineData(HttpStatusCode.ResetContent)] // 205 + [InlineData(HttpStatusCode.Ambiguous)] // 300 + [InlineData(HttpStatusCode.Moved)] // 301 + [InlineData(HttpStatusCode.Found)] // 302 + [InlineData(HttpStatusCode.RedirectMethod)] // 303 + [InlineData(HttpStatusCode.NotModified)] // 304 + [InlineData(HttpStatusCode.UseProxy)] // 305 + [InlineData(HttpStatusCode.Unused)] // 306 + [InlineData(HttpStatusCode.RedirectKeepVerb)] // 307 + [InlineData(HttpStatusCode.PermanentRedirect)] // 308 + public void ItRecognizesErrors(HttpStatusCode statusCode) + { + Assert.False(statusCode.IsTransientError()); + Assert.False(HttpErrors.IsTransientError((int)statusCode)); + + Assert.False(statusCode.IsFatalError()); + Assert.False(HttpErrors.IsFatalError((int)statusCode)); + } + + [Theory] + [Trait("Category", "UnitTest")] + [InlineData(HttpStatusCode.Continue)] // 100 + [InlineData(HttpStatusCode.SwitchingProtocols)] // 101 + [InlineData(HttpStatusCode.Processing)] // 102 + [InlineData(HttpStatusCode.EarlyHints)] // 103 + [InlineData(HttpStatusCode.OK)] // 200 + [InlineData(HttpStatusCode.Created)] // 201 + [InlineData(HttpStatusCode.Accepted)] // 202 + [InlineData(HttpStatusCode.NonAuthoritativeInformation)] // 203 + [InlineData(HttpStatusCode.NoContent)] // 204 + [InlineData(HttpStatusCode.ResetContent)] // 205 + [InlineData(HttpStatusCode.Ambiguous)] // 300 + [InlineData(HttpStatusCode.Moved)] // 301 + [InlineData(HttpStatusCode.Found)] // 302 + [InlineData(HttpStatusCode.RedirectMethod)] // 303 + [InlineData(HttpStatusCode.NotModified)] // 304 + [InlineData(HttpStatusCode.UseProxy)] // 305 + [InlineData(HttpStatusCode.Unused)] // 306 + [InlineData(HttpStatusCode.RedirectKeepVerb)] // 307 + [InlineData(HttpStatusCode.PermanentRedirect)] // 308 + public void ItRecognizesErrors(HttpStatusCode? statusCode) + { + Assert.False(statusCode.IsTransientError()); + Assert.False(statusCode.IsFatalError()); + } + + [Theory] + [Trait("Category", "UnitTest")] + [InlineData(HttpStatusCode.RequestTimeout)] // 408 + [InlineData(HttpStatusCode.PreconditionFailed)] // 412 + [InlineData(HttpStatusCode.Locked)] // 423 + [InlineData(HttpStatusCode.TooManyRequests)] // 429 + [InlineData(HttpStatusCode.InternalServerError)] // 500 + [InlineData(HttpStatusCode.BadGateway)] // 502 + [InlineData(HttpStatusCode.ServiceUnavailable)] // 503 + [InlineData(HttpStatusCode.GatewayTimeout)] // 504 + [InlineData(HttpStatusCode.InsufficientStorage)] // 507 + public void ItRecognizesTransientErrors(HttpStatusCode statusCode) + { + Assert.True(statusCode.IsTransientError()); + Assert.True(HttpErrors.IsTransientError((int)statusCode)); + + Assert.False(statusCode.IsFatalError()); + Assert.False(HttpErrors.IsFatalError((int)statusCode)); + } + + [Theory] + [Trait("Category", "UnitTest")] + [InlineData(HttpStatusCode.RequestTimeout)] // 408 + [InlineData(HttpStatusCode.PreconditionFailed)] // 412 + [InlineData(HttpStatusCode.Locked)] // 423 + [InlineData(HttpStatusCode.TooManyRequests)] // 429 + [InlineData(HttpStatusCode.InternalServerError)] // 500 + [InlineData(HttpStatusCode.BadGateway)] // 502 + [InlineData(HttpStatusCode.ServiceUnavailable)] // 503 + [InlineData(HttpStatusCode.GatewayTimeout)] // 504 + [InlineData(HttpStatusCode.InsufficientStorage)] // 507 + public void ItRecognizesTransientErrors(HttpStatusCode? statusCode) + { + Assert.True(statusCode.IsTransientError()); + Assert.False(statusCode.IsFatalError()); + } + + [Theory] + [Trait("Category", "UnitTest")] + [InlineData(HttpStatusCode.BadRequest)] // 400 + [InlineData(HttpStatusCode.Unauthorized)] // 401 + [InlineData(HttpStatusCode.PaymentRequired)] // 402 + [InlineData(HttpStatusCode.Forbidden)] // 403 + [InlineData(HttpStatusCode.NotFound)] // 404 + [InlineData(HttpStatusCode.MethodNotAllowed)] // 405 + [InlineData(HttpStatusCode.NotAcceptable)] // 406 + [InlineData(HttpStatusCode.ProxyAuthenticationRequired)] // 407 + [InlineData(HttpStatusCode.Conflict)] // 409 + [InlineData(HttpStatusCode.Gone)] // 410 + [InlineData(HttpStatusCode.LengthRequired)] // 411 + [InlineData(HttpStatusCode.RequestEntityTooLarge)] // 413 + [InlineData(HttpStatusCode.RequestUriTooLong)] // 414 + [InlineData(HttpStatusCode.UnsupportedMediaType)] // 415 + [InlineData(HttpStatusCode.RequestedRangeNotSatisfiable)] // 416 + [InlineData(HttpStatusCode.ExpectationFailed)] // 417 + [InlineData(HttpStatusCode.UnprocessableContent)] // 422 + [InlineData(HttpStatusCode.UpgradeRequired)] // 426 + [InlineData(HttpStatusCode.RequestHeaderFieldsTooLarge)] // 431 + [InlineData(HttpStatusCode.UnavailableForLegalReasons)] // 451 + [InlineData(HttpStatusCode.NotImplemented)] // 501 + [InlineData(HttpStatusCode.HttpVersionNotSupported)] // 505 + [InlineData(HttpStatusCode.LoopDetected)] // 508 + [InlineData(HttpStatusCode.NotExtended)] // 510 + [InlineData(HttpStatusCode.NetworkAuthenticationRequired)] // 511 + public void ItRecognizesFatalErrors(HttpStatusCode statusCode) + { + Assert.False(statusCode.IsTransientError()); + Assert.False(HttpErrors.IsTransientError((int)statusCode)); + + Assert.True(statusCode.IsFatalError()); + Assert.True(HttpErrors.IsFatalError((int)statusCode)); + } + + [Theory] + [Trait("Category", "UnitTest")] + [InlineData(HttpStatusCode.BadRequest)] // 400 + [InlineData(HttpStatusCode.Unauthorized)] // 401 + [InlineData(HttpStatusCode.PaymentRequired)] // 402 + [InlineData(HttpStatusCode.Forbidden)] // 403 + [InlineData(HttpStatusCode.NotFound)] // 404 + [InlineData(HttpStatusCode.MethodNotAllowed)] // 405 + [InlineData(HttpStatusCode.NotAcceptable)] // 406 + [InlineData(HttpStatusCode.ProxyAuthenticationRequired)] // 407 + [InlineData(HttpStatusCode.Conflict)] // 409 + [InlineData(HttpStatusCode.Gone)] // 410 + [InlineData(HttpStatusCode.LengthRequired)] // 411 + [InlineData(HttpStatusCode.RequestEntityTooLarge)] // 413 + [InlineData(HttpStatusCode.RequestUriTooLong)] // 414 + [InlineData(HttpStatusCode.UnsupportedMediaType)] // 415 + [InlineData(HttpStatusCode.RequestedRangeNotSatisfiable)] // 416 + [InlineData(HttpStatusCode.ExpectationFailed)] // 417 + [InlineData(HttpStatusCode.UnprocessableContent)] // 422 + [InlineData(HttpStatusCode.UpgradeRequired)] // 426 + [InlineData(HttpStatusCode.RequestHeaderFieldsTooLarge)] // 431 + [InlineData(HttpStatusCode.UnavailableForLegalReasons)] // 451 + [InlineData(HttpStatusCode.NotImplemented)] // 501 + [InlineData(HttpStatusCode.HttpVersionNotSupported)] // 505 + [InlineData(HttpStatusCode.LoopDetected)] // 508 + [InlineData(HttpStatusCode.NotExtended)] // 510 + [InlineData(HttpStatusCode.NetworkAuthenticationRequired)] // 511 + public void ItRecognizesFatalErrors(HttpStatusCode? statusCode) + { + Assert.False(statusCode.IsTransientError()); + Assert.True(statusCode.IsFatalError()); + } +}