Skip to content

Commit ea2935e

Browse files
committed
Fix setting conversationId
1 parent 171392a commit ea2935e

File tree

5 files changed

+23
-18
lines changed

5 files changed

+23
-18
lines changed

src/api/Elastic.Documentation.Api.Core/AskAi/AskAiUsecase.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
using System.Diagnostics;
66
using System.Text.Json;
7-
using Elastic.Documentation.Api.Core;
87
using Microsoft.Extensions.Logging;
98

109
namespace Elastic.Documentation.Api.Core.AskAi;
@@ -23,6 +22,8 @@ public async Task<Stream> AskAi(AskAiRequest askAiRequest, Cancel ctx)
2322
_ = activity?.SetTag("gen_ai.operation.name", "chat");
2423
_ = activity?.SetTag("gen_ai.provider.name", streamTransformer.AgentProvider); // agent-builder or llm-gateway
2524
_ = activity?.SetTag("gen_ai.agent.id", streamTransformer.AgentId); // docs-agent or docs_assistant
25+
if (askAiRequest.ThreadId is not null)
26+
_ = activity?.SetTag("gen_ai.conversation.id", askAiRequest.ThreadId);
2627
var inputMessages = new[]
2728
{
2829
new InputMessage("user", [new MessagePart("text", askAiRequest.Message)])
@@ -33,7 +34,7 @@ public async Task<Stream> AskAi(AskAiRequest askAiRequest, Cancel ctx)
3334
logger.LogInformation("Streaming AskAI response");
3435
var rawStream = await askAiGateway.AskAi(askAiRequest, ctx);
3536
// The stream transformer will handle disposing the activity when streaming completes
36-
var transformedStream = await streamTransformer.TransformAsync(rawStream, activity, ctx);
37+
var transformedStream = await streamTransformer.TransformAsync(rawStream, askAiRequest.ThreadId, activity, ctx);
3738
return transformedStream;
3839
}
3940
}

src/api/Elastic.Documentation.Api.Core/AskAi/IStreamTransformer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ public interface IStreamTransformer
2323
/// Transforms a raw SSE stream into a stream of AskAiEvent objects
2424
/// </summary>
2525
/// <param name="rawStream">Raw SSE stream from gateway (Agent Builder, LLM Gateway, etc.)</param>
26+
/// <param name="threadId">Thread/conversation ID (if known)</param>
2627
/// <param name="parentActivity">Parent activity to track the streaming operation (will be disposed when stream completes)</param>
2728
/// <param name="cancellationToken">Cancellation token</param>
2829
/// <returns>Stream containing SSE-formatted AskAiEvent objects</returns>
29-
Task<Stream> TransformAsync(Stream rawStream, System.Diagnostics.Activity? parentActivity, CancellationToken cancellationToken = default);
30+
Task<Stream> TransformAsync(Stream rawStream, string? threadId, System.Diagnostics.Activity? parentActivity, CancellationToken cancellationToken = default);
3031
}

