Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using DurableTask.Core.Entities;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Google.Protobuf;
using Microsoft.Azure.WebJobs.Host.Scale;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
Expand Down Expand Up @@ -598,6 +599,18 @@ public virtual bool TryGetTargetScaler(
{
targetScaler = null;
return false;
}

/// <summary>
/// Streams the history of the specified orchestration instance as an enumerable of serialized history chunks.
/// </summary>
/// <param name="instanceId">The instance ID of the orchestration.</param>
/// <param name="jsonFormatter">The JSON formatter used to serialize the history chunks.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The enumerable of history chunks representing the orchestration's history.</returns>
public virtual Task<IEnumerable<string>> StreamOrchestrationHistoryAsync(string instanceId, JsonFormatter jsonFormatter, CancellationToken cancellationToken)
{
throw this.GetNotImplementedException(nameof(this.StreamOrchestrationHistoryAsync));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we would expect a PR at other backends repo? Can you link those in the PR description too when they are done?

}
}
}
78 changes: 78 additions & 0 deletions src/WebJobs.Extensions.DurableTask/HistoryEventJsonConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

#nullable enable
using System;
using DurableTask.Core.History;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
/// <summary>
/// Provides custom JSON deserialization for <see cref="HistoryEvent"> objects, mapping each event type to its corresponding
/// concrete class. This converter enables polymorphic deserialization of history events based on the EventType
/// property in the JSON payload.
/// </summary>
/// <remarks>
/// This converter only supports reading (deserialization) and does not support writing (serialization) of <see cref="HistoryEvent"> objects.
/// When deserializing, the EventType property in the JSON must be present and correspond to a known <see cref="EventType">;
/// otherwise, a <see cref="JsonSerializationException"/> (for a missing EventType property) or <see cref="NotSupportedException"/>
/// (for an unknown EventType) will be thrown.
/// </remarks>
public class HistoryEventJsonConverter : JsonConverter
{
/// <inheritdoc/>
public override bool CanWrite => false;

/// <inheritdoc/>
public override bool CanConvert(Type objectType)
{
return objectType == typeof(HistoryEvent);
}

/// <inheritdoc/>
/// <throws><see cref="JsonSerializationException"/> If the EventType property is missing in the JSON object attempted to be deserialized.</throws>
/// <throws><see cref="InvalidOperationException"/>If the EventType property does not correspond to a known <see cref="EventType"/>.</throws>
public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer)
{
var jo = JObject.Load(reader);
int eventType = jo["EventType"]?.Value<int>()
?? throw new JsonSerializationException("EventType missing");

Type concreteType = (EventType)eventType switch
{
EventType.ExecutionStarted => typeof(ExecutionStartedEvent),
EventType.ExecutionCompleted => typeof(ExecutionCompletedEvent),
EventType.TaskScheduled => typeof(TaskScheduledEvent),
EventType.TaskCompleted => typeof(TaskCompletedEvent),
EventType.TaskFailed => typeof(TaskFailedEvent),
EventType.SubOrchestrationInstanceCreated => typeof(SubOrchestrationInstanceCreatedEvent),
EventType.SubOrchestrationInstanceCompleted => typeof(SubOrchestrationInstanceCompletedEvent),
EventType.SubOrchestrationInstanceFailed => typeof(SubOrchestrationInstanceFailedEvent),
EventType.TimerCreated => typeof(TimerCreatedEvent),
EventType.TimerFired => typeof(TimerFiredEvent),
EventType.OrchestratorStarted => typeof(OrchestratorStartedEvent),
EventType.OrchestratorCompleted => typeof(OrchestratorCompletedEvent),
EventType.EventSent => typeof(EventSentEvent),
EventType.EventRaised => typeof(EventRaisedEvent),
EventType.GenericEvent => typeof(GenericEvent),
EventType.ContinueAsNew => typeof(ContinueAsNewEvent),
EventType.ExecutionTerminated => typeof(ExecutionTerminatedEvent),
EventType.ExecutionSuspended => typeof(ExecutionSuspendedEvent),
EventType.ExecutionResumed => typeof(ExecutionResumedEvent),
EventType.ExecutionRewound => typeof(ExecutionRewoundEvent),
EventType.HistoryState => typeof(HistoryStateEvent),
_ => throw new NotSupportedException($"Unknown HistoryEvent type {eventType}")
};

return jo.ToObject(concreteType, serializer);
}

