Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private static async Task Main()
CheckpointInfo savedCheckpoint = checkpoints[CheckpointIndex];

await using Checkpointed<StreamingRun> newCheckpointedRun =
await InProcessExecution.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager, checkpointedRun.Run.RunId);
await InProcessExecution.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager);

await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
{
Expand Down
15 changes: 12 additions & 3 deletions dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ public static class ChatProtocolExtensions
/// Determines whether the specified protocol descriptor represents the Agent Workflow Chat Protocol.
/// </summary>
/// <param name="descriptor">The protocol descriptor to evaluate.</param>
/// <param name="allowCatchAll">If <see langword="true"/>, will allow protocols handling all inputs to be treated
/// as a Chat Protocol</param>
/// <returns><see langword="true"/> if the protocol descriptor represents a supported chat protocol; otherwise, <see
/// langword="false"/>.</returns>
public static bool IsChatProtocol(this ProtocolDescriptor descriptor)
public static bool IsChatProtocol(this ProtocolDescriptor descriptor, bool allowCatchAll = false)
{
bool foundListChatMessageInput = false;
bool foundTurnTokenInput = false;

if (allowCatchAll && descriptor.AcceptsAll)
{
return true;
}

// We require that the workflow be a ChatProtocol; right now that is defined as accepting at
// least List<ChatMessage> as input (pending polymorphism/interface-input support), as well as
// TurnToken. Since output is mediated by events, which we forward, we don't need to validate
Expand All @@ -50,9 +57,11 @@ public static bool IsChatProtocol(this ProtocolDescriptor descriptor)
/// Throws an exception if the specified protocol descriptor does not represent a valid chat protocol.
/// </summary>
/// <param name="descriptor">The protocol descriptor to validate as a chat protocol. Cannot be null.</param>
public static void ThrowIfNotChatProtocol(this ProtocolDescriptor descriptor)
/// <param name="allowCatchAll">If <see langword="true"/>, will allow protocols handling all inputs to be treated
/// as a Chat Protocol</param>
public static void ThrowIfNotChatProtocol(this ProtocolDescriptor descriptor, bool allowCatchAll = false)
{
if (!descriptor.IsChatProtocol())
if (!descriptor.IsChatProtocol(allowCatchAll))
{
throw new InvalidOperationException("Workflow does not support ChatProtocol: At least List<ChatMessage>" +
" and TurnToken must be supported as input.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,20 @@ public abstract class ChatProtocolExecutor : StatefulExecutor<List<ChatMessage>>
private static readonly Func<List<ChatMessage>> s_initFunction = () => [];
private readonly ChatRole? _stringMessageChatRole;

private static readonly StatefulExecutorOptions s_baseExecutorOptions = new()
{
AutoSendMessageHandlerResultObject = false,
AutoYieldOutputHandlerResultObject = false
};

/// <summary>
/// Initializes a new instance of the <see cref="ChatProtocolExecutor"/> class.
/// </summary>
/// <param name="id">The unique identifier for this executor instance. Cannot be null or empty.</param>
/// <param name="options">Optional configuration settings for the executor. If null, default options are used.</param>
/// <param name="declareCrossRunShareable">Declare that this executor may be used simultaneously by multiple runs safely.</param>
protected ChatProtocolExecutor(string id, ChatProtocolExecutorOptions? options = null, bool declareCrossRunShareable = false)
: base(id, () => [], declareCrossRunShareable: declareCrossRunShareable)
: base(id, () => [], s_baseExecutorOptions, declareCrossRunShareable)
{
this._stringMessageChatRole = options?.StringMessageChatRole;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ internal interface ISuperStepJoinContext

ValueTask ForwardWorkflowEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);
ValueTask SendMessageAsync<TMessage>(string senderId, [DisallowNull] TMessage message, CancellationToken cancellationToken = default);
ValueTask YieldOutputAsync<TOutput>(string senderId, [DisallowNull] TOutput output, CancellationToken cancellationToken = default);

ValueTask<string> AttachSuperstepAsync(ISuperStepRunner superStepRunner, CancellationToken cancellationToken = default);
ValueTask<bool> DetachSuperstepAsync(string id);
Expand Down
2 changes: 1 addition & 1 deletion dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public ProtocolDescriptor DescribeProtocol()
// TODO: Once burden of annotating yield/output messages becomes easier for the non-Auto case,
// we should (1) start checking for validity on output/send side, and (2) add the Yield/Send
// types to the ProtocolDescriptor.
return new(this.InputTypes);
return new(this.InputTypes, this.Router.HasCatchAll);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,9 @@ public interface IWorkflowExecutionEnvironment
/// <param name="workflow">The workflow to be executed. Must not be <c>null</c>.</param>
/// <param name="fromCheckpoint">The <see cref="CheckpointInfo"/> corresponding to the checkpoint from which to resume.</param>
/// <param name="checkpointManager">The <see cref="CheckpointManager"/> to use with this run.</param>
/// <param name="runId">An optional unique identifier for the run. If not provided, a new identifier will be generated.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A <see cref="StreamingRun"/> that provides access to the results of the streaming run.</returns>
ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default);
ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a non-streaming execution of the workflow with the specified input.
Expand Down Expand Up @@ -117,9 +116,8 @@ public interface IWorkflowExecutionEnvironment
/// <param name="workflow">The workflow to be executed. Must not be <c>null</c>.</param>
/// <param name="fromCheckpoint">The <see cref="CheckpointInfo"/> corresponding to the checkpoint from which to resume.</param>
/// <param name="checkpointManager">The <see cref="CheckpointManager"/> to use with this run.</param>
/// <param name="runId">An optional unique identifier for the run. If not provided, a new identifier will be generated.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A <see cref="ValueTask{Run}"/> that represents the asynchronous operation. The result contains a <see
/// cref="Run"/> for managing and interacting with the streaming run.</returns>
ValueTask<Checkpointed<Run>> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default);
ValueTask<Checkpointed<Run>> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ internal ValueTask<AsyncRunHandle> BeginRunAsync(Workflow workflow, ICheckpointM
return runner.BeginStreamAsync(this.ExecutionMode, cancellationToken);
}

internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, ICheckpointManager? checkpointManager, string? runId, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken)
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, ICheckpointManager? checkpointManager, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken)
{
InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, checkpointManager, runId, this.EnableConcurrentRuns, knownValidInputTypes);
InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, checkpointManager, fromCheckpoint.RunId, this.EnableConcurrentRuns, knownValidInputTypes);
return runner.ResumeStreamAsync(this.ExecutionMode, fromCheckpoint, cancellationToken);
}

