diff --git a/dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointAndRehydrate/Program.cs b/dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointAndRehydrate/Program.cs index bfa8741ecb..093024a873 100644 --- a/dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointAndRehydrate/Program.cs +++ b/dotnet/samples/GettingStarted/Workflows/Checkpoint/CheckpointAndRehydrate/Program.cs @@ -73,7 +73,7 @@ private static async Task Main() CheckpointInfo savedCheckpoint = checkpoints[CheckpointIndex]; await using Checkpointed newCheckpointedRun = - await InProcessExecution.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager, checkpointedRun.Run.RunId); + await InProcessExecution.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager); await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync()) { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocol.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocol.cs index ff8140ee3a..5a328bc8c8 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocol.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocol.cs @@ -20,13 +20,20 @@ public static class ChatProtocolExtensions /// Determines whether the specified protocol descriptor represents the Agent Workflow Chat Protocol. /// /// The protocol descriptor to evaluate. + /// If , will allow protocols handling all inputs to be treated + /// as a Chat Protocol /// if the protocol descriptor represents a supported chat protocol; otherwise, . - public static bool IsChatProtocol(this ProtocolDescriptor descriptor) + public static bool IsChatProtocol(this ProtocolDescriptor descriptor, bool allowCatchAll = false) { bool foundListChatMessageInput = false; bool foundTurnTokenInput = false; + if (allowCatchAll && descriptor.AcceptsAll) + { + return true; + } + // We require that the workflow be a ChatProtocol; right now that is defined as accepting at // least List as input (pending polymorphism/interface-input support), as well as // TurnToken. Since output is mediated by events, which we forward, we don't need to validate @@ -50,9 +57,11 @@ public static bool IsChatProtocol(this ProtocolDescriptor descriptor) /// Throws an exception if the specified protocol descriptor does not represent a valid chat protocol. /// /// The protocol descriptor to validate as a chat protocol. Cannot be null. - public static void ThrowIfNotChatProtocol(this ProtocolDescriptor descriptor) + /// If , will allow protocols handling all inputs to be treated + /// as a Chat Protocol + public static void ThrowIfNotChatProtocol(this ProtocolDescriptor descriptor, bool allowCatchAll = false) { - if (!descriptor.IsChatProtocol()) + if (!descriptor.IsChatProtocol(allowCatchAll)) { throw new InvalidOperationException("Workflow does not support ChatProtocol: At least List" + " and TurnToken must be supported as input."); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocolExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocolExecutor.cs index 238734b598..8fe11f696f 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocolExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocolExecutor.cs @@ -29,6 +29,12 @@ public abstract class ChatProtocolExecutor : StatefulExecutor> private static readonly Func> s_initFunction = () => []; private readonly ChatRole? _stringMessageChatRole; + private static readonly StatefulExecutorOptions s_baseExecutorOptions = new() + { + AutoSendMessageHandlerResultObject = false, + AutoYieldOutputHandlerResultObject = false + }; + /// /// Initializes a new instance of the class. /// @@ -36,7 +42,7 @@ public abstract class ChatProtocolExecutor : StatefulExecutor> /// Optional configuration settings for the executor. If null, default options are used. /// Declare that this executor may be used simultaneously by multiple runs safely. protected ChatProtocolExecutor(string id, ChatProtocolExecutorOptions? options = null, bool declareCrossRunShareable = false) - : base(id, () => [], declareCrossRunShareable: declareCrossRunShareable) + : base(id, () => [], s_baseExecutorOptions, declareCrossRunShareable) { this._stringMessageChatRole = options?.StringMessageChatRole; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepJoinContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepJoinContext.cs index f4af19bcfd..8dacca61c0 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepJoinContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepJoinContext.cs @@ -13,6 +13,7 @@ internal interface ISuperStepJoinContext ValueTask ForwardWorkflowEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default); ValueTask SendMessageAsync(string senderId, [DisallowNull] TMessage message, CancellationToken cancellationToken = default); + ValueTask YieldOutputAsync(string senderId, [DisallowNull] TOutput output, CancellationToken cancellationToken = default); ValueTask AttachSuperstepAsync(ISuperStepRunner superStepRunner, CancellationToken cancellationToken = default); ValueTask DetachSuperstepAsync(string id); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs index 647dbcd852..741f49e2ab 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs @@ -210,7 +210,7 @@ public ProtocolDescriptor DescribeProtocol() // TODO: Once burden of annotating yield/output messages becomes easier for the non-Auto case, // we should (1) start checking for validity on output/send side, and (2) add the Yield/Send // types to the ProtocolDescriptor. - return new(this.InputTypes); + return new(this.InputTypes, this.Router.HasCatchAll); } /// diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowExecutionEnvironment.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowExecutionEnvironment.cs index b8e8b37fa5..1b82308ae7 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowExecutionEnvironment.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowExecutionEnvironment.cs @@ -75,10 +75,9 @@ public interface IWorkflowExecutionEnvironment /// The workflow to be executed. Must not be null. /// The corresponding to the checkpoint from which to resume. /// The to use with this run. - /// An optional unique identifier for the run. If not provided, a new identifier will be generated. /// The to monitor for cancellation requests. The default is . /// A that provides access to the results of the streaming run. - ValueTask> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default); + ValueTask> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default); /// /// Initiates a non-streaming execution of the workflow with the specified input. @@ -117,9 +116,8 @@ public interface IWorkflowExecutionEnvironment /// The workflow to be executed. Must not be null. /// The corresponding to the checkpoint from which to resume. /// The to use with this run. - /// An optional unique identifier for the run. If not provided, a new identifier will be generated. /// The to monitor for cancellation requests. The default is . /// A that represents the asynchronous operation. The result contains a for managing and interacting with the streaming run. - ValueTask> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default); + ValueTask> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessExecutionEnvironment.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessExecutionEnvironment.cs index a4d40ff127..47dee1e1e0 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessExecutionEnvironment.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessExecutionEnvironment.cs @@ -30,9 +30,9 @@ internal ValueTask BeginRunAsync(Workflow workflow, ICheckpointM return runner.BeginStreamAsync(this.ExecutionMode, cancellationToken); } - internal ValueTask ResumeRunAsync(Workflow workflow, ICheckpointManager? checkpointManager, string? runId, CheckpointInfo fromCheckpoint, IEnumerable knownValidInputTypes, CancellationToken cancellationToken) + internal ValueTask ResumeRunAsync(Workflow workflow, ICheckpointManager? checkpointManager, CheckpointInfo fromCheckpoint, IEnumerable knownValidInputTypes, CancellationToken cancellationToken) { - InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, checkpointManager, runId, this.EnableConcurrentRuns, knownValidInputTypes); + InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, checkpointManager, fromCheckpoint.RunId, this.EnableConcurrentRuns, knownValidInputTypes); return runner.ResumeStreamAsync(this.ExecutionMode, fromCheckpoint, cancellationToken); } @@ -95,10 +95,9 @@ public async ValueTask> ResumeStreamAsync( Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, - string? runId = null, CancellationToken cancellationToken = default) { - AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, runId: runId, fromCheckpoint, [], cancellationToken) + AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, fromCheckpoint, [], cancellationToken) .ConfigureAwait(false); return await runHandle.WithCheckpointingAsync(() => new(new StreamingRun(runHandle))) @@ -172,10 +171,9 @@ public async ValueTask> ResumeAsync( Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, - string? runId = null, CancellationToken cancellationToken = default) { - AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, runId: runId, fromCheckpoint, [], cancellationToken) + AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, checkpointManager, fromCheckpoint, [], cancellationToken) .ConfigureAwait(false); return await runHandle.WithCheckpointingAsync(() => new(new Run(runHandle))) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index 1750f779f2..2f2162b969 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -24,6 +24,8 @@ internal sealed class InProcessRunnerContext : IRunnerContext private int _runEnded; private readonly string _runId; private readonly Workflow _workflow; + private readonly object? _previousOwnership; + private bool _ownsWorkflow; private readonly EdgeMap _edgeMap; private readonly OutputFilter _outputFilter; @@ -54,7 +56,10 @@ public InProcessRunnerContext( else { workflow.TakeOwnership(this, existingOwnershipSignoff: existingOwnershipSignoff); + this._previousOwnership = existingOwnershipSignoff; + this._ownsWorkflow = true; } + this._workflow = workflow; this._runId = runId; @@ -211,10 +216,27 @@ await this._edgeMap.PrepareDeliveryForEdgeAsync(edge, envelope) } } + private async ValueTask YieldOutputAsync(string sourceId, object output, CancellationToken cancellationToken = default) + { + this.CheckEnded(); + Throw.IfNull(output); + + Executor sourceExecutor = await this.EnsureExecutorAsync(sourceId, tracer: null, cancellationToken).ConfigureAwait(false); + if (!sourceExecutor.CanOutput(output.GetType())) + { + throw new InvalidOperationException($"Cannot output object of type {output.GetType().Name}. Expecting one of [{string.Join(", ", sourceExecutor.OutputTypes)}]."); + } + + if (this._outputFilter.CanOutput(sourceId, output)) + { + await this.AddEventAsync(new WorkflowOutputEvent(output, sourceId), cancellationToken).ConfigureAwait(false); + } + } + public IWorkflowContext Bind(string executorId, Dictionary? traceContext = null) { this.CheckEnded(); - return new BoundContext(this, executorId, this._outputFilter, traceContext); + return new BoundContext(this, executorId, traceContext); } public ValueTask PostAsync(ExternalRequest request) @@ -241,7 +263,6 @@ public bool CompleteRequest(string requestId) private sealed class BoundContext( InProcessRunnerContext RunnerContext, string ExecutorId, - OutputFilter outputFilter, Dictionary? traceContext) : IWorkflowContext { public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default) => RunnerContext.AddEventAsync(workflowEvent, cancellationToken); @@ -251,21 +272,9 @@ public ValueTask SendMessageAsync(object message, string? targetId = null, Cance return RunnerContext.SendMessageAsync(ExecutorId, message, targetId, cancellationToken); } - public async ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default) + public ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default) { - RunnerContext.CheckEnded(); - Throw.IfNull(output); - - Executor sourceExecutor = await RunnerContext.EnsureExecutorAsync(ExecutorId, tracer: null, cancellationToken).ConfigureAwait(false); - if (!sourceExecutor.CanOutput(output.GetType())) - { - throw new InvalidOperationException($"Cannot output object of type {output.GetType().Name}. Expecting one of [{string.Join(", ", sourceExecutor.OutputTypes)}]."); - } - - if (outputFilter.CanOutput(ExecutorId, output)) - { - await this.AddEventAsync(new WorkflowOutputEvent(output, ExecutorId), cancellationToken).ConfigureAwait(false); - } + return RunnerContext.YieldOutputAsync(ExecutorId, output, cancellationToken); } public ValueTask RequestHaltAsync() => this.AddEventAsync(new RequestHaltEvent()); @@ -389,7 +398,9 @@ public async ValueTask EndRunAsync() { foreach (string executorId in this._executors.Keys) { - Task executor = this._executors[executorId]; + Task executorTask = this._executors[executorId]; + Executor executor = await executorTask.ConfigureAwait(false); + if (executor is IAsyncDisposable asyncDisposable) { await asyncDisposable.DisposeAsync().ConfigureAwait(false); @@ -400,9 +411,10 @@ public async ValueTask EndRunAsync() } } - if (!this.ConcurrentRunsEnabled) + if (this._ownsWorkflow) { - await this._workflow.ReleaseOwnershipAsync(this).ConfigureAwait(false); + await this._workflow.ReleaseOwnershipAsync(this, this._previousOwnership).ConfigureAwait(false); + this._ownsWorkflow = false; } } } @@ -429,4 +441,7 @@ ValueTask ISuperStepJoinContext.ForwardWorkflowEventAsync(WorkflowEvent workflow ValueTask ISuperStepJoinContext.SendMessageAsync(string senderId, [DisallowNull] TMessage message, CancellationToken cancellationToken) => this.SendMessageAsync(senderId, Throw.IfNull(message), cancellationToken: cancellationToken); + + ValueTask ISuperStepJoinContext.YieldOutputAsync(string senderId, [DisallowNull] TOutput output, CancellationToken cancellationToken) + => this.YieldOutputAsync(senderId, Throw.IfNull(output), cancellationToken); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProcessExecution.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProcessExecution.cs index dc110e7570..f21117a38a 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProcessExecution.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProcessExecution.cs @@ -57,9 +57,9 @@ public static ValueTask> StreamAsync(Workflow workflo public static ValueTask> StreamAsync(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull => Default.StreamAsync(workflow, input, checkpointManager, runId, cancellationToken); - /// - public static ValueTask> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) - => Default.ResumeStreamAsync(workflow, fromCheckpoint, checkpointManager, runId, cancellationToken); + /// + public static ValueTask> ResumeStreamAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default) + => Default.ResumeStreamAsync(workflow, fromCheckpoint, checkpointManager, cancellationToken); /// public static ValueTask RunAsync(Workflow workflow, TInput input, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull @@ -69,7 +69,7 @@ public static ValueTask RunAsync(Workflow workflow, TInput input, s public static ValueTask> RunAsync(Workflow workflow, TInput input, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) where TInput : notnull => Default.RunAsync(workflow, input, checkpointManager, runId, cancellationToken); - /// - public static ValueTask> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, string? runId = null, CancellationToken cancellationToken = default) - => Default.ResumeAsync(workflow, fromCheckpoint, checkpointManager, runId, cancellationToken); + /// + public static ValueTask> ResumeAsync(Workflow workflow, CheckpointInfo fromCheckpoint, CheckpointManager checkpointManager, CancellationToken cancellationToken = default) + => Default.ResumeAsync(workflow, fromCheckpoint, checkpointManager, cancellationToken); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ProtocolDescriptor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ProtocolDescriptor.cs index 91adc4dbae..bb2663c100 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/ProtocolDescriptor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ProtocolDescriptor.cs @@ -12,12 +12,18 @@ namespace Microsoft.Agents.AI.Workflows; public class ProtocolDescriptor { /// - /// Get the collection of types accepted by the or . + /// Get the collection of types explicitly accepted by the or . /// public IEnumerable Accepts { get; } - internal ProtocolDescriptor(IEnumerable acceptedTypes) + /// + /// Gets a value indicating whether the or has a "catch-all" handler. + /// + public bool AcceptsAll { get; set; } + + internal ProtocolDescriptor(IEnumerable acceptedTypes, bool acceptsAll) { this.Accepts = acceptedTypes.ToArray(); + this.AcceptsAll = acceptsAll; } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/OutputMessagesExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/OutputMessagesExecutor.cs index e727e30bac..b3c714406d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/OutputMessagesExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/OutputMessagesExecutor.cs @@ -7,17 +7,16 @@ namespace Microsoft.Agents.AI.Workflows; -public static partial class AgentWorkflowBuilder +/// +/// Provides an executor that batches received chat messages that it then publishes as the final result +/// when receiving a . +/// +internal sealed class OutputMessagesExecutor(ChatProtocolExecutorOptions? options = null) : ChatProtocolExecutor(ExecutorId, options, declareCrossRunShareable: true), IResettableExecutor { - /// - /// Provides an executor that batches received chat messages that it then publishes as the final result - /// when receiving a . - /// - internal sealed class OutputMessagesExecutor() : ChatProtocolExecutor("OutputMessages", declareCrossRunShareable: true), IResettableExecutor - { - protected override ValueTask TakeTurnAsync(List messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default) - => context.YieldOutputAsync(messages, cancellationToken); + public const string ExecutorId = "OutputMessages"; - ValueTask IResettableExecutor.ResetAsync() => default; - } + protected override ValueTask TakeTurnAsync(List messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default) + => context.YieldOutputAsync(messages, cancellationToken); + + ValueTask IResettableExecutor.ResetAsync() => default; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/WorkflowHostExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/WorkflowHostExecutor.cs index 409f751107..ab8a499a75 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/WorkflowHostExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/WorkflowHostExecutor.cs @@ -79,7 +79,7 @@ internal async ValueTask EnsureRunnerAsync() // serialization because we will be relying on the parent workflow's checkpoint manager to do that, // if needed. For our purposes, all we need is to keep a faithful representation of the checkpointed // objects so we can emit them back to the parent workflow on checkpoint creation. - this._checkpointManager = new InMemoryCheckpointManager(); + this._checkpointManager ??= new InMemoryCheckpointManager(); } this._activeRunner = InProcessRunner.CreateSubworkflowRunner(this._workflow, @@ -124,7 +124,7 @@ internal async ValueTask EnsureRunSendMessageAsync(object? incomin if (incomingMessage != null) { - await runHandle.EnqueueUntypedAndRunAsync(incomingMessage, cancellationToken).ConfigureAwait(false); + await runHandle.EnqueueMessageUntypedAsync(incomingMessage, cancellationToken: cancellationToken).ConfigureAwait(false); } } else if (incomingMessage != null) @@ -132,7 +132,7 @@ internal async ValueTask EnsureRunSendMessageAsync(object? incomin runHandle = await activeRunner.BeginStreamAsync(ExecutionMode.Subworkflow, cancellationToken) .ConfigureAwait(false); - await runHandle.EnqueueUntypedAndRunAsync(incomingMessage, cancellationToken).ConfigureAwait(false); + await runHandle.EnqueueMessageUntypedAsync(incomingMessage, cancellationToken: cancellationToken).ConfigureAwait(false); } else { @@ -198,6 +198,13 @@ private async ValueTask ForwardWorkflowEventAsync(object? sender, WorkflowEvent { resultTask = this._joinContext.SendMessageAsync(this.Id, outputEvent.Data).AsTask(); } + + if (this._joinContext != null && + this._options.AutoYieldOutputHandlerResultObject + && outputEvent.Data != null) + { + resultTask = this._joinContext.YieldOutputAsync(this.Id, outputEvent.Data).AsTask(); + } break; case RequestHaltEvent requestHaltEvent: resultTask = this._joinContext?.ForwardWorkflowEventAsync(new RequestHaltEvent()).AsTask() ?? Task.CompletedTask; @@ -231,9 +238,10 @@ internal async ValueTask AttachSuperStepContextAsync(ISuperStepJoinContext joinC this._joinContext = Throw.IfNull(joinContext); } + private const string CheckpointManagerStateKey = nameof(CheckpointManager); protected internal override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default) { - await context.QueueStateUpdateAsync(nameof(CheckpointManager), this._checkpointManager, cancellationToken: cancellationToken).ConfigureAwait(false); + await context.QueueStateUpdateAsync(CheckpointManagerStateKey, this._checkpointManager, cancellationToken: cancellationToken).ConfigureAwait(false); await base.OnCheckpointingAsync(context, cancellationToken).ConfigureAwait(false); } @@ -242,7 +250,7 @@ protected internal override async ValueTask OnCheckpointRestoredAsync(IWorkflowC { await base.OnCheckpointRestoredAsync(context, cancellationToken).ConfigureAwait(false); - InMemoryCheckpointManager manager = await context.ReadStateAsync(nameof(InMemoryCheckpointManager), cancellationToken: cancellationToken).ConfigureAwait(false) ?? new(); + InMemoryCheckpointManager manager = await context.ReadStateAsync(CheckpointManagerStateKey, cancellationToken: cancellationToken).ConfigureAwait(false) ?? new(); if (this._checkpointManager == manager) { // We are restoring in the context of the same run; not need to rebuild the entire execution stack. @@ -254,7 +262,7 @@ protected internal override async ValueTask OnCheckpointRestoredAsync(IWorkflowC await this.ResetAsync().ConfigureAwait(false); } - StreamingRun run = await this.EnsureRunSendMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + await this.EnsureRunSendMessageAsync(resume: true, cancellationToken: cancellationToken).ConfigureAwait(false); } private async ValueTask ResetAsync() @@ -273,15 +281,10 @@ private async ValueTask ResetAsync() this._activeRunner = null; } - if (this._joinContext != null) + if (this._joinContext != null && this._joinId != null) { - if (this._joinId != null) - { - await this._joinContext.DetachSuperstepAsync(this._joinId).ConfigureAwait(false); - this._joinId = null; - } - - this._joinContext = null; + await this._joinContext.DetachSuperstepAsync(this._joinId).ConfigureAwait(false); + this._joinId = null; } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/SubworkflowBinding.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/SubworkflowBinding.cs index 1f29ffe426..389aa19afc 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/SubworkflowBinding.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/SubworkflowBinding.cs @@ -16,9 +16,9 @@ namespace Microsoft.Agents.AI.Workflows; /// public record SubworkflowBinding(Workflow WorkflowInstance, string Id, ExecutorOptions? ExecutorOptions = null) : ExecutorBinding(Throw.IfNull(Id), - CreateWorkflowExecutorFactory(WorkflowInstance, Id, ExecutorOptions), - typeof(WorkflowHostExecutor), - WorkflowInstance) + CreateWorkflowExecutorFactory(WorkflowInstance, Id, ExecutorOptions), + typeof(WorkflowHostExecutor), + WorkflowInstance) { private static Func> CreateWorkflowExecutorFactory(Workflow workflow, string id, ExecutorOptions? options) { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs index 456838b9eb..dab109a6a2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs @@ -166,9 +166,9 @@ internal void TakeOwnership(object ownerToken, bool subworkflow = false, object? [System.Diagnostics.CodeAnalysis.SuppressMessage("Maintainability", "CA1513:Use ObjectDisposedException throw helper", Justification = "Does not exist in NetFx 4.7.2")] - internal async ValueTask ReleaseOwnershipAsync(object ownerToken) + internal async ValueTask ReleaseOwnershipAsync(object ownerToken, object? targetOwnerToken) { - object? originalToken = Interlocked.CompareExchange(ref this._ownerToken, null, ownerToken) ?? + object? originalToken = Interlocked.CompareExchange(ref this._ownerToken, targetOwnerToken, ownerToken) ?? throw new InvalidOperationException("Attempting to release ownership of a Workflow that is not owned."); if (!ReferenceEquals(originalToken, ownerToken)) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs index f20660bc51..290fe6cac4 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs @@ -19,11 +19,12 @@ internal sealed class WorkflowHostAgent : AIAgent private readonly CheckpointManager? _checkpointManager; private readonly IWorkflowExecutionEnvironment _executionEnvironment; private readonly bool _includeExceptionDetails; + private readonly bool _includeWorkflowOutputsInResponse; private readonly Task _describeTask; private readonly ConcurrentDictionary _assignedRunIds = []; - public WorkflowHostAgent(Workflow workflow, string? id = null, string? name = null, string? description = null, CheckpointManager? checkpointManager = null, IWorkflowExecutionEnvironment? executionEnvironment = null, bool includeExceptionDetails = false) + public WorkflowHostAgent(Workflow workflow, string? id = null, string? name = null, string? description = null, CheckpointManager? checkpointManager = null, IWorkflowExecutionEnvironment? executionEnvironment = null, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false) { this._workflow = Throw.IfNull(workflow); @@ -32,6 +33,7 @@ public WorkflowHostAgent(Workflow workflow, string? id = null, string? name = nu : InProcessExecution.OffThread); this._checkpointManager = checkpointManager; this._includeExceptionDetails = includeExceptionDetails; + this._includeWorkflowOutputsInResponse = includeWorkflowOutputsInResponse; this._id = id; this.Name = name; @@ -60,14 +62,14 @@ private string GenerateNewId() private async ValueTask ValidateWorkflowAsync() { ProtocolDescriptor protocol = await this._describeTask.ConfigureAwait(false); - protocol.ThrowIfNotChatProtocol(); + protocol.ThrowIfNotChatProtocol(allowCatchAll: true); } public override ValueTask GetNewThreadAsync(CancellationToken cancellationToken = default) - => new(new WorkflowThread(this._workflow, this.GenerateNewId(), this._executionEnvironment, this._checkpointManager, this._includeExceptionDetails)); + => new(new WorkflowThread(this._workflow, this.GenerateNewId(), this._executionEnvironment, this._checkpointManager, this._includeExceptionDetails, this._includeWorkflowOutputsInResponse)); public override ValueTask DeserializeThreadAsync(JsonElement serializedThread, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default) - => new(new WorkflowThread(this._workflow, serializedThread, this._executionEnvironment, this._checkpointManager, this._includeExceptionDetails, jsonSerializerOptions)); + => new(new WorkflowThread(this._workflow, serializedThread, this._executionEnvironment, this._checkpointManager, this._includeExceptionDetails, this._includeWorkflowOutputsInResponse, jsonSerializerOptions)); private async ValueTask UpdateThreadAsync(IEnumerable messages, AgentThread? thread = null, CancellationToken cancellationToken = default) { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostingExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostingExtensions.cs index c217039a35..36bce91fc3 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostingExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostingExtensions.cs @@ -23,6 +23,8 @@ public static class WorkflowHostingExtensions /// for the in-process environments. /// If , will include /// in the representing the workflow error. + /// If , will transform outgoing workflow outputs + /// into into content in s or the as appropriate. /// public static AIAgent AsAgent( this Workflow workflow, @@ -31,9 +33,10 @@ public static AIAgent AsAgent( string? description = null, CheckpointManager? checkpointManager = null, IWorkflowExecutionEnvironment? executionEnvironment = null, - bool includeExceptionDetails = false) + bool includeExceptionDetails = false, + bool includeWorkflowOutputsInResponse = false) { - return new WorkflowHostAgent(workflow, id, name, description, checkpointManager, executionEnvironment, includeExceptionDetails); + return new WorkflowHostAgent(workflow, id, name, description, checkpointManager, executionEnvironment, includeExceptionDetails, includeWorkflowOutputsInResponse); } internal static FunctionCallContent ToFunctionCall(this ExternalRequest request) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowThread.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowThread.cs index 6f00566b95..96d3f3b08a 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowThread.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowThread.cs @@ -19,15 +19,17 @@ internal sealed class WorkflowThread : AgentThread private readonly Workflow _workflow; private readonly IWorkflowExecutionEnvironment _executionEnvironment; private readonly bool _includeExceptionDetails; + private readonly bool _includeWorkflowOutputsInResponse; private readonly CheckpointManager _checkpointManager; private readonly InMemoryCheckpointManager? _inMemoryCheckpointManager; - public WorkflowThread(Workflow workflow, string runId, IWorkflowExecutionEnvironment executionEnvironment, CheckpointManager? checkpointManager = null, bool includeExceptionDetails = false) + public WorkflowThread(Workflow workflow, string runId, IWorkflowExecutionEnvironment executionEnvironment, CheckpointManager? checkpointManager = null, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false) { this._workflow = Throw.IfNull(workflow); this._executionEnvironment = Throw.IfNull(executionEnvironment); this._includeExceptionDetails = includeExceptionDetails; + this._includeWorkflowOutputsInResponse = includeWorkflowOutputsInResponse; // If the user provided an external checkpoint manager, use that, otherwise rely on an in-memory one. // TODO: Implement persist-only-last functionality for in-memory checkpoint manager, to avoid unbounded @@ -38,10 +40,12 @@ public WorkflowThread(Workflow workflow, string runId, IWorkflowExecutionEnviron this.MessageStore = new WorkflowMessageStore(); } - public WorkflowThread(Workflow workflow, JsonElement serializedThread, IWorkflowExecutionEnvironment executionEnvironment, CheckpointManager? checkpointManager = null, bool includeExceptionDetails = false, JsonSerializerOptions? jsonSerializerOptions = null) + public WorkflowThread(Workflow workflow, JsonElement serializedThread, IWorkflowExecutionEnvironment executionEnvironment, CheckpointManager? checkpointManager = null, bool includeExceptionDetails = false, bool includeWorkflowOutputsInResponse = false, JsonSerializerOptions? jsonSerializerOptions = null) { this._workflow = Throw.IfNull(workflow); this._executionEnvironment = Throw.IfNull(executionEnvironment); + this._includeExceptionDetails = includeExceptionDetails; + this._includeWorkflowOutputsInResponse = includeWorkflowOutputsInResponse; JsonMarshaller marshaller = new(jsonSerializerOptions); ThreadState threadState = marshaller.Marshal(serializedThread); @@ -101,6 +105,23 @@ public AgentResponseUpdate CreateUpdate(string responseId, object raw, params AI return update; } + public AgentResponseUpdate CreateUpdate(string responseId, object raw, ChatMessage message) + { + Throw.IfNull(message); + + AgentResponseUpdate update = new(message.Role, message.Contents) + { + CreatedAt = message.CreatedAt ?? DateTimeOffset.UtcNow, + MessageId = message.MessageId ?? Guid.NewGuid().ToString("N"), + ResponseId = responseId, + RawRepresentation = raw + }; + + this.MessageStore.AddMessages(update.ToChatMessage()); + + return update; + } + private async ValueTask> CreateOrResumeRunAsync(List messages, CancellationToken cancellationToken = default) { // The workflow is validated to be a ChatProtocol workflow by the WorkflowHostAgent before creating the thread, @@ -112,7 +133,6 @@ await this._executionEnvironment .ResumeStreamAsync(this._workflow, this.LastCheckpoint, this._checkpointManager, - this.RunId, cancellationToken) .ConfigureAwait(false); @@ -184,6 +204,25 @@ IAsyncEnumerable InvokeStageAsync( this.LastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint; goto default; + case WorkflowOutputEvent output: + IEnumerable? updateMessages = output.Data switch + { + IEnumerable chatMessages => chatMessages, + ChatMessage chatMessage => [chatMessage], + _ => null + }; + + if (!this._includeWorkflowOutputsInResponse || updateMessages == null) + { + goto default; + } + + foreach (ChatMessage message in updateMessages) + { + yield return this.CreateUpdate(this.LastResponseId, evt, message); + } + break; + default: // Emit all other workflow events for observability (DevUI, logging, etc.) yield return new AgentResponseUpdate(ChatRole.Assistant, []) diff --git a/dotnet/src/Shared/Workflows/Execution/WorkflowRunner.cs b/dotnet/src/Shared/Workflows/Execution/WorkflowRunner.cs index b8666451f6..380ea5eaeb 100644 --- a/dotnet/src/Shared/Workflows/Execution/WorkflowRunner.cs +++ b/dotnet/src/Shared/Workflows/Execution/WorkflowRunner.cs @@ -95,7 +95,7 @@ public async Task ExecuteAsync(Func workflowProvider, string input) Debug.WriteLine($"RESTORE #{this.LastCheckpoint.CheckpointId}"); Notify("WORKFLOW: Restore", ConsoleColor.DarkYellow); - run = await InProcessExecution.ResumeStreamAsync(workflow, this.LastCheckpoint, checkpointManager, run.Run.RunId).ConfigureAwait(false); + run = await InProcessExecution.ResumeStreamAsync(workflow, this.LastCheckpoint, checkpointManager).ConfigureAwait(false); } else { diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.IntegrationTests/Framework/WorkflowHarness.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.IntegrationTests/Framework/WorkflowHarness.cs index afd9b18fb9..80d4c57da8 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.IntegrationTests/Framework/WorkflowHarness.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.IntegrationTests/Framework/WorkflowHarness.cs @@ -55,7 +55,7 @@ public async Task ResumeAsync(ExternalResponse response) { Console.WriteLine("\nRESUMING WORKFLOW..."); Assert.NotNull(this._lastCheckpoint); - Checkpointed run = await InProcessExecution.ResumeStreamAsync(workflow, this._lastCheckpoint, this.GetCheckpointManager(), runId); + Checkpointed run = await InProcessExecution.ResumeStreamAsync(workflow, this._lastCheckpoint, this.GetCheckpointManager()); IReadOnlyList workflowEvents = await MonitorAndDisposeWorkflowRunAsync(run, response).ToArrayAsync(); return new WorkflowEvents(workflowEvents); } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/05_Simple_Workflow_Checkpointing.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/05_Simple_Workflow_Checkpointing.cs index aab5fd0958..7216d44208 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/05_Simple_Workflow_Checkpointing.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/05_Simple_Workflow_Checkpointing.cs @@ -42,7 +42,7 @@ await environment.StreamAsync(workflow, NumberSignal.Init, checkpointManager) { await handle.DisposeAsync().ConfigureAwait(false); - checkpointed = await environment.ResumeStreamAsync(workflow, targetCheckpoint, checkpointManager, runId: handle.RunId, cancellationToken: CancellationToken.None) + checkpointed = await environment.ResumeStreamAsync(workflow, targetCheckpoint, checkpointManager, cancellationToken: CancellationToken.None) .ConfigureAwait(false); handle = checkpointed.Run; } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/13_Subworkflow_Checkpointing.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/13_Subworkflow_Checkpointing.cs new file mode 100644 index 0000000000..113731e679 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Sample/13_Subworkflow_Checkpointing.cs @@ -0,0 +1,95 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows.Sample; + +internal static class Step13EntryPoint +{ + public static Workflow SubworkflowInstance + { + get + { + OutputMessagesExecutor output = new(new ChatProtocolExecutorOptions() { StringMessageChatRole = ChatRole.User }); + return new WorkflowBuilder(output).WithOutputFrom(output).Build(); + } + } + + public static Workflow WorkflowInstance + { + get + { + ExecutorBinding subworkflow = SubworkflowInstance.BindAsExecutor("EchoSubworkflow"); + return new WorkflowBuilder(subworkflow).WithOutputFrom(subworkflow).Build(); + } + } + + public static async ValueTask RunAsAgentAsync(TextWriter writer, string input, IWorkflowExecutionEnvironment environment, AgentThread? thread) + { + AIAgent hostAgent = WorkflowInstance.AsAgent("echo-workflow", "EchoW", executionEnvironment: environment, includeWorkflowOutputsInResponse: true); + + thread ??= await hostAgent.GetNewThreadAsync(); + AgentResponse response; + ResponseContinuationToken? continuationToken = null; + do + { + response = await hostAgent.RunAsync(input, thread, new AgentRunOptions { ContinuationToken = continuationToken }); + } while ((continuationToken = response.ContinuationToken) is { }); + + foreach (ChatMessage message in response.Messages) + { + writer.WriteLine($"{message.AuthorName}: {message.Text}"); + } + + return thread; + } + + public static async ValueTask RunAsync(TextWriter writer, string input, IWorkflowExecutionEnvironment environment, CheckpointManager checkpointManager, CheckpointInfo? resumeFrom) + { + await using Checkpointed checkpointed = await BeginAsync(); + StreamingRun run = checkpointed.Run; + + await run.TrySendMessageAsync(new TurnToken()); + + CheckpointInfo? lastCheckpoint = null; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + if (evt is WorkflowOutputEvent output) + { + if (output.Data is List messages) + { + foreach (ChatMessage message in messages) + { + writer.WriteLine($"{output.SourceId}: {message.Text}"); + } + } + else + { + Debug.Fail($"Unexpected output type: {(output.Data == null ? "null" : output.Data?.GetType().Name)}"); + } + } + else if (evt is SuperStepCompletedEvent stepCompleted) + { + lastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint; + } + } + + return lastCheckpoint!; + + async ValueTask> BeginAsync() + { + if (resumeFrom == null) + { + return await environment.StreamAsync(WorkflowInstance, input, checkpointManager); + } + + Checkpointed checkpointed = await environment.ResumeStreamAsync(WorkflowInstance, resumeFrom, checkpointManager); + await checkpointed.Run.TrySendMessageAsync(input); + return checkpointed; + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs index dbe3a56d06..214333f11e 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SampleSmokeTest.cs @@ -371,6 +371,72 @@ IEnumerable EchoesForInput(string input) Action CreateValidator(string expected) => actual => actual.Should().Be(expected); } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + [InlineData(ExecutionEnvironment.InProcess_Concurrent)] + internal async Task Test_RunSample_Step13Async(ExecutionEnvironment environment) + { + IWorkflowExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + CheckpointInfo? resumeFrom = null; + + await RunAndValidateAsync(1); + + // this should crash before fix + await RunAndValidateAsync(2); + + async ValueTask RunAndValidateAsync(int step) + { + using StringWriter writer = new(); + string input = $"[{step}] Hello, World!"; + + resumeFrom = await Step13EntryPoint.RunAsync(writer, input, executionEnvironment, checkpointManager, resumeFrom); + + string result = writer.ToString(); + string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries); + + const string ExpectedSource = "EchoSubworkflow"; + Assert.Collection(lines, + line => Assert.Contains($"{ExpectedSource}: {input}", line) + ); + } + } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + [InlineData(ExecutionEnvironment.InProcess_Concurrent)] + internal async Task Test_RunSample_Step13aAsync(ExecutionEnvironment environment) + { + IWorkflowExecutionEnvironment executionEnvironment = environment.ToWorkflowExecutionEnvironment(); + AgentThread? thread = null; + + await RunAndValidateAsync(1); + + // this should crash before fix + await RunAndValidateAsync(2); + + async ValueTask RunAndValidateAsync(int step) + { + using StringWriter writer = new(); + string input = $"[{step}] Hello, World!"; + + thread = await Step13EntryPoint.RunAsAgentAsync(writer, input, executionEnvironment, thread); + + string result = writer.ToString(); + string[] lines = result.Split([Environment.NewLine], StringSplitOptions.RemoveEmptyEntries); + + // We expect to get the message that was passed in directly; since we are passing it in as a string, there is no associated + // author information. The ExpectedSource is empty string. + const string ExpectedSource = ""; + Assert.Collection(lines, + line => Assert.Contains($"{ExpectedSource}: {input}", line) + ); + } + } } internal sealed class VerifyingPlaybackResponder diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs index 57375b8341..b90bd30c54 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs @@ -68,6 +68,9 @@ public ValueTask PostAsync(ExternalRequest request) } internal Dictionary> QueuedMessages { get; } = []; + + internal Dictionary> QueuedOutputs { get; } = []; + public ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null, CancellationToken cancellationToken = default) { if (!this.QueuedMessages.TryGetValue(sourceId, out List? deliveryQueue)) @@ -79,6 +82,17 @@ public ValueTask SendMessageAsync(string sourceId, object message, string? targe return default; } + public ValueTask YieldOutputAsync(string sourceId, object output, CancellationToken cancellationToken = default) + { + if (!this.QueuedOutputs.TryGetValue(sourceId, out List? outputQueue)) + { + this.QueuedOutputs[sourceId] = outputQueue = []; + } + + outputQueue.Add(output); + return default; + } + ValueTask IRunnerContext.AdvanceAsync(CancellationToken cancellationToken) => throw new NotImplementedException(); @@ -104,8 +118,11 @@ public ValueTask> GetStartingExecutorInputTypesAsync(Cancellat public ValueTask ForwardWorkflowEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default) => this.AddEventAsync(workflowEvent, cancellationToken); - public ValueTask SendMessageAsync(string senderId, [System.Diagnostics.CodeAnalysis.DisallowNull] TMessage message, CancellationToken cancellationToken = default) - => this.SendMessageAsync(senderId, message, cancellationToken); + ValueTask ISuperStepJoinContext.SendMessageAsync(string senderId, [System.Diagnostics.CodeAnalysis.DisallowNull] TMessage message, CancellationToken cancellationToken) + => this.SendMessageAsync(senderId, message, cancellationToken: cancellationToken); + + ValueTask ISuperStepJoinContext.YieldOutputAsync(string senderId, [System.Diagnostics.CodeAnalysis.DisallowNull] TOutput output, CancellationToken cancellationToken) + => this.YieldOutputAsync(senderId, output, cancellationToken); ValueTask ISuperStepJoinContext.AttachSuperstepAsync(ISuperStepRunner superStepRunner, CancellationToken cancellationToken) => new(string.Empty); ValueTask ISuperStepJoinContext.DetachSuperstepAsync(string joinId) => new(false);