/// <inheritdoc/>
public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer)
{
throw new NotImplementedException();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support write because we don't need this converter to write, right?

}
}
}
34 changes: 23 additions & 11 deletions src/WebJobs.Extensions.DurableTask/ProtobufUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,9 @@ public static P.HistoryEvent ToHistoryEventProto(HistoryEvent e)
var completedEvent = (ExecutionCompletedEvent)e;
payload.ExecutionCompleted = new P.ExecutionCompletedEvent
{
OrchestrationStatus = P.OrchestrationStatus.Completed,
OrchestrationStatus = (P.OrchestrationStatus)completedEvent.OrchestrationStatus,
Result = completedEvent.Result,
};
break;
case EventType.ExecutionFailed:
var failedEvent = (ExecutionCompletedEvent)e;
payload.ExecutionCompleted = new P.ExecutionCompletedEvent
{
OrchestrationStatus = P.OrchestrationStatus.Failed,
Result = failedEvent.Result,
FailureDetails = GetFailureDetails(completedEvent.FailureDetails),
};
break;
case EventType.ExecutionStarted:
Expand Down Expand Up @@ -108,7 +101,16 @@ public static P.HistoryEvent ToHistoryEventProto(HistoryEvent e)
TraceParent = startedEvent.ParentTraceContext.TraceParent,
TraceState = startedEvent.ParentTraceContext.TraceState,
},
};
};

if (startedEvent.Tags != null)
{
foreach (KeyValuePair<string, string> tag in startedEvent.Tags)
{
payload.ExecutionStarted.Tags[tag.Key] = tag.Value;
}
}

break;
case EventType.ExecutionTerminated:
var terminatedEvent = (ExecutionTerminatedEvent)e;
Expand All @@ -124,7 +126,16 @@ public static P.HistoryEvent ToHistoryEventProto(HistoryEvent e)
Name = taskScheduledEvent.Name,
Version = taskScheduledEvent.Version,
Input = taskScheduledEvent.Input,
};
};

if (taskScheduledEvent.Tags != null)
{
foreach (KeyValuePair<string, string> tag in taskScheduledEvent.Tags)
{
payload.TaskScheduled.Tags[tag.Key] = tag.Value;
}
}

break;
case EventType.TaskCompleted:
var taskCompletedEvent = (TaskCompletedEvent)e;
Expand Down Expand Up @@ -252,6 +263,7 @@ public static OrchestratorAction ToOrchestratorAction(P.OrchestratorAction a)
Input = a.ScheduleTask.Input,
Name = a.ScheduleTask.Name,
Version = a.ScheduleTask.Version,
Tags = a.ScheduleTask.Tags.ToDictionary(),
};
case P.OrchestratorAction.OrchestratorActionTypeOneofCase.CreateSubOrchestration:
return new CreateSubOrchestrationAction
Expand Down
88 changes: 88 additions & 0 deletions src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
Expand All @@ -12,16 +14,19 @@
using DurableTask.Core.History;
using DurableTask.Core.Query;
using DurableTask.Core.Serializing.Internal;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
using Newtonsoft.Json;
using DTCore = DurableTask.Core;
using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
internal class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBase
{
private const int MaxHistoryChunkSizeInBytes = 2 * 1024 * 1024; // 2 MB
private readonly DurableTaskExtension extension;

public TaskHubGrpcServer(DurableTaskExtension extension)
Expand Down Expand Up @@ -64,6 +69,7 @@ public override Task<Empty> Hello(Empty request, ServerCallContext context)
Version = request.Version != null ? request.Version : this.extension.Options.DefaultVersion,
OrchestrationInstance = instance,
ScheduledStartTime = request.ScheduledStartTimestamp?.ToDateTime(),
Tags = request.Tags.ToDictionary(),
};