Expand Down Expand Up @@ -95,10 +95,9 @@ public async ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(
Workflow workflow,
CheckpointInfo fromCheckpoint,
CheckpointManager checkpointManager,
string? runId = null,
CancellationToken cancellationToken = default)
{
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, runId: runId, fromCheckpoint, [], cancellationToken)
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, fromCheckpoint, [], cancellationToken)
.ConfigureAwait(false);

return await runHandle.WithCheckpointingAsync<StreamingRun>(() => new(new StreamingRun(runHandle)))
Expand Down Expand Up @@ -172,10 +171,9 @@ public async ValueTask<Checkpointed<Run>> ResumeAsync(
Workflow workflow,
CheckpointInfo fromCheckpoint,
CheckpointManager checkpointManager,
string? runId = null,
CancellationToken cancellationToken = default)
{
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, runId: runId, fromCheckpoint, [], cancellationToken)
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, fromCheckpoint, [], cancellationToken)
.ConfigureAwait(false);

return await runHandle.WithCheckpointingAsync<Run>(() => new(new Run(runHandle)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ internal sealed class InProcessRunnerContext : IRunnerContext
private int _runEnded;
private readonly string _runId;
private readonly Workflow _workflow;
private readonly object? _previousOwnership;
private bool _ownsWorkflow;

private readonly EdgeMap _edgeMap;
private readonly OutputFilter _outputFilter;
Expand Down Expand Up @@ -54,7 +56,10 @@ public InProcessRunnerContext(
else
{
workflow.TakeOwnership(this, existingOwnershipSignoff: existingOwnershipSignoff);
this._previousOwnership = existingOwnershipSignoff;
this._ownsWorkflow = true;
}

this._workflow = workflow;
this._runId = runId;

Expand Down Expand Up @@ -211,10 +216,27 @@ await this._edgeMap.PrepareDeliveryForEdgeAsync(edge, envelope)
}
}

private async ValueTask YieldOutputAsync(string sourceId, object output, CancellationToken cancellationToken = default)
{
this.CheckEnded();
Throw.IfNull(output);

Executor sourceExecutor = await this.EnsureExecutorAsync(sourceId, tracer: null, cancellationToken).ConfigureAwait(false);
if (!sourceExecutor.CanOutput(output.GetType()))
{
throw new InvalidOperationException($"Cannot output object of type {output.GetType().Name}. Expecting one of [{string.Join(", ", sourceExecutor.OutputTypes)}].");
}

if (this._outputFilter.CanOutput(sourceId, output))
{
await this.AddEventAsync(new WorkflowOutputEvent(output, sourceId), cancellationToken).ConfigureAwait(false);
}
}

public IWorkflowContext Bind(string executorId, Dictionary<string, string>? traceContext = null)
{
this.CheckEnded();
return new BoundContext(this, executorId, this._outputFilter, traceContext);
return new BoundContext(this, executorId, traceContext);
}

public ValueTask PostAsync(ExternalRequest request)
Expand All @@ -241,7 +263,6 @@ public bool CompleteRequest(string requestId)
private sealed class BoundContext(
InProcessRunnerContext RunnerContext,
string ExecutorId,
OutputFilter outputFilter,
Dictionary<string, string>? traceContext) : IWorkflowContext
{
public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default) => RunnerContext.AddEventAsync(workflowEvent, cancellationToken);
Expand All @@ -251,21 +272,9 @@ public ValueTask SendMessageAsync(object message, string? targetId = null, Cance
return RunnerContext.SendMessageAsync(ExecutorId, message, targetId, cancellationToken);
}

public async ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default)
public ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default)
{
RunnerContext.CheckEnded();
Throw.IfNull(output);

Executor sourceExecutor = await RunnerContext.EnsureExecutorAsync(ExecutorId, tracer: null, cancellationToken).ConfigureAwait(false);
if (!sourceExecutor.CanOutput(output.GetType()))
{
throw new InvalidOperationException($"Cannot output object of type {output.GetType().Name}. Expecting one of [{string.Join(", ", sourceExecutor.OutputTypes)}].");
}

if (outputFilter.CanOutput(ExecutorId, output))
{
await this.AddEventAsync(new WorkflowOutputEvent(output, ExecutorId), cancellationToken).ConfigureAwait(false);
}
return RunnerContext.YieldOutputAsync(ExecutorId, output, cancellationToken);
}

