Skip to content

Commit f7b8b95

Browse files
committed
fix: Subworkflows do not work well with HostAsAgent
Subworkflows run into issues with Checkpointing and the Chat Protocol: * The concurrency rework made subtle changes in behaviour that introduced a hang when using subworkflows with ChatProtocol and streaming execution. * The ResetAsync() implementation in WorkflowHostExecutor was improperly resetting the joinContext - this was happening on restore checkpoint _after_ the join context was attached when * Subworkflows cannot be used as the start node when hosted AsAgent due to inability to treat Catch-All as a Chat Protocol * Subworkflow ownership issue when used in non-concurrent mode after finishing a run Also fixes: * When ChatMessages are output by executors that are not agents, there is no corresponding AgentResponseUpdate/AgentResponse event Breaking Changes * [BREAKING CHANGE] It is possible to provide the wrong RunId when resuming from CheckpointInfo (even though the data already exists on CheckpointInfo)
1 parent b773830 commit f7b8b95

File tree

23 files changed

+344
-87
lines changed

23 files changed

+344
-87
lines changed

dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointAndRehydrate/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ private static async Task Main()
7373
CheckpointInfo savedCheckpoint = checkpoints[CheckpointIndex];
7474

7575
await using Checkpointed<StreamingRun> newCheckpointedRun =
76-
await InProcessExecution.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager, checkpointedRun.Run.RunId);
76+
await InProcessExecution.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager);
7777

7878
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
7979
{

dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocol.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,20 @@ public static class ChatProtocolExtensions
2020
/// Determines whether the specified protocol descriptor represents the Agent Workflow Chat Protocol.
2121
/// </summary>
2222
/// <param name="descriptor">The protocol descriptor to evaluate.</param>
23+
/// <param name="allowCatchAll">If <see langword="true"/>, will allow protocols handling all inputs to be treated
24+
/// as a Chat Protocol</param>
2325
/// <returns><see langword="true"/> if the protocol descriptor represents a supported chat protocol; otherwise, <see
2426
/// langword="false"/>.</returns>
25-
public static bool IsChatProtocol(this ProtocolDescriptor descriptor)
27+
public static bool IsChatProtocol(this ProtocolDescriptor descriptor, bool allowCatchAll = false)
2628
{
2729
bool foundListChatMessageInput = false;
2830
bool foundTurnTokenInput = false;
2931

32+
if (allowCatchAll && descriptor.AcceptsAll)
33+
{
34+
return true;
35+
}
36+
3037
// We require that the workflow be a ChatProtocol; right now that is defined as accepting at
3138
// least List<ChatMessage> as input (pending polymorphism/interface-input support), as well as
3239
// TurnToken. Since output is mediated by events, which we forward, we don't need to validate
@@ -50,9 +57,11 @@ public static bool IsChatProtocol(this ProtocolDescriptor descriptor)
5057
/// Throws an exception if the specified protocol descriptor does not represent a valid chat protocol.
5158
/// </summary>
5259
/// <param name="descriptor">The protocol descriptor to validate as a chat protocol. Cannot be null.</param>
53-
public static void ThrowIfNotChatProtocol(this ProtocolDescriptor descriptor)
60+
/// <param name="allowCatchAll">If <see langword="true"/>, will allow protocols handling all inputs to be treated
61+
/// as a Chat Protocol</param>
62+
public static void ThrowIfNotChatProtocol(this ProtocolDescriptor descriptor, bool allowCatchAll = false)
5463
{
55-
if (!descriptor.IsChatProtocol())
64+
if (!descriptor.IsChatProtocol(allowCatchAll))
5665
{
5766
throw new InvalidOperationException("Workflow does not support ChatProtocol: At least List<ChatMessage>" +
5867
" and TurnToken must be supported as input.");

dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocolExecutor.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,20 @@ public abstract class ChatProtocolExecutor : StatefulExecutor<List<ChatMessage>>
2929
private static readonly Func<List<ChatMessage>> s_initFunction = () => [];
3030
private readonly ChatRole? _stringMessageChatRole;
3131

32+
private static readonly StatefulExecutorOptions s_baseExecutorOptions = new()
33+
{
34+
AutoSendMessageHandlerResultObject = false,
35+
AutoYieldOutputHandlerResultObject = false
36+
};
37+
3238
/// <summary>
3339
/// Initializes a new instance of the <see cref="ChatProtocolExecutor"/> class.
3440
/// </summary>
3541
/// <param name="id">The unique identifier for this executor instance. Cannot be null or empty.</param>
3642
/// <param name="options">Optional configuration settings for the executor. If null, default options are used.</param>
3743
/// <param name="declareCrossRunShareable">Declare that this executor may be used simultaneously by multiple runs safely.</param>
3844
protected ChatProtocolExecutor(string id, ChatProtocolExecutorOptions? options = null, bool declareCrossRunShareable = false)
39-
: base(id, () => [], declareCrossRunShareable: declareCrossRunShareable)
45+
: base(id, () => [], s_baseExecutorOptions, declareCrossRunShareable)
4046
{
4147
this._stringMessageChatRole = options?.StringMessageChatRole;
4248
}

dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepJoinContext.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ internal interface ISuperStepJoinContext
1313

1414
ValueTask ForwardWorkflowEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);
1515
ValueTask SendMessageAsync<TMessage>(string senderId, [DisallowNull] TMessage message, CancellationToken cancellationToken = default);
16+
ValueTask YieldOutputAsync<TOutput>(string senderId, [DisallowNull] TOutput output, CancellationToken cancellationToken = default);
1617

1718
ValueTask<string> AttachSuperstepAsync(ISuperStepRunner superStepRunner, CancellationToken cancellationToken = default);
1819
ValueTask<bool> DetachSuperstepAsync(string id);

dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public ProtocolDescriptor DescribeProtocol()
210210
// TODO: Once burden of annotating yield/output messages becomes easier for the non-Auto case,
211211
// we should (1) start checking for validity on output/send side, and (2) add the Yield/Send
212212
// types to the ProtocolDescriptor.
213-
return new(this.InputTypes);
213+
return new(this.InputTypes, this.Router.HasCatchAll);
214214
}
215215

216216
/// <summary>

dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowExecutionEnvironment.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,9 @@ public interface IWorkflowExecutionEnvironment
7575
/// <param name="workflow">The workflow to be executed. Must not be <c>null</c>.</param>
7676
/// <param name="fromCheckpoint">The <see cref="CheckpointInfo"/> corresponding to the checkpoint from which to resume.</param>
7777
/// <param name="checkpointManager">The <see cref="CheckpointManager"/> to use with this run.</param>
78-
/// <param name="runId">An optional unique identifier for the run. If not provided, a new identifier will be generated.</param>
7978
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
8079
/// <returns>A <see cref="StreamingRun"/> that provides access to the results of the streaming run.</returns>
81-
ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default);
80+
ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default);
8281

