diff --git a/src/Dapr.AI/Conversation/ConversationStreamProcessor.cs b/src/Dapr.AI/Conversation/ConversationStreamProcessor.cs
new file mode 100644
index 000000000..00b13e720
--- /dev/null
+++ b/src/Dapr.AI/Conversation/ConversationStreamProcessor.cs
@@ -0,0 +1,101 @@
+// ------------------------------------------------------------------------
+// Copyright 2025 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+using System.Runtime.CompilerServices;
+using System.Threading.Channels;
+using Grpc.Core;
+using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
+
+namespace Dapr.AI.Conversation;
+
+///
+/// Provides the implementation to process the streamed response from a conversation endpoint invocation.
+///
+internal sealed class ConversationStreamProcessor : IDisposable
+{
+ private bool disposed;
+ private readonly Channel outputChannel = Channel.CreateUnbounded();
+
+ ///
+ /// Surfaces any exceptions encountered while asynchronously processing the outbound stream.
+ ///
+ internal event EventHandler? OnException;
+
+ ///
+ /// Reads the chunks out asynchronously from the streaming source into the channel.
+ ///
+ /// The call made to the Dapr sidecar to process the response from.
+ /// Token used to cancel the ongoing request.
+ public Task ProcessStreamAsync(
+ AsyncServerStreamingCall call,
+ CancellationToken cancellationToken)
+ {
+ // Start reading from the gRPC call and writing to the output channel.
+ _ = Task.Run(async () =>
+ {
+ try
+ {
+ await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken))
+ {
+ await outputChannel.Writer.WriteAsync(response.Chunk.Content, cancellationToken);
+ }
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ // Expected cancellation exception
+ }
+ catch (Exception ex)
+ {
+ OnException?.Invoke(this, ex);
+ }
+ finally
+ {
+ outputChannel.Writer.Complete();
+ }
+ }, cancellationToken);
+ return Task.CompletedTask;
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ private void Dispose(bool disposing)
+ {
+ if (!disposed)
+ {
+ if (disposing)
+ {
+ outputChannel.Writer.TryComplete();
+ }
+
+ disposed = true;
+ }
+ }
+
+ ///
+ /// Retrieves the processed content from the operation from the Dapr sidecar and returns as an
+ /// enumerable stream.
+ ///
+ /// Cancellation token.
+ ///
+ public async IAsyncEnumerable GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken)
+ {
+ await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken))
+ {
+ yield return data;
+ }
+ }
+}
diff --git a/src/Dapr.AI/Conversation/DaprConversationClient.cs b/src/Dapr.AI/Conversation/DaprConversationClient.cs
index b081427b0..07e26b5fc 100644
--- a/src/Dapr.AI/Conversation/DaprConversationClient.cs
+++ b/src/Dapr.AI/Conversation/DaprConversationClient.cs
@@ -80,4 +80,19 @@ protected DaprConversationClient(Autogenerated.DaprClient client,
public abstract Task ConverseAsync(string daprConversationComponentName,
IReadOnlyList inputs, ConversationOptions? options = null,
CancellationToken cancellationToken = default);
+
+ ///
+ /// Sends various inputs to the large language model via the Conversational building block on the Dapr sidecar
+ /// and get a streamed response back.
+ ///
+ /// The name of the Dapr conversation component.
+ /// The input values to send.
+ /// Optional options used to configure the conversation.
+ /// Cancellation token.
+ /// The response provided as a stream by the LLM provider.
+ public abstract IAsyncEnumerable ConverseAsStreamAsync(
+ string daprConversationComponentName,
+ IReadOnlyList inputs,
+ ConversationOptions? options = null,
+ CancellationToken cancellationToken = default);
}
diff --git a/src/Dapr.AI/Conversation/DaprConversationGrpcClient.cs b/src/Dapr.AI/Conversation/DaprConversationGrpcClient.cs
index 6a1a5f438..fefd4a3a9 100644
--- a/src/Dapr.AI/Conversation/DaprConversationGrpcClient.cs
+++ b/src/Dapr.AI/Conversation/DaprConversationGrpcClient.cs
@@ -11,8 +11,10 @@
// limitations under the License.
// ------------------------------------------------------------------------
+using System.Runtime.CompilerServices;
using Dapr.Common;
using Dapr.Common.Extensions;
+using Grpc.Core;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
namespace Dapr.AI.Conversation;
@@ -35,6 +37,63 @@ internal sealed class DaprConversationGrpcClient(Autogenerated.Dapr.DaprClient c
/// The response(s) provided by the LLM provider.
public override async Task ConverseAsync(string daprConversationComponentName, IReadOnlyList inputs, ConversationOptions? options = null,
CancellationToken cancellationToken = default)
+ {
+ ArgumentException.ThrowIfNullOrEmpty(daprConversationComponentName);
+
+ //Build out the common request and gRPC options to the endpoint
+ var (request, grpcCallOptions) = BuildRequest(daprConversationComponentName, inputs, options, cancellationToken);
+
+ var result = await Client.ConverseAlpha1Async(request, grpcCallOptions).ConfigureAwait(false);
+ var outputs = result.Outputs.Select(output => new DaprConversationResult(output.Result)
+ {
+ Parameters = output.Parameters.ToDictionary(kvp => kvp.Key, parameter => parameter.Value)
+ }).ToList();
+
+ return new DaprConversationResponse(outputs);
+ }
+
+ ///
+ /// Sends various inputs to the large language model via the Conversational building block on the Dapr sidecar
+ /// and get a streamed response back.
+ ///
+ /// The name of the Dapr conversation component.
+ /// The input values to send.
+ /// Optional options used to configure the conversation.
+ /// Cancellation token.
+ /// The response provided as a stream by the LLM provider.
+ public override async IAsyncEnumerable ConverseAsStreamAsync(
+ string daprConversationComponentName,
+ IReadOnlyList inputs,
+ ConversationOptions? options = null,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentException.ThrowIfNullOrEmpty(daprConversationComponentName);
+
+ EventHandler exceptionHandler = (_, ex) => throw ex;
+
+ //Build out the common request and gRPC options to the endpoint
+ var (request, grpcCallOptions) = BuildRequest(daprConversationComponentName, inputs, options, cancellationToken);
+
+ var streamResponse = Client.ConverseStreamAlpha1(request, grpcCallOptions);
+ using var streamProcessor = new ConversationStreamProcessor();
+ try
+ {
+ streamProcessor.OnException += exceptionHandler;
+ await streamProcessor.ProcessStreamAsync(streamResponse, cancellationToken);
+
+ await foreach (var content in streamProcessor.GetProcessedDataAsync(cancellationToken))
+ {
+ yield return content;
+ }
+ }
+ finally
+ {
+ streamProcessor.OnException -= exceptionHandler;
+ }
+ }
+
+ private (Autogenerated.ConversationRequest request, CallOptions grpcCallOptions) BuildRequest(string daprConversationComponentName,
+ IReadOnlyList inputs, ConversationOptions? options, CancellationToken cancellationToken)
{
var request = new Autogenerated.ConversationRequest
{
@@ -70,18 +129,12 @@ public override async Task ConverseAsync(string daprCo
Role = input.Role.GetValueFromEnumMember()
});
}
-
- var grpCCallOptions =
+
+ var grpcCallOptions =
DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprConversationClient).Assembly, this.DaprApiToken,
cancellationToken);
- var result = await Client.ConverseAlpha1Async(request, grpCCallOptions).ConfigureAwait(false);
- var outputs = result.Outputs.Select(output => new DaprConversationResult(output.Result)
- {
- Parameters = output.Parameters.ToDictionary(kvp => kvp.Key, parameter => parameter.Value)
- }).ToList();
-
- return new DaprConversationResponse(outputs);
+ return (request, grpcCallOptions);
}
///
diff --git a/src/Dapr.AI/Conversation/DaprConversationUsage.cs b/src/Dapr.AI/Conversation/DaprConversationUsage.cs
new file mode 100644
index 000000000..82527d4b5
--- /dev/null
+++ b/src/Dapr.AI/Conversation/DaprConversationUsage.cs
@@ -0,0 +1,35 @@
+// ------------------------------------------------------------------------
+// Copyright 2025 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+namespace Dapr.AI.Conversation;
+
+///
+/// Represents token usage statistics.
+///
+public record DaprConversationUsage
+{
+ ///
+ /// The number of tokens in the prompt.
+ ///
+ public int? PromptTokens { get; init; }
+
+ ///
+ /// The number of tokens in the completion.
+ ///
+ public int? CompletionTokens { get; init; }
+
+ ///
+ /// The total number of tokens used.
+ ///
+ public int? TotalTokens { get; init; }
+}
diff --git a/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto b/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto
index a0a7b124a..802ced8d9 100644
--- a/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto
+++ b/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto
@@ -220,6 +220,9 @@ service Dapr {
// Converse with a LLM service
rpc ConverseAlpha1(ConversationRequest) returns (ConversationResponse) {}
+
+ // Converse with a LLM service using streaming
+ rpc ConverseStreamAlpha1(ConversationRequest) returns (stream ConversationStreamResponse) {}
}
// InvokeServiceRequest represents the request message for Service invocation.
@@ -1357,4 +1360,41 @@ message ConversationResponse {
// An array of results.
repeated ConversationResult outputs = 2;
+
+ // Usage statistics if available
+ optional ConversationUsage usage = 3;
+}
+
+// ConversationStreamResponse is the streaming response for Conversation.
+message ConversationStreamResponse {
+ oneof response_type {
+ ConversationStreamChunk chunk = 1;
+ ConversationStreamComplete complete = 2;
+ }
+}
+
+// ConversationStreamChunk represents a streaming content chunk.
+message ConversationStreamChunk {
+ // Streaming content chunk
+ string content = 1;
+}
+
+// ConversationStreamComplete indicates the streaming conversation has completed.
+message ConversationStreamComplete {
+ // Final context ID
+ optional string contextID = 1;
+ // Usage statistics if available
+ optional ConversationUsage usage = 2;
}
+
+
+
+// ConversationUsage represents token usage statistics.
+message ConversationUsage {
+ // Number of tokens in the prompt
+ optional int32 prompt_tokens = 1 [json_name = "promptTokens"];
+ // Number of tokens in the completion
+ optional int32 completion_tokens = 2 [json_name = "completionTokens"];
+ // Total number of tokens used
+ optional int32 total_tokens = 3 [json_name = "totalTokens"];
+}
\ No newline at end of file