Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions dotnet/src/Microsoft.Agents.AI.Workflows/AIAgentBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,27 @@
namespace Microsoft.Agents.AI.Workflows;

/// <summary>
/// Represents the workflow binding details for an AI agent, including configuration options for event emission.
/// Represents the workflow binding details for an AI agent, including configuration options for agent hosting behaviour.
/// </summary>
/// <param name="Agent">The AI agent.</param>
/// <param name="EmitEvents">Specifies whether the agent should emit events. If null, the default behavior is applied.</param>
public record AIAgentBinding(AIAgent Agent, bool EmitEvents = false)
/// <param name="Options">The options for configuring the AI agent host.
/// </param>
public record AIAgentBinding(AIAgent Agent, AIAgentHostOptions? Options = null)
: ExecutorBinding(Throw.IfNull(Agent).GetDescriptiveId(),
(_) => new(new AIAgentHostExecutor(Agent, EmitEvents)),
(_) => new(new AIAgentHostExecutor(Agent, Options ?? new())),
typeof(AIAgentHostExecutor),
Agent)
{
/// <summary>
/// Initializes a new instance of the AIAgentBinding class, associating it with the specified AI agent and
/// optionally enabling event emission.
/// </summary>
/// <param name="agent">The AI agent.</param>
/// <param name="emitEvents">Specifies whether the agent should emit events. If null, the default behavior is applied.</param>
public AIAgentBinding(AIAgent agent, bool emitEvents = false)
: this(agent, new AIAgentHostOptions { EmitAgentRunUpdateEvents = emitEvents })
{ }

/// <inheritdoc/>
public override bool IsSharedInstance => false;

Expand Down
47 changes: 47 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/AIAgentHostOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft. All rights reserved.

using Microsoft.Extensions.AI;

namespace Microsoft.Agents.AI.Workflows;

/// <summary>
/// .
/// </summary>
Comment on lines +8 to +9
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The XML documentation summary for AIAgentHostOptions is incomplete. It only contains a period "." without any descriptive text. Please provide a complete summary describing what this class represents and its purpose in configuring AI agent hosting behavior.

Suggested change
/// .
/// </summary>
/// Represents configuration options that control how AI agents are hosted, including
/// event emission and interception or forwarding of messages during agent execution.

Copilot uses AI. Check for mistakes.
public sealed class AIAgentHostOptions
{
/// <summary>
/// Gets or sets a value indicating whether agent streaming update events should be emitted during execution.
/// If <see langword="null"/>, the value will be taken from the <see cref="TurnToken"/> />
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The XML documentation contains a syntax error with an extra closing tag. The line should read: "If null, the value will be taken from the TurnToken" without the extra "/>" at the end.

Suggested change
/// If <see langword="null"/>, the value will be taken from the <see cref="TurnToken"/> />
/// If <see langword="null"/>, the value will be taken from the <see cref="TurnToken"/>.

Copilot uses AI. Check for mistakes.
/// </summary>
public bool? EmitAgentRunUpdateEvents { get; set; }

/// <summary>
/// Gets or sets a value indicating whether aggregated agent response events should be emitted during execution.
/// </summary>
public bool EmitAgentRunResponseEvents { get; set; }

/// <summary>
/// Gets or sets a value indicating whether <see cref="UserInputRequestContent"/> should be intercepted and sent
/// as a message to the workflow for handling, instead of being raised as a request.
/// </summary>
public bool InterceptUserInputRequests { get; set; }

/// <summary>
/// Gets or sets a value indicating whether <see cref="FunctionCallContent"/> without a corresponding
/// <see cref="FunctionResultContent"/> should be intercepted and sent as a message to the workflow for handling,
/// instead of being raised as a request.
/// </summary>
public bool InterceptUnterminatedFunctionCalls { get; set; }

/// <summary>
/// Gets or sets a value indicating whether other messages from other agents should be assigned to the
/// <see cref="ChatRole.User"/> role during execution.
/// </summary>
public bool ReassignOtherAgentsAsUsers { get; set; } = true;

/// <summary>
/// Gets or sets a value indicating whether incoming messages are automatically forwarded before new messages generated
/// by the agent during its turn.
/// </summary>
public bool ForwardIncomingMessages { get; set; } = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,29 @@ public static ChatMessage ToChatMessage(this AgentRunResponseUpdate update) =>
RawRepresentation = update.RawRepresentation ?? update,
};

public static ChatMessage ChatAssistantToUserIfNotFromNamed(this ChatMessage message, string agentName)
=> message.ChatAssistantToUserIfNotFromNamed(agentName, out _, false);

private static ChatMessage ChatAssistantToUserIfNotFromNamed(this ChatMessage message, string agentName, out bool changed, bool inplace = true)
{
changed = false;

if (message.Role == ChatRole.Assistant &&
message.AuthorName != agentName &&
message.Contents.All(c => c is TextContent or DataContent or UriContent or UsageContent))
{
if (!inplace)
{
message = message.Clone();
}

message.Role = ChatRole.User;
changed = true;
}

return message;
}

/// <summary>
/// Iterates through <paramref name="messages"/> looking for <see cref="ChatRole.Assistant"/> messages and swapping
/// any that have a different <see cref="ChatMessage.AuthorName"/> from <paramref name="targetAgentName"/> to
Expand All @@ -29,11 +52,9 @@ public static ChatMessage ToChatMessage(this AgentRunResponseUpdate update) =>
List<ChatMessage>? roleChanged = null;
foreach (var m in messages)
{
if (m.Role == ChatRole.Assistant &&
m.AuthorName != targetAgentName &&
m.Contents.All(c => c is TextContent or DataContent or UriContent or UsageContent))
m.ChatAssistantToUserIfNotFromNamed(targetAgentName, out bool changed);
if (changed)
{
m.Role = ChatRole.User;
(roleChanged ??= []).Add(m);
}
}
Expand Down
46 changes: 19 additions & 27 deletions dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Specialized;
Expand Down Expand Up @@ -35,38 +34,28 @@ public static Workflow BuildSequential(string workflowName, params IEnumerable<A

private static Workflow BuildSequentialCore(string? workflowName, params IEnumerable<AIAgent> agents)
{
Throw.IfNull(agents);
Throw.IfNullOrEmpty(agents);

// Create a builder that chains the agents together in sequence. The workflow simply begins
// with the first agent in the sequence.
WorkflowBuilder? builder = null;
ExecutorBinding? previous = null;
foreach (var agent in agents)

AIAgentHostOptions options = new()
{
AgentRunStreamingExecutor agentExecutor = new(agent, includeInputInOutput: true);

if (builder is null)
{
builder = new WorkflowBuilder(agentExecutor);
}
else
{
Debug.Assert(previous is not null);
builder.AddEdge(previous, agentExecutor);
}

previous = agentExecutor;
}
ReassignOtherAgentsAsUsers = true,
ForwardIncomingMessages = true,
};

List<ExecutorBinding> agentExecutors = agents.Select(agent => agent.BindAsExecutor(options)).ToList();

if (previous is null)
ExecutorBinding previous = agentExecutors[0];
WorkflowBuilder builder = new(previous);

foreach (ExecutorBinding next in agentExecutors.Skip(1))
{
Throw.ArgumentException(nameof(agents), "At least one agent must be provided to build a sequential workflow.");
builder.AddEdge(previous, next);
previous = next;
}

// Add an ending executor that batches up all messages from the last agent
// so that it's published as a single list result.
Debug.Assert(builder is not null);

OutputMessagesExecutor end = new();
builder = builder.AddEdge(previous, end).WithOutputFrom(end);
if (workflowName is not null)
Expand Down Expand Up @@ -125,9 +114,12 @@ private static Workflow BuildConcurrentCore(
// so that the final accumulator receives a single list of messages from each agent. Otherwise, the
// accumulator would not be able to determine what came from what agent, as there's currently no
// provenance tracking exposed in the workflow context passed to a handler.
ExecutorBinding[] agentExecutors = (from agent in agents select (ExecutorBinding)new AgentRunStreamingExecutor(agent, includeInputInOutput: false)).ToArray();
ExecutorBinding[] accumulators = [.. from agent in agentExecutors select (ExecutorBinding)new CollectChatMessagesExecutor($"Batcher/{agent.Id}")];

ExecutorBinding[] agentExecutors = (from agent in agents
select agent.BindAsExecutor(new AIAgentHostOptions() { ReassignOtherAgentsAsUsers = true })).ToArray();
ExecutorBinding[] accumulators = [.. from agent in agentExecutors select (ExecutorBinding)new AggregateTurnMessagesExecutor($"Batcher/{agent.Id}")];
builder.AddFanOutEdge(start, agentExecutors);

for (int i = 0; i < agentExecutors.Length; i++)
{
builder.AddEdge(agentExecutors[i], accumulators[i]);
Expand Down
56 changes: 50 additions & 6 deletions dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocolExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.AI;
Expand All @@ -18,6 +19,12 @@ public class ChatProtocolExecutorOptions
/// If set, the executor will accept string messages and convert them to chat messages with this role.
/// </summary>
public ChatRole? StringMessageChatRole { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the executor should automatically send the <see cref="TurnToken"/>
/// after returning from <see cref="ChatProtocolExecutor.TakeTurnAsync(List{ChatMessage}, IWorkflowContext, bool?, CancellationToken)"/>
/// </summary>
public bool AutoSendTurnToken { get; set; } = true;
}

/// <summary>
Expand All @@ -26,8 +33,8 @@ public class ChatProtocolExecutorOptions
/// </summary>
public abstract class ChatProtocolExecutor : StatefulExecutor<List<ChatMessage>>
{
private static readonly Func<List<ChatMessage>> s_initFunction = () => [];
private readonly ChatRole? _stringMessageChatRole;
internal static readonly Func<List<ChatMessage>> s_initFunction = () => [];
private readonly ChatProtocolExecutorOptions _options;

/// <summary>
/// Initializes a new instance of the <see cref="ChatProtocolExecutor"/> class.
Expand All @@ -38,16 +45,28 @@ public abstract class ChatProtocolExecutor : StatefulExecutor<List<ChatMessage>>
protected ChatProtocolExecutor(string id, ChatProtocolExecutorOptions? options = null, bool declareCrossRunShareable = false)
: base(id, () => [], declareCrossRunShareable: declareCrossRunShareable)
{
this._stringMessageChatRole = options?.StringMessageChatRole;
this._options = options ?? new();
}

/// <summary>
/// Gets a value indicating whether string-based messages are by this <see cref="ChatProtocolExecutor"/>.
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The XML documentation contains a grammatical error. The phrase "are by this" is missing a verb. It should read "are supported by this" or "are handled by this ChatProtocolExecutor".

Suggested change
/// Gets a value indicating whether string-based messages are by this <see cref="ChatProtocolExecutor"/>.
/// Gets a value indicating whether string-based messages are supported by this <see cref="ChatProtocolExecutor"/>.

Copilot uses AI. Check for mistakes.
/// </summary>
[MemberNotNullWhen(true, nameof(StringMessageChatRole))]
protected bool SupportsStringMessage => this.StringMessageChatRole.HasValue;

/// <inheritdoc cref="ChatProtocolExecutorOptions.StringMessageChatRole"/>
protected ChatRole? StringMessageChatRole => this._options.StringMessageChatRole;

/// <inheritdoc cref="ChatProtocolExecutorOptions.AutoSendTurnToken"/>
protected bool AutoSendTurnToken => this._options.AutoSendTurnToken;

/// <inheritdoc/>
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
if (this._stringMessageChatRole.HasValue)
if (this.SupportsStringMessage)
{
routeBuilder = routeBuilder.AddHandler<string>(
(message, context) => this.AddMessageAsync(new(this._stringMessageChatRole.Value, message), context));
(message, context) => this.AddMessageAsync(new(this.StringMessageChatRole.Value, message), context));
}

return routeBuilder.AddHandler<ChatMessage>(this.AddMessageAsync)
Expand Down Expand Up @@ -111,14 +130,39 @@ public ValueTask TakeTurnAsync(TurnToken token, IWorkflowContext context, Cancel
await this.TakeTurnAsync(maybePendingMessages ?? s_initFunction(), context, token.EmitEvents, cancellationToken)
.ConfigureAwait(false);

await context.SendMessageAsync(token, cancellationToken: cancellationToken).ConfigureAwait(false);
if (this.AutoSendTurnToken)
{
await context.SendMessageAsync(token, cancellationToken: cancellationToken).ConfigureAwait(false);
}

// Rerun the initialStateFactory to reset the state to empty list. (We could return the empty list directly,
// but this is more consistent if the initial state factory becomes more complex.)
return s_initFunction();
}
}

/// <summary>
/// Processes the current set of turn messages using the specified asynchronous processing function.
/// </summary>
/// <remarks>If the provided list of chat messages is null, an initial empty list is supplied to the
/// processing function. If the processing function returns null, an empty list is used as the result.</remarks>
/// <param name="processFunc">A delegate that asynchronously processes a list of chat messages within the given workflow context and
/// cancellation token, returning the processed list of chat messages or null.</param>
/// <param name="context">The workflow context in which the messages are processed.</param>
/// <param name="cancellationToken">A token that can be used to cancel the asynchronous operation.</param>
/// <returns>A ValueTask that represents the asynchronous operation. The result contains the processed list of chat messages,
/// or an empty list if the processing function returns null.</returns>
protected ValueTask ProcessTurnMessagesAsync(Func<List<ChatMessage>, IWorkflowContext, CancellationToken, ValueTask<List<ChatMessage>?>> processFunc, IWorkflowContext context, CancellationToken cancellationToken)
{
return this.InvokeWithStateAsync(InvokeProcessFuncAsync, context, cancellationToken: cancellationToken);

async ValueTask<List<ChatMessage>?> InvokeProcessFuncAsync(List<ChatMessage>? maybePendingMessages, IWorkflowContext context, CancellationToken cancellationToken)
{
return (await processFunc(maybePendingMessages ?? s_initFunction(), context, cancellationToken).ConfigureAwait(false))
?? s_initFunction();
}
}

/// <summary>
/// When overridden in a derived class, processes the accumulated chat messages for a single turn.
/// </summary>
Expand Down
20 changes: 14 additions & 6 deletions dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeMap.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand All @@ -12,7 +13,7 @@ internal sealed class EdgeMap
{
private readonly Dictionary<EdgeId, EdgeRunner> _edgeRunners = [];
private readonly Dictionary<EdgeId, IStatefulEdgeRunner> _statefulRunners = [];
private readonly Dictionary<string, ResponseEdgeRunner> _portEdgeRunners;
private readonly ConcurrentDictionary<string, ResponseEdgeRunner> _portEdgeRunners;

private readonly ResponseEdgeRunner _inputRunner;
private readonly IStepTracer? _stepTracer;
Expand Down Expand Up @@ -51,12 +52,16 @@ public EdgeMap(IRunnerContext runContext,
}
}

this._portEdgeRunners = workflowPorts.ToDictionary(
port => port.Id,
port => ResponseEdgeRunner.ForPort(runContext, port)
);
this._portEdgeRunners = new();
foreach (RequestPort port in workflowPorts)
{
if (!this.TryRegisterPort(runContext, port.Id, port))
{
throw new InvalidOperationException($"Duplicate port ID detected: {port.Id}");
}
}

this._inputRunner = new ResponseEdgeRunner(runContext, startExecutorId);
this._inputRunner = new ResponseEdgeRunner(runContext, startExecutorId, "");
this._stepTracer = stepTracer;
}

Expand All @@ -71,6 +76,9 @@ public EdgeMap(IRunnerContext runContext,
return edgeRunner.ChaseEdgeAsync(message, this._stepTracer);
}

public bool TryRegisterPort(IRunnerContext runContext, string executorId, RequestPort port)
=> this._portEdgeRunners.TryAdd(port.Id, ResponseEdgeRunner.ForPort(runContext, executorId, port));

public ValueTask<DeliveryMapping?> PrepareDeliveryForInputAsync(MessageEnvelope message)
{
return this._inputRunner.ChaseEdgeAsync(message, this._stepTracer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ internal interface IRunnerContext : IExternalRequestSink, ISuperStepJoinContext
ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null, CancellationToken cancellationToken = default);

ValueTask<StepContext> AdvanceAsync(CancellationToken cancellationToken = default);
IWorkflowContext Bind(string executorId, Dictionary<string, string>? traceContext = null);
IWorkflowContext BindWorkflowContext(string executorId, Dictionary<string, string>? traceContext = null);
ValueTask<Executor> EnsureExecutorAsync(string executorId, IStepTracer? tracer, CancellationToken cancellationToken = default);
}
Loading
Loading