src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/AgentBuilderStreamTransformer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class AgentBuilderStreamTransformer(ILogger<AgentBuilderStreamTransformer
1818
{
1919
protected override string GetAgentId() => AgentBuilderAskAiGateway.ModelName;
2020
protected override string GetAgentProvider() => AgentBuilderAskAiGateway.ProviderName;
21-
protected override AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json)
21+
protected override AskAiEvent? TransformJsonEvent(string? threadId, string? eventType, JsonElement json)
2222
{
2323
var type = eventType ?? "message";
2424
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/LlmGatewayStreamTransformer.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class LlmGatewayStreamTransformer(ILogger<LlmGatewayStreamTransformer> lo
1818
{
1919
protected override string GetAgentId() => LlmGatewayAskAiGateway.ModelName;
2020
protected override string GetAgentProvider() => LlmGatewayAskAiGateway.ProviderName;
21-
protected override AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json)
21+
protected override AskAiEvent? TransformJsonEvent(string? threadId, string? eventType, JsonElement json)
2222
{
2323
// LLM Gateway format: ["custom", {type: "...", ...}]
2424
if (json.ValueKind != JsonValueKind.Array || json.GetArrayLength() < 2)
@@ -34,12 +34,13 @@ public class LlmGatewayStreamTransformer(ILogger<LlmGatewayStreamTransformer> lo
3434
var id = message.GetProperty("id").GetString()!;
3535
var messageData = message.GetProperty("data");
3636

37+
// LLM gateway does not emit conversation start events with thread IDs
38+
// so we create a synthetic conversation start event here
39+
if (threadId is null)
40+
return new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString());
41+
3742
return type switch
3843
{
39-
"agent_start" =>
40-
// LLM Gateway doesn't provide conversation ID, so generate one
41-
new AskAiEvent.ConversationStart(id, timestamp, Guid.NewGuid().ToString()),
42-
4344
"ai_message_chunk" when messageData.TryGetProperty("content", out var content) =>
4445
new AskAiEvent.MessageChunk(id, timestamp, content.GetString()!),
4546

src/api/Elastic.Documentation.Api.Infrastructure/Adapters/AskAi/StreamTransformerBase.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public abstract class StreamTransformerBase(ILogger logger) : IStreamTransformer
5151
/// </summary>
5252
public string AgentProvider => GetAgentProvider();
5353

54-
public Task<Stream> TransformAsync(Stream rawStream, Activity? parentActivity, Cancel cancellationToken = default)
54+
public Task<Stream> TransformAsync(Stream rawStream, string? threadId, Activity? parentActivity, Cancel cancellationToken = default)
5555
{
5656
// Configure pipe for low-latency streaming
5757
var pipeOptions = new PipeOptions(
@@ -70,7 +70,7 @@ public Task<Stream> TransformAsync(Stream rawStream, Activity? parentActivity, C
7070
// Note: We intentionally don't await this task as we need to return the stream immediately
7171
// The pipe handles synchronization and backpressure between producer and consumer
7272
// Pass parent activity - it will be disposed when streaming completes
73-
_ = ProcessPipeAsync(reader, pipe.Writer, parentActivity, cancellationToken);
73+
_ = ProcessPipeAsync(reader, pipe.Writer, threadId, parentActivity, cancellationToken);
7474

7575
// Return the read side of the pipe as a stream
7676
return Task.FromResult(pipe.Reader.AsStream());
@@ -80,13 +80,13 @@ public Task<Stream> TransformAsync(Stream rawStream, Activity? parentActivity, C
8080
/// Process the pipe reader and write transformed events to the pipe writer.
8181
/// This runs concurrently with the consumer reading from the output stream.
8282
/// </summary>
83-
private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, CancellationToken cancellationToken)
83+
private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, string? threadId, Activity? parentActivity, CancellationToken cancellationToken)
8484
{
8585
try
8686
{
8787
try
8888
{
89-
await ProcessStreamAsync(reader, writer, parentActivity, cancellationToken);
89+
await ProcessStreamAsync(reader, writer, threadId, parentActivity, cancellationToken);
9090
}
9191
catch (OperationCanceledException ex)
9292
{
@@ -131,7 +131,7 @@ private async Task ProcessPipeAsync(PipeReader reader, PipeWriter writer, Activi
131131
/// Default implementation parses SSE events and JSON, then calls TransformJsonEvent.
132132
/// </summary>
133133
/// <returns>Stream processing result with metrics and captured output</returns>
134-
private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Activity? parentActivity, CancellationToken cancellationToken)
134+
private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, string? threadId, Activity? parentActivity, CancellationToken cancellationToken)
135135
{
136136
using var activity = StreamTransformerActivitySource.StartActivity(nameof(ProcessStreamAsync));
137137

@@ -141,7 +141,6 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti
141141
List<MessagePart> outputMessageParts = [];
142142
await foreach (var sseEvent in ParseSseEventsAsync(reader, cancellationToken))
143143
{
144-
using var parseActivity = StreamTransformerActivitySource.StartActivity("AskAI Event");
145144
AskAiEvent? transformedEvent;
146145
try
147146
{
@@ -150,7 +149,7 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti
150149
var root = doc.RootElement;
151150

152151
// Subclass transforms JsonElement to AskAiEvent
153-
transformedEvent = TransformJsonEvent(sseEvent.EventType, root);
152+
transformedEvent = TransformJsonEvent(threadId, sseEvent.EventType, root);
154153
}
155154
catch (JsonException ex)
156155
{
@@ -166,9 +165,11 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti
166165
continue;
167166
}
168167

168+
using var parseActivity = StreamTransformerActivitySource.StartActivity("AskAI Event");
169+
169170
// Set event type tag on parse_event activity
170171
_ = parseActivity?.SetTag("ask_ai.event.type", transformedEvent.GetType().Name);
171-
_ = parseActivity?.SetTag("gen_ai.respone.id", transformedEvent.Id);
172+
_ = parseActivity?.SetTag("gen_ai.response.id", transformedEvent.Id);
172173

173174
switch (transformedEvent)
174175
{
@@ -247,10 +248,11 @@ private async Task ProcessStreamAsync(PipeReader reader, PipeWriter writer, Acti
247248
/// Transform a parsed JSON event into an AskAiEvent.
248249
/// Subclasses implement provider-specific transformation logic.
249250
/// </summary>
251+
/// <param name="threadId">The conversation/thread ID, if available</param>
250252
/// <param name="eventType">The SSE event type (from "event:" field), or null if not present</param>
251253
/// <param name="json">The parsed JSON data from the "data:" field</param>
252254
/// <returns>The transformed AskAiEvent, or null to skip this event</returns>
253-
protected abstract AskAiEvent? TransformJsonEvent(string? eventType, JsonElement json);
255+
protected abstract AskAiEvent? TransformJsonEvent(string? threadId, string? eventType, JsonElement json);
254256

255257
/// <summary>
256258
/// Write a transformed event to the output stream

0 commit comments

Comments
 (0)