8382
/// <summary>
8483
/// Initiates a non-streaming execution of the workflow with the specified input.
@@ -117,9 +116,8 @@ public interface IWorkflowExecutionEnvironment
117116
/// <param name="workflow">The workflow to be executed. Must not be <c>null</c>.</param>
118117
/// <param name="fromCheckpoint">The <see cref="CheckpointInfo"/> corresponding to the checkpoint from which to resume.</param>
119118
/// <param name="checkpointManager">The <see cref="CheckpointManager"/> to use with this run.</param>
120-
/// <param name="runId">An optional unique identifier for the run. If not provided, a new identifier will be generated.</param>
121119
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
122120
/// <returns>A <see cref="ValueTask{Run}"/> that represents the asynchronous operation. The result contains a <see
123121
/// cref="Run"/> for managing and interacting with the streaming run.</returns>
124-
ValueTask<Checkpointed<Run>> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default);
122+
ValueTask<Checkpointed<Run>> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default);
125123
}

dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessExecutionEnvironment.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ internal ValueTask<AsyncRunHandle> BeginRunAsync(Workflow workflow, ICheckpointM
3030
return runner.BeginStreamAsync(this.ExecutionMode, cancellationToken);
3131
}
3232

33-
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, ICheckpointManager? checkpointManager, string? runId, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken)
33+
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, ICheckpointManager? checkpointManager, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken)
3434
{
35-
InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, checkpointManager, runId, this.EnableConcurrentRuns, knownValidInputTypes);
35+
InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, checkpointManager, fromCheckpoint.RunId, this.EnableConcurrentRuns, knownValidInputTypes);
3636
return runner.ResumeStreamAsync(this.ExecutionMode, fromCheckpoint, cancellationToken);
3737
}
3838

@@ -95,10 +95,9 @@ public async ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(
9595
Workflow workflow,
9696
CheckpointInfo fromCheckpoint,
9797
CheckpointManager checkpointManager,
98-
string? runId = null,
9998
CancellationToken cancellationToken = default)
10099
{
101-
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, runId: runId, fromCheckpoint, [], cancellationToken)
100+
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, fromCheckpoint, [], cancellationToken)
102101
.ConfigureAwait(false);
103102

104103
return await runHandle.WithCheckpointingAsync<StreamingRun>(() => new(new StreamingRun(runHandle)))
@@ -172,10 +171,9 @@ public async ValueTask<Checkpointed<Run>> ResumeAsync(
172171
Workflow workflow,
173172
CheckpointInfo fromCheckpoint,
174173
CheckpointManager checkpointManager,
175-
string? runId = null,
176174
CancellationToken cancellationToken = default)
177175
{
178-
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, runId: runId, fromCheckpoint, [], cancellationToken)
176+
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, fromCheckpoint, [], cancellationToken)
179177
.ConfigureAwait(false);
180178