public ValueTask RequestHaltAsync() => this.AddEventAsync(new RequestHaltEvent());
Expand Down Expand Up @@ -389,7 +398,9 @@ public async ValueTask EndRunAsync()
{
foreach (string executorId in this._executors.Keys)
{
Task<Executor> executor = this._executors[executorId];
Task<Executor> executorTask = this._executors[executorId];
Executor executor = await executorTask.ConfigureAwait(false);

if (executor is IAsyncDisposable asyncDisposable)
{
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
Expand All @@ -400,9 +411,10 @@ public async ValueTask EndRunAsync()
}
}

if (!this.ConcurrentRunsEnabled)
if (this._ownsWorkflow)
{
await this._workflow.ReleaseOwnershipAsync(this).ConfigureAwait(false);
await this._workflow.ReleaseOwnershipAsync(this, this._previousOwnership).ConfigureAwait(false);
this._ownsWorkflow = false;
}
}
}
Expand All @@ -429,4 +441,7 @@ ValueTask ISuperStepJoinContext.ForwardWorkflowEventAsync(WorkflowEvent workflow

ValueTask ISuperStepJoinContext.SendMessageAsync<TMessage>(string senderId, [DisallowNull] TMessage message, CancellationToken cancellationToken)
=> this.SendMessageAsync(senderId, Throw.IfNull(message), cancellationToken: cancellationToken);

ValueTask ISuperStepJoinContext.YieldOutputAsync<TOutput>(string senderId, [DisallowNull] TOutput output, CancellationToken cancellationToken)
=> this.YieldOutputAsync(senderId, Throw.IfNull(output), cancellationToken);
}
12 changes: 6 additions & 6 deletions dotnet/src/Microsoft.Agents.AI.Workflows/InProcessExecution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public static ValueTask<Checkpointed<StreamingRun>> StreamAsync(Workflow workflo
public static ValueTask<Checkpointed<StreamingRun>> StreamAsync<TInput>(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
=> Default.StreamAsync(workflow, input, checkpointManager, runId, cancellationToken);

/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeStreamAsync(Workflow, CheckpointInfo, CheckpointManager, string?, CancellationToken)"/>
public static ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default)
=> Default.ResumeStreamAsync(workflow, fromCheckpoint, checkpointManager, runId, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeStreamAsync(Workflow, CheckpointInfo, CheckpointManager, CancellationToken)"/>
public static ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default)
=> Default.ResumeStreamAsync(workflow, fromCheckpoint, checkpointManager, cancellationToken);

/// <inheritdoc cref="IWorkflowExecutionEnvironment.RunAsync{TInput}(Workflow, TInput, string?, CancellationToken)"/>
public static ValueTask<Run> RunAsync<TInput>(Workflow workflow, TInput input, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
Expand All @@ -69,7 +69,7 @@ public static ValueTask<Run> RunAsync<TInput>(Workflow workflow, TInput input, s
public static ValueTask<Checkpointed<Run>> RunAsync<TInput>(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
=> Default.RunAsync(workflow, input, checkpointManager, runId, cancellationToken);

/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeAsync(Workflow, CheckpointInfo, CheckpointManager, string?, CancellationToken)"/>
public static ValueTask<Checkpointed<Run>> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default)
=> Default.ResumeAsync(workflow, fromCheckpoint, checkpointManager, runId, cancellationToken);
/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeAsync(Workflow, CheckpointInfo, CheckpointManager, CancellationToken)"/>
public static ValueTask<Checkpointed<Run>> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default)
=> Default.ResumeAsync(workflow, fromCheckpoint, checkpointManager, cancellationToken);
}
10 changes: 8 additions & 2 deletions dotnet/src/Microsoft.Agents.AI.Workflows/ProtocolDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ namespace Microsoft.Agents.AI.Workflows;
public class ProtocolDescriptor
{
/// <summary>
/// Get the collection of types accepted by the <see cref="Workflow"/> or <see cref="Executor"/>.
/// Get the collection of types explicitly accepted by the <see cref="Workflow"/> or <see cref="Executor"/>.
/// </summary>
public IEnumerable<Type> Accepts { get; }

internal ProtocolDescriptor(IEnumerable<Type> acceptedTypes)
/// <summary>
/// Gets a value indicating whether the <see cref="Workflow"/> or <see cref="Executor"/> has a "catch-all" handler.
/// </summary>
public bool AcceptsAll { get; set; }

internal ProtocolDescriptor(IEnumerable<Type> acceptedTypes, bool acceptsAll)
{
this.Accepts = acceptedTypes.ToArray();
this.AcceptsAll = acceptsAll;
}
}
Loading
Loading