From 8f7b97461a5286417fce7653b11764a18764fbff Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 1 Dec 2025 10:33:02 -0800 Subject: [PATCH 1/3] initial implementation --- .../DurabilityProvider.cs | 14 + .../HistoryEventJsonConverter.cs | 61 +++ .../ProtobufUtils.cs | 34 +- .../TaskHubGrpcServer.cs | 89 +++++ .../FunctionsDurableTaskClient.cs | 10 +- .../GetOrchestrationHistory.cs | 214 ++++++++++ .../Tests/DistributedTracingEntitiesTests.cs | 5 +- .../Tests/GetOrchestrationHistoryTests.cs | 368 ++++++++++++++++++ 8 files changed, 781 insertions(+), 14 deletions(-) create mode 100644 src/WebJobs.Extensions.DurableTask/HistoryEventJsonConverter.cs create mode 100644 test/e2e/Apps/BasicDotNetIsolated/GetOrchestrationHistory.cs create mode 100644 test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs diff --git a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs index 789e88685..b97326646 100644 --- a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs @@ -9,6 +9,7 @@ using DurableTask.Core.Entities; using DurableTask.Core.History; using DurableTask.Core.Query; +using Google.Protobuf; using Microsoft.Azure.WebJobs.Host.Scale; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -598,6 +599,19 @@ public virtual bool TryGetTargetScaler( { targetScaler = null; return false; + } + + /// + /// Streams the history of the specified orchestration instance as an enumerable of serialized history chunks. + /// + /// The instance ID of the orchestration. + /// The maximum size (in bytes) of each history chunk. + /// The JSON formatter used to serialize the history chunks. + /// The cancellation token. + /// The enumerable of history chunks representing the orchestration's history. + public virtual Task> StreamOrchestrationHistoryAsync(string instanceId, JsonFormatter jsonFormatter, CancellationToken cancellationToken) + { + throw this.GetNotImplementedException(nameof(this.StreamOrchestrationHistoryAsync)); } } } diff --git a/src/WebJobs.Extensions.DurableTask/HistoryEventJsonConverter.cs b/src/WebJobs.Extensions.DurableTask/HistoryEventJsonConverter.cs new file mode 100644 index 000000000..ebebcd81f --- /dev/null +++ b/src/WebJobs.Extensions.DurableTask/HistoryEventJsonConverter.cs @@ -0,0 +1,61 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for license information. + +#nullable enable +using System; +using DurableTask.Core.History; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microsoft.Azure.WebJobs.Extensions.DurableTask +{ + public class HistoryEventJsonConverter : JsonConverter + { + public override bool CanWrite => false; + + public override bool CanConvert(Type objectType) + { + return objectType == typeof(HistoryEvent); + } + + public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer) + { + var jo = JObject.Load(reader); + int eventType = jo["EventType"]?.Value() + ?? throw new JsonSerializationException("EventType missing"); + + Type concreteType = (EventType)eventType switch + { + EventType.ExecutionStarted => typeof(ExecutionStartedEvent), + EventType.ExecutionCompleted => typeof(ExecutionCompletedEvent), + EventType.TaskScheduled => typeof(TaskScheduledEvent), + EventType.TaskCompleted => typeof(TaskCompletedEvent), + EventType.TaskFailed => typeof(TaskFailedEvent), + EventType.SubOrchestrationInstanceCreated => typeof(SubOrchestrationInstanceCreatedEvent), + EventType.SubOrchestrationInstanceCompleted => typeof(SubOrchestrationInstanceCompletedEvent), + EventType.SubOrchestrationInstanceFailed => typeof(SubOrchestrationInstanceFailedEvent), + EventType.TimerCreated => typeof(TimerCreatedEvent), + EventType.TimerFired => typeof(TimerFiredEvent), + EventType.OrchestratorStarted => typeof(OrchestratorStartedEvent), + EventType.OrchestratorCompleted => typeof(OrchestratorCompletedEvent), + EventType.EventSent => typeof(EventSentEvent), + EventType.EventRaised => typeof(EventRaisedEvent), + EventType.GenericEvent => typeof(GenericEvent), + EventType.ContinueAsNew => typeof(ContinueAsNewEvent), + EventType.ExecutionTerminated => typeof(ExecutionTerminatedEvent), + EventType.ExecutionSuspended => typeof(ExecutionSuspendedEvent), + EventType.ExecutionResumed => typeof(ExecutionResumedEvent), + EventType.ExecutionRewound => typeof(ExecutionRewoundEvent), + EventType.HistoryState => typeof(HistoryStateEvent), + _ => throw new NotSupportedException($"Unknown HistoryEvent type {eventType}") + }; + + return jo.ToObject(concreteType, serializer); + } + + public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer) + { + throw new NotImplementedException(); + } + } +} diff --git a/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs b/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs index 0975c5e98..3edf18137 100644 --- a/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs +++ b/src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs @@ -66,16 +66,9 @@ public static P.HistoryEvent ToHistoryEventProto(HistoryEvent e) var completedEvent = (ExecutionCompletedEvent)e; payload.ExecutionCompleted = new P.ExecutionCompletedEvent { - OrchestrationStatus = P.OrchestrationStatus.Completed, + OrchestrationStatus = (P.OrchestrationStatus)completedEvent.OrchestrationStatus, Result = completedEvent.Result, - }; - break; - case EventType.ExecutionFailed: - var failedEvent = (ExecutionCompletedEvent)e; - payload.ExecutionCompleted = new P.ExecutionCompletedEvent - { - OrchestrationStatus = P.OrchestrationStatus.Failed, - Result = failedEvent.Result, + FailureDetails = GetFailureDetails(completedEvent.FailureDetails), }; break; case EventType.ExecutionStarted: @@ -108,7 +101,16 @@ public static P.HistoryEvent ToHistoryEventProto(HistoryEvent e) TraceParent = startedEvent.ParentTraceContext.TraceParent, TraceState = startedEvent.ParentTraceContext.TraceState, }, - }; + }; + + if (startedEvent.Tags != null) + { + foreach (KeyValuePair tag in startedEvent.Tags) + { + payload.ExecutionStarted.Tags[tag.Key] = tag.Value; + } + } + break; case EventType.ExecutionTerminated: var terminatedEvent = (ExecutionTerminatedEvent)e; @@ -124,7 +126,16 @@ public static P.HistoryEvent ToHistoryEventProto(HistoryEvent e) Name = taskScheduledEvent.Name, Version = taskScheduledEvent.Version, Input = taskScheduledEvent.Input, - }; + }; + + if (taskScheduledEvent.Tags != null) + { + foreach (KeyValuePair tag in taskScheduledEvent.Tags) + { + payload.TaskScheduled.Tags[tag.Key] = tag.Value; + } + } + break; case EventType.TaskCompleted: var taskCompletedEvent = (TaskCompletedEvent)e; @@ -252,6 +263,7 @@ public static OrchestratorAction ToOrchestratorAction(P.OrchestratorAction a) Input = a.ScheduleTask.Input, Name = a.ScheduleTask.Name, Version = a.ScheduleTask.Version, + Tags = a.ScheduleTask.Tags.ToDictionary(), }; case P.OrchestratorAction.OrchestratorActionTypeOneofCase.CreateSubOrchestration: return new CreateSubOrchestrationAction diff --git a/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs index 486ac4eaa..009021137 100644 --- a/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs +++ b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs @@ -3,7 +3,9 @@ #nullable enable using System; +using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Threading; using System.Threading.Tasks; using DurableTask.Core; @@ -12,9 +14,11 @@ using DurableTask.Core.History; using DurableTask.Core.Query; using DurableTask.Core.Serializing.Internal; +using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation; +using Newtonsoft.Json; using DTCore = DurableTask.Core; using P = Microsoft.DurableTask.Protobuf; @@ -22,6 +26,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { internal class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBase { + private const int MaxHistoryChunkSizeInBytes = 2 * 1024 * 1024; // 2 MB private readonly DurableTaskExtension extension; public TaskHubGrpcServer(DurableTaskExtension extension) @@ -64,6 +69,7 @@ public override Task Hello(Empty request, ServerCallContext context) Version = request.Version != null ? request.Version : this.extension.Options.DefaultVersion, OrchestrationInstance = instance, ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(), + Tags = request.Tags.ToDictionary(), }; // Get the parent trace context from CreateInstanceRequest @@ -475,6 +481,89 @@ private static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationStat }; } + public async override Task StreamInstanceHistory( + P.StreamInstanceHistoryRequest request, + IServerStreamWriter responseStream, + ServerCallContext context) + { + if (await this.GetClient(context).GetStatusAsync(request.InstanceId, showInput: false) is null) + { + throw new RpcException(new Status(StatusCode.NotFound, $"Orchestration instance with ID {request.InstanceId} was not found.")); + } + + try + { + // First, try to use the streaming API if it's implemented. + try + { + IEnumerable historyChunks = await this.GetDurabilityProvider(context).StreamOrchestrationHistoryAsync( + request.InstanceId, + new JsonFormatter(new JsonFormatter.Settings(formatDefaultValues: true)), + context.CancellationToken); + + JsonParser jsonParser = new (JsonParser.Settings.Default.WithIgnoreUnknownFields(true)); + foreach (string chunk in historyChunks) + { + context.CancellationToken.ThrowIfCancellationRequested(); + await responseStream.WriteAsync(jsonParser.Parse(chunk)); + } + } + + // Otherwise default to the older non-streaming implementation. + catch (NotImplementedException) + { + string jsonHistory = await this.GetDurabilityProvider(context).GetOrchestrationHistoryAsync( + request.InstanceId, + executionId: null); + + // Throw exception or return an empty list? + List? historyEvents = JsonConvert.DeserializeObject>( + jsonHistory, + new JsonSerializerSettings() + { + Converters = { new HistoryEventJsonConverter() }, + }) + ?? throw new RpcException(new Status(StatusCode.Internal, "Failed to deserialize orchestration history.")); + + int currentChunkSizeInBytes = 0; + + P.HistoryChunk chunk = new (); + + foreach (HistoryEvent historyEvent in historyEvents) + { + context.CancellationToken.ThrowIfCancellationRequested(); + P.HistoryEvent result = ProtobufUtils.ToHistoryEventProto(historyEvent); + + int currentEventSize = result.CalculateSize(); + if (currentChunkSizeInBytes + currentEventSize > MaxHistoryChunkSizeInBytes) + { + // If we exceeded the chunk size threshold, send what we have so far. + await responseStream.WriteAsync(chunk); + chunk = new (); + currentChunkSizeInBytes = 0; + } + + chunk.Events.Add(result); + currentChunkSizeInBytes += currentEventSize; + } + + // Send the last chunk, which may be smaller than the maximum chunk size. + if (chunk.Events.Count > 0) + { + await responseStream.WriteAsync(chunk); + } + } + } + catch (OperationCanceledException) + { + throw new RpcException(new Status(StatusCode.Cancelled, $"Orchestration history streaming cancelled for instance {request.InstanceId}")); + } + catch (Exception ex) + { + throw new RpcException(new Status(StatusCode.Internal, $"Failed to stream orchestration history for instance {request.InstanceId}: {ex.Message}")); + } + } + private static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) { if (failureDetails == null) diff --git a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs index 1bd0e98cd..344ac1bce 100644 --- a/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs +++ b/src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs @@ -1,9 +1,10 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using DurableTask.Core.History; using Microsoft.DurableTask; using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Client.Entities; @@ -115,4 +116,11 @@ public override Task RewindInstanceAsync( { return this.inner.RewindInstanceAsync(instanceId, reason, cancellation); } + + public override Task> GetOrchestrationHistoryAsync( + string instanceId, + CancellationToken cancellation = default) + { + return this.inner.GetOrchestrationHistoryAsync(instanceId, cancellation); + } } diff --git a/test/e2e/Apps/BasicDotNetIsolated/GetOrchestrationHistory.cs b/test/e2e/Apps/BasicDotNetIsolated/GetOrchestrationHistory.cs new file mode 100644 index 000000000..f948a81c8 --- /dev/null +++ b/test/e2e/Apps/BasicDotNetIsolated/GetOrchestrationHistory.cs @@ -0,0 +1,214 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Net; +using DurableTask.Core.History; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Http; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; + +namespace Microsoft.Azure.Durable.Tests.E2E; + +public static class GetOrchestrationHistory +{ + public static readonly EntityInstanceId entityId = new(nameof(SimpleEntity), "singleton"); + + [Function(nameof(ParentOrchestration))] + public static async Task ParentOrchestration( + [OrchestrationTrigger] TaskOrchestrationContext context) + { + ComplexInput? input = context.GetInput(); + + if (input == null) + { + throw new ArgumentNullException(nameof(input)); + } + + if (input.OrchestrationType == "succeed") + { + // Try setting various fields to null to ensure serialization of the history works as expected. + input.Tags = null; + await context.CallSubOrchestratorAsync( + nameof(CallLargeOutputTasksSubOrchestration), + input, + new SubOrchestrationOptions { InstanceId = input.SubOrchestrationInstanceId } + ); + } + else + { + // Try setting various fields to null to ensure serialization of the history works as expected. + input.OrchestrationType = null; + await context.CallSubOrchestratorAsync( + nameof(FailSubOrchestration), + input, + new SubOrchestrationOptions { InstanceId = input.SubOrchestrationInstanceId, Tags = input.Tags } + ); + } + + return input; + } + + [Function(nameof(FailSubOrchestration))] + public static async Task FailSubOrchestration( + [OrchestrationTrigger] TaskOrchestrationContext context) + { + await context.CallActivityAsync(nameof(ThrowExceptionActivity), new TaskOptions { Tags = context.GetInput()?.Tags }); + } + + [Function(nameof(CallLargeOutputTasksSubOrchestration))] + public static async Task CallLargeOutputTasksSubOrchestration( + [OrchestrationTrigger] TaskOrchestrationContext context) + { + ComplexInput? input = context.GetInput(); + if (input == null) + { + throw new ArgumentNullException(nameof(input)); + } + + await context.CallActivityAsync(nameof(LargeOutputActivity), input.OutputSize); + + if (input.CallEntities) + { + await context.Entities.SignalEntityAsync(entityId, "set", input.OutputSize); + // Add a timer to give the signal some more time to be processed before we read the entity state. + // We could make this a "call" rather than a "signal", but this ensures we get more history event types in the orchestration history. + await context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(5), CancellationToken.None); + await context.Entities.CallEntityAsync(entityId, "get"); + } + else + { + await context.CallActivityAsync(nameof(LargeOutputActivity), input.OutputSize); + } + return input; + } + + [Function(nameof(LargeOutputActivity))] + public static string LargeOutputActivity([ActivityTrigger] int outputSize, FunctionContext executionContext) + { + return new string('a', outputSize); + } + + [Function(nameof(ThrowExceptionActivity))] + public static string ThrowExceptionActivity([ActivityTrigger] FunctionContext executionContext) + { + throw new Exception("Failure!"); + } + + [Function(nameof(GetInstanceHistory))] + public static async Task GetInstanceHistory( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, + [DurableClient] DurableTaskClient client, + string instanceId) + { + try + { + IList history = await client.GetOrchestrationHistoryAsync(instanceId); + HttpResponseData response = req.CreateResponse(HttpStatusCode.OK); + + // The WriteAsJsonAsync method does not serialize the HistoryEvent polymorphic types correctly, so we use WriteStringAsync instead + // and use JsonConvert to serialize the history ourselves. + await response.WriteStringAsync(JsonConvert.SerializeObject(history)); + return response; + } + catch (ArgumentException) + { + return req.CreateResponse(HttpStatusCode.NotFound); + } + } + + [Function(nameof(GetOrchestrationHistory_HttpStart))] + public static async Task GetOrchestrationHistory_HttpStart( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext, + string orchestrationType, + string subOrchestrationInstanceId, + int outputSize, + bool callEntities, + string tagsKey, + string tagsValue) + { + ILogger logger = executionContext.GetLogger(nameof(GetOrchestrationHistory_HttpStart)); + Dictionary tags = new() { { tagsKey, tagsValue } }; + + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync( + nameof(ParentOrchestration), + new ComplexInput( + orchestrationType, + subOrchestrationInstanceId, + outputSize, + callEntities, + tags), + new StartOrchestrationOptions { Tags = tags }); + + logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId); + + // Returns an HTTP 202 response with an instance management payload. + // See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration + return await client.CreateCheckStatusResponseAsync(req, instanceId); + } + + [Function(nameof(SimpleEntity))] + public static Task SimpleEntity([EntityTrigger] TaskEntityDispatcher dispatcher) + { + return dispatcher.DispatchAsync(operation => + { + switch (operation.Name) + { + case "get": + return new(operation.State.GetState()); + case "set": + int size = operation.GetInput(); + operation.State.SetState(new string('a', size)); + break; + default: + throw new InvalidOperationException($"Unknown operation '{operation.Name}'"); + } + return default; + }); + } + + public class ComplexInput( + string? orchestrationType, + string subOrchestrationInstanceId, + int outputSize, + bool callEntities, + Dictionary? tags) + { + public bool CallEntities { get; set; } = callEntities; + + public string? OrchestrationType { get; set; } = orchestrationType; + + public string SubOrchestrationInstanceId { get; set; } = subOrchestrationInstanceId; + + public int OutputSize { get; set; } = outputSize; + + public Dictionary? Tags { get; set; } = tags; + + public override bool Equals(object? obj) + { + if (obj is not ComplexInput other) + { + return false; + } + return other.CallEntities == this.CallEntities + && ((other.OrchestrationType is null && this.OrchestrationType is null) + || (other.OrchestrationType is not null && this.OrchestrationType is not null + && other.OrchestrationType.Equals(this.OrchestrationType))) + && other.SubOrchestrationInstanceId.Equals(this.SubOrchestrationInstanceId) + && other.OutputSize == this.OutputSize + && ((other.Tags is null && this.Tags is null) + || (other.Tags is not null && this.Tags is not null + && other.Tags.OrderBy(x => x.Key).SequenceEqual(this.Tags.OrderBy(x => x.Key)))); + } + + public override int GetHashCode() + { + return HashCode.Combine(this.CallEntities, this.OrchestrationType, this.SubOrchestrationInstanceId, this.OutputSize, this.Tags); + } + } +} diff --git a/test/e2e/Tests/Tests/DistributedTracingEntitiesTests.cs b/test/e2e/Tests/Tests/DistributedTracingEntitiesTests.cs index 8108cfaba..f538a14df 100644 --- a/test/e2e/Tests/Tests/DistributedTracingEntitiesTests.cs +++ b/test/e2e/Tests/Tests/DistributedTracingEntitiesTests.cs @@ -73,8 +73,9 @@ public async Task DistributedTracingEntitiesTest() HttpResponseMessage result = await HttpHelpers.InvokeHttpTrigger("GetActivityInfoOrchestration_Output", $"?id={orchestrationId}"); Assert.Equal(HttpStatusCode.OK, result.StatusCode); - var remainingIds = (await result.Content.ReadAsStringAsync()).Replace("\r", "").Replace("\n", "").Replace("\"", "").Replace("[", "").Replace("]", "").Replace(" ", ""); - ids.AddRange(remainingIds.Split(",")); + List? remainingIds = JsonSerializer.Deserialize>((await result.Content.ReadAsStringAsync())); + Assert.NotNull(remainingIds); + ids.AddRange(remainingIds); Assert.Equal(8, ids.Count); Assert.True(ids.All(traceId => traceId.Equals(activity.TraceId.ToString()))); } diff --git a/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs b/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs new file mode 100644 index 000000000..b5bdf359e --- /dev/null +++ b/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs @@ -0,0 +1,368 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Net; +using DurableTask.Core; +using DurableTask.Core.History; +using Microsoft.Azure.WebJobs.Extensions.DurableTask; +using Newtonsoft.Json; +using Xunit; +using Xunit.Abstractions; +using static Microsoft.Azure.Durable.Tests.E2E.GetOrchestrationHistory; + +namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E; + +[Collection(Constants.FunctionAppCollectionName)] +public class GetOrchestrationHistoryTests +{ + private readonly FunctionAppFixture fixture; + private readonly ITestOutputHelper output; + // Make the results of the activity/entity calls around 1 MB so the orchestration history exceeds the max limit of the history chunk size (2 MB) + // We make it slightly smaller than 1 MB to avoid exceeding the current payload size limit in DTS, but just large enough to force chunking + private const int OutputSize = 1024 * 1024 - 5; + private const string TagsKey = "key"; + private const string TagsValue = "value"; + + private readonly Dictionary tags = new() { { TagsKey, TagsValue } }; + + public GetOrchestrationHistoryTests(FunctionAppFixture fixture, ITestOutputHelper testOutputHelper) + { + this.fixture = fixture; + this.fixture.TestLogs.UseTestLogger(testOutputHelper); + this.output = testOutputHelper; + } + + [Fact] + [Trait("Java", "Skip")] // The GetOrchestrationHistory API is not implemented in Java + [Trait("Python", "Skip")] // The GetOrchestrationHistory API is not implemented in Python + [Trait("PowerShell", "Skip")] // The GetOrchestrationHistory API is not implemented in PowerShell + [Trait("Node", "Skip")] // The GetOrchestrationHistory API is not implemented in Node + public async Task GetOrchestrationHistory_FailedOrchestration() + { + bool isNotMSSQL = this.fixture.GetDurabilityProvider() != FunctionAppFixture.ConfiguredDurabilityProviderType.MSSQL; + // The other backends currently do not serialize tags when sending the history, or the failure details of an ExecutionCompletedEvent + bool checkTagsAndFailureDetails = this.fixture.GetDurabilityProvider() == FunctionAppFixture.ConfiguredDurabilityProviderType.AzureStorage; + string subOrchestrationInstanceId = Guid.NewGuid().ToString(); + + using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger( + "GetOrchestrationHistory_HttpStart", + $"?orchestrationType=fail&subOrchestrationInstanceId={subOrchestrationInstanceId}&outputSize={OutputSize}&callEntities={isNotMSSQL.ToString().ToLower()}&tagsKey={TagsKey}&tagsValue={TagsValue}"); + Assert.Equal(HttpStatusCode.Accepted, response.StatusCode); + string instanceId = await DurableHelpers.ParseInstanceIdAsync(response); + string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response); + + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Failed", 30); + + using HttpResponseMessage getOrchestrationHistoryResponse = await HttpHelpers.InvokeHttpTrigger("GetInstanceHistory", $"?instanceId={instanceId}"); + Assert.Equal(HttpStatusCode.OK, getOrchestrationHistoryResponse.StatusCode); + string jsonHistory = await getOrchestrationHistoryResponse.Content.ReadAsStringAsync(); + List? historyEvents = JsonConvert.DeserializeObject>( + jsonHistory, + new JsonSerializerSettings() + { + // I had to make the HistoryEventJsonConverter public to use it here. Is this a good reason? + Converters = { new HistoryEventJsonConverter() }, + }); + Assert.NotNull(historyEvents); + + // Confirm the correct count and sequence of events + Assert.Equal(8, historyEvents.Count); + Assert.Equal(EventType.OrchestratorStarted, historyEvents[0].EventType); + Assert.Equal(EventType.ExecutionStarted, historyEvents[1].EventType); + + // Confirm the fields of the ExecutionStartedEvent (name, orchestration input, and orchestration tags) + var parentExecutionStartedEvent = (ExecutionStartedEvent)historyEvents[1]; + Assert.Equal("ParentOrchestration", parentExecutionStartedEvent.Name); + Assert.Equal(new ComplexInput("fail", subOrchestrationInstanceId, OutputSize, isNotMSSQL, this.tags), + JsonConvert.DeserializeObject(parentExecutionStartedEvent.Input)); + if (checkTagsAndFailureDetails) + { + Assert.NotNull(parentExecutionStartedEvent.Tags); + Assert.Contains(TagsKey, parentExecutionStartedEvent.Tags.Keys); + Assert.Contains(TagsValue, parentExecutionStartedEvent.Tags.Values); + } + + Assert.Equal(EventType.SubOrchestrationInstanceCreated, historyEvents[2].EventType); + var subOrchestrationInstanceCreatedEvent = (SubOrchestrationInstanceCreatedEvent)historyEvents[2]; + Assert.Equal("FailSubOrchestration", subOrchestrationInstanceCreatedEvent.Name); + // MSSQL does not include the instance ID field in the SubOrchestrationInstanceCreatedEvent + if (isNotMSSQL) + { + Assert.Equal(subOrchestrationInstanceId, subOrchestrationInstanceCreatedEvent.InstanceId); + } + Assert.Equal(EventType.OrchestratorCompleted, historyEvents[3].EventType); + + Assert.Equal(EventType.OrchestratorStarted, historyEvents[4].EventType); + Assert.Equal(EventType.SubOrchestrationInstanceFailed, historyEvents[5].EventType); + Assert.Equal(subOrchestrationInstanceCreatedEvent.EventId, ((SubOrchestrationInstanceFailedEvent)historyEvents[5]).TaskScheduledId); + Assert.Equal(EventType.ExecutionCompleted, historyEvents[6].EventType); + Assert.Equal(EventType.OrchestratorCompleted, historyEvents[7].EventType); + + // Now confirm the failure details field of the SubOrchestrationInstanceFailed and ExecutionCompleted events + FailureDetails? parentFailureDetails = ((ExecutionCompletedEvent)historyEvents[6]).FailureDetails; + FailureDetails? subOrchestrationFailureDetails = ((SubOrchestrationInstanceFailedEvent)historyEvents[5]).FailureDetails; + + Assert.NotNull(subOrchestrationFailureDetails); + Assert.Equal("Microsoft.DurableTask.TaskFailedException", subOrchestrationFailureDetails.ErrorType); + Assert.NotNull(subOrchestrationFailureDetails.InnerFailure); + // The inner failure for the suborchestration failed event will be the actual exception thrown by the Activity, whereas the inner failure of the + // execution completed event will be the suborchestration task failing + Assert.Equal("System.Exception", subOrchestrationFailureDetails.InnerFailure.ErrorType); + Assert.Equal("Failure!", subOrchestrationFailureDetails.InnerFailure.ErrorMessage); + + if (checkTagsAndFailureDetails) + { + Assert.NotNull(parentFailureDetails); + Assert.Equal("Microsoft.DurableTask.TaskFailedException", parentFailureDetails.ErrorType); + Assert.NotNull(parentFailureDetails.InnerFailure); + Assert.Equal("Microsoft.DurableTask.TaskFailedException", parentFailureDetails.InnerFailure.ErrorType); + Assert.Equal(subOrchestrationFailureDetails.ErrorMessage, parentFailureDetails.InnerFailure.ErrorMessage); + // Finally, the doubly nested inner failure of the execution completed event will correspond to the Activity failing + Assert.NotNull(parentFailureDetails.InnerFailure.InnerFailure); + Assert.Equal("Failure!", parentFailureDetails.InnerFailure.InnerFailure.ErrorMessage); + } + + using HttpResponseMessage getSubOrchestrationHistoryResponse = await HttpHelpers.InvokeHttpTrigger("GetInstanceHistory", $"?instanceId={subOrchestrationInstanceId}"); + Assert.Equal(HttpStatusCode.OK, getSubOrchestrationHistoryResponse.StatusCode); + string subOrchestrationJsonHistory = await getSubOrchestrationHistoryResponse.Content.ReadAsStringAsync(); + List? subOrchestrationHistoryEvents = JsonConvert.DeserializeObject>( + subOrchestrationJsonHistory, + new JsonSerializerSettings() + { + // I had to make the HistoryEventJsonConverter public to use it here. Is this a good reason? + Converters = { new HistoryEventJsonConverter() }, + }); + Assert.NotNull(subOrchestrationHistoryEvents); + + // Confirm the correct count and sequence of events for the suborchestration + Assert.Equal(8, subOrchestrationHistoryEvents.Count); + Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[0].EventType); + Assert.Equal(EventType.ExecutionStarted, subOrchestrationHistoryEvents[1].EventType); + + // Confirm the fields of the ExecutionStartedEvent for the suborchestration (name, orchestration input, parent information, task ID) + var subOrchestrationExecutionStartedEvent = (ExecutionStartedEvent)subOrchestrationHistoryEvents[1]; + Assert.Equal("FailSubOrchestration", subOrchestrationExecutionStartedEvent.Name); + Assert.Equal(new ComplexInput(null, subOrchestrationInstanceId, OutputSize, isNotMSSQL, this.tags), + JsonConvert.DeserializeObject(subOrchestrationExecutionStartedEvent.Input)); + Assert.Equal(parentExecutionStartedEvent.OrchestrationInstance.InstanceId, subOrchestrationExecutionStartedEvent.ParentInstance.OrchestrationInstance.InstanceId); + Assert.Equal(subOrchestrationInstanceCreatedEvent.EventId, subOrchestrationExecutionStartedEvent.ParentInstance.TaskScheduleId); + // MSSQL currently only adds the instance ID and task scheduled ID fields to the parent instance object + if (isNotMSSQL) + { + Assert.Equal("ParentOrchestration", subOrchestrationExecutionStartedEvent.ParentInstance.Name); + Assert.Equal(parentExecutionStartedEvent.OrchestrationInstance.ExecutionId, subOrchestrationExecutionStartedEvent.ParentInstance.OrchestrationInstance.ExecutionId); + } + + Assert.Equal(EventType.TaskScheduled, subOrchestrationHistoryEvents[2].EventType); + var taskScheduledEvent = (TaskScheduledEvent)subOrchestrationHistoryEvents[2]; + Assert.Equal("ThrowExceptionActivity", taskScheduledEvent.Name); + if (checkTagsAndFailureDetails) + { + Assert.NotNull(taskScheduledEvent.Tags); + Assert.Contains(TagsKey, taskScheduledEvent.Tags.Keys); + Assert.Contains(TagsValue, taskScheduledEvent.Tags.Values); + } + Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[3].EventType); + + Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[4].EventType); + Assert.Equal(EventType.TaskFailed, subOrchestrationHistoryEvents[5].EventType); + // Confirm the event ID of the TaskScheduledEvent matches the TaskScheduledId field of the TaskFailedEvent + Assert.Equal(taskScheduledEvent.EventId, ((TaskFailedEvent)subOrchestrationHistoryEvents[5]).TaskScheduledId); + Assert.Equal(EventType.ExecutionCompleted, subOrchestrationHistoryEvents[6].EventType); + Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[7].EventType); + + // Now confirm the failure details field of the TaskFailed and ExecutionCompleted events + subOrchestrationFailureDetails = ((ExecutionCompletedEvent)subOrchestrationHistoryEvents[6]).FailureDetails; + FailureDetails? taskFailureDetails = ((TaskFailedEvent)subOrchestrationHistoryEvents[5]).FailureDetails; + + Assert.NotNull(taskFailureDetails); + Assert.Equal("System.Exception", taskFailureDetails.ErrorType); + Assert.Equal("Failure!", taskFailureDetails.ErrorMessage); + + if (checkTagsAndFailureDetails) + { + Assert.NotNull(subOrchestrationFailureDetails); + Assert.Equal("Microsoft.DurableTask.TaskFailedException", subOrchestrationFailureDetails.ErrorType); + Assert.NotNull(subOrchestrationFailureDetails.InnerFailure); + // The inner failure for the suborchestration failed event will be the actual exception thrown by the Activity + Assert.Equal(taskFailureDetails.ErrorType, subOrchestrationFailureDetails.InnerFailure.ErrorType); + Assert.Equal(taskFailureDetails.ErrorMessage, subOrchestrationFailureDetails.InnerFailure.ErrorMessage); + } + } + + [Fact] + [Trait("Java", "Skip")] // The GetOrchestrationHistory API is not implemented in Java + [Trait("Python", "Skip")] // The GetOrchestrationHistory API is not implemented in Python + [Trait("PowerShell", "Skip")] // The GetOrchestrationHistory API is not implemented in PowerShell + [Trait("Node", "Skip")] // The GetOrchestrationHistory API is not implemented in Node + public async Task GetOrchestrationHistory_LargeHistory() + { + bool isNotMSSQL = this.fixture.GetDurabilityProvider() != FunctionAppFixture.ConfiguredDurabilityProviderType.MSSQL; + // The other backends currently do not serialize tags when sending the history, or the failure details of an ExecutionCompletedEvent + bool checkTagsAndFailureDetails = this.fixture.GetDurabilityProvider() == FunctionAppFixture.ConfiguredDurabilityProviderType.AzureStorage; + string subOrchestrationInstanceId = Guid.NewGuid().ToString(); + + using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger( + "GetOrchestrationHistory_HttpStart", + $"?orchestrationType=succeed&subOrchestrationInstanceId={subOrchestrationInstanceId}&outputSize={OutputSize}&callEntities={isNotMSSQL.ToString().ToLower()}&tagsKey={TagsKey}&tagsValue={TagsValue}"); + Assert.Equal(HttpStatusCode.Accepted, response.StatusCode); + string instanceId = await DurableHelpers.ParseInstanceIdAsync(response); + string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response); + + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", 30); + + using HttpResponseMessage getOrchestrationHistoryResponse = await HttpHelpers.InvokeHttpTrigger("GetInstanceHistory", $"?instanceId={instanceId}"); + Assert.Equal(HttpStatusCode.OK, getOrchestrationHistoryResponse.StatusCode); + string jsonHistory = await getOrchestrationHistoryResponse.Content.ReadAsStringAsync(); + List? historyEvents = JsonConvert.DeserializeObject>( + jsonHistory, + new JsonSerializerSettings() + { + // I had to make the HistoryEventJsonConverter public to use it here. Is this a good reason? + Converters = { new HistoryEventJsonConverter() }, + }); + Assert.NotNull(historyEvents); + + // Confirm the correct count and sequence of events + Assert.Equal(8, historyEvents.Count); + Assert.Equal(EventType.OrchestratorStarted, historyEvents[0].EventType); + Assert.Equal(EventType.ExecutionStarted, historyEvents[1].EventType); + + // Confirm the fields of the ExecutionStartedEvent (name, orchestration input, and orchestration tags) + var parentExecutionStartedEvent = (ExecutionStartedEvent)historyEvents[1]; + Assert.Equal("ParentOrchestration", parentExecutionStartedEvent.Name); + Assert.Equal(new ComplexInput("succeed", subOrchestrationInstanceId, OutputSize, isNotMSSQL, this.tags), + JsonConvert.DeserializeObject(parentExecutionStartedEvent.Input)); + if (checkTagsAndFailureDetails) + { + Assert.NotNull(parentExecutionStartedEvent.Tags); + Assert.Contains(TagsKey, parentExecutionStartedEvent.Tags.Keys); + Assert.Contains(TagsValue, parentExecutionStartedEvent.Tags.Values); + } + + Assert.Equal(EventType.SubOrchestrationInstanceCreated, historyEvents[2].EventType); + var subOrchestrationInstanceCreatedEvent = (SubOrchestrationInstanceCreatedEvent)historyEvents[2]; + Assert.Equal("CallLargeOutputTasksSubOrchestration", subOrchestrationInstanceCreatedEvent.Name); + // MSSQL does not include the instance ID field in the SubOrchestrationInstanceCreatedEvent + if (isNotMSSQL) + { + Assert.Equal(subOrchestrationInstanceId, subOrchestrationInstanceCreatedEvent.InstanceId); + } + Assert.Equal(EventType.OrchestratorCompleted, historyEvents[3].EventType); + + Assert.Equal(EventType.OrchestratorStarted, historyEvents[4].EventType); + Assert.Equal(EventType.SubOrchestrationInstanceCompleted, historyEvents[5].EventType); + Assert.Equal(EventType.ExecutionCompleted, historyEvents[6].EventType); + Assert.Equal(EventType.OrchestratorCompleted, historyEvents[7].EventType); + + // Now confirm the fields of the SubOrchestrationInstanceCompleted and ExecutionCompleted events (the result, task ID, and absence of failure details) + var executionCompletedEvent = (ExecutionCompletedEvent)historyEvents[6]; + var subOrchestrationCompletedEvent = (SubOrchestrationInstanceCompletedEvent)historyEvents[5]; + ComplexInput result = new("succeed", subOrchestrationInstanceId, OutputSize, isNotMSSQL, null); + Assert.Null(executionCompletedEvent.FailureDetails); + Assert.NotNull(executionCompletedEvent.Result); + Assert.Equal(result, JsonConvert.DeserializeObject(executionCompletedEvent.Result)); + Assert.Equal(subOrchestrationInstanceCreatedEvent.EventId, subOrchestrationCompletedEvent.TaskScheduledId); + Assert.Equal(result, JsonConvert.DeserializeObject(subOrchestrationCompletedEvent.Result)); + + // The suborchestration calls Activities/entities with large outputs, so it should force multiple history chunks in the streaming process + using HttpResponseMessage getSubOrchestrationHistoryResponse = await HttpHelpers.InvokeHttpTrigger("GetInstanceHistory", $"?instanceId={subOrchestrationInstanceId}"); + Assert.Equal(HttpStatusCode.OK, getSubOrchestrationHistoryResponse.StatusCode); + string subOrchestrationJsonHistory = await getSubOrchestrationHistoryResponse.Content.ReadAsStringAsync(); + List? subOrchestrationHistoryEvents = JsonConvert.DeserializeObject>( + subOrchestrationJsonHistory, + new JsonSerializerSettings() + { + // I had to make the HistoryEventJsonConverter public to use it here. Is this a good reason? + Converters = { new HistoryEventJsonConverter() }, + }); + Assert.NotNull(subOrchestrationHistoryEvents); + + // Confirm the correct count and sequence of events for the suborchestration + Assert.Equal(isNotMSSQL ? 17 : 12, subOrchestrationHistoryEvents.Count); + Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[0].EventType); + Assert.Equal(EventType.ExecutionStarted, subOrchestrationHistoryEvents[1].EventType); + + // Confirm the fields of the ExecutionStartedEvent for the suborchestration (name, orchestration input, parent information, task ID) + var subOrchestrationExecutionStartedEvent = (ExecutionStartedEvent)subOrchestrationHistoryEvents[1]; + Assert.Equal("CallLargeOutputTasksSubOrchestration", subOrchestrationExecutionStartedEvent.Name); + Assert.Equal(result, JsonConvert.DeserializeObject(subOrchestrationExecutionStartedEvent.Input)); + Assert.Equal(parentExecutionStartedEvent.OrchestrationInstance.InstanceId, subOrchestrationExecutionStartedEvent.ParentInstance.OrchestrationInstance.InstanceId); + Assert.Equal(subOrchestrationInstanceCreatedEvent.EventId, subOrchestrationExecutionStartedEvent.ParentInstance.TaskScheduleId); + // MSSQL currently only adds the instance ID and task scheduled ID fields to the parent instance object + if (isNotMSSQL) + { + Assert.Equal("ParentOrchestration", subOrchestrationExecutionStartedEvent.ParentInstance.Name); + Assert.Equal(parentExecutionStartedEvent.OrchestrationInstance.ExecutionId, subOrchestrationExecutionStartedEvent.ParentInstance.OrchestrationInstance.ExecutionId); + } + + Assert.Equal(EventType.TaskScheduled, subOrchestrationHistoryEvents[2].EventType); + Assert.Equal("LargeOutputActivity", ((TaskScheduledEvent)subOrchestrationHistoryEvents[2]).Name); + Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[3].EventType); + + Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[4].EventType); + Assert.Equal(EventType.TaskCompleted, subOrchestrationHistoryEvents[5].EventType); + var taskCompletedEvent = (TaskCompletedEvent)subOrchestrationHistoryEvents[5]; + // Confirm the event ID of the TaskScheduledEvent matches the TaskScheduledId field of the TaskCompletedEvent + Assert.Equal(subOrchestrationHistoryEvents[2].EventId, taskCompletedEvent.TaskScheduledId); + Assert.Equal($"\"{new string('a', OutputSize)}\"", taskCompletedEvent.Result); + + ExecutionCompletedEvent subOrchestrationExecutionCompletedEvent; + if (isNotMSSQL) + { + Assert.Equal(EventType.EventSent, subOrchestrationHistoryEvents[6].EventType); + Assert.Equal(entityId.ToString(), ((EventSentEvent)subOrchestrationHistoryEvents[6]).InstanceId); + Assert.Equal(EventType.TimerCreated, subOrchestrationHistoryEvents[7].EventType); + Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[8].EventType); + + Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[9].EventType); + Assert.Equal(EventType.TimerFired, subOrchestrationHistoryEvents[10].EventType); + // Confirm the event ID of the TimerCreatedEvent matches the TimerId field of the TimerFiredEvent + Assert.Equal(subOrchestrationHistoryEvents[7].EventId, ((TimerFiredEvent)subOrchestrationHistoryEvents[10]).TimerId); + Assert.Equal(EventType.EventSent, subOrchestrationHistoryEvents[11].EventType); + Assert.Equal(entityId.ToString(), ((EventSentEvent)subOrchestrationHistoryEvents[11]).InstanceId); + Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[12].EventType); + + Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[13].EventType); + Assert.Equal(EventType.EventRaised, subOrchestrationHistoryEvents[14].EventType); + Assert.Equal(EventType.ExecutionCompleted, subOrchestrationHistoryEvents[15].EventType); + Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[16].EventType); + subOrchestrationExecutionCompletedEvent = (ExecutionCompletedEvent)subOrchestrationHistoryEvents[15]; + } + else + { + Assert.Equal(EventType.TaskScheduled, subOrchestrationHistoryEvents[6].EventType); + Assert.Equal("LargeOutputActivity", ((TaskScheduledEvent)subOrchestrationHistoryEvents[6]).Name); + Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[7].EventType); + + Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[8].EventType); + taskCompletedEvent = (TaskCompletedEvent)subOrchestrationHistoryEvents[9]; + // Confirm the event ID of the TaskScheduledEvent matches the TaskScheduledId field of the TaskCompletedEvent + Assert.Equal(subOrchestrationHistoryEvents[6].EventId, taskCompletedEvent.TaskScheduledId); + Assert.Equal($"\"{new string('a', OutputSize)}\"", taskCompletedEvent.Result); + Assert.Equal(EventType.ExecutionCompleted, subOrchestrationHistoryEvents[10].EventType); + Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[11].EventType); + subOrchestrationExecutionCompletedEvent = (ExecutionCompletedEvent)subOrchestrationHistoryEvents[10]; + } + + // Confirm the details of the ExecutionCompleted event for the suborchestration (the result and absence of failure details) + Assert.Null(subOrchestrationExecutionCompletedEvent.FailureDetails); + Assert.NotNull(subOrchestrationExecutionCompletedEvent.Result); + Assert.Equal(result, JsonConvert.DeserializeObject(subOrchestrationExecutionCompletedEvent.Result)); + } + + [Fact] + [Trait("Java", "Skip")] // The GetOrchestrationHistory API is not implemented in Java + [Trait("Python", "Skip")] // The GetOrchestrationHistory API is not implemented in Python + [Trait("PowerShell", "Skip")] // The GetOrchestrationHistory API is not implemented in PowerShell + [Trait("Node", "Skip")] // The GetOrchestrationHistory API is not implemented in Node + public async Task GetOrchestrationHistory_InvalidInstanceId_ThrowsArgumentException() + { + string nonExistentInstanceId = Guid.NewGuid().ToString(); + HttpResponseMessage getOrchestrationHistoryResponse = await HttpHelpers.InvokeHttpTrigger("GetInstanceHistory", $"?instanceId={nonExistentInstanceId}"); + Assert.Equal(HttpStatusCode.NotFound, getOrchestrationHistoryResponse.StatusCode); + getOrchestrationHistoryResponse = await HttpHelpers.InvokeHttpTrigger("GetInstanceHistory", $"?instanceId={entityId}"); + Assert.Equal(HttpStatusCode.NotFound, getOrchestrationHistoryResponse.StatusCode); + getOrchestrationHistoryResponse.Dispose(); + } +} From 8f52e573dd8918f2fe3016d81db935d1264da651 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 2 Dec 2025 12:40:38 -0800 Subject: [PATCH 2/3] added more comments and exception cases --- .../TaskHubGrpcServer.cs | 3 +-- .../Tests/GetOrchestrationHistoryTests.cs | 25 +++++++++++++------ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs index 009021137..21e6a670e 100644 --- a/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs +++ b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs @@ -516,14 +516,13 @@ public async override Task StreamInstanceHistory( request.InstanceId, executionId: null); - // Throw exception or return an empty list? List? historyEvents = JsonConvert.DeserializeObject>( jsonHistory, new JsonSerializerSettings() { Converters = { new HistoryEventJsonConverter() }, }) - ?? throw new RpcException(new Status(StatusCode.Internal, "Failed to deserialize orchestration history.")); + ?? throw new Exception($"Failed to deserialize orchestration history."); int currentChunkSizeInBytes = 0; diff --git a/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs b/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs index b5bdf359e..e20900698 100644 --- a/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs +++ b/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs @@ -67,9 +67,10 @@ public async Task GetOrchestrationHistory_FailedOrchestration() // Confirm the correct count and sequence of events Assert.Equal(8, historyEvents.Count); + + // OrchestratorStarted, ExecutionStarted, SubOrchestrationInstanceCreated, OrchestratorCompleted Assert.Equal(EventType.OrchestratorStarted, historyEvents[0].EventType); Assert.Equal(EventType.ExecutionStarted, historyEvents[1].EventType); - // Confirm the fields of the ExecutionStartedEvent (name, orchestration input, and orchestration tags) var parentExecutionStartedEvent = (ExecutionStartedEvent)historyEvents[1]; Assert.Equal("ParentOrchestration", parentExecutionStartedEvent.Name); @@ -81,7 +82,6 @@ public async Task GetOrchestrationHistory_FailedOrchestration() Assert.Contains(TagsKey, parentExecutionStartedEvent.Tags.Keys); Assert.Contains(TagsValue, parentExecutionStartedEvent.Tags.Values); } - Assert.Equal(EventType.SubOrchestrationInstanceCreated, historyEvents[2].EventType); var subOrchestrationInstanceCreatedEvent = (SubOrchestrationInstanceCreatedEvent)historyEvents[2]; Assert.Equal("FailSubOrchestration", subOrchestrationInstanceCreatedEvent.Name); @@ -92,6 +92,7 @@ public async Task GetOrchestrationHistory_FailedOrchestration() } Assert.Equal(EventType.OrchestratorCompleted, historyEvents[3].EventType); + // OrchestratorStarted, SubOrchestrationInstanceFailed, ExecutionCompleted, OrchestratorCompleted Assert.Equal(EventType.OrchestratorStarted, historyEvents[4].EventType); Assert.Equal(EventType.SubOrchestrationInstanceFailed, historyEvents[5].EventType); Assert.Equal(subOrchestrationInstanceCreatedEvent.EventId, ((SubOrchestrationInstanceFailedEvent)historyEvents[5]).TaskScheduledId); @@ -136,9 +137,10 @@ public async Task GetOrchestrationHistory_FailedOrchestration() // Confirm the correct count and sequence of events for the suborchestration Assert.Equal(8, subOrchestrationHistoryEvents.Count); + + // OrchestratorStarted, ExecutionStarted, TaskScheduled, OrchestratorCompleted Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[0].EventType); Assert.Equal(EventType.ExecutionStarted, subOrchestrationHistoryEvents[1].EventType); - // Confirm the fields of the ExecutionStartedEvent for the suborchestration (name, orchestration input, parent information, task ID) var subOrchestrationExecutionStartedEvent = (ExecutionStartedEvent)subOrchestrationHistoryEvents[1]; Assert.Equal("FailSubOrchestration", subOrchestrationExecutionStartedEvent.Name); @@ -152,7 +154,6 @@ public async Task GetOrchestrationHistory_FailedOrchestration() Assert.Equal("ParentOrchestration", subOrchestrationExecutionStartedEvent.ParentInstance.Name); Assert.Equal(parentExecutionStartedEvent.OrchestrationInstance.ExecutionId, subOrchestrationExecutionStartedEvent.ParentInstance.OrchestrationInstance.ExecutionId); } - Assert.Equal(EventType.TaskScheduled, subOrchestrationHistoryEvents[2].EventType); var taskScheduledEvent = (TaskScheduledEvent)subOrchestrationHistoryEvents[2]; Assert.Equal("ThrowExceptionActivity", taskScheduledEvent.Name); @@ -164,6 +165,7 @@ public async Task GetOrchestrationHistory_FailedOrchestration() } Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[3].EventType); + // OrchestratorStarted, TaskFailed, ExecutionCompleted, OrchestratorCompleted Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[4].EventType); Assert.Equal(EventType.TaskFailed, subOrchestrationHistoryEvents[5].EventType); // Confirm the event ID of the TaskScheduledEvent matches the TaskScheduledId field of the TaskFailedEvent @@ -225,9 +227,10 @@ public async Task GetOrchestrationHistory_LargeHistory() // Confirm the correct count and sequence of events Assert.Equal(8, historyEvents.Count); + + // OrchestratorStarted, ExecutionStarted, SubOrchestrationInstanceCreated, OrchestratorCompleted Assert.Equal(EventType.OrchestratorStarted, historyEvents[0].EventType); Assert.Equal(EventType.ExecutionStarted, historyEvents[1].EventType); - // Confirm the fields of the ExecutionStartedEvent (name, orchestration input, and orchestration tags) var parentExecutionStartedEvent = (ExecutionStartedEvent)historyEvents[1]; Assert.Equal("ParentOrchestration", parentExecutionStartedEvent.Name); @@ -239,7 +242,6 @@ public async Task GetOrchestrationHistory_LargeHistory() Assert.Contains(TagsKey, parentExecutionStartedEvent.Tags.Keys); Assert.Contains(TagsValue, parentExecutionStartedEvent.Tags.Values); } - Assert.Equal(EventType.SubOrchestrationInstanceCreated, historyEvents[2].EventType); var subOrchestrationInstanceCreatedEvent = (SubOrchestrationInstanceCreatedEvent)historyEvents[2]; Assert.Equal("CallLargeOutputTasksSubOrchestration", subOrchestrationInstanceCreatedEvent.Name); @@ -250,6 +252,7 @@ public async Task GetOrchestrationHistory_LargeHistory() } Assert.Equal(EventType.OrchestratorCompleted, historyEvents[3].EventType); + // OrchestratorStarted, SubOrchestrationInstanceCompleted, ExecutionCompleted, OrchestratorCompleted Assert.Equal(EventType.OrchestratorStarted, historyEvents[4].EventType); Assert.Equal(EventType.SubOrchestrationInstanceCompleted, historyEvents[5].EventType); Assert.Equal(EventType.ExecutionCompleted, historyEvents[6].EventType); @@ -280,9 +283,10 @@ public async Task GetOrchestrationHistory_LargeHistory() // Confirm the correct count and sequence of events for the suborchestration Assert.Equal(isNotMSSQL ? 17 : 12, subOrchestrationHistoryEvents.Count); + + // OrchestratorStarted, ExecutionStarted, TaskScheduled, OrchestratorCompleted Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[0].EventType); Assert.Equal(EventType.ExecutionStarted, subOrchestrationHistoryEvents[1].EventType); - // Confirm the fields of the ExecutionStartedEvent for the suborchestration (name, orchestration input, parent information, task ID) var subOrchestrationExecutionStartedEvent = (ExecutionStartedEvent)subOrchestrationHistoryEvents[1]; Assert.Equal("CallLargeOutputTasksSubOrchestration", subOrchestrationExecutionStartedEvent.Name); @@ -295,11 +299,11 @@ public async Task GetOrchestrationHistory_LargeHistory() Assert.Equal("ParentOrchestration", subOrchestrationExecutionStartedEvent.ParentInstance.Name); Assert.Equal(parentExecutionStartedEvent.OrchestrationInstance.ExecutionId, subOrchestrationExecutionStartedEvent.ParentInstance.OrchestrationInstance.ExecutionId); } - Assert.Equal(EventType.TaskScheduled, subOrchestrationHistoryEvents[2].EventType); Assert.Equal("LargeOutputActivity", ((TaskScheduledEvent)subOrchestrationHistoryEvents[2]).Name); Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[3].EventType); + // OrchestratorStarted, TaskCompleted Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[4].EventType); Assert.Equal(EventType.TaskCompleted, subOrchestrationHistoryEvents[5].EventType); var taskCompletedEvent = (TaskCompletedEvent)subOrchestrationHistoryEvents[5]; @@ -310,11 +314,13 @@ public async Task GetOrchestrationHistory_LargeHistory() ExecutionCompletedEvent subOrchestrationExecutionCompletedEvent; if (isNotMSSQL) { + // EventSentEvent, TimerCreated, OrchestratorCompleted Assert.Equal(EventType.EventSent, subOrchestrationHistoryEvents[6].EventType); Assert.Equal(entityId.ToString(), ((EventSentEvent)subOrchestrationHistoryEvents[6]).InstanceId); Assert.Equal(EventType.TimerCreated, subOrchestrationHistoryEvents[7].EventType); Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[8].EventType); + // OrchestratorStarted, TimerFired, EventSentEvent, OrchestratorCompleted Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[9].EventType); Assert.Equal(EventType.TimerFired, subOrchestrationHistoryEvents[10].EventType); // Confirm the event ID of the TimerCreatedEvent matches the TimerId field of the TimerFiredEvent @@ -323,6 +329,7 @@ public async Task GetOrchestrationHistory_LargeHistory() Assert.Equal(entityId.ToString(), ((EventSentEvent)subOrchestrationHistoryEvents[11]).InstanceId); Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[12].EventType); + // OrchestratorStarted, EventRaised, ExecutionCompleted, OrchestratorCompleted Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[13].EventType); Assert.Equal(EventType.EventRaised, subOrchestrationHistoryEvents[14].EventType); Assert.Equal(EventType.ExecutionCompleted, subOrchestrationHistoryEvents[15].EventType); @@ -331,10 +338,12 @@ public async Task GetOrchestrationHistory_LargeHistory() } else { + // TaskScheduled, OrchestratorCompleted Assert.Equal(EventType.TaskScheduled, subOrchestrationHistoryEvents[6].EventType); Assert.Equal("LargeOutputActivity", ((TaskScheduledEvent)subOrchestrationHistoryEvents[6]).Name); Assert.Equal(EventType.OrchestratorCompleted, subOrchestrationHistoryEvents[7].EventType); + // OrchestratorStarted, TaskCompleted, ExecutionCompleted, OrchestratorCompleted Assert.Equal(EventType.OrchestratorStarted, subOrchestrationHistoryEvents[8].EventType); taskCompletedEvent = (TaskCompletedEvent)subOrchestrationHistoryEvents[9]; // Confirm the event ID of the TaskScheduledEvent matches the TaskScheduledId field of the TaskCompletedEvent From c903db281174931a79b06254162899a1c14d79db Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 2 Dec 2025 13:02:40 -0800 Subject: [PATCH 3/3] updated comments --- .../DurabilityProvider.cs | 1 - .../HistoryEventJsonConverter.cs | 17 +++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs index b97326646..218e4787e 100644 --- a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs @@ -605,7 +605,6 @@ public virtual bool TryGetTargetScaler( /// Streams the history of the specified orchestration instance as an enumerable of serialized history chunks. /// /// The instance ID of the orchestration. - /// The maximum size (in bytes) of each history chunk. /// The JSON formatter used to serialize the history chunks. /// The cancellation token. /// The enumerable of history chunks representing the orchestration's history. diff --git a/src/WebJobs.Extensions.DurableTask/HistoryEventJsonConverter.cs b/src/WebJobs.Extensions.DurableTask/HistoryEventJsonConverter.cs index ebebcd81f..e5640b902 100644 --- a/src/WebJobs.Extensions.DurableTask/HistoryEventJsonConverter.cs +++ b/src/WebJobs.Extensions.DurableTask/HistoryEventJsonConverter.cs @@ -9,15 +9,31 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask { + /// + /// Provides custom JSON deserialization for objects, mapping each event type to its corresponding + /// concrete class. This converter enables polymorphic deserialization of history events based on the EventType + /// property in the JSON payload. + /// + /// + /// This converter only supports reading (deserialization) and does not support writing (serialization) of objects. + /// When deserializing, the EventType property in the JSON must be present and correspond to a known ; + /// otherwise, a (for a missing EventType property) or + /// (for an unknown EventType) will be thrown. + /// public class HistoryEventJsonConverter : JsonConverter { + /// public override bool CanWrite => false; + /// public override bool CanConvert(Type objectType) { return objectType == typeof(HistoryEvent); } + /// + /// If the EventType property is missing in the JSON object attempted to be deserialized. + /// If the EventType property does not correspond to a known . public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer) { var jo = JObject.Load(reader); @@ -53,6 +69,7 @@ public override bool CanConvert(Type objectType) return jo.ToObject(concreteType, serializer); } + /// public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer) { throw new NotImplementedException();