181179
return await runHandle.WithCheckpointingAsync<Run>(() => new(new Run(runHandle)))

dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ internal sealed class InProcessRunnerContext : IRunnerContext
2424
private int _runEnded;
2525
private readonly string _runId;
2626
private readonly Workflow _workflow;
27+
private readonly object? _previousOwnership;
28+
private bool _ownsWorkflow;
2729

2830
private readonly EdgeMap _edgeMap;
2931
private readonly OutputFilter _outputFilter;
@@ -54,7 +56,10 @@ public InProcessRunnerContext(
5456
else
5557
{
5658
workflow.TakeOwnership(this, existingOwnershipSignoff: existingOwnershipSignoff);
59+
this._previousOwnership = existingOwnershipSignoff;
60+
this._ownsWorkflow = true;
5761
}
62+
5863
this._workflow = workflow;
5964
this._runId = runId;
6065

@@ -211,10 +216,27 @@ await this._edgeMap.PrepareDeliveryForEdgeAsync(edge, envelope)
211216
}
212217
}
213218

219+
private async ValueTask YieldOutputAsync(string sourceId, object output, CancellationToken cancellationToken = default)
220+
{
221+
this.CheckEnded();
222+
Throw.IfNull(output);
223+
224+
Executor sourceExecutor = await this.EnsureExecutorAsync(sourceId, tracer: null, cancellationToken).ConfigureAwait(false);
225+
if (!sourceExecutor.CanOutput(output.GetType()))
226+
{
227+
throw new InvalidOperationException($"Cannot output object of type {output.GetType().Name}. Expecting one of [{string.Join(", ", sourceExecutor.OutputTypes)}].");
228+
}
229+
230+
if (this._outputFilter.CanOutput(sourceId, output))
231+
{
232+
await this.AddEventAsync(new WorkflowOutputEvent(output, sourceId), cancellationToken).ConfigureAwait(false);
233+
}
234+
}
235+
214236
public IWorkflowContext Bind(string executorId, Dictionary<string, string>? traceContext = null)
215237
{
216238
this.CheckEnded();
217-
return new BoundContext(this, executorId, this._outputFilter, traceContext);
239+
return new BoundContext(this, executorId, traceContext);
218240
}
219241

220242
public ValueTask PostAsync(ExternalRequest request)
@@ -241,7 +263,6 @@ public bool CompleteRequest(string requestId)
241263
private sealed class BoundContext(
242264
InProcessRunnerContext RunnerContext,
243265
string ExecutorId,
244-
OutputFilter outputFilter,
245266
Dictionary<string, string>? traceContext) : IWorkflowContext
246267
{
247268
public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default) => RunnerContext.AddEventAsync(workflowEvent, cancellationToken);
@@ -251,21 +272,9 @@ public ValueTask SendMessageAsync(object message, string? targetId = null, Cance
251272
return RunnerContext.SendMessageAsync(ExecutorId, message, targetId, cancellationToken);
252273
}
253274

254-
public async ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default)
275+
public ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default)
255276
{
256-
RunnerContext.CheckEnded();
257-
Throw.IfNull(output);
258-
259-
Executor sourceExecutor = await RunnerContext.EnsureExecutorAsync(ExecutorId, tracer: null, cancellationToken).ConfigureAwait(false);
260-
if (!sourceExecutor.CanOutput(output.GetType()))
261-
{
262-
throw new InvalidOperationException($"Cannot output object of type {output.GetType().Name}. Expecting one of [{string.Join(", ", sourceExecutor.OutputTypes)}].");
263-
}
264-
265-
if (outputFilter.CanOutput(ExecutorId, output))
266-
{
267-
await this.AddEventAsync(new WorkflowOutputEvent(output, ExecutorId), cancellationToken).ConfigureAwait(false);
268-
}
277+
return RunnerContext.YieldOutputAsync(ExecutorId, output, cancellationToken);
269278
}
270279

