Skip to content

Commit 27b9a6e

Browse files
sophiatevSophia Tevosyan
andauthored
Add API to get orchestration history (#516)
* first commit * more updated * removed the API from the shim client * updated changelog --------- Co-authored-by: Sophia Tevosyan <[email protected]>
1 parent 8c8f184 commit 27b9a6e

File tree

4 files changed

+69
-3
lines changed

4 files changed

+69
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
- Update project to target .net 8.0 and .net 10 and upgrade dependencies by Tomer Rosenthal ([#510](https://github.com/microsoft/durabletask-dotnet/pull/510))
1414
- Support worker features announcement by wangbill ([#502](https://github.com/microsoft/durabletask-dotnet/pull/502))
1515
- Introduce custom copilot review instructions by halspang ([#503](https://github.com/microsoft/durabletask-dotnet/pull/503))
16+
- Add API to get orchestration history ([#516](https://github.com/microsoft/durabletask-dotnet/pull/516))
1617

1718
## v1.17.1
1819
- Fix Worker Registry and Add Documentation Notes by nytian in [#462](https://github.com/microsoft/durabletask-dotnet/pull/495)

src/Client/Core/DurableTaskClient.cs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33

44
using System.ComponentModel;
5+
using DurableTask.Core.History;
56
using Microsoft.DurableTask.Client.Entities;
67
using Microsoft.DurableTask.Internal;
78

@@ -456,15 +457,34 @@ public virtual Task<string> RestartAsync(
456457
/// <exception cref="NotSupportedException">Thrown if this implementation of <see cref="DurableTaskClient"/> does not
457458
/// support rewinding orchestrations.</exception>
458459
/// <exception cref="NotImplementedException">Thrown if the backend storage provider does not support rewinding orchestrations.</exception>
459-
/// <exception cref="ArgumentException">Thrown if an orchestration with the specified <paramref name="instanceId"/> does not exist.</exception>
460+
/// <exception cref="ArgumentException">Thrown if an orchestration with the specified <paramref name="instanceId"/> does not exist,
461+
/// or if <paramref name="instanceId"/> is the instance ID of an entity.</exception>
460462
/// <exception cref="InvalidOperationException">Thrown if a precondition of the operation fails, for example if the specified
461463
/// orchestration is not in a "Failed" state.</exception>
462464
/// <exception cref="OperationCanceledException">Thrown if the operation is canceled via the <paramref name="cancellation"/> token.</exception>
463465
public virtual Task RewindInstanceAsync(
464466
string instanceId,
465467
string reason,
466468
CancellationToken cancellation = default)
467-
=> throw new NotSupportedException($"{this.GetType()} does not support orchestration rewind.");
469+
=> throw new NotSupportedException($"{this.GetType()} does not support orchestration rewind.");
470+
471+
/// <summary>
472+
/// Retrieves the history of the specified orchestration instance as a list of <see cref="HistoryEvent"/> objects.
473+
/// </summary>
474+
/// <param name="instanceId">The instance ID of the orchestration.</param>
475+
/// <param name="cancellation">The cancellation token.</param>
476+
/// <returns>The list of <see cref="HistoryEvent"/> objects representing the orchestration's history.</returns>
477+
/// <exception cref="NotSupportedException">Thrown if this implementation of <see cref="DurableTaskClient"/> does not
478+
/// support retrieving orchestration history.</exception>
479+
/// <exception cref="ArgumentNullException">Thrown if <paramref name="instanceId"/> is null.</exception>
480+
/// <exception cref="ArgumentException">Thrown if an orchestration with the specified <paramref name="instanceId"/> does not exist,
481+
/// or if <paramref name="instanceId"/> is the instance ID of an entity.</exception>
482+
/// <exception cref="OperationCanceledException">Thrown if the operation is canceled via the <paramref name="cancellation"/> token.</exception>
483+
/// <exception cref="InvalidOperationException">Thrown if an internal error occurs when attempting to retrieve the orchestration history.</exception>
484+
public virtual Task<IList<HistoryEvent>> GetOrchestrationHistoryAsync(
485+
string instanceId,
486+
CancellationToken cancellation = default)
487+
=> throw new NotSupportedException($"{this.GetType()} does not support retrieving orchestration history.");
468488

469489
// TODO: Create task hub
470490

src/Client/Grpc/GrpcDurableTaskClient.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System.Diagnostics;
55
using System.Text;
6+
using DurableTask.Core.History;
67
using Google.Protobuf.WellKnownTypes;
78
using Microsoft.DurableTask.Client.Entities;
89
using Microsoft.DurableTask.Tracing;
@@ -468,6 +469,49 @@ public override async Task RewindInstanceAsync(
468469
}
469470
}
470471

472+
/// <inheritdoc/>
473+
public override async Task<IList<HistoryEvent>> GetOrchestrationHistoryAsync(
474+
string instanceId,
475+
CancellationToken cancellation = default)
476+
{
477+
Check.NotNullOrEmpty(instanceId);
478+
Check.NotEntity(this.options.EnableEntitySupport, instanceId);
479+
480+
P.StreamInstanceHistoryRequest streamRequest = new()
481+
{
482+
InstanceId = instanceId,
483+
ForWorkItemProcessing = false,
484+
};
485+
486+
try
487+
{
488+
using AsyncServerStreamingCall<P.HistoryChunk> streamResponse =
489+
this.sidecarClient.StreamInstanceHistory(streamRequest, cancellationToken: cancellation);
490+
491+
List<HistoryEvent> pastEvents = [];
492+
while (await streamResponse.ResponseStream.MoveNext(cancellation))
493+
{
494+
pastEvents.AddRange(streamResponse.ResponseStream.Current.Events.Select(DurableTask.ProtoUtils.ConvertHistoryEvent));
495+
}
496+
497+
return pastEvents;
498+
}
499+
catch (RpcException e) when (e.StatusCode == StatusCode.NotFound)
500+
{
501+
throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found.", e);
502+
}
503+
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
504+
{
505+
throw new OperationCanceledException(
506+
$"The {nameof(this.GetOrchestrationHistoryAsync)} operation was canceled.", e, cancellation);
507+
}
508+
catch (RpcException e) when (e.StatusCode == StatusCode.Internal)
509+
{
510+
throw new InvalidOperationException(
511+
$"An error occurred while retrieving the history for orchestration with instanceId {instanceId}.", e);
512+
}
513+
}
514+
471515
static AsyncDisposable GetCallInvoker(GrpcDurableTaskClientOptions options, out CallInvoker callInvoker)
472516
{
473517
if (options.Channel is GrpcChannel c)

src/Shared/Grpc/ProtoUtils.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityCon
8080
historyEvent = new ExecutionCompletedEvent(
8181
proto.EventId,
8282
proto.ExecutionCompleted.Result,
83-
proto.ExecutionCompleted.OrchestrationStatus.ToCore());
83+
proto.ExecutionCompleted.OrchestrationStatus.ToCore(),
84+
proto.ExecutionCompleted.FailureDetails.ToCore());
8485
break;
8586
case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated:
8687
historyEvent = new ExecutionTerminatedEvent(proto.EventId, proto.ExecutionTerminated.Input);

0 commit comments

Comments
 (0)