Skip to content

Commit a610a47

Browse files
.NET: Add support for background responses to A2A agent (#2381)
* add support for baackground responses to a2a agent * fix line endings * address pr review comments * address pr review comments * update sample to net10.0 * Update dotnet/samples/GettingStarted/A2A/A2AAgent_PollingForTaskCompletion/README.md Co-authored-by: Roger Barreto <[email protected]> * Update dotnet/src/Microsoft.Agents.AI.A2A/Extensions/A2AAgentTaskExtensions.cs Co-authored-by: Roger Barreto <[email protected]> * Update dotnet/samples/GettingStarted/A2A/A2AAgent_PollingForTaskCompletion/Program.cs Co-authored-by: Roger Barreto <[email protected]> * address pr review feedback * add clarification regarding background responses --------- Co-authored-by: Roger Barreto <[email protected]>
1 parent bcbf1b3 commit a610a47

File tree

17 files changed

+1358
-55
lines changed

17 files changed

+1358
-55
lines changed

dotnet/agent-framework-dotnet.slnx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
<Folder Name="/Samples/GettingStarted/A2A/">
4141
<File Path="samples/GettingStarted/A2A/README.md" />
4242
<Project Path="samples/GettingStarted/A2A/A2AAgent_AsFunctionTools/A2AAgent_AsFunctionTools.csproj" />
43+
<Project Path="samples/GettingStarted/A2A/A2AAgent_PollingForTaskCompletion/A2AAgent_PollingForTaskCompletion.csproj" />
4344
</Folder>
4445
<Folder Name="/Samples/GettingStarted/AgentProviders/">
4546
<File Path="samples/GettingStarted/AgentProviders/README.md" />
@@ -389,4 +390,4 @@
389390
<Project Path="tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests.csproj" />
390391
<Project Path="tests/Microsoft.Agents.AI.Workflows.UnitTests/Microsoft.Agents.AI.Workflows.UnitTests.csproj" />
391392
</Folder>
392-
</Solution>
393+
</Solution>
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net10.0</TargetFramework>
6+
7+
<Nullable>enable</Nullable>
8+
<ImplicitUsings>enable</ImplicitUsings>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<PackageReference Include="A2A" />
13+
<PackageReference Include="Azure.AI.OpenAI" />
14+
<PackageReference Include="Azure.Identity" />
15+
<PackageReference Include="Microsoft.Extensions.Hosting" />
16+
<PackageReference Include="System.Net.ServerSentEvents" />
17+
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
18+
</ItemGroup>
19+
20+
<ItemGroup>
21+
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI.A2A\Microsoft.Agents.AI.A2A.csproj" />
22+
<ProjectReference Include="..\..\..\..\src\Microsoft.Agents.AI.OpenAI\Microsoft.Agents.AI.OpenAI.csproj" />
23+
</ItemGroup>
24+
25+
</Project>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
// This sample demonstrates how to poll for long-running task completion using continuation tokens with an A2A AI agent.
4+
5+
using A2A;
6+
using Microsoft.Agents.AI;
7+
8+
var a2aAgentHost = Environment.GetEnvironmentVariable("A2A_AGENT_HOST") ?? throw new InvalidOperationException("A2A_AGENT_HOST is not set.");
9+
10+
// Initialize an A2ACardResolver to get an A2A agent card.
11+
A2ACardResolver agentCardResolver = new(new Uri(a2aAgentHost));
12+
13+
// Get the agent card
14+
AgentCard agentCard = await agentCardResolver.GetAgentCardAsync();
15+
16+
// Create an instance of the AIAgent for an existing A2A agent specified by the agent card.
17+
AIAgent agent = agentCard.GetAIAgent();
18+
19+
AgentThread thread = agent.GetNewThread();
20+
21+
// Start the initial run with a long-running task.
22+
AgentRunResponse response = await agent.RunAsync("Conduct a comprehensive analysis of quantum computing applications in cryptography, including recent breakthroughs, implementation challenges, and future roadmap. Please include diagrams and visual representations to illustrate complex concepts.", thread);
23+
24+
// Poll until the response is complete.
25+
while (response.ContinuationToken is { } token)
26+
{
27+
// Wait before polling again.
28+
await Task.Delay(TimeSpan.FromSeconds(2));
29+
30+
// Continue with the token.
31+
response = await agent.RunAsync(thread, options: new AgentRunOptions { ContinuationToken = token });
32+
}
33+
34+
// Display the result
35+
Console.WriteLine(response);
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Polling for A2A Agent Task Completion
2+
3+
This sample demonstrates how to poll for long-running task completion using continuation tokens with an A2A AI agent, following the background responses pattern.
4+
5+
The sample:
6+
7+
- Connects to an A2A agent server specified in the `A2A_AGENT_HOST` environment variable
8+
- Sends a request to the agent that may take time to complete
9+
- Polls the agent at regular intervals using continuation tokens until a final response is received
10+
- Displays the final result
11+
12+
This pattern is useful when an AI model cannot complete a complex task in a single response and needs multiple rounds of processing.
13+
14+
# Prerequisites
15+
16+
Before you begin, ensure you have the following prerequisites:
17+
18+
- .NET 10.0 SDK or later
19+
- An A2A agent server running and accessible via HTTP
20+
21+
Set the following environment variable:
22+
23+
```powershell
24+
$env:A2A_AGENT_HOST="http://localhost:5000" # Replace with your A2A agent server host
25+
```

dotnet/samples/GettingStarted/A2A/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ See the README.md for each sample for the prerequisites for that sample.
1414
|Sample|Description|
1515
|---|---|
1616
|[A2A Agent As Function Tools](./A2AAgent_AsFunctionTools/)|This sample demonstrates how to represent an A2A agent as a set of function tools, where each function tool corresponds to a skill of the A2A agent, and register these function tools with another AI agent so it can leverage the A2A agent's skills.|
17+
|[A2A Agent Polling For Task Completion](./A2AAgent_PollingForTaskCompletion/)|This sample demonstrates how to poll for long-running task completion using continuation tokens with an A2A agent.|
1718

1819
## Running the samples from the console
1920

dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs

Lines changed: 159 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Linq;
6+
using System.Net.ServerSentEvents;
67
using System.Runtime.CompilerServices;
78
using System.Text.Json;
89
using System.Threading;
@@ -74,48 +75,63 @@ public override async Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> m
7475
{
7576
_ = Throw.IfNull(messages);
7677

77-
var a2aMessage = messages.ToA2AMessage();
78-
7978
thread ??= this.GetNewThread();
8079
if (thread is not A2AAgentThread typedThread)
8180
{
8281
throw new InvalidOperationException("The provided thread is not compatible with the agent. Only threads created by the agent can be used.");
8382
}
8483

85-
// Linking the message to the existing conversation, if any.
86-
a2aMessage.ContextId = typedThread.ContextId;
87-
8884
this._logger.LogA2AAgentInvokingAgent(nameof(RunAsync), this.Id, this.Name);
8985

90-
var a2aResponse = await this._a2aClient.SendMessageAsync(new MessageSendParams { Message = a2aMessage }, cancellationToken).ConfigureAwait(false);
86+
A2AResponse? a2aResponse = null;
87+
88+
if (GetContinuationToken(messages, options) is { } token)
89+
{
90+
a2aResponse = await this._a2aClient.GetTaskAsync(token.TaskId, cancellationToken).ConfigureAwait(false);
91+
}
92+
else
93+
{
94+
var a2aMessage = CreateA2AMessage(typedThread, messages);
95+
96+
a2aResponse = await this._a2aClient.SendMessageAsync(new MessageSendParams { Message = a2aMessage }, cancellationToken).ConfigureAwait(false);
97+
}
9198

9299
this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, this.Name);
93100

94101
if (a2aResponse is AgentMessage message)
95102
{
96-
UpdateThreadConversationId(typedThread, message.ContextId);
103+
UpdateThread(typedThread, message.ContextId);
97104

98105
return new AgentRunResponse
99106
{
100107
AgentId = this.Id,
101108
ResponseId = message.MessageId,
102109
RawRepresentation = message,
103110
Messages = [message.ToChatMessage()],
104-
AdditionalProperties = message.Metadata.ToAdditionalProperties(),
111+
AdditionalProperties = message.Metadata?.ToAdditionalProperties(),
105112
};
106113
}
114+
107115
if (a2aResponse is AgentTask agentTask)
108116
{
109-
UpdateThreadConversationId(typedThread, agentTask.ContextId);
117+
UpdateThread(typedThread, agentTask.ContextId, agentTask.Id);
110118

111-
return new AgentRunResponse
119+
var response = new AgentRunResponse
112120
{
113121
AgentId = this.Id,
114122
ResponseId = agentTask.Id,
115123
RawRepresentation = agentTask,
116-
Messages = agentTask.ToChatMessages(),
117-
AdditionalProperties = agentTask.Metadata.ToAdditionalProperties(),
124+
Messages = agentTask.ToChatMessages() ?? [],
125+
ContinuationToken = CreateContinuationToken(agentTask.Id, agentTask.Status.State),
126+
AdditionalProperties = agentTask.Metadata?.ToAdditionalProperties(),
118127
};
128+
129+
if (agentTask.ToChatMessages() is { Count: > 0 } taskMessages)
130+
{
131+
response.Messages = taskMessages;
132+
}
133+
134+
return response;
119135
}
120136

121137
throw new NotSupportedException($"Only Message and AgentTask responses are supported from A2A agents. Received: {a2aResponse.GetType().FullName ?? "null"}");
@@ -126,43 +142,67 @@ public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync
126142
{
127143
_ = Throw.IfNull(messages);
128144

129-
var a2aMessage = messages.ToA2AMessage();
130-
131145
thread ??= this.GetNewThread();
132146
if (thread is not A2AAgentThread typedThread)
133147
{
134148
throw new InvalidOperationException("The provided thread is not compatible with the agent. Only threads created by the agent can be used.");
135149
}
136150

137-
// Linking the message to the existing conversation, if any.
138-
a2aMessage.ContextId = typedThread.ContextId;
139-
140151
this._logger.LogA2AAgentInvokingAgent(nameof(RunStreamingAsync), this.Id, this.Name);
141152

142-
var a2aSseEvents = this._a2aClient.SendMessageStreamingAsync(new MessageSendParams { Message = a2aMessage }, cancellationToken).ConfigureAwait(false);
153+
ConfiguredCancelableAsyncEnumerable<SseItem<A2AEvent>> a2aSseEvents;
154+
155+
if (options?.ContinuationToken is not null)
156+
{
157+
// Task stream resumption is not well defined in the A2A v2.* specification, leaving it to the agent implementations.
158+
// The v3.0 specification improves this by defining task stream reconnection that allows obtaining the same stream
159+
// from the beginning, but it does not define stream resumption from a specific point in the stream.
160+
// Therefore, the code should be updated once the A2A .NET library supports the A2A v3.0 specification,
161+
// and AF has the necessary model to allow consumers to know whether they need to resume the stream and add new updates to
162+
// the existing ones or reconnect the stream and obtain all updates again.
163+
// For more details, see the following issue: https://github.com/microsoft/agent-framework/issues/1764
164+
throw new InvalidOperationException("Reconnecting to task streams using continuation tokens is not supported yet.");
165+
// a2aSseEvents = this._a2aClient.SubscribeToTaskAsync(token.TaskId, cancellationToken).ConfigureAwait(false);
166+
}
167+
168+
var a2aMessage = CreateA2AMessage(typedThread, messages);
169+
170+
a2aSseEvents = this._a2aClient.SendMessageStreamingAsync(new MessageSendParams { Message = a2aMessage }, cancellationToken).ConfigureAwait(false);
143171

144172
this._logger.LogAgentChatClientInvokedAgent(nameof(RunStreamingAsync), this.Id, this.Name);
145173

174+
string? contextId = null;
175+
string? taskId = null;
176+
146177
await foreach (var sseEvent in a2aSseEvents)
147178
{
148-
if (sseEvent.Data is not AgentMessage message)
179+
if (sseEvent.Data is AgentMessage message)
149180
{
150-
throw new NotSupportedException($"Only message responses are supported from A2A agents. Received: {sseEvent.Data?.GetType().FullName ?? "null"}");
181+
contextId = message.ContextId;
182+
183+
yield return this.ConvertToAgentResponseUpdate(message);
151184
}
185+
else if (sseEvent.Data is AgentTask task)
186+
{
187+
contextId = task.ContextId;
188+
taskId = task.Id;
152189

153-
UpdateThreadConversationId(typedThread, message.ContextId);
190+
yield return this.ConvertToAgentResponseUpdate(task);
191+
}
192+
else if (sseEvent.Data is TaskUpdateEvent taskUpdateEvent)
193+
{
194+
contextId = taskUpdateEvent.ContextId;
195+
taskId = taskUpdateEvent.TaskId;
154196

155-
yield return new AgentRunResponseUpdate
197+
yield return this.ConvertToAgentResponseUpdate(taskUpdateEvent);
198+
}
199+
else
156200
{
157-
AgentId = this.Id,
158-
ResponseId = message.MessageId,
159-
RawRepresentation = message,
160-
Role = ChatRole.Assistant,
161-
MessageId = message.MessageId,
162-
Contents = [.. message.Parts.Select(part => part.ToAIContent()).OfType<AIContent>()],
163-
AdditionalProperties = message.Metadata.ToAdditionalProperties(),
164-
};
201+
throw new NotSupportedException($"Only message, task, task update events are supported from A2A agents. Received: {sseEvent.Data.GetType().FullName ?? "null"}");
202+
}
165203
}
204+
205+
UpdateThread(typedThread, contextId, taskId);
166206
}
167207