// Get the parent trace context from CreateInstanceRequest
Expand Down Expand Up @@ -475,6 +481,88 @@ private static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationStat
};
}

public async override Task StreamInstanceHistory(
P.StreamInstanceHistoryRequest request,
IServerStreamWriter<P.HistoryChunk> responseStream,
ServerCallContext context)
{
if (await this.GetClient(context).GetStatusAsync(request.InstanceId, showInput: false) is null)
{
throw new RpcException(new Status(StatusCode.NotFound, $"Orchestration instance with ID {request.InstanceId} was not found."));
}

try
{
// First, try to use the streaming API if it's implemented.
try
{
IEnumerable<string> historyChunks = await this.GetDurabilityProvider(context).StreamOrchestrationHistoryAsync(
request.InstanceId,
new JsonFormatter(new JsonFormatter.Settings(formatDefaultValues: true)),
context.CancellationToken);

JsonParser jsonParser = new (JsonParser.Settings.Default.WithIgnoreUnknownFields(true));
foreach (string chunk in historyChunks)
{
context.CancellationToken.ThrowIfCancellationRequested();
await responseStream.WriteAsync(jsonParser.Parse<P.HistoryChunk>(chunk));
}
}

// Otherwise default to the older non-streaming implementation.
catch (NotImplementedException)
{
string jsonHistory = await this.GetDurabilityProvider(context).GetOrchestrationHistoryAsync(
request.InstanceId,
executionId: null);

List<HistoryEvent>? historyEvents = JsonConvert.DeserializeObject<List<HistoryEvent>>(
jsonHistory,
new JsonSerializerSettings()
{
Converters = { new HistoryEventJsonConverter() },
})
?? throw new Exception($"Failed to deserialize orchestration history.");

int currentChunkSizeInBytes = 0;

P.HistoryChunk chunk = new ();

foreach (HistoryEvent historyEvent in historyEvents)
{
context.CancellationToken.ThrowIfCancellationRequested();
P.HistoryEvent result = ProtobufUtils.ToHistoryEventProto(historyEvent);

int currentEventSize = result.CalculateSize();
if (currentChunkSizeInBytes + currentEventSize > MaxHistoryChunkSizeInBytes)
{
// If we exceeded the chunk size threshold, send what we have so far.
await responseStream.WriteAsync(chunk);
chunk = new ();
currentChunkSizeInBytes = 0;
}

chunk.Events.Add(result);
currentChunkSizeInBytes += currentEventSize;
}

// Send the last chunk, which may be smaller than the maximum chunk size.
if (chunk.Events.Count > 0)
{
await responseStream.WriteAsync(chunk);
}
}
}
catch (OperationCanceledException)
{
throw new RpcException(new Status(StatusCode.Cancelled, $"Orchestration history streaming cancelled for instance {request.InstanceId}"));
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.Internal, $"Failed to stream orchestration history for instance {request.InstanceId}: {ex.Message}"));
}
}

private static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails)
{
if (failureDetails == null)
Expand Down
10 changes: 9 additions & 1 deletion src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core.History;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.Entities;
Expand Down Expand Up @@ -115,4 +116,11 @@
{
return this.inner.RewindInstanceAsync(instanceId, reason, cancellation);
}

public override Task<IList<HistoryEvent>> GetOrchestrationHistoryAsync(

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-linux

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-linux

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / e2e-dts

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / e2e-dts

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / e2e-mssql

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / e2e-mssql

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-windows

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override

Check failure on line 120 in src/Worker.Extensions.DurableTask/FunctionsDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / e2e-azurestorage-windows

'FunctionsDurableTaskClient.GetOrchestrationHistoryAsync(string, CancellationToken)': no suitable method found to override
string instanceId,
CancellationToken cancellation = default)
{
return this.inner.GetOrchestrationHistoryAsync(instanceId, cancellation);
}
}
Loading
Loading