271280
public ValueTask RequestHaltAsync() => this.AddEventAsync(new RequestHaltEvent());
@@ -389,7 +398,9 @@ public async ValueTask EndRunAsync()
389398
{
390399
foreach (string executorId in this._executors.Keys)
391400
{
392-
Task<Executor> executor = this._executors[executorId];
401+
Task<Executor> executorTask = this._executors[executorId];
402+
Executor executor = await executorTask.ConfigureAwait(false);
403+
393404
if (executor is IAsyncDisposable asyncDisposable)
394405
{
395406
await asyncDisposable.DisposeAsync().ConfigureAwait(false);
@@ -400,9 +411,10 @@ public async ValueTask EndRunAsync()
400411
}
401412
}
402413

403-
if (!this.ConcurrentRunsEnabled)
414+
if (this._ownsWorkflow)
404415
{
405-
await this._workflow.ReleaseOwnershipAsync(this).ConfigureAwait(false);
416+
await this._workflow.ReleaseOwnershipAsync(this, this._previousOwnership).ConfigureAwait(false);
417+
this._ownsWorkflow = false;
406418
}
407419
}
408420
}
@@ -429,4 +441,7 @@ ValueTask ISuperStepJoinContext.ForwardWorkflowEventAsync(WorkflowEvent workflow
429441

430442
ValueTask ISuperStepJoinContext.SendMessageAsync<TMessage>(string senderId, [DisallowNull] TMessage message, CancellationToken cancellationToken)
431443
=> this.SendMessageAsync(senderId, Throw.IfNull(message), cancellationToken: cancellationToken);
444+
445+
ValueTask ISuperStepJoinContext.YieldOutputAsync<TOutput>(string senderId, [DisallowNull] TOutput output, CancellationToken cancellationToken)
446+
=> this.YieldOutputAsync(senderId, Throw.IfNull(output), cancellationToken);
432447
}

dotnet/src/Microsoft.Agents.AI.Workflows/InProcessExecution.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ public static ValueTask<Checkpointed<StreamingRun>> StreamAsync(Workflow workflo
5757
public static ValueTask<Checkpointed<StreamingRun>> StreamAsync<TInput>(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
5858
=> Default.StreamAsync(workflow, input, checkpointManager, runId, cancellationToken);
5959

60-
/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeStreamAsync(Workflow, CheckpointInfo, CheckpointManager, string?, CancellationToken)"/>
61-
public static ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default)
62-
=> Default.ResumeStreamAsync(workflow, fromCheckpoint, checkpointManager, runId, cancellationToken);
60+
/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeStreamAsync(Workflow, CheckpointInfo, CheckpointManager, CancellationToken)"/>
61+
public static ValueTask<Checkpointed<StreamingRun>> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default)
62+
=> Default.ResumeStreamAsync(workflow, fromCheckpoint, checkpointManager, cancellationToken);
6363

6464
/// <inheritdoc cref="IWorkflowExecutionEnvironment.RunAsync{TInput}(Workflow, TInput, string?, CancellationToken)"/>
6565
public static ValueTask<Run> RunAsync<TInput>(Workflow workflow, TInput input, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
@@ -69,7 +69,7 @@ public static ValueTask<Run> RunAsync<TInput>(Workflow workflow, TInput input, s
6969
public static ValueTask<Checkpointed<Run>> RunAsync<TInput>(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull
7070
=> Default.RunAsync(workflow, input, checkpointManager, runId, cancellationToken);
7171

72-
/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeAsync(Workflow, CheckpointInfo, CheckpointManager, string?, CancellationToken)"/>
73-
public static ValueTask<Checkpointed<Run>> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default)
74-
=> Default.ResumeAsync(workflow, fromCheckpoint, checkpointManager, runId, cancellationToken);
72+
/// <inheritdoc cref="IWorkflowExecutionEnvironment.ResumeAsync(Workflow, CheckpointInfo, CheckpointManager, CancellationToken)"/>
73+
public static ValueTask<Checkpointed<Run>> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default)
74+
=> Default.ResumeAsync(workflow, fromCheckpoint, checkpointManager, cancellationToken);
7575
}

dotnet/src/Microsoft.Agents.AI.Workflows/ProtocolDescriptor.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ namespace Microsoft.Agents.AI.Workflows;
1212
public class ProtocolDescriptor
1313
{
1414
/// <summary>
15-
/// Get the collection of types accepted by the <see cref="Workflow"/> or <see cref="Executor"/>.
15+
/// Get the collection of types explicitly accepted by the <see cref="Workflow"/> or <see cref="Executor"/>.
1616
/// </summary>
1717
public IEnumerable<Type> Accepts { get; }
1818

19-
internal ProtocolDescriptor(IEnumerable<Type> acceptedTypes)
19+
/// <summary>
20+
/// Gets a value indicating whether the <see cref="Workflow"/> or <see cref="Executor"/> has a "catch-all" handler.
21+
/// </summary>
22+
public bool AcceptsAll { get; set; }
23+
24+
internal ProtocolDescriptor(IEnumerable<Type> acceptedTypes, bool acceptsAll)
2025
{
2126
this.Accepts = acceptedTypes.ToArray();
27+
this.AcceptsAll = acceptsAll;
2228
}
2329
}

0 commit comments

Comments
 (0)