168208
/// <inheritdoc/>
@@ -177,7 +217,7 @@ public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync
177217
/// <inheritdoc/>
178218
public override string? Description => this._description ?? base.Description;
179219

180-
private static void UpdateThreadConversationId(A2AAgentThread? thread, string? contextId)
220+
private static void UpdateThread(A2AAgentThread? thread, string? contextId, string? taskId = null)
181221
{
182222
if (thread is null)
183223
{
@@ -194,5 +234,93 @@ private static void UpdateThreadConversationId(A2AAgentThread? thread, string? c
194234

195235
// Assign a server-generated context Id to the thread if it's not already set.
196236
thread.ContextId ??= contextId;
237+
thread.TaskId = taskId;
238+
}
239+
240+
private static AgentMessage CreateA2AMessage(A2AAgentThread typedThread, IEnumerable<ChatMessage> messages)
241+
{
242+
var a2aMessage = messages.ToA2AMessage();
243+
244+
// Linking the message to the existing conversation, if any.
245+
// See: https://github.com/a2aproject/A2A/blob/main/docs/topics/life-of-a-task.md#group-related-interactions
246+
a2aMessage.ContextId = typedThread.ContextId;
247+
248+
// Link the message as a follow-up to an existing task, if any.
249+
// See: https://github.com/a2aproject/A2A/blob/main/docs/topics/life-of-a-task.md#task-refinements
250+
a2aMessage.ReferenceTaskIds = typedThread.TaskId is null ? null : [typedThread.TaskId];
251+
252+
return a2aMessage;
253+
}
254+
255+
private static A2AContinuationToken? GetContinuationToken(IEnumerable<ChatMessage> messages, AgentRunOptions? options = null)
256+
{
257+
if (options?.ContinuationToken is ResponseContinuationToken token)
258+
{
259+
if (messages.Any())
260+
{
261+
throw new InvalidOperationException("Messages are not allowed when continuing a background response using a continuation token.");
262+
}
263+
264+
return A2AContinuationToken.FromToken(token);
265+
}
266+
267+
return null;
268+
}
269+
270+
private static A2AContinuationToken? CreateContinuationToken(string taskId, TaskState state)
271+
{
272+
if (state == TaskState.Submitted || state == TaskState.Working)
273+
{
274+
return new A2AContinuationToken(taskId);
275+
}
276+
277+
return null;
278+
}
279+
280+
private AgentRunResponseUpdate ConvertToAgentResponseUpdate(AgentMessage message)
281+
{
282+
return new AgentRunResponseUpdate
283+
{
284+
AgentId = this.Id,
285+
ResponseId = message.MessageId,
286+
RawRepresentation = message,
287+
Role = ChatRole.Assistant,
288+
MessageId = message.MessageId,
289+
Contents = message.Parts.ConvertAll(part => part.ToAIContent()),
290+
AdditionalProperties = message.Metadata?.ToAdditionalProperties(),
291+
};
292+
}
293+
294+
private AgentRunResponseUpdate ConvertToAgentResponseUpdate(AgentTask task)
295+
{
296+
return new AgentRunResponseUpdate
297+
{
298+
AgentId = this.Id,
299+
ResponseId = task.Id,
300+
RawRepresentation = task,
301+
Role = ChatRole.Assistant,
302+
Contents = task.ToAIContents(),
303+
AdditionalProperties = task.Metadata?.ToAdditionalProperties(),
304+
};
305+
}
306+
307+
private AgentRunResponseUpdate ConvertToAgentResponseUpdate(TaskUpdateEvent taskUpdateEvent)
308+
{
309+
AgentRunResponseUpdate responseUpdate = new()
310+
{
311+
AgentId = this.Id,
312+
ResponseId = taskUpdateEvent.TaskId,
313+
RawRepresentation = taskUpdateEvent,
314+
Role = ChatRole.Assistant,
315+
AdditionalProperties = taskUpdateEvent.Metadata?.ToAdditionalProperties() ?? [],
316+
};
317+
318+
if (taskUpdateEvent is TaskArtifactUpdateEvent artifactUpdateEvent)
319+
{
320+
responseUpdate.Contents = artifactUpdateEvent.Artifact.ToAIContents();
321+
responseUpdate.RawRepresentation = artifactUpdateEvent;
322+
}
323+
324+
return responseUpdate;
197325
}
198326
}

0 commit comments

Comments
 (0)