Skip to content

Commit 97f98dd

Browse files
Add streaming support to A2A agent handler
Add HandleNewMessageStreamingAsync to A2AAgentHandler that routes StreamingResponse requests through RunStreamingAsync, enqueuing an A2A Message for each AgentResponseUpdate. Add MessageConverter.ToParts(AgentResponseUpdate) extension to convert streaming update contents to A2A Parts with unsupported-content filtering. Add CreateMessageFromUpdate to map AgentResponseUpdate to A2A Message. Add 16 new tests covering the streaming path and converter. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 424d66a commit 97f98dd

4 files changed

Lines changed: 540 additions & 0 deletions

File tree

dotnet/src/Microsoft.Agents.AI.Hosting.A2A/A2AAgentHandler.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,19 @@ public A2AAgentHandler(
4242
/// <inheritdoc/>
4343
public Task ExecuteAsync(RequestContext context, AgentEventQueue eventQueue, CancellationToken cancellationToken)
4444
{
45+
// Handle task updates
4546
if (context.IsContinuation)
4647
{
4748
return this.HandleTaskUpdateAsync(context, eventQueue, cancellationToken);
4849
}
4950

51+
// Handle messages received via streaming endpoint
52+
if (context.StreamingResponse)
53+
{
54+
return this.HandleNewMessageStreamingAsync(context, eventQueue, cancellationToken);
55+
}
56+
57+
// Handle new messages received via non-streaming endpoint
5058
return this.HandleNewMessageAsync(context, eventQueue, cancellationToken);
5159
}
5260

@@ -108,6 +116,34 @@ private async Task HandleNewMessageAsync(RequestContext context, AgentEventQueue
108116
}
109117
}
110118

119+
private async Task HandleNewMessageStreamingAsync(RequestContext context, AgentEventQueue eventQueue, CancellationToken cancellationToken)
120+
{
121+
var contextId = context.ContextId ?? Guid.NewGuid().ToString("N");
122+
var session = await this._hostAgent.GetOrCreateSessionAsync(contextId, cancellationToken).ConfigureAwait(false);
123+
124+
// AIAgent does not support resuming from arbitrary prior tasks.
125+
// Throw explicitly so the client gets a clear error rather than a response
126+
// that silently ignores the referenced task context.
127+
if (context.Message?.ReferenceTaskIds is { Count: > 0 })
128+
{
129+
throw new NotSupportedException("ReferenceTaskIds is not supported. AIAgent cannot resume from arbitrary prior task context.");
130+
}
131+
132+
List<ChatMessage> chatMessages = context.Message is not null ? [context.Message.ToChatMessage()] : [];
133+
134+
var options = context.Metadata is { Count: > 0 }
135+
? new AgentRunOptions { AdditionalProperties = context.Metadata.ToAdditionalProperties() }
136+
: null;
137+
138+
await foreach (var update in this._hostAgent.RunStreamingAsync(chatMessages, session, options, cancellationToken).ConfigureAwait(false))
139+
{
140+
var message = CreateMessageFromUpdate(contextId, update);
141+
await eventQueue.EnqueueMessageAsync(message, cancellationToken).ConfigureAwait(false);
142+
}
143+
144+
await this._hostAgent.SaveSessionAsync(contextId, session, cancellationToken).ConfigureAwait(false);
145+
}
146+
111147
private async Task HandleTaskUpdateAsync(RequestContext context, AgentEventQueue eventQueue, CancellationToken cancellationToken)
112148
{
113149
var contextId = context.ContextId ?? Guid.NewGuid().ToString("N");
@@ -174,6 +210,16 @@ private static Message CreateMessageFromResponse(string contextId, AgentResponse
174210
Metadata = response.AdditionalProperties?.ToA2AMetadata()
175211
};
176212

213+
private static Message CreateMessageFromUpdate(string contextId, AgentResponseUpdate update) =>
214+
new()
215+
{
216+
MessageId = update.ResponseId ?? Guid.NewGuid().ToString("N"),
217+
ContextId = contextId,
218+
Role = Role.Agent,
219+
Parts = update.ToParts(),
220+
Metadata = update.AdditionalProperties?.ToA2AMetadata()
221+
};
222+
177223
private static List<ChatMessage> ExtractChatMessagesFromTaskHistory(AgentTask? agentTask)
178224
{
179225
if (agentTask?.History is not { Count: > 0 })

dotnet/src/Microsoft.Agents.AI.Hosting.A2A/Converters/MessageConverter.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,26 @@ namespace Microsoft.Agents.AI.Hosting.A2A.Converters;
88

99
internal static class MessageConverter
1010
{
11+
public static List<Part> ToParts(this AgentResponseUpdate update)
12+
{
13+
if (update.Contents is not { Count: > 0 })
14+
{
15+
return [];
16+
}
17+
18+
var parts = new List<Part>();
19+
foreach (var content in update.Contents)
20+
{
21+
var part = content.ToPart();
22+
if (part is not null)
23+
{
24+
parts.Add(part);
25+
}
26+
}
27+
28+
return parts;
29+
}
30+
1131
public static List<Part> ToParts(this IList<ChatMessage> chatMessages)
1232
{
1333
if (chatMessages is null || chatMessages.Count == 0)

0 commit comments

Comments
 (0)