-
Notifications
You must be signed in to change notification settings - Fork 501
Add InvokeAsync for chained durable function calls (DOTNET-8661) #2374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
GarrettBeatty
wants to merge
4
commits into
feature/durablefunction
Choose a base branch
from
gcbeatty/durable-invoke
base: feature/durablefunction
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
182 changes: 182 additions & 0 deletions
182
Libraries/src/Amazon.Lambda.DurableExecution/Internal/InvokeOperation.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,182 @@ | ||
| using System.IO; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add license header
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #2398 added in this separate PR instead |
||
| using System.Text; | ||
| using Amazon.Lambda.Core; | ||
| using SdkChainedInvokeOptions = Amazon.Lambda.Model.ChainedInvokeOptions; | ||
| using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; | ||
|
|
||
| namespace Amazon.Lambda.DurableExecution.Internal; | ||
|
|
||
| /// <summary> | ||
| /// Durable chained-invoke operation. Schedules an asynchronous invocation of | ||
| /// another durable Lambda function via the durable execution service and | ||
| /// suspends the parent workflow until the chained execution reaches a terminal | ||
| /// state. The service drives the chained function and re-invokes the parent | ||
| /// with an updated operation status. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// Replay branches — example: | ||
| /// <c>await ctx.InvokeAsync<Req, Resp>("arn:...:fn:prod", req, "process_payment")</c> | ||
| /// <list type="bullet"> | ||
| /// <item><b>Fresh</b>: serialize payload → sync-flush <c>CHAINED_INVOKE START</c> | ||
| /// (carrying <see cref="SdkChainedInvokeOptions"/>) → suspend with | ||
| /// <see cref="TerminationReason.InvokePending"/>.</item> | ||
| /// <item><b>SUCCEEDED</b>: deserialize and return cached result from | ||
| /// <c>ChainedInvokeDetails.Result</c>; the chained function is NOT | ||
| /// re-invoked.</item> | ||
| /// <item><b>FAILED</b>: throw <see cref="InvokeFailedException"/> populated | ||
| /// from the recorded error.</item> | ||
| /// <item><b>TIMED_OUT</b>: throw <see cref="InvokeTimedOutException"/>.</item> | ||
| /// <item><b>STOPPED</b>: throw <see cref="InvokeStoppedException"/>.</item> | ||
| /// <item><b>STARTED</b> / <b>PENDING</b>: chained execution is still in | ||
| /// flight; re-suspend without re-checkpointing — the original | ||
| /// <c>START</c> remains authoritative.</item> | ||
| /// </list> | ||
| /// Mirrors <see cref="WaitOperation"/>'s "sync-flush START → suspend" idiom; | ||
| /// the chained function executes out-of-process so there is nothing to run | ||
| /// locally on either fresh or replay paths besides the suspend wiring. | ||
| /// Serialization is delegated to the <see cref="ILambdaSerializer"/> registered | ||
| /// on <see cref="ILambdaContext.Serializer"/>; AOT-safe and reflection-based | ||
| /// callers share the same code path (the AOT story is determined by the | ||
| /// registered serializer). | ||
| /// </remarks> | ||
| internal sealed class InvokeOperation<TPayload, TResult> : DurableOperation<TResult> | ||
| { | ||
| private readonly string _functionName; | ||
| private readonly TPayload _payload; | ||
| private readonly InvokeConfig? _config; | ||
| private readonly ILambdaSerializer _serializer; | ||
|
|
||
| public InvokeOperation( | ||
| string operationId, | ||
| string? name, | ||
| string? parentId, | ||
| string functionName, | ||
| TPayload payload, | ||
| InvokeConfig? config, | ||
| ILambdaSerializer serializer, | ||
| ExecutionState state, | ||
| TerminationManager termination, | ||
| string durableExecutionArn, | ||
| CheckpointBatcher? batcher = null) | ||
| : base(operationId, name, parentId, state, termination, durableExecutionArn, batcher) | ||
| { | ||
| _functionName = functionName; | ||
| _payload = payload; | ||
| _config = config; | ||
| _serializer = serializer; | ||
| } | ||
|
|
||
| protected override string OperationType => OperationTypes.ChainedInvoke; | ||
|
|
||
| protected override async Task<TResult> StartAsync(CancellationToken cancellationToken) | ||
| { | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| var serializedPayload = SerializeValue(_payload); | ||
|
|
||
| // The service is what actually invokes the chained function, so it | ||
| // must receive this START before we suspend. If we only batched it | ||
| // locally and the parent process were recycled at suspend, the START | ||
| // would be lost and the chained function would never run. | ||
| await EnqueueAsync(new SdkOperationUpdate | ||
| { | ||
| Id = OperationId, | ||
| ParentId = ParentId, | ||
| Type = OperationTypes.ChainedInvoke, | ||
| Action = OperationAction.START, | ||
| SubType = OperationSubTypes.ChainedInvoke, | ||
| Name = Name, | ||
| Payload = serializedPayload, | ||
| ChainedInvokeOptions = new SdkChainedInvokeOptions | ||
| { | ||
| FunctionName = _functionName, | ||
| TenantId = _config?.TenantId | ||
| } | ||
| }, cancellationToken); | ||
|
|
||
| return await Termination.SuspendAndAwait<TResult>( | ||
| TerminationReason.InvokePending, $"invoke:{Name ?? _functionName}"); | ||
| } | ||
|
|
||
| protected override Task<TResult> ReplayAsync(Operation existing, CancellationToken cancellationToken) | ||
| { | ||
| switch (existing.Status) | ||
| { | ||
| case OperationStatuses.Succeeded: | ||
| return Task.FromResult(DeserializeResult(existing.ChainedInvokeDetails?.Result)); | ||
|
|
||
| case OperationStatuses.Failed: | ||
| throw BuildFailed(existing); | ||
|
|
||
| case OperationStatuses.TimedOut: | ||
| throw BuildTimedOut(existing); | ||
|
|
||
| case OperationStatuses.Stopped: | ||
| throw BuildStopped(existing); | ||
|
|
||
| case OperationStatuses.Started: | ||
| case OperationStatuses.Pending: | ||
| // Chained function is still running. Just suspend again — | ||
| // the original START is already on the service, so don't | ||
| // re-checkpoint it. Whenever the service re-invokes us next, | ||
| // it will include the updated status. | ||
| return Termination.SuspendAndAwait<TResult>( | ||
| TerminationReason.InvokePending, $"invoke:{Name ?? _functionName}"); | ||
|
|
||
| default: | ||
| throw new NonDeterministicExecutionException( | ||
| $"Chained invoke operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay."); | ||
| } | ||
| } | ||
|
|
||
| private string SerializeValue(TPayload value) | ||
| { | ||
| using var ms = new MemoryStream(); | ||
| _serializer.Serialize(value, ms); | ||
| return Encoding.UTF8.GetString(ms.ToArray()); | ||
| } | ||
|
|
||
| private TResult DeserializeResult(string? serialized) | ||
| { | ||
| if (serialized == null) return default!; | ||
| var bytes = Encoding.UTF8.GetBytes(serialized); | ||
| using var ms = new MemoryStream(bytes); | ||
| return _serializer.Deserialize<TResult>(ms); | ||
| } | ||
|
|
||
| private InvokeFailedException BuildFailed(Operation failedOp) | ||
| { | ||
| var err = failedOp.ChainedInvokeDetails?.Error; | ||
| return new InvokeFailedException(err?.ErrorMessage ?? "Chained invoke failed.") | ||
| { | ||
| FunctionName = _functionName, | ||
| ErrorType = err?.ErrorType, | ||
| ErrorData = err?.ErrorData, | ||
| OriginalStackTrace = err?.StackTrace | ||
| }; | ||
| } | ||
|
|
||
| private InvokeTimedOutException BuildTimedOut(Operation failedOp) | ||
| { | ||
| var err = failedOp.ChainedInvokeDetails?.Error; | ||
| return new InvokeTimedOutException(err?.ErrorMessage ?? "Chained invoke timed out.") | ||
| { | ||
| FunctionName = _functionName, | ||
| ErrorType = err?.ErrorType, | ||
| ErrorData = err?.ErrorData, | ||
| OriginalStackTrace = err?.StackTrace | ||
| }; | ||
| } | ||
|
|
||
| private InvokeStoppedException BuildStopped(Operation failedOp) | ||
| { | ||
| var err = failedOp.ChainedInvokeDetails?.Error; | ||
| return new InvokeStoppedException(err?.ErrorMessage ?? "Chained invoke was stopped.") | ||
| { | ||
| FunctionName = _functionName, | ||
| ErrorType = err?.ErrorType, | ||
| ErrorData = err?.ErrorData, | ||
| OriginalStackTrace = err?.StackTrace | ||
| }; | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other sdks dont have this. this was a hallucination