From c3b17e8d0b5764af9fe3f355926bedcfeccb6830 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Sat, 21 Jun 2025 00:44:29 -0500 Subject: [PATCH] Updated protos and implemented SDK endpoints to support streaming responses from the conversation endpoint. Signed-off-by: Whit Waldo --- .../ConversationStreamProcessor.cs | 101 ++++++++++++++++++ .../Conversation/DaprConversationClient.cs | 15 +++ .../DaprConversationGrpcClient.cs | 71 ++++++++++-- .../Conversation/DaprConversationUsage.cs | 35 ++++++ .../Protos/dapr/proto/runtime/v1/dapr.proto | 40 +++++++ 5 files changed, 253 insertions(+), 9 deletions(-) create mode 100644 src/Dapr.AI/Conversation/ConversationStreamProcessor.cs create mode 100644 src/Dapr.AI/Conversation/DaprConversationUsage.cs diff --git a/src/Dapr.AI/Conversation/ConversationStreamProcessor.cs b/src/Dapr.AI/Conversation/ConversationStreamProcessor.cs new file mode 100644 index 000000000..00b13e720 --- /dev/null +++ b/src/Dapr.AI/Conversation/ConversationStreamProcessor.cs @@ -0,0 +1,101 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using Grpc.Core; +using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + +namespace Dapr.AI.Conversation; + +/// +/// Provides the implementation to process the streamed response from a conversation endpoint invocation. +/// +internal sealed class ConversationStreamProcessor : IDisposable +{ + private bool disposed; + private readonly Channel outputChannel = Channel.CreateUnbounded(); + + /// + /// Surfaces any exceptions encountered while asynchronously processing the outbound stream. + /// + internal event EventHandler? OnException; + + /// + /// Reads the chunks out asynchronously from the streaming source into the channel. + /// + /// The call made to the Dapr sidecar to process the response from. + /// Token used to cancel the ongoing request. + public Task ProcessStreamAsync( + AsyncServerStreamingCall call, + CancellationToken cancellationToken) + { + // Start reading from the gRPC call and writing to the output channel. + _ = Task.Run(async () => + { + try + { + await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken)) + { + await outputChannel.Writer.WriteAsync(response.Chunk.Content, cancellationToken); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Expected cancellation exception + } + catch (Exception ex) + { + OnException?.Invoke(this, ex); + } + finally + { + outputChannel.Writer.Complete(); + } + }, cancellationToken); + return Task.CompletedTask; + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + private void Dispose(bool disposing) + { + if (!disposed) + { + if (disposing) + { + outputChannel.Writer.TryComplete(); + } + + disposed = true; + } + } + + /// + /// Retrieves the processed content from the operation from the Dapr sidecar and returns as an + /// enumerable stream. + /// + /// Cancellation token. + /// + public async IAsyncEnumerable GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken) + { + await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken)) + { + yield return data; + } + } +} diff --git a/src/Dapr.AI/Conversation/DaprConversationClient.cs b/src/Dapr.AI/Conversation/DaprConversationClient.cs index b081427b0..07e26b5fc 100644 --- a/src/Dapr.AI/Conversation/DaprConversationClient.cs +++ b/src/Dapr.AI/Conversation/DaprConversationClient.cs @@ -80,4 +80,19 @@ protected DaprConversationClient(Autogenerated.DaprClient client, public abstract Task ConverseAsync(string daprConversationComponentName, IReadOnlyList inputs, ConversationOptions? options = null, CancellationToken cancellationToken = default); + + /// + /// Sends various inputs to the large language model via the Conversational building block on the Dapr sidecar + /// and get a streamed response back. + /// + /// The name of the Dapr conversation component. + /// The input values to send. + /// Optional options used to configure the conversation. + /// Cancellation token. + /// The response provided as a stream by the LLM provider. + public abstract IAsyncEnumerable ConverseAsStreamAsync( + string daprConversationComponentName, + IReadOnlyList inputs, + ConversationOptions? options = null, + CancellationToken cancellationToken = default); } diff --git a/src/Dapr.AI/Conversation/DaprConversationGrpcClient.cs b/src/Dapr.AI/Conversation/DaprConversationGrpcClient.cs index 6a1a5f438..fefd4a3a9 100644 --- a/src/Dapr.AI/Conversation/DaprConversationGrpcClient.cs +++ b/src/Dapr.AI/Conversation/DaprConversationGrpcClient.cs @@ -11,8 +11,10 @@ // limitations under the License. // ------------------------------------------------------------------------ +using System.Runtime.CompilerServices; using Dapr.Common; using Dapr.Common.Extensions; +using Grpc.Core; using Autogenerated = Dapr.Client.Autogen.Grpc.v1; namespace Dapr.AI.Conversation; @@ -35,6 +37,63 @@ internal sealed class DaprConversationGrpcClient(Autogenerated.Dapr.DaprClient c /// The response(s) provided by the LLM provider. public override async Task ConverseAsync(string daprConversationComponentName, IReadOnlyList inputs, ConversationOptions? options = null, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrEmpty(daprConversationComponentName); + + //Build out the common request and gRPC options to the endpoint + var (request, grpcCallOptions) = BuildRequest(daprConversationComponentName, inputs, options, cancellationToken); + + var result = await Client.ConverseAlpha1Async(request, grpcCallOptions).ConfigureAwait(false); + var outputs = result.Outputs.Select(output => new DaprConversationResult(output.Result) + { + Parameters = output.Parameters.ToDictionary(kvp => kvp.Key, parameter => parameter.Value) + }).ToList(); + + return new DaprConversationResponse(outputs); + } + + /// + /// Sends various inputs to the large language model via the Conversational building block on the Dapr sidecar + /// and get a streamed response back. + /// + /// The name of the Dapr conversation component. + /// The input values to send. + /// Optional options used to configure the conversation. + /// Cancellation token. + /// The response provided as a stream by the LLM provider. + public override async IAsyncEnumerable ConverseAsStreamAsync( + string daprConversationComponentName, + IReadOnlyList inputs, + ConversationOptions? options = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrEmpty(daprConversationComponentName); + + EventHandler exceptionHandler = (_, ex) => throw ex; + + //Build out the common request and gRPC options to the endpoint + var (request, grpcCallOptions) = BuildRequest(daprConversationComponentName, inputs, options, cancellationToken); + + var streamResponse = Client.ConverseStreamAlpha1(request, grpcCallOptions); + using var streamProcessor = new ConversationStreamProcessor(); + try + { + streamProcessor.OnException += exceptionHandler; + await streamProcessor.ProcessStreamAsync(streamResponse, cancellationToken); + + await foreach (var content in streamProcessor.GetProcessedDataAsync(cancellationToken)) + { + yield return content; + } + } + finally + { + streamProcessor.OnException -= exceptionHandler; + } + } + + private (Autogenerated.ConversationRequest request, CallOptions grpcCallOptions) BuildRequest(string daprConversationComponentName, + IReadOnlyList inputs, ConversationOptions? options, CancellationToken cancellationToken) { var request = new Autogenerated.ConversationRequest { @@ -70,18 +129,12 @@ public override async Task ConverseAsync(string daprCo Role = input.Role.GetValueFromEnumMember() }); } - - var grpCCallOptions = + + var grpcCallOptions = DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprConversationClient).Assembly, this.DaprApiToken, cancellationToken); - var result = await Client.ConverseAlpha1Async(request, grpCCallOptions).ConfigureAwait(false); - var outputs = result.Outputs.Select(output => new DaprConversationResult(output.Result) - { - Parameters = output.Parameters.ToDictionary(kvp => kvp.Key, parameter => parameter.Value) - }).ToList(); - - return new DaprConversationResponse(outputs); + return (request, grpcCallOptions); } /// diff --git a/src/Dapr.AI/Conversation/DaprConversationUsage.cs b/src/Dapr.AI/Conversation/DaprConversationUsage.cs new file mode 100644 index 000000000..82527d4b5 --- /dev/null +++ b/src/Dapr.AI/Conversation/DaprConversationUsage.cs @@ -0,0 +1,35 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.AI.Conversation; + +/// +/// Represents token usage statistics. +/// +public record DaprConversationUsage +{ + /// + /// The number of tokens in the prompt. + /// + public int? PromptTokens { get; init; } + + /// + /// The number of tokens in the completion. + /// + public int? CompletionTokens { get; init; } + + /// + /// The total number of tokens used. + /// + public int? TotalTokens { get; init; } +} diff --git a/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto b/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto index a0a7b124a..802ced8d9 100644 --- a/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto +++ b/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto @@ -220,6 +220,9 @@ service Dapr { // Converse with a LLM service rpc ConverseAlpha1(ConversationRequest) returns (ConversationResponse) {} + + // Converse with a LLM service using streaming + rpc ConverseStreamAlpha1(ConversationRequest) returns (stream ConversationStreamResponse) {} } // InvokeServiceRequest represents the request message for Service invocation. @@ -1357,4 +1360,41 @@ message ConversationResponse { // An array of results. repeated ConversationResult outputs = 2; + + // Usage statistics if available + optional ConversationUsage usage = 3; +} + +// ConversationStreamResponse is the streaming response for Conversation. +message ConversationStreamResponse { + oneof response_type { + ConversationStreamChunk chunk = 1; + ConversationStreamComplete complete = 2; + } +} + +// ConversationStreamChunk represents a streaming content chunk. +message ConversationStreamChunk { + // Streaming content chunk + string content = 1; +} + +// ConversationStreamComplete indicates the streaming conversation has completed. +message ConversationStreamComplete { + // Final context ID + optional string contextID = 1; + // Usage statistics if available + optional ConversationUsage usage = 2; } + + + +// ConversationUsage represents token usage statistics. +message ConversationUsage { + // Number of tokens in the prompt + optional int32 prompt_tokens = 1 [json_name = "promptTokens"]; + // Number of tokens in the completion + optional int32 completion_tokens = 2 [json_name = "completionTokens"]; + // Total number of tokens used + optional int32 total_tokens = 3 [json_name = "totalTokens"]; +} \ No newline at end of file