Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/201-dotnet-serverless-custom-handler/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public MyHandler(
public string StepName { get; }

/// <inheritdoc />
public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync(
public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync(
DataPipeline pipeline, CancellationToken cancellationToken = default)
{
/* ... your custom ...
Expand All @@ -64,6 +64,6 @@ public MyHandler(
// Remove this - here only to avoid build errors
await Task.Delay(0, cancellationToken).ConfigureAwait(false);

return (true, pipeline);
return (ReturnType.Success, pipeline);
}
}
4 changes: 2 additions & 2 deletions examples/202-dotnet-custom-handler-as-a-service/MyHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public Task StopAsync(CancellationToken cancellationToken = default)
}

/// <inheritdoc />
public async Task<(bool success, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default)
public async Task<(ReturnType returnType, DataPipeline updatedPipeline)> InvokeAsync(DataPipeline pipeline, CancellationToken cancellationToken = default)
{
/* ... your custom ...
* ... handler ...
Expand All @@ -49,6 +49,6 @@ public Task StopAsync(CancellationToken cancellationToken = default)
// Remove this - here only to avoid build errors
await Task.Delay(0, cancellationToken).ConfigureAwait(false);

return (true, pipeline);
return (ReturnType.Success, pipeline);
}
}
3 changes: 2 additions & 1 deletion extensions/Anthropic/Client/RawAnthropicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ internal async IAsyncEnumerable<StreamingResponseMessage> CallClaudeStreamingAsy
if (!response.IsSuccessStatusCode)
{
var responseError = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}");
throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}",
isTransient: response.StatusCode.IsTransientError());
}

var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
Expand Down
17 changes: 12 additions & 5 deletions extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,19 @@ public AzureAIDocIntelEngine(
///<inheritdoc/>
public async Task<string> ExtractTextFromImageAsync(Stream imageContent, CancellationToken cancellationToken = default)
{
// Start the OCR operation
var operation = await this._recognizerClient.AnalyzeDocumentAsync(WaitUntil.Completed, "prebuilt-read", imageContent, cancellationToken: cancellationToken).ConfigureAwait(false);
try
{
// Start the OCR operation
var operation = await this._recognizerClient.AnalyzeDocumentAsync(WaitUntil.Completed, "prebuilt-read", imageContent, cancellationToken: cancellationToken).ConfigureAwait(false);

// Wait for the result
Response<AnalyzeResult> operationResponse = await operation.WaitForCompletionAsync(cancellationToken).ConfigureAwait(false);
// Wait for the result
Response<AnalyzeResult> operationResponse = await operation.WaitForCompletionAsync(cancellationToken).ConfigureAwait(false);

return operationResponse.Value.Content;
return operationResponse.Value.Content;
}
catch (RequestFailedException e)
{
throw new AzureAIDocIntelException(e.Message, e, isTransient: HttpErrors.IsTransientError(e.Status));
}
}
}
26 changes: 26 additions & 0 deletions extensions/AzureAIDocIntel/AzureAIDocIntelException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.

using System;

namespace Microsoft.KernelMemory.DataFormats.AzureAIDocIntel;

public class AzureAIDocIntelException : KernelMemoryException
{
/// <inheritdoc />
public AzureAIDocIntelException(bool? isTransient = null)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureAIDocIntelException(string message, bool? isTransient = null) : base(message)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureAIDocIntelException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
{
this.IsTransient = isTransient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ namespace Microsoft.KernelMemory.MemoryDb.AzureAISearch;
public class AzureAISearchMemoryException : KernelMemoryException
{
/// <inheritdoc />
public AzureAISearchMemoryException()
public AzureAISearchMemoryException(bool? isTransient = null)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureAISearchMemoryException(string? message) : base(message)
public AzureAISearchMemoryException(string message, bool? isTransient = null) : base(message)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureAISearchMemoryException(string? message, Exception? innerException) : base(message, innerException)
public AzureAISearchMemoryException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
{
this.IsTransient = isTransient;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,41 @@ public void Validate(bool vectorSizeRequired = false)
{
if (this.Fields.Count == 0)
{
throw new KernelMemoryException("The schema is empty");
throw new AzureAISearchMemoryException("The schema is empty", isTransient: false);
}

if (this.Fields.All(x => x.Type != MemoryDbField.FieldType.Vector))
{
throw new KernelMemoryException("The schema doesn't contain a vector field");
throw new AzureAISearchMemoryException("The schema doesn't contain a vector field", isTransient: false);
}

int keys = this.Fields.Count(x => x.IsKey);
switch (keys)
{
case 0:
throw new KernelMemoryException("The schema doesn't contain a key field");
throw new AzureAISearchMemoryException("The schema doesn't contain a key field", isTransient: false);
case > 1:
throw new KernelMemoryException("The schema cannot contain more than one key");
throw new AzureAISearchMemoryException("The schema cannot contain more than one key", isTransient: false);
}

if (vectorSizeRequired && this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Vector, VectorSize: 0 }))
{
throw new KernelMemoryException("Vector fields must have a size greater than zero defined");
throw new AzureAISearchMemoryException("Vector fields must have a size greater than zero defined", isTransient: false);
}

if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Bool, IsKey: true }))
{
throw new KernelMemoryException("Boolean fields cannot be used as unique keys");
throw new AzureAISearchMemoryException("Boolean fields cannot be used as unique keys", isTransient: false);
}

if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.ListOfStrings, IsKey: true }))
{
throw new KernelMemoryException("Collection fields cannot be used as unique keys");
throw new AzureAISearchMemoryException("Collection fields cannot be used as unique keys", isTransient: false);
}

if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Vector, IsKey: true }))
{
throw new KernelMemoryException("Vector fields cannot be used as unique keys");
throw new AzureAISearchMemoryException("Vector fields cannot be used as unique keys", isTransient: false);
}
}
}
26 changes: 26 additions & 0 deletions extensions/AzureOpenAI/AzureOpenAI/AzureOpenAIException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.

using System;

namespace Microsoft.KernelMemory.AI.AzureOpenAI;

public class AzureOpenAIException : KernelMemoryException
{
/// <inheritdoc />
public AzureOpenAIException(bool? isTransient = null)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureOpenAIException(string message, bool? isTransient = null) : base(message)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureOpenAIException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
{
this.IsTransient = isTransient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Microsoft.KernelMemory.AI.AzureOpenAI.Internals;
using Microsoft.KernelMemory.AI.OpenAI;
using Microsoft.KernelMemory.Diagnostics;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.AI.Embeddings;
using Microsoft.SemanticKernel.Connectors.AzureOpenAI;

Expand Down Expand Up @@ -121,15 +122,29 @@ public IReadOnlyList<string> GetTokens(string text)
public Task<Embedding> GenerateEmbeddingAsync(string text, CancellationToken cancellationToken = default)
{
this._log.LogTrace("Generating embedding");
return this._client.GenerateEmbeddingAsync(text, cancellationToken);
try
{
return this._client.GenerateEmbeddingAsync(text, cancellationToken);
}
catch (HttpOperationException e)
{
throw new AzureOpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError());
}
}

/// <inheritdoc/>
public async Task<Embedding[]> GenerateEmbeddingBatchAsync(IEnumerable<string> textList, CancellationToken cancellationToken = default)
{
var list = textList.ToList();
this._log.LogTrace("Generating embeddings, batch size '{0}'", list.Count);
IList<ReadOnlyMemory<float>> embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false);
return embeddings.Select(e => new Embedding(e)).ToArray();
try
{
IList<ReadOnlyMemory<float>> embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false);
return embeddings.Select(e => new Embedding(e)).ToArray();
}
catch (HttpOperationException e)
{
throw new AzureOpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError());
}
}
}
14 changes: 12 additions & 2 deletions extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Net.Http;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Microsoft.Extensions.Logging;
using Microsoft.KernelMemory.AI.AzureOpenAI.Internals;
Expand Down Expand Up @@ -140,8 +141,17 @@ public async IAsyncEnumerable<string> GenerateTextAsync(
}

this._log.LogTrace("Sending chat message generation request");
IAsyncEnumerable<StreamingTextContent> result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken);
await foreach (StreamingTextContent x in result)
IAsyncEnumerable<StreamingTextContent> result;
try
{
result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken);
}
catch (HttpOperationException e)
{
throw new AzureOpenAIException(e.Message, e, isTransient: e.StatusCode.IsTransientError());
}

await foreach (StreamingTextContent x in result.WithCancellation(cancellationToken))
{
if (x.Text == null) { continue; }

Expand Down
40 changes: 28 additions & 12 deletions extensions/AzureQueues/AzureQueuesPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Microsoft.Extensions.Logging;
using Microsoft.KernelMemory.Diagnostics;
using Microsoft.KernelMemory.DocumentStorage;
using Microsoft.KernelMemory.Pipeline;
using Microsoft.KernelMemory.Pipeline.Queue;
using Timer = System.Timers.Timer;

Expand Down Expand Up @@ -180,7 +181,7 @@ public async Task EnqueueAsync(string message, CancellationToken cancellationTok
}

/// <inheritdoc />
public void OnDequeue(Func<string, Task<bool>> processMessageAction)
public void OnDequeue(Func<string, Task<ReturnType>> processMessageAction)
{
this.Received += async (object sender, MessageEventArgs args) =>
{
Expand All @@ -191,20 +192,30 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)

try
{
ReturnType returnType = await processMessageAction.Invoke(message.MessageText).ConfigureAwait(false);
if (message.DequeueCount <= this._config.MaxRetriesBeforePoisonQueue)
{
bool success = await processMessageAction.Invoke(message.MessageText).ConfigureAwait(false);
if (success)
switch (returnType)
{
this._log.LogTrace("Message '{0}' successfully processed, deleting message", message.MessageId);
await this.DeleteMessageAsync(message, cancellationToken: default).ConfigureAwait(false);
}
else
{
var backoffDelay = TimeSpan.FromSeconds(1 * message.DequeueCount);
this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue with a delay of {1} msecs",
message.MessageId, backoffDelay.TotalMilliseconds);
await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false);
case ReturnType.Success:
this._log.LogTrace("Message '{0}' successfully processed, deleting message", message.MessageId);
await this.DeleteMessageAsync(message, cancellationToken: default).ConfigureAwait(false);
break;

case ReturnType.TransientError:
var backoffDelay = TimeSpan.FromSeconds(1 * message.DequeueCount);
this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue with a delay of {1} msecs",
message.MessageId, backoffDelay.TotalMilliseconds);
await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false);
break;

case ReturnType.FatalError:
this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.MessageId);
await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false);
break;

default:
throw new ArgumentOutOfRangeException($"Unknown {returnType:G} result");
}
}
else
Expand All @@ -213,6 +224,11 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)
await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false);
}
}
catch (KernelMemoryException e) when (e.IsTransient.HasValue && !e.IsTransient.Value)
{
this._log.LogError(e, "Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.MessageId);
await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false);
}
#pragma warning disable CA1031 // Must catch all to handle queue properly
catch (Exception e)
{
Expand Down
26 changes: 26 additions & 0 deletions extensions/OpenAI/OpenAI/OpenAIException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.

using System;

namespace Microsoft.KernelMemory.AI.OpenAI;

public class OpenAIException : KernelMemoryException
{
/// <inheritdoc />
public OpenAIException(bool? isTransient = null)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public OpenAIException(string message, bool? isTransient = null) : base(message)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public OpenAIException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
{
this.IsTransient = isTransient;
}
}
Loading
Loading