diff --git a/.gitignore b/.gitignore index dfcfd56f4..4a1224346 100644 --- a/.gitignore +++ b/.gitignore @@ -348,3 +348,6 @@ MigrationBackup/ # Ionide (cross platform F# VS Code tools) working folder .ionide/ + +# Rider (cross platform .NET/C# tools) working folder +.idea/ diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index e1caea2f5..c38682a3c 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -5,6 +5,7 @@ using System.Text; using Google.Protobuf.WellKnownTypes; using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Tracing; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -107,24 +108,6 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( } } - if (Activity.Current?.Id != null || Activity.Current?.TraceStateString != null) - { - if (request.ParentTraceContext == null) - { - request.ParentTraceContext = new P.TraceContext(); - } - - if (Activity.Current?.Id != null) - { - request.ParentTraceContext.TraceParent = Activity.Current?.Id; - } - - if (Activity.Current?.TraceStateString != null) - { - request.ParentTraceContext.TraceState = Activity.Current?.TraceStateString; - } - } - DateTimeOffset? startAt = options?.StartAt; this.logger.SchedulingOrchestration( request.InstanceId, @@ -138,6 +121,8 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime()); } + using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request); + P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync( request, cancellationToken: cancellation); return result.InstanceId; @@ -159,6 +144,8 @@ public override async Task RaiseEventAsync( Input = this.DataConverter.Serialize(eventPayload), }; + using Activity? traceActivity = TraceHelper.StartActivityForNewEventRaisedFromClient(request, instanceId); + await this.sidecarClient.RaiseEventAsync(request, cancellationToken: cancellation); } diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index b2ca147b5..95bfeedc8 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -193,7 +193,7 @@ message EntityOperationCalledEvent { } message EntityLockRequestedEvent { - string criticalSectionId = 1; + string criticalSectionId = 1; repeated string lockSet = 2; int32 position = 3; google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories @@ -218,7 +218,7 @@ message EntityUnlockSentEvent { message EntityLockGrantedEvent { string criticalSectionId = 1; } - + message HistoryEvent { int32 eventId = 1; google.protobuf.Timestamp timestamp = 2; @@ -245,8 +245,8 @@ message HistoryEvent { ExecutionResumedEvent executionResumed = 22; EntityOperationSignaledEvent entityOperationSignaled = 23; EntityOperationCalledEvent entityOperationCalled = 24; - EntityOperationCompletedEvent entityOperationCompleted = 25; - EntityOperationFailedEvent entityOperationFailed = 26; + EntityOperationCompletedEvent entityOperationCompleted = 25; + EntityOperationFailedEvent entityOperationFailed = 26; EntityLockRequestedEvent entityLockRequested = 27; EntityLockGrantedEvent entityLockGranted = 28; EntityUnlockSentEvent entityUnlockSent = 29; @@ -258,6 +258,7 @@ message ScheduleTaskAction { google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; map tags = 4; + TraceContext parentTraceContext = 5; } message CreateSubOrchestrationAction { @@ -265,6 +266,7 @@ message CreateSubOrchestrationAction { string name = 2; google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; + TraceContext parentTraceContext = 5; } message CreateTimerAction { @@ -314,6 +316,11 @@ message OrchestratorAction { } } +message OrchestrationTraceContext { + google.protobuf.StringValue spanID = 1; + google.protobuf.Timestamp spanStartTime = 2; +} + message OrchestratorRequest { string instanceId = 1; google.protobuf.StringValue executionId = 2; @@ -322,6 +329,8 @@ message OrchestratorRequest { OrchestratorEntityParameters entityParameters = 5; bool requiresHistoryStreaming = 6; map properties = 7; + + OrchestrationTraceContext orchestrationTraceContext = 8; } message OrchestratorResponse { @@ -333,6 +342,8 @@ message OrchestratorResponse { // The number of work item events that were processed by the orchestrator. // This field is optional. If not set, the service should assume that the orchestrator processed all events. google.protobuf.Int32Value numEventsProcessed = 5; + + OrchestrationTraceContext orchestrationTraceContext = 6; } message CreateInstanceRequest { @@ -498,7 +509,7 @@ message SignalEntityRequest { } message SignalEntityResponse { - // no payload + // no payload } message GetEntityRequest { @@ -673,16 +684,16 @@ service TaskHubSidecarService { // Waits for an orchestration instance to reach a running or completion state. rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse); - + // Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.). rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse); // Raises an event to a running orchestration instance. rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse); - + // Terminates a running orchestration instance. rpc TerminateInstance(TerminateRequest) returns (TerminateResponse); - + // Suspends a running orchestration instance. rpc SuspendInstance(SuspendRequest) returns (SuspendResponse); @@ -764,7 +775,7 @@ message CompleteTaskResponse { } message HealthPing { - // No payload + // No payload } message StreamInstanceHistoryRequest { @@ -777,4 +788,4 @@ message StreamInstanceHistoryRequest { message HistoryChunk { repeated HistoryEvent events = 1; -} +} \ No newline at end of file diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 121ec0ec7..5c0de3577 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-06-02 21:12:34 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fd9369c6a03d6af4e95285e432b7c4e943c06970/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-08-08 16:46:11 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/e88acbd07ae38b499dbe8c4e333e9e3feeb2a9cc/protos/orchestrator_service.proto diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 868ecc661..54957e72f 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -3,6 +3,7 @@ using System.Buffers; using System.Buffers.Text; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Text; using DurableTask.Core; @@ -15,6 +16,7 @@ using Google.Protobuf.WellKnownTypes; using DTCore = DurableTask.Core; using P = Microsoft.DurableTask.Protobuf; +using TraceHelper = Microsoft.DurableTask.Tracing.TraceHelper; namespace Microsoft.DurableTask; @@ -268,6 +270,7 @@ internal static Timestamp ToTimestamp(this DateTime dateTime) /// Constructs a . /// /// The orchestrator instance ID. + /// The orchestrator execution ID. /// The orchestrator customer status or null if no custom status. /// The orchestrator actions. /// @@ -275,14 +278,17 @@ internal static Timestamp ToTimestamp(this DateTime dateTime) /// value that was provided by the corresponding that triggered the orchestrator execution. /// /// The entity conversion state, or null if no conversion is required. + /// The that represents orchestration execution. /// The orchestrator response. /// When an orchestrator action is unknown. internal static P.OrchestratorResponse ConstructOrchestratorResponse( string instanceId, + string executionId, string? customStatus, IEnumerable actions, string completionToken, - EntityConversionState? entityConversionState) + EntityConversionState? entityConversionState, + Activity? orchestrationActivity) { Check.NotNull(actions); var response = new P.OrchestratorResponse @@ -290,21 +296,46 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse( InstanceId = instanceId, CustomStatus = customStatus, CompletionToken = completionToken, + OrchestrationTraceContext = + new() + { + SpanID = orchestrationActivity?.SpanId.ToString(), + SpanStartTime = orchestrationActivity?.StartTimeUtc.ToTimestamp(), + }, }; foreach (OrchestratorAction action in actions) { var protoAction = new P.OrchestratorAction { Id = action.Id }; + P.TraceContext? CreateTraceContext() + { + if (orchestrationActivity is null) + { + return null; + } + + ActivitySpanId clientSpanId = ActivitySpanId.CreateRandom(); + ActivityContext clientActivityContext = new(orchestrationActivity.TraceId, clientSpanId, orchestrationActivity.ActivityTraceFlags, orchestrationActivity.TraceStateString); + + return new P.TraceContext + { + TraceParent = $"00-{clientActivityContext.TraceId}-{clientActivityContext.SpanId}-0{clientActivityContext.TraceFlags:d}", + TraceState = clientActivityContext.TraceState, + }; + } + switch (action.OrchestratorActionType) { case OrchestratorActionType.ScheduleOrchestrator: var scheduleTaskAction = (ScheduleTaskOrchestratorAction)action; + protoAction.ScheduleTask = new P.ScheduleTaskAction { Name = scheduleTaskAction.Name, Version = scheduleTaskAction.Version, Input = scheduleTaskAction.Input, + ParentTraceContext = CreateTraceContext(), }; if (scheduleTaskAction.Tags != null) @@ -324,6 +355,7 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse( InstanceId = subOrchestrationAction.InstanceId, Name = subOrchestrationAction.Name, Version = subOrchestrationAction.Version, + ParentTraceContext = CreateTraceContext(), }; break; case OrchestratorActionType.CreateTimer: @@ -378,6 +410,12 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse( Name = sendEventAction.EventName, Data = sendEventAction.EventData, }; + + // Distributed Tracing: start a new trace activity derived from the orchestration + // for an EventRaisedEvent (external event) + using Activity? traceActivity = TraceHelper.StartTraceActivityForEventRaisedFromWorker(sendEventAction, instanceId, executionId); + + traceActivity?.Stop(); } break; diff --git a/src/Shared/Grpc/Tracing/DiagnosticActivityExtensions.cs b/src/Shared/Grpc/Tracing/DiagnosticActivityExtensions.cs new file mode 100644 index 000000000..34c88e1d5 --- /dev/null +++ b/src/Shared/Grpc/Tracing/DiagnosticActivityExtensions.cs @@ -0,0 +1,108 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Diagnostics; +using System.Linq.Expressions; +using System.Reflection; + +// NOTE: Modified from https://github.com/Azure/durabletask/blob/main/src/DurableTask.Core/Tracing/DiagnosticActivityExtensions.cs +namespace Microsoft.DurableTask.Tracing; + +/// +/// Replica from System.Diagnostics.DiagnosticSource >= 6.0.0. +/// +enum ActivityStatusCode +{ + /// + /// The default value indicating the status code is not initialized. + /// + Unset = 0, + + /// + /// Indicates the operation has been validated and completed successfully. + /// + Ok = 1, + + /// + /// Indicates an error was encountered during the operation. + /// + Error = 2, +} + +/// +/// Extensions for . +/// +static class DiagnosticActivityExtensions +{ + static readonly Action SetSpanIdMethod; + static readonly Action SetStatusMethod; + + static DiagnosticActivityExtensions() + { + BindingFlags flags = BindingFlags.NonPublic | BindingFlags.Instance; + SetSpanIdMethod = (typeof(Activity).GetField("_spanId", flags) + ?? throw new InvalidOperationException("The field Activity._spanId was not found.")) + .CreateSetter(); + SetStatusMethod = CreateSetStatus(); + } + + /// + /// Explicitly sets the span ID for the given activity. + /// + /// The activity on which to set the span ID. + /// The span ID to set. + public static void SetSpanId(this Activity activity, string spanId) + => SetSpanIdMethod(activity, spanId); + + /// + /// Explicitly sets the status code and description for the given activity. + /// + /// The activity on which to set the span ID. + /// The status to set. + /// The description to set. + public static void SetStatus(this Activity activity, ActivityStatusCode status, string description) + => SetStatusMethod(activity, status, description); + + static Action CreateSetStatus() + { + MethodInfo? method = typeof(Activity).GetMethod("SetStatus"); + + if (method is null) + { + return (activity, status, description) => + { +#pragma warning disable CA1510 + if (activity is null) + { + throw new ArgumentNullException(nameof(activity)); + } +#pragma warning restore CA1510 + + string? str = status switch + { + ActivityStatusCode.Unset => "UNSET", + ActivityStatusCode.Ok => "OK", + ActivityStatusCode.Error => "ERROR", + _ => null, + }; + + activity.SetTag("otel.status_code", str); + activity.SetTag("otel.status_description", description); + }; + } + + /* + building expression tree to effectively perform: + (activity, status, description) => activity.SetStatus((ActivityStatusCode)(int)status, description); + */ + + ParameterExpression targetExp = Expression.Parameter(typeof(Activity), "target"); + ParameterExpression status = Expression.Parameter(typeof(ActivityStatusCode), "status"); + ParameterExpression description = Expression.Parameter(typeof(string), "description"); + UnaryExpression convert = Expression.Convert(status, typeof(int)); + convert = Expression.Convert(convert, method.GetParameters().First().ParameterType); + MethodCallExpression callExp = Expression.Call(targetExp, method, convert, description); + return Expression.Lambda>(callExp, targetExp, status, description) + .Compile(); + } +} diff --git a/src/Shared/Grpc/Tracing/FieldInfoExtensionMethods.cs b/src/Shared/Grpc/Tracing/FieldInfoExtensionMethods.cs new file mode 100644 index 000000000..508f4dc12 --- /dev/null +++ b/src/Shared/Grpc/Tracing/FieldInfoExtensionMethods.cs @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Linq.Expressions; +using System.Reflection; + +// NOTE: Modified from https://github.com/Azure/durabletask/blob/main/src/DurableTask.Core/Tracing/FieldInfoExtensionMethods.cs +namespace Microsoft.DurableTask.Tracing; + +/// +/// Extensions for . +/// +static class FieldInfoExtensionMethods +{ + /// + /// Create a re-usable setter for a . + /// When cached and reused, This is quicker than using . + /// + /// The target type of the object. + /// The value type of the field. + /// The field info. + /// A re-usable action to set the field. + internal static Action CreateSetter(this FieldInfo fieldInfo) + { +#pragma warning disable CA1510 + if (fieldInfo == null) + { + throw new ArgumentNullException(nameof(fieldInfo)); + } +#pragma warning restore CA1510 + + if (fieldInfo.DeclaringType is null) + { + throw new ArgumentException("FieldInfo.DeclaringType cannot be null.", nameof(fieldInfo)); + } + + ParameterExpression targetExp = Expression.Parameter(typeof(TTarget), "target"); + Expression source = targetExp; + + if (typeof(TTarget) != fieldInfo.DeclaringType) + { + source = Expression.Convert(targetExp, fieldInfo.DeclaringType); + } + + // Creating the setter to set the value to the field + ParameterExpression valueExp = Expression.Parameter(typeof(TValue), "value"); + MemberExpression fieldExp = Expression.Field(source, fieldInfo); + BinaryExpression assignExp = Expression.Assign(fieldExp, valueExp); + return Expression.Lambda>(assignExp, targetExp, valueExp).Compile(); + } +} diff --git a/src/Shared/Grpc/Tracing/Schema.cs b/src/Shared/Grpc/Tracing/Schema.cs new file mode 100644 index 000000000..377778172 --- /dev/null +++ b/src/Shared/Grpc/Tracing/Schema.cs @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Tracing; + +/// +/// Schema for tracing events related to Durable Task operations. +/// +/// +/// Adapted from "https://github.com/Azure/durabletask/blob/main/src/DurableTask.Core/Tracing/Schema.cs". +/// +static class Schema +{ + /// + /// Tags for tracing events related to orchestrations. + /// + public static class Task + { + /// + /// The type of activity being executed, such as "orchestration", "activity", or "event". + /// + public const string Type = "durabletask.type"; + + /// + /// The name of the orchestration, activity, or event associated with the tracing event. + /// + public const string Name = "durabletask.task.name"; + + /// + /// The version of the orchestration or activity being executed. + /// + public const string Version = "durabletask.task.version"; + + /// + /// The ID of the orchestration instance associated with the tracing event. + /// + public const string InstanceId = "durabletask.task.instance_id"; + + /// + /// The execution ID of the orchestration instance associated with the tracing event. + /// + public const string ExecutionId = "durabletask.task.execution_id"; + + /// + /// The runtime status of the completed orchestration associated with the trace event. + /// + public const string Status = "durabletask.task.status"; + + /// + /// The event ID of the task being executed. + /// + public const string TaskId = "durabletask.task.task_id"; + + /// + /// The ID of the orchestration instance for which the event will be raised. + /// + public const string EventTargetInstanceId = "durabletask.event.target_instance_id"; + + /// + /// The time at which the timer is scheduled to fire. + /// + public const string FireAt = "durabletask.fire_at"; + } +} diff --git a/src/Shared/Grpc/Tracing/TraceActivityConstants.cs b/src/Shared/Grpc/Tracing/TraceActivityConstants.cs new file mode 100644 index 000000000..cba4bf524 --- /dev/null +++ b/src/Shared/Grpc/Tracing/TraceActivityConstants.cs @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +// Modified from https://github.com/Azure/durabletask/blob/main/src/DurableTask.Core/Tracing/TraceActivityConstants.cs. +namespace Microsoft.DurableTask.Tracing; + +/// +/// Constants for trace activity names used in Durable Task Framework. +/// +static class TraceActivityConstants +{ + /// + /// The name of the activity that represents orchestration operations. + /// + public const string Orchestration = "orchestration"; + + /// + /// The name of the activity that represents activity operations. + /// + public const string Activity = "activity"; + + /// + /// The name of the activity that represents entity operations. + /// + public const string Event = "event"; + + /// + /// The name of the activity that represents timer operations. + /// + public const string Timer = "timer"; + + /// + /// The name of the activity that represents an operation to create an orchestration. + /// + public const string CreateOrchestration = "create_orchestration"; + + /// + /// The name of the activity that represents an operation to raise an event. + /// + public const string OrchestrationEvent = "orchestration_event"; +} diff --git a/src/Shared/Grpc/Tracing/TraceHelper.cs b/src/Shared/Grpc/Tracing/TraceHelper.cs new file mode 100644 index 000000000..1283ff126 --- /dev/null +++ b/src/Shared/Grpc/Tracing/TraceHelper.cs @@ -0,0 +1,468 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Diagnostics; +using DurableTask.Core.Command; +using Google.Protobuf.WellKnownTypes; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask.Tracing; + +/// +/// Methods for starting and managing trace activities related to Durable Task operations. +/// +/// +/// Adapted from "https://github.com/Azure/durabletask/blob/main/src/DurableTask.Core/Tracing/TraceHelper.cs". +/// +static class TraceHelper +{ + const string Source = "Microsoft.DurableTask"; + + static readonly ActivitySource ActivityTraceSource = new ActivitySource(Source); + + /// + /// Starts a new trace activity for scheduling an orchestration from the client. + /// + /// The orchestration's creation request. + /// + /// Returns a newly started with orchestration-specific metadata. + /// + public static Activity? StartActivityForNewOrchestration(P.CreateInstanceRequest createInstanceRequest) + { + Activity? newActivity = ActivityTraceSource.StartActivity( + name: CreateSpanName( + TraceActivityConstants.CreateOrchestration, + createInstanceRequest.Name, + createInstanceRequest.Version), + kind: ActivityKind.Producer); + + if (newActivity != null) + { + newActivity.SetTag(Schema.Task.Type, TraceActivityConstants.Orchestration); + newActivity.SetTag(Schema.Task.Name, createInstanceRequest.Name); + newActivity.SetTag(Schema.Task.InstanceId, createInstanceRequest.InstanceId); + newActivity.SetTag(Schema.Task.ExecutionId, createInstanceRequest.ExecutionId); + + if (!string.IsNullOrEmpty(createInstanceRequest.Version)) + { + newActivity.SetTag(Schema.Task.Version, createInstanceRequest.Version); + } + + createInstanceRequest.ParentTraceContext ??= new P.TraceContext(); + createInstanceRequest.ParentTraceContext.TraceParent = newActivity.Id!; + createInstanceRequest.ParentTraceContext.TraceState = newActivity.TraceStateString; + } + + return newActivity; + } + + /// + /// Starts a new trace activity for orchestration execution. + /// + /// The orchestration's execution started event. + /// The orchestration trace context containing span metadata. + /// + /// Returns a newly started with orchestration-specific metadata. + /// + public static Activity? StartTraceActivityForOrchestrationExecution( + P.ExecutionStartedEvent? startEvent, + P.OrchestrationTraceContext? orchestrationTraceContext) + { + if (startEvent == null) + { + return null; + } + + if (startEvent.ParentTraceContext is null + || !ActivityContext.TryParse( + startEvent.ParentTraceContext.TraceParent, + startEvent.ParentTraceContext.TraceState, + out ActivityContext activityContext)) + { + return null; + } + + string activityName = CreateSpanName(TraceActivityConstants.Orchestration, startEvent.Name, startEvent.Version); + ActivityKind activityKind = ActivityKind.Server; + DateTimeOffset startTime = orchestrationTraceContext?.SpanStartTime?.ToDateTimeOffset() ?? default; + + Activity? activity = ActivityTraceSource.StartActivity( + activityName, + kind: activityKind, + parentContext: activityContext, + startTime: startTime); + + if (activity == null) + { + return null; + } + + activity.SetTag(Schema.Task.Type, TraceActivityConstants.Orchestration); + activity.SetTag(Schema.Task.Name, startEvent.Name); + activity.SetTag(Schema.Task.InstanceId, startEvent.OrchestrationInstance.InstanceId); + + if (!string.IsNullOrEmpty(startEvent.Version)) + { + activity.SetTag(Schema.Task.Version, startEvent.Version); + } + + if (orchestrationTraceContext?.SpanID != null) + { + activity.SetSpanId(orchestrationTraceContext.SpanID!); + } + + return activity; + } + + /// + /// Starts a new trace activity for (task) activity execution. + /// + /// The associated request to start a (task) activity. + /// + /// Returns a newly started with (task) activity and orchestration-specific metadata. + /// + public static Activity? StartTraceActivityForTaskExecution( + P.ActivityRequest request) + { + if (request.ParentTraceContext is null + || !ActivityContext.TryParse( + request.ParentTraceContext.TraceParent, + request.ParentTraceContext.TraceState, + out ActivityContext activityContext)) + { + return null; + } + + Activity? newActivity = ActivityTraceSource.StartActivity( + CreateSpanName(TraceActivityConstants.Activity, request.Name, request.Version), + kind: ActivityKind.Server, + parentContext: activityContext); + + if (newActivity == null) + { + return null; + } + + newActivity.SetTag(Schema.Task.Type, TraceActivityConstants.Activity); + newActivity.SetTag(Schema.Task.Name, request.Name); + newActivity.SetTag(Schema.Task.InstanceId, request.OrchestrationInstance.InstanceId); + newActivity.SetTag(Schema.Task.TaskId, request.TaskId); + + if (!string.IsNullOrEmpty(request.Version)) + { + newActivity.SetTag(Schema.Task.Version, request.Version); + } + + return newActivity; + } + + /// + /// Emits a new trace activity for a (task) activity that successfully completes. + /// + /// The ID of the associated orchestration. + /// The associated . + /// The associated . + public static void EmitTraceActivityForTaskCompleted( + string? instanceId, + P.HistoryEvent? historyEvent, + P.TaskScheduledEvent? taskScheduledEvent) + { + // The parent of this is the parent orchestration span ID. It should be the client span which started this + Activity? activity = StartTraceActivityForSchedulingTask(instanceId, historyEvent, taskScheduledEvent); + + activity?.Dispose(); + } + + /// + /// Emits a new trace activity for a (task) activity that fails. + /// + /// The ID of the associated orchestration. + /// The associated . + /// The associated . + /// The associated . + public static void EmitTraceActivityForTaskFailed( + string? instanceId, + P.HistoryEvent? historyEvent, + P.TaskScheduledEvent? taskScheduledEvent, + P.TaskFailedEvent? failedEvent) + { + Activity? activity = StartTraceActivityForSchedulingTask(instanceId, historyEvent, taskScheduledEvent); + + if (activity is null) + { + return; + } + + if (failedEvent != null) + { + string statusDescription = failedEvent.FailureDetails?.ErrorMessage ?? "Unspecified task activity failure"; + activity?.SetStatus(ActivityStatusCode.Error, statusDescription); + } + + activity?.Dispose(); + } + + /// + /// Emits a new trace activity for sub-orchestration execution when the sub-orchestration + /// completes successfully. + /// + /// The ID of the associated orchestration. + /// The associated . + /// The associated . + public static void EmitTraceActivityForSubOrchestrationCompleted( + string? instanceId, + P.HistoryEvent? historyEvent, + P.SubOrchestrationInstanceCreatedEvent? createdEvent) + { + // The parent of this is the parent orchestration span ID. It should be the client span which started this + Activity? activity = CreateTraceActivityForSchedulingSubOrchestration(instanceId, historyEvent, createdEvent); + + activity?.Dispose(); + } + + /// + /// Emits a new trace activity for sub-orchestration execution when the sub-orchestration fails. + /// + /// The ID of the associated orchestration. + /// The associated . + /// The associated . + /// The associated . + public static void EmitTraceActivityForSubOrchestrationFailed( + string? instanceId, + P.HistoryEvent? historyEvent, + P.SubOrchestrationInstanceCreatedEvent? createdEvent, + P.SubOrchestrationInstanceFailedEvent? failedEvent) + { + Activity? activity = CreateTraceActivityForSchedulingSubOrchestration(instanceId, historyEvent, createdEvent); + + if (activity is null) + { + return; + } + + if (failedEvent != null) + { + string statusDescription = failedEvent.FailureDetails.ErrorMessage + ?? "Unspecified sub-orchestration failure"; + activity?.SetStatus(ActivityStatusCode.Error, statusDescription); + } + + activity?.Dispose(); + } + + /// + /// Emits a new trace activity for events raised from the worker. + /// + /// The associated . + /// The instance ID of the associated orchestration. + /// The execution ID of the associated orchestration. + /// + /// Returns a newly started with (task) activity and orchestration-specific metadata. + /// + public static Activity? StartTraceActivityForEventRaisedFromWorker( + SendEventOrchestratorAction eventRaisedEvent, + string? instanceId, + string? executionId) + { + Activity? newActivity = ActivityTraceSource.StartActivity( + CreateSpanName(TraceActivityConstants.OrchestrationEvent, eventRaisedEvent.EventName, null), + kind: ActivityKind.Producer, + parentContext: Activity.Current?.Context ?? default); + + if (newActivity == null) + { + return null; + } + + newActivity.AddTag(Schema.Task.Type, TraceActivityConstants.Event); + newActivity.AddTag(Schema.Task.Name, eventRaisedEvent.EventName); + newActivity.AddTag(Schema.Task.InstanceId, instanceId); + newActivity.AddTag(Schema.Task.ExecutionId, executionId); + + if (!string.IsNullOrEmpty(eventRaisedEvent.Instance?.InstanceId)) + { + newActivity.AddTag(Schema.Task.EventTargetInstanceId, eventRaisedEvent.Instance!.InstanceId); + } + + return newActivity; + } + + /// + /// Creates a new trace activity for events created from the client. + /// + /// The associated . + /// The ID of the associated orchestration. + /// + /// Returns a newly started with (task) activity and orchestration-specific metadata. + /// + public static Activity? StartActivityForNewEventRaisedFromClient(P.RaiseEventRequest eventRaised, string instanceId) + { + Activity? newActivity = ActivityTraceSource.StartActivity( + CreateSpanName(TraceActivityConstants.OrchestrationEvent, eventRaised.Name, null), + kind: ActivityKind.Producer, + parentContext: Activity.Current?.Context ?? default, + tags: new KeyValuePair[] + { + new(Schema.Task.Type, TraceActivityConstants.Event), + new(Schema.Task.Name, eventRaised.Name), + new(Schema.Task.EventTargetInstanceId, instanceId), + }); + + return newActivity; + } + + /// + /// Emits a new trace activity for timers. + /// + /// The ID of the associated orchestration. + /// The name of the orchestration invoking the timer. + /// The timer's start time. + /// The associated . + public static void EmitTraceActivityForTimer( + string? instanceId, + string orchestrationName, + DateTime startTime, + P.TimerFiredEvent timerFiredEvent) + { + Activity? newActivity = ActivityTraceSource.StartActivity( + CreateTimerSpanName(orchestrationName), + kind: ActivityKind.Internal, + startTime: startTime, + parentContext: Activity.Current?.Context ?? default); + + if (newActivity is not null) + { + newActivity.AddTag(Schema.Task.Type, TraceActivityConstants.Timer); + newActivity.AddTag(Schema.Task.Name, orchestrationName); + newActivity.AddTag(Schema.Task.InstanceId, instanceId); + newActivity.AddTag(Schema.Task.FireAt, timerFiredEvent.FireAt.ToDateTime().ToString("o")); + newActivity.AddTag(Schema.Task.TaskId, timerFiredEvent.TimerId); + + newActivity.Dispose(); + } + } + + static string CreateSpanName(string spanDescription, string? taskName, string? taskVersion) + { + if (!string.IsNullOrEmpty(taskVersion)) + { + return $"{spanDescription}:{taskName}@({taskVersion})"; + } + else + { + return $"{spanDescription}:{taskName}"; + } + } + + /// + /// Starts a new trace activity for (task) activity that represents the time between when the task message + /// is enqueued and when the response message is received. + /// + /// The ID of the associated instance. + /// The associated . + /// The associated . + /// + /// Returns a newly started with (task) activity and orchestration-specific metadata. + /// + static Activity? StartTraceActivityForSchedulingTask( + string? instanceId, + P.HistoryEvent? historyEvent, + P.TaskScheduledEvent? taskScheduledEvent) + { + if (taskScheduledEvent == null) + { + return null; + } + + Activity? newActivity = ActivityTraceSource.StartActivity( + CreateSpanName(TraceActivityConstants.Activity, taskScheduledEvent.Name, taskScheduledEvent.Version), + kind: ActivityKind.Client, + startTime: historyEvent?.Timestamp?.ToDateTimeOffset() ?? default, + parentContext: Activity.Current?.Context ?? default); + + if (newActivity == null) + { + return null; + } + + if (taskScheduledEvent.ParentTraceContext != null) + { + if (ActivityContext.TryParse( + taskScheduledEvent.ParentTraceContext.TraceParent, + taskScheduledEvent.ParentTraceContext?.TraceState, + out ActivityContext parentContext)) + { + newActivity.SetSpanId(parentContext.SpanId.ToString()); + } + } + + newActivity.AddTag(Schema.Task.Type, TraceActivityConstants.Activity); + newActivity.AddTag(Schema.Task.Name, taskScheduledEvent.Name); + newActivity.AddTag(Schema.Task.InstanceId, instanceId); + newActivity.AddTag(Schema.Task.TaskId, historyEvent?.EventId); + + if (!string.IsNullOrEmpty(taskScheduledEvent.Version)) + { + newActivity.AddTag(Schema.Task.Version, taskScheduledEvent.Version); + } + + return newActivity; + } + + /// + /// Starts a new trace activity for sub-orchestrations. Represents the time between enqueuing + /// the sub-orchestration message and it completing. + /// + /// The ID of the associated orchestration. + /// The associated . + /// The associated . + /// + /// Returns a newly started with (task) activity and orchestration-specific metadata. + /// + static Activity? CreateTraceActivityForSchedulingSubOrchestration( + string? instanceId, + P.HistoryEvent? historyEvent, + P.SubOrchestrationInstanceCreatedEvent? createdEvent) + { + if (instanceId == null || createdEvent == null) + { + return null; + } + + Activity? activity = ActivityTraceSource.StartActivity( + CreateSpanName(TraceActivityConstants.Orchestration, createdEvent.Name, createdEvent.Version), + kind: ActivityKind.Client, + startTime: historyEvent?.Timestamp?.ToDateTimeOffset() ?? default, + parentContext: Activity.Current?.Context ?? default); + + if (activity == null) + { + return null; + } + + if (createdEvent.ParentTraceContext != null + && ActivityContext.TryParse( + createdEvent.ParentTraceContext.TraceParent, + createdEvent.ParentTraceContext.TraceState, + out ActivityContext parentContext)) + { + activity.SetSpanId(parentContext.SpanId.ToString()); + } + + activity.SetTag(Schema.Task.Type, TraceActivityConstants.Orchestration); + activity.SetTag(Schema.Task.Name, createdEvent.Name); + activity.SetTag(Schema.Task.InstanceId, instanceId); + + if (!string.IsNullOrEmpty(createdEvent.Version)) + { + activity.SetTag(Schema.Task.Version, createdEvent.Version); + } + + return activity; + } + + static string CreateTimerSpanName(string orchestrationName) + { + return $"{TraceActivityConstants.Orchestration}:{orchestrationName}:{TraceActivityConstants.Timer}"; + } +} diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index df2e7ea83..2a4610cf8 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Diagnostics; using System.Text; using DurableTask.Core; using DurableTask.Core.Entities; @@ -8,10 +9,12 @@ using DurableTask.Core.History; using Microsoft.DurableTask.Abstractions; using Microsoft.DurableTask.Entities; +using Microsoft.DurableTask.Tracing; using Microsoft.DurableTask.Worker.Shims; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using static Microsoft.DurableTask.Protobuf.TaskHubSidecarService; +using ActivityStatusCode = System.Diagnostics.ActivityStatusCode; using DTCore = DurableTask.Core; using P = Microsoft.DurableTask.Protobuf; @@ -359,6 +362,109 @@ async Task OnRunOrchestratorAsync( string completionToken, CancellationToken cancellationToken) { + var executionStartedEvent = + request + .NewEvents + .Concat(request.PastEvents) + .Where(e => e.EventTypeCase == P.HistoryEvent.EventTypeOneofCase.ExecutionStarted) + .Select(e => e.ExecutionStarted) + .FirstOrDefault(); + + Activity? traceActivity = TraceHelper.StartTraceActivityForOrchestrationExecution( + executionStartedEvent, + request.OrchestrationTraceContext); + + if (executionStartedEvent is not null) + { + P.HistoryEvent? GetSuborchestrationInstanceCreatedEvent(int eventId) + { + var subOrchestrationEvent = + request + .PastEvents + .Where(x => x.EventTypeCase == P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated) + .FirstOrDefault(x => x.EventId == eventId); + + return subOrchestrationEvent; + } + + P.HistoryEvent? GetTaskScheduledEvent(int eventId) + { + var taskScheduledEvent = + request + .PastEvents + .Where(x => x.EventTypeCase == P.HistoryEvent.EventTypeOneofCase.TaskScheduled) + .LastOrDefault(x => x.EventId == eventId); + + return taskScheduledEvent; + } + + foreach (var newEvent in request.NewEvents) + { + switch (newEvent.EventTypeCase) + { + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted: + { + P.HistoryEvent? subOrchestrationInstanceCreatedEvent = + GetSuborchestrationInstanceCreatedEvent( + newEvent.SubOrchestrationInstanceCompleted.TaskScheduledId); + + TraceHelper.EmitTraceActivityForSubOrchestrationCompleted( + request.InstanceId, + subOrchestrationInstanceCreatedEvent, + subOrchestrationInstanceCreatedEvent?.SubOrchestrationInstanceCreated); + break; + } + + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceFailed: + { + P.HistoryEvent? subOrchestrationInstanceCreatedEvent = + GetSuborchestrationInstanceCreatedEvent( + newEvent.SubOrchestrationInstanceFailed.TaskScheduledId); + + TraceHelper.EmitTraceActivityForSubOrchestrationFailed( + request.InstanceId, + subOrchestrationInstanceCreatedEvent, + subOrchestrationInstanceCreatedEvent?.SubOrchestrationInstanceCreated, + newEvent.SubOrchestrationInstanceFailed); + break; + } + + case P.HistoryEvent.EventTypeOneofCase.TaskCompleted: + { + P.HistoryEvent? taskScheduledEvent = + GetTaskScheduledEvent(newEvent.TaskCompleted.TaskScheduledId); + + TraceHelper.EmitTraceActivityForTaskCompleted( + request.InstanceId, + taskScheduledEvent, + taskScheduledEvent?.TaskScheduled); + break; + } + + case P.HistoryEvent.EventTypeOneofCase.TaskFailed: + { + P.HistoryEvent? taskScheduledEvent = + GetTaskScheduledEvent(newEvent.TaskFailed.TaskScheduledId); + + TraceHelper.EmitTraceActivityForTaskFailed( + request.InstanceId, + taskScheduledEvent, + taskScheduledEvent?.TaskScheduled, + newEvent.TaskFailed); + break; + } + + case P.HistoryEvent.EventTypeOneofCase.TimerFired: + TraceHelper.EmitTraceActivityForTimer( + request.InstanceId, + executionStartedEvent.Name, + newEvent.Timestamp.ToDateTime(), + newEvent.TimerFired); + break; + } + } + } + OrchestratorExecutionResult? result = null; P.TaskFailureDetails? failureDetails = null; TaskName name = new("(unknown)"); @@ -460,10 +566,12 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( { response = ProtoUtils.ConstructOrchestratorResponse( request.InstanceId, + request.ExecutionId, result.CustomStatus, result.Actions, completionToken, - entityConversionState); + entityConversionState, + traceActivity); } else if (versioning != null && failureDetails != null && versionFailure) { @@ -521,6 +629,25 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( }; } + var completeOrchestrationAction = response.Actions.FirstOrDefault( + a => a.CompleteOrchestration is not null); + + if (completeOrchestrationAction is not null) + { + if (completeOrchestrationAction.CompleteOrchestration.OrchestrationStatus == P.OrchestrationStatus.Failed) + { + traceActivity?.SetStatus( + ActivityStatusCode.Error, + completeOrchestrationAction.CompleteOrchestration.Result); + } + + traceActivity?.SetTag( + Schema.Task.Status, + completeOrchestrationAction.CompleteOrchestration.OrchestrationStatus.ToString()); + + traceActivity?.Dispose(); + } + this.Logger.SendingOrchestratorResponse( name, response.InstanceId, @@ -532,6 +659,8 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, CancellationToken cancellation) { + using Activity? traceActivity = TraceHelper.StartTraceActivityForTaskExecution(request); + OrchestrationInstance instance = request.OrchestrationInstance.ToCore(); string rawInput = request.Input; @@ -570,6 +699,8 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, int outputSizeInBytes = 0; if (failureDetails != null) { + traceActivity?.SetStatus(ActivityStatusCode.Error, failureDetails.ErrorMessage); + outputSizeInBytes = failureDetails.GetApproximateByteCount(); } else if (output != null) @@ -590,6 +721,9 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, CompletionToken = completionToken, }; + // Stop the trace activity here to avoid including the completion time in the latency calculation + traceActivity?.Stop(); + await this.client.CompleteActivityTaskAsync(response, cancellationToken: cancellation); } diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs index 9a6e3c5fc..064b38c65 100644 --- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs +++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs @@ -117,10 +117,12 @@ public static string LoadAndRun( P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse( request.InstanceId, + request.ExecutionId, result.CustomStatus, result.Actions, completionToken: string.Empty, /* doesn't apply */ - entityConversionState: null); + entityConversionState: null, + orchestrationActivity: null); byte[] responseBytes = response.ToByteArray(); return Convert.ToBase64String(responseBytes); } diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcCreateSubOrchestrationAction.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcCreateSubOrchestrationAction.cs new file mode 100644 index 000000000..ce916d7ae --- /dev/null +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcCreateSubOrchestrationAction.cs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core.Command; +using DurableTask.Core.Tracing; + +namespace Microsoft.DurableTask.Sidecar.Dispatcher; + +public class GrpcCreateSubOrchestrationAction : CreateSubOrchestrationAction +{ + public DistributedTraceContext? ParentTraceContext { get; set; } +} diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcOrchestratorExecutionResult.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcOrchestratorExecutionResult.cs new file mode 100644 index 000000000..302e13d44 --- /dev/null +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcOrchestratorExecutionResult.cs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; + +namespace Microsoft.DurableTask.Sidecar.Dispatcher; + +public class GrpcOrchestratorExecutionResult : OrchestratorExecutionResult +{ + public string? OrchestrationActivitySpanId { get; set; } + public DateTimeOffset? OrchestrationActivityStartTime { get; set; } +} diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcScheduleTaskOrchestratorAction.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcScheduleTaskOrchestratorAction.cs new file mode 100644 index 000000000..bd8a5af40 --- /dev/null +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcScheduleTaskOrchestratorAction.cs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core.Command; +using DurableTask.Core.Tracing; + +namespace Microsoft.DurableTask.Sidecar.Dispatcher; + +public class GrpcScheduleTaskOrchestratorAction : ScheduleTaskOrchestratorAction +{ + public DistributedTraceContext? ParentTraceContext { get; set; } +} diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcSubOrchestrationInstanceCreatedEvent.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcSubOrchestrationInstanceCreatedEvent.cs new file mode 100644 index 000000000..466f891c4 --- /dev/null +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/GrpcSubOrchestrationInstanceCreatedEvent.cs @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Runtime.Serialization; +using DurableTask.Core.History; +using DurableTask.Core.Tracing; + +namespace Microsoft.DurableTask.Sidecar.Dispatcher; + +public class GrpcSubOrchestrationInstanceCreatedEvent : SubOrchestrationInstanceCreatedEvent +{ + public GrpcSubOrchestrationInstanceCreatedEvent(int eventId) + : base(eventId) + { + } + + [DataMember] + public DistributedTraceContext? ParentTraceContext { get; set; } +} diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/ITaskExecutor.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/ITaskExecutor.cs index 535b95021..2a8f144e8 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/ITaskExecutor.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/ITaskExecutor.cs @@ -18,7 +18,7 @@ interface ITaskExecutor /// Returns a task containing the result of the orchestrator execution. These are effectively the side-effects of the /// orchestrator code, such as calling activities, scheduling timers, etc. /// - Task ExecuteOrchestrator( + Task ExecuteOrchestrator( OrchestrationInstance instance, IEnumerable pastEvents, IEnumerable newEvents); diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/TaskOrchestrationDispatcher.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/TaskOrchestrationDispatcher.cs index 20cae03e3..3b16d1c46 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/TaskOrchestrationDispatcher.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Dispatcher/TaskOrchestrationDispatcher.cs @@ -62,7 +62,7 @@ protected override async Task ExecuteWorkItemAsync(TaskOrchestrationWorkItem wor // Execute the orchestrator code and get back a set of new actions to take. // IMPORTANT: This IEnumerable may be lazily evaluated and should only be enumerated once! - OrchestratorExecutionResult result = await this.taskExecutor.ExecuteOrchestrator( + GrpcOrchestratorExecutionResult result = await this.taskExecutor.ExecuteOrchestrator( instance, workItem.OrchestrationRuntimeState.PastEvents, workItem.OrchestrationRuntimeState.NewEvents); @@ -89,6 +89,14 @@ protected override async Task ExecuteWorkItemAsync(TaskOrchestrationWorkItem wor continue; } + ExecutionStartedEvent? executionStartedEvent = workItem.OrchestrationRuntimeState.ExecutionStartedEvent; + + if (executionStartedEvent?.ParentTraceContext is not null) + { + executionStartedEvent.ParentTraceContext.ActivityStartTime = result.OrchestrationActivityStartTime; + executionStartedEvent.ParentTraceContext.SpanId = result.OrchestrationActivitySpanId; + } + // Commit the changes to the durable store await this.service.CompleteTaskOrchestrationWorkItemAsync( workItem, @@ -197,6 +205,11 @@ void ApplyOrchestratorActions( scheduleTaskAction.Version, scheduleTaskAction.Input); + if (action is GrpcScheduleTaskOrchestratorAction { ParentTraceContext: not null } grpcAction) + { + scheduledEvent.ParentTraceContext ??= new(grpcAction.ParentTraceContext.TraceParent, grpcAction.ParentTraceContext.TraceState); + } + newActivityMessages ??= new List(); newActivityMessages.Add(new TaskMessage { @@ -224,12 +237,15 @@ void ApplyOrchestratorActions( } else if (action is CreateSubOrchestrationAction subOrchestrationAction) { - runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(subOrchestrationAction.Id) + var grpcAction = action as GrpcCreateSubOrchestrationAction; + + runtimeState.AddEvent(new GrpcSubOrchestrationInstanceCreatedEvent(subOrchestrationAction.Id) { Name = subOrchestrationAction.Name, Version = subOrchestrationAction.Version, InstanceId = subOrchestrationAction.InstanceId, Input = subOrchestrationAction.Input, + ParentTraceContext = grpcAction?.ParentTraceContext }); ExecutionStartedEvent startedEvent = new(-1, subOrchestrationAction.Input) @@ -248,6 +264,7 @@ void ApplyOrchestratorActions( Version = runtimeState.Version, TaskScheduleId = subOrchestrationAction.Id, }, + ParentTraceContext = grpcAction?.ParentTraceContext, Tags = subOrchestrationAction.Tags, }; diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs index 6c9ab0b95..ae8c16a19 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs @@ -6,8 +6,10 @@ using DurableTask.Core.Command; using DurableTask.Core.History; using DurableTask.Core.Query; +using DurableTask.Core.Tracing; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; +using Microsoft.DurableTask.Sidecar.Dispatcher; using Proto = Microsoft.DurableTask.Protobuf; namespace Microsoft.DurableTask.Sidecar.Grpc; @@ -95,7 +97,7 @@ public static Proto.HistoryEvent ToHistoryEventProto(HistoryEvent e) { TraceParent = startedEvent.ParentTraceContext.TraceParent, TraceState = startedEvent.ParentTraceContext.TraceState, - }, + } }; break; case EventType.ExecutionTerminated: @@ -144,6 +146,16 @@ public static Proto.HistoryEvent ToHistoryEventProto(HistoryEvent e) Name = subOrchestrationCreated.Name, Version = subOrchestrationCreated.Version, }; + + if (subOrchestrationCreated is GrpcSubOrchestrationInstanceCreatedEvent { ParentTraceContext: not null } grpcEvent) + { + payload.SubOrchestrationInstanceCreated.ParentTraceContext = new Proto.TraceContext + { + TraceParent = grpcEvent.ParentTraceContext.TraceParent, + TraceState = grpcEvent.ParentTraceContext.TraceState, + }; + } + break; case EventType.SubOrchestrationInstanceCompleted: var subOrchestrationCompleted = (SubOrchestrationInstanceCompletedEvent)e; @@ -237,20 +249,26 @@ public static OrchestratorAction ToOrchestratorAction(Proto.OrchestratorAction a switch (a.OrchestratorActionTypeCase) { case Proto.OrchestratorAction.OrchestratorActionTypeOneofCase.ScheduleTask: - return new ScheduleTaskOrchestratorAction + return new GrpcScheduleTaskOrchestratorAction { Id = a.Id, Input = a.ScheduleTask.Input, Name = a.ScheduleTask.Name, Version = a.ScheduleTask.Version, + ParentTraceContext = a.ScheduleTask.ParentTraceContext is not null + ? new DistributedTraceContext(a.ScheduleTask.ParentTraceContext.TraceParent, a.ScheduleTask.ParentTraceContext.TraceState) + : null, }; case Proto.OrchestratorAction.OrchestratorActionTypeOneofCase.CreateSubOrchestration: - return new CreateSubOrchestrationAction + return new GrpcCreateSubOrchestrationAction { Id = a.Id, Input = a.CreateSubOrchestration.Input, Name = a.CreateSubOrchestration.Name, InstanceId = a.CreateSubOrchestration.InstanceId, + ParentTraceContext = a.CreateSubOrchestration.ParentTraceContext is not null + ? new DistributedTraceContext(a.CreateSubOrchestration.ParentTraceContext.TraceParent, a.CreateSubOrchestration.ParentTraceContext.TraceState) + : null, Tags = null, // TODO Version = a.CreateSubOrchestration.Version, }; diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs index 192a5a77f..b035ed53f 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs @@ -20,7 +20,7 @@ public class TaskHubGrpcServer : P.TaskHubSidecarService.TaskHubSidecarServiceBa { static readonly Task EmptyCompleteTaskResponse = Task.FromResult(new P.CompleteTaskResponse()); - readonly ConcurrentDictionary> pendingOrchestratorTasks = new(StringComparer.OrdinalIgnoreCase); + readonly ConcurrentDictionary> pendingOrchestratorTasks = new(StringComparer.OrdinalIgnoreCase); readonly ConcurrentDictionary> pendingActivityTasks = new(StringComparer.OrdinalIgnoreCase); readonly ILogger log; @@ -143,6 +143,9 @@ await this.client.CreateTaskOrchestrationAsync( Version = request.Version, OrchestrationInstance = instance, Tags = request.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value), + ParentTraceContext = request.ParentTraceContext is not null + ? new(request.ParentTraceContext.TraceParent, request.ParentTraceContext.TraceState) + : null }, OrchestrationInstance = instance, }); @@ -359,16 +362,18 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, { if (!this.pendingOrchestratorTasks.TryRemove( request.InstanceId, - out TaskCompletionSource? tcs)) + out TaskCompletionSource? tcs)) { // TODO: Log? throw new RpcException(new Status(StatusCode.NotFound, $"Orchestration not found")); } - OrchestratorExecutionResult result = new() + GrpcOrchestratorExecutionResult result = new() { Actions = request.Actions.Select(ProtobufUtils.ToOrchestratorAction), CustomStatus = request.CustomStatus, + OrchestrationActivitySpanId = request.OrchestrationTraceContext?.SpanID, + OrchestrationActivityStartTime = request.OrchestrationTraceContext?.SpanStartTime?.ToDateTimeOffset(), }; tcs.TrySetResult(result); @@ -458,14 +463,24 @@ public override async Task GetWorkItems(P.GetWorkItemsRequest request, IServerSt /// Invoked by the when a work item is available, proxies the call to execute an orchestrator over a gRPC channel. /// /// - async Task ITaskExecutor.ExecuteOrchestrator( + async Task ITaskExecutor.ExecuteOrchestrator( OrchestrationInstance instance, IEnumerable pastEvents, IEnumerable newEvents) { + var executionStartedEvent = pastEvents.OfType().FirstOrDefault(); + + P.OrchestrationTraceContext? orchestrationTraceContext = executionStartedEvent?.ParentTraceContext?.SpanId is not null + ? new P.OrchestrationTraceContext + { + SpanID = executionStartedEvent.ParentTraceContext.SpanId, + SpanStartTime = executionStartedEvent.ParentTraceContext.ActivityStartTime?.ToTimestamp(), + } + : null; + // Create a task completion source that represents the async completion of the orchestrator execution. // This must be done before we start the orchestrator execution. - TaskCompletionSource tcs = + TaskCompletionSource tcs = this.CreateTaskCompletionSourceForOrchestrator(instance.InstanceId); try @@ -477,6 +492,7 @@ await this.SendWorkItemToClientAsync(new P.WorkItem InstanceId = instance.InstanceId, ExecutionId = instance.ExecutionId, NewEvents = { newEvents.Select(ProtobufUtils.ToHistoryEventProto) }, + OrchestrationTraceContext = orchestrationTraceContext, PastEvents = { pastEvents.Select(ProtobufUtils.ToHistoryEventProto) }, } }); @@ -517,6 +533,13 @@ await this.SendWorkItemToClientAsync(new P.WorkItem InstanceId = instance.InstanceId, ExecutionId = instance.ExecutionId, }, + ParentTraceContext = activityEvent.ParentTraceContext is not null + ? new() + { + TraceParent = activityEvent.ParentTraceContext.TraceParent, + TraceState = activityEvent.ParentTraceContext.TraceState, + } + : null, } }); } @@ -558,9 +581,9 @@ async Task SendWorkItemToClientAsync(P.WorkItem workItem) } } - TaskCompletionSource CreateTaskCompletionSourceForOrchestrator(string instanceId) + TaskCompletionSource CreateTaskCompletionSourceForOrchestrator(string instanceId) { - TaskCompletionSource tcs = new(); + TaskCompletionSource tcs = new(); this.pendingOrchestratorTasks.TryAdd(instanceId, tcs); return tcs; } diff --git a/test/Grpc.IntegrationTests/IntegrationTestBase.cs b/test/Grpc.IntegrationTests/IntegrationTestBase.cs index fdebdb244..642107efc 100644 --- a/test/Grpc.IntegrationTests/IntegrationTestBase.cs +++ b/test/Grpc.IntegrationTests/IntegrationTestBase.cs @@ -37,6 +37,8 @@ public IntegrationTestBase(ITestOutputHelper output, GrpcSidecarFixture sidecarF /// public CancellationToken TimeoutToken => this.testTimeoutSource.Token; + public ICollection ExportedItems = new List(); + void IDisposable.Dispose() { this.testTimeoutSource.Dispose(); @@ -57,14 +59,15 @@ protected async Task StartWorkerAsync(ActionConfigures the durable task client builder. protected IHostBuilder CreateHostBuilder(Action workerConfigure, Action? clientConfigure) { - return Host.CreateDefaultBuilder() + var host = Host.CreateDefaultBuilder() .ConfigureLogging(b => { b.ClearProviders(); b.AddProvider(this.logProvider); b.SetMinimumLevel(LogLevel.Debug); - }) - .ConfigureServices(services => + + }) + .ConfigureServices((context, services) => { services.AddDurableTaskWorker(b => { @@ -79,6 +82,8 @@ protected IHostBuilder CreateHostBuilder(Action worke clientConfigure?.Invoke(b); }); }); + + return host; } protected IReadOnlyCollection GetLogs() diff --git a/test/Grpc.IntegrationTests/TracingIntegrationTests.cs b/test/Grpc.IntegrationTests/TracingIntegrationTests.cs new file mode 100644 index 000000000..6bd5c421b --- /dev/null +++ b/test/Grpc.IntegrationTests/TracingIntegrationTests.cs @@ -0,0 +1,699 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Diagnostics; +using FluentAssertions; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Worker; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Grpc.Tests; + +public class TracingIntegrationTests : IntegrationTestBase +{ + public TracingIntegrationTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) + : base(output, sidecarFixture) + { + } + + static ActivityListener CreateListener(string[] sources, ICollection activities) + { + ActivityListener listener = new(); + + listener.ShouldListenTo = s => sources.Contains(s.Name); + listener.Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllData; + listener.ActivityStopped = a => activities.Add(a); + + ActivitySource.AddActivityListener(listener); + + return listener; + } + + const string TestActivitySourceName = nameof(TracingIntegrationTests); + const string CoreActivitySourceName = "Microsoft.DurableTask"; + + static readonly string[] ActivitySourceNames = [TestActivitySourceName, CoreActivitySourceName]; + + static readonly ActivitySource TestActivitySource = new(TestActivitySourceName); + + [Fact] + public async Task MultiTaskOrchestration() + { + var activities = new List(); + + using var listener = CreateListener(ActivitySourceNames, activities); + + string orchestratorName = nameof(MultiTaskOrchestration); + string activityName = nameof(TestActivityAsync); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, + async (ctx, input) => + { + await ctx.CallActivityAsync(nameof(TestActivityAsync), true); + await ctx.CallActivityAsync(nameof(TestActivityAsync), true); + + return true; + }) + .AddActivityFunc(activityName, (_, input) => TestActivityAsync(input))); + }); + + string instanceId; + + using (var activity = TestActivitySource.StartActivity("Test")) + { + instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: true, cancellation: this.TimeoutToken); + + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + } + + var testActivity = activities.Single(a => a.Source == TestActivitySource && a.OperationName == "Test"); + var createActivity = activities.Single(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"create_orchestration:{orchestratorName}"); + + // The creation activity should be parented to the test activity. + createActivity.ParentId.Should().Be(testActivity.Id); + createActivity.ParentSpanId.Should().Be(testActivity.SpanId); + createActivity.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + createActivity.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(orchestratorName); + createActivity.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("orchestration"); + + var orchestrationActivities = activities.Where(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{orchestratorName}").ToList(); + + orchestrationActivities.Should().HaveCountGreaterThan(0); + + // The orchestration activities should be the same "logical" orchestration activity. + orchestrationActivities.Select(a => a.StartTimeUtc).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.Id).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.SpanId).Distinct().Should().HaveCount(1); + + // The orchestration activities should be parented to the create activity. + orchestrationActivities + .Should().AllSatisfy(a => + { + a.Kind.Should().Be(ActivityKind.Server); + a.ParentId.Should().Be(createActivity.Id); + a.ParentSpanId.Should().Be(createActivity.SpanId); + + a.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + a.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(orchestratorName); + a.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("orchestration"); + a.TagObjects.Should().ContainKey("durabletask.task.status").WhoseValue.Should().Be("Completed"); + }); + + var orchestrationActivity = orchestrationActivities.First(); + + var clientActivityActivities = activities.Where(a => a.Kind == ActivityKind.Client && a.Source.Name == CoreActivitySourceName && a.OperationName == $"activity:{activityName}").ToList(); + + // The "client" (i.e. scheduled) task activities should be parented to the orchestration activity. + clientActivityActivities + .Should().HaveCount(2) + .And.AllSatisfy(a => + { + a.ParentId.Should().Be(orchestrationActivity.Id); + a.ParentSpanId.Should().Be(orchestrationActivity.SpanId); + + }); + + var serverActivityActivities = activities.Where(a => a.Kind == ActivityKind.Server && a.Source.Name == CoreActivitySourceName && a.OperationName == $"activity:{activityName}").ToList(); + + // The "server" (i.e. executed) task activities should be parented to the client activity activities. + serverActivityActivities + .Should().HaveCount(clientActivityActivities.Count) + .And.AllSatisfy(a => + { + a.ParentId.Should().BeOneOf(clientActivityActivities.Select(aa => aa.Id)); + a.ParentSpanId.ToString().Should().BeOneOf(clientActivityActivities.Select(aa => aa.SpanId.ToString())); + + a.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + a.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(activityName); + a.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("activity"); + a.TagObjects.Should().ContainKey("durabletask.task.task_id").WhoseValue.Should().NotBeNull(); + }); + + var activityExecutionActivities = activities.Where(a => a.Source.Name == TestActivitySourceName && a.OperationName == nameof(TestActivityAsync)).ToList(); + + // The activity execution activities should be parented to the server activity activities. + activityExecutionActivities + .Should().HaveCount(serverActivityActivities.Count) + .And.AllSatisfy(a => + { + a.ParentId.Should().BeOneOf(serverActivityActivities.Select(aa => aa.Id)); + a.ParentSpanId.ToString().Should().BeOneOf(serverActivityActivities.Select(aa => aa.SpanId.ToString())); + }); + } + + [Fact] + public async Task TaskOrchestrationWithActivityFailure() + { + var activities = new List(); + + using var listener = CreateListener(ActivitySourceNames, activities); + + string orchestratorName = nameof(TaskOrchestrationWithActivityFailure); + string activityName = nameof(TestActivityAsync); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, + async (ctx, input) => + { + await ctx.CallActivityAsync(nameof(TestActivityAsync), false); + + return true; + }) + .AddActivityFunc(activityName, (_, input) => TestActivityAsync(input))); + }); + + string instanceId; + + using (var activity = TestActivitySource.StartActivity("Test")) + { + instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: true, cancellation: this.TimeoutToken); + + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + } + + var testActivity = activities.Single(a => a.Source == TestActivitySource && a.OperationName == "Test"); + var createActivity = activities.Single(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"create_orchestration:{orchestratorName}"); + + // The creation activity should be parented to the test activity. + createActivity.ParentId.Should().Be(testActivity.Id); + createActivity.ParentSpanId.Should().Be(testActivity.SpanId); + + var orchestrationActivities = activities.Where(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{orchestratorName}").ToList(); + + orchestrationActivities.Should().HaveCountGreaterThan(0); + + // The orchestration activities should be the same "logical" orchestration activity. + orchestrationActivities.Select(a => a.StartTimeUtc).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.Id).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.SpanId).Distinct().Should().HaveCount(1); + + // The orchestration activities should be parented to the create activity. + orchestrationActivities + .Should().AllSatisfy(a => + { + a.Kind.Should().Be(ActivityKind.Server); + a.ParentId.Should().Be(createActivity.Id); + a.ParentSpanId.Should().Be(createActivity.SpanId); + a.Status.Should().Be(ActivityStatusCode.Error); + + a.TagObjects.Should().ContainKey("durabletask.task.status").WhoseValue.Should().Be("Failed"); + }); + + var orchestrationActivity = orchestrationActivities.First(); + + var clientActivityActivities = activities.Where(a => a.Kind == ActivityKind.Client && a.Source.Name == CoreActivitySourceName && a.OperationName == $"activity:{activityName}").ToList(); + + // The "client" (i.e. scheduled) task activities should be parented to the orchestration activity. + clientActivityActivities + .Should().HaveCount(1) + .And.AllSatisfy(a => + { + a.ParentId.Should().Be(orchestrationActivity.Id); + a.ParentSpanId.Should().Be(orchestrationActivity.SpanId); + a.Status.Should().Be(ActivityStatusCode.Error); + }); + + var serverActivityActivities = activities.Where(a => a.Kind == ActivityKind.Server && a.Source.Name == CoreActivitySourceName && a.OperationName == $"activity:{activityName}").ToList(); + + // The "server" (i.e. executed) task activities should be parented to the client activity activities. + serverActivityActivities + .Should().HaveCount(clientActivityActivities.Count) + .And.AllSatisfy(a => + { + a.ParentId.Should().BeOneOf(clientActivityActivities.Select(aa => aa.Id)); + a.ParentSpanId.ToString().Should().BeOneOf(clientActivityActivities.Select(aa => aa.SpanId.ToString())); + a.Status.Should().Be(ActivityStatusCode.Error); + }); + + var activityExecutionActivities = activities.Where(a => a.Source.Name == TestActivitySourceName && a.OperationName == nameof(TestActivityAsync)).ToList(); + + // The activity execution activities should be parented to the server activity activities. + activityExecutionActivities + .Should().HaveCount(serverActivityActivities.Count) + .And.AllSatisfy(a => + { + a.ParentId.Should().BeOneOf(serverActivityActivities.Select(aa => aa.Id)); + a.ParentSpanId.ToString().Should().BeOneOf(serverActivityActivities.Select(aa => aa.SpanId.ToString())); + a.Status.Should().Be(ActivityStatusCode.Error); + }); + } + + [Fact] + public async Task TaskWithSuborchestration() + { + var activities = new List(); + + using var listener = CreateListener(ActivitySourceNames, activities); + + string orchestratorName = nameof(TaskWithSuborchestration); + string subOrchestratorName = "SubOrchestration"; + string activityName = nameof(TestActivityAsync); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, + async (ctx, input) => + { + await ctx.CallSubOrchestratorAsync(subOrchestratorName, input: true); + + return true; + }) + .AddOrchestratorFunc( + subOrchestratorName, + async (ctx, input) => + { + await ctx.CallActivityAsync(nameof(TestActivityAsync), input); + + return true; + }) + .AddActivityFunc(activityName, (_, input) => TestActivityAsync(input))); + }); + + string instanceId; + + using (var activity = TestActivitySource.StartActivity("Test")) + { + instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: true, cancellation: this.TimeoutToken); + + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + } + + var testActivity = activities.Single(a => a.Source == TestActivitySource && a.OperationName == "Test"); + var createActivity = activities.Single(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"create_orchestration:{orchestratorName}"); + + // The creation activity should be parented to the test activity. + createActivity.ParentId.Should().Be(testActivity.Id); + createActivity.ParentSpanId.Should().Be(testActivity.SpanId); + createActivity.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + createActivity.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(orchestratorName); + createActivity.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("orchestration"); + + var orchestrationActivities = activities.Where(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{orchestratorName}").ToList(); + + orchestrationActivities.Should().HaveCountGreaterThan(0); + + // The orchestration activities should be the same "logical" orchestration activity. + orchestrationActivities.Select(a => a.StartTimeUtc).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.Id).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.SpanId).Distinct().Should().HaveCount(1); + + // The orchestration activities should be parented to the create activity. + orchestrationActivities + .Should().AllSatisfy(a => + { + a.Kind.Should().Be(ActivityKind.Server); + a.ParentId.Should().Be(createActivity.Id); + a.ParentSpanId.Should().Be(createActivity.SpanId); + + a.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + a.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(orchestratorName); + a.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("orchestration"); + }); + + var orchestrationActivity = orchestrationActivities.First(); + + var clientSuborchestrationActivities = activities.Where(a => a.Kind == ActivityKind.Client && a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{subOrchestratorName}").ToList(); + + // The client suborchestration activities should be parented to the orchestration activity. + clientSuborchestrationActivities + .Should().HaveCount(1) + .And.AllSatisfy(a => + { + a.ParentId.Should().Be(orchestrationActivity.Id); + a.ParentSpanId.Should().Be(orchestrationActivity.SpanId); + + a.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + a.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(subOrchestratorName); + a.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("orchestration"); + }); + + var clientSuborchestrationActivity = clientSuborchestrationActivities.First(); + + var serverSuborchestrationActivities = activities.Where(a => a.Kind == ActivityKind.Server && a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{subOrchestratorName}").ToList(); + + // The server suborchestration activities should be parented to the client orchestration activity. + serverSuborchestrationActivities + .Should().HaveCountGreaterThan(0) + .And.AllSatisfy(a => + { + a.ParentId.Should().Be(clientSuborchestrationActivity.Id); + a.ParentSpanId.Should().Be(clientSuborchestrationActivity.SpanId); + + a.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(subOrchestratorName); + a.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("orchestration"); + }); + } + + [Fact] + public async Task TaskWithSuborchestrationFailure() + { + var activities = new List(); + + using var listener = CreateListener(ActivitySourceNames, activities); + + string orchestratorName = nameof(TaskWithSuborchestrationFailure); + string subOrchestratorName = "SubOrchestration"; + string activityName = nameof(TestActivityAsync); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, + async (ctx, input) => + { + await ctx.CallSubOrchestratorAsync(subOrchestratorName, input: false); + + return true; + }) + .AddOrchestratorFunc( + subOrchestratorName, + async (ctx, input) => + { + await ctx.CallActivityAsync(nameof(TestActivityAsync), input); + + return true; + }) + .AddActivityFunc(activityName, (_, input) => TestActivityAsync(input))); + }); + + string instanceId; + + using (var activity = TestActivitySource.StartActivity("Test")) + { + instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: true, cancellation: this.TimeoutToken); + + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + } + + var testActivity = activities.Single(a => a.Source == TestActivitySource && a.OperationName == "Test"); + var createActivity = activities.Single(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"create_orchestration:{orchestratorName}"); + + // The creation activity should be parented to the test activity. + createActivity.ParentId.Should().Be(testActivity.Id); + createActivity.ParentSpanId.Should().Be(testActivity.SpanId); + createActivity.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + createActivity.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(orchestratorName); + createActivity.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("orchestration"); + + var orchestrationActivities = activities.Where(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{orchestratorName}").ToList(); + + orchestrationActivities.Should().HaveCountGreaterThan(0); + + // The orchestration activities should be the same "logical" orchestration activity. + orchestrationActivities.Select(a => a.StartTimeUtc).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.Id).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.SpanId).Distinct().Should().HaveCount(1); + + // The orchestration activities should be parented to the create activity. + orchestrationActivities + .Should().AllSatisfy(a => + { + a.Kind.Should().Be(ActivityKind.Server); + a.ParentId.Should().Be(createActivity.Id); + a.ParentSpanId.Should().Be(createActivity.SpanId); + a.Status.Should().Be(ActivityStatusCode.Error); + + a.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + a.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(orchestratorName); + a.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("orchestration"); + }); + + var orchestrationActivity = orchestrationActivities.First(); + + var clientSuborchestrationActivities = activities.Where(a => a.Kind == ActivityKind.Client && a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{subOrchestratorName}").ToList(); + + // The client suborchestration activities should be parented to the orchestration activity. + clientSuborchestrationActivities + .Should().HaveCount(1) + .And.AllSatisfy(a => + { + a.ParentId.Should().Be(orchestrationActivity.Id); + a.ParentSpanId.Should().Be(orchestrationActivity.SpanId); + a.Status.Should().Be(ActivityStatusCode.Error); + + a.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + a.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(subOrchestratorName); + a.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("orchestration"); + }); + + var clientSuborchestrationActivity = clientSuborchestrationActivities.First(); + + var serverSuborchestrationActivities = activities.Where(a => a.Kind == ActivityKind.Server && a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{subOrchestratorName}").ToList(); + + // The server suborchestration activities should be parented to the client orchestration activity. + serverSuborchestrationActivities + .Should().HaveCountGreaterThan(0) + .And.AllSatisfy(a => + { + a.ParentId.Should().Be(clientSuborchestrationActivity.Id); + a.ParentSpanId.Should().Be(clientSuborchestrationActivity.SpanId); + a.Status.Should().Be(ActivityStatusCode.Error); + + a.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(subOrchestratorName); + a.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("orchestration"); + }); + } + + [Fact] + public async Task TaskOrchestrationWithSentEvent() + { + var activities = new List(); + + using var listener = CreateListener(ActivitySourceNames, activities); + + string orchestratorName = nameof(TaskOrchestrationWithSentEvent); + string activityName = nameof(TestActivityAsync); + string targetInstanceId = "TestInstanceId"; + string eventName = "TestEvent"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, + (ctx, input) => + { + ctx.SendEvent(targetInstanceId, eventName, "TestData"); + + return true; + }) + .AddActivityFunc(activityName, (_, input) => TestActivityAsync(input))); + }); + + string instanceId; + + using (var activity = TestActivitySource.StartActivity("Test")) + { + instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: true, cancellation: this.TimeoutToken); + + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + } + + var testActivity = activities.Single(a => a.Source == TestActivitySource && a.OperationName == "Test"); + var createActivity = activities.Single(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"create_orchestration:{orchestratorName}"); + + // The creation activity should be parented to the test activity. + createActivity.ParentId.Should().Be(testActivity.Id); + createActivity.ParentSpanId.Should().Be(testActivity.SpanId); + + var orchestrationActivities = activities.Where(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{orchestratorName}").ToList(); + + orchestrationActivities.Should().HaveCountGreaterThan(0); + + // The orchestration activities should be the same "logical" orchestration activity. + orchestrationActivities.Select(a => a.StartTimeUtc).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.Id).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.SpanId).Distinct().Should().HaveCount(1); + + // The orchestration activities should be parented to the create activity. + orchestrationActivities + .Should().AllSatisfy(a => + { + a.Kind.Should().Be(ActivityKind.Server); + a.ParentId.Should().Be(createActivity.Id); + a.ParentSpanId.Should().Be(createActivity.SpanId); + }); + + var orchestrationActivity = orchestrationActivities.First(); + + var sentEventActivities = activities.Where(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration_event:{eventName}").ToList(); + + // The "client" (i.e. scheduled) task activities should be parented to the orchestration activity. + sentEventActivities + .Should().HaveCount(1) + .And.AllSatisfy(a => + { + a.Kind.Should().Be(ActivityKind.Producer); + a.ParentId.Should().Be(orchestrationActivity.Id); + a.ParentSpanId.Should().Be(orchestrationActivity.SpanId); + + a.TagObjects.Should().ContainKey("durabletask.event.target_instance_id").WhoseValue.Should().Be(targetInstanceId); + a.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + a.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(eventName); + a.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("event"); + }); + } + + [Fact] + public async Task TaskOrchestrationWithTimer() + { + var activities = new List(); + + using var listener = CreateListener(ActivitySourceNames, activities); + + string orchestratorName = nameof(TaskOrchestrationWithTimer); + string activityName = nameof(TestActivityAsync); + + DateTime fireAt = default; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, + async (ctx, input) => + { + fireAt = ctx.CurrentUtcDateTime.AddSeconds(1); + + await ctx.CreateTimer(fireAt, CancellationToken.None); + + return true; + }) + .AddActivityFunc(activityName, (_, input) => TestActivityAsync(input))); + }); + + string instanceId; + + using (var activity = TestActivitySource.StartActivity("Test")) + { + instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: true, cancellation: this.TimeoutToken); + + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, this.TimeoutToken); + } + + fireAt.Should().NotBe(default); + + var testActivity = activities.Single(a => a.Source == TestActivitySource && a.OperationName == "Test"); + var createActivity = activities.Single(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"create_orchestration:{orchestratorName}"); + + // The creation activity should be parented to the test activity. + createActivity.ParentId.Should().Be(testActivity.Id); + createActivity.ParentSpanId.Should().Be(testActivity.SpanId); + + var orchestrationActivities = activities.Where(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{orchestratorName}").ToList(); + + orchestrationActivities.Should().HaveCountGreaterThan(0); + + // The orchestration activities should be the same "logical" orchestration activity. + orchestrationActivities.Select(a => a.StartTimeUtc).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.Id).Distinct().Should().HaveCount(1); + orchestrationActivities.Select(a => a.SpanId).Distinct().Should().HaveCount(1); + + // The orchestration activities should be parented to the create activity. + orchestrationActivities + .Should().AllSatisfy(a => + { + a.Kind.Should().Be(ActivityKind.Server); + a.ParentId.Should().Be(createActivity.Id); + a.ParentSpanId.Should().Be(createActivity.SpanId); + }); + + var orchestrationActivity = orchestrationActivities.First(); + + var timerActivities = activities.Where(a => a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration:{orchestratorName}:timer").ToList(); + + // The "client" (i.e. scheduled) task activities should be parented to the orchestration activity. + timerActivities + .Should().HaveCount(1) + .And.AllSatisfy(a => + { + a.Kind.Should().Be(ActivityKind.Internal); + a.ParentId.Should().Be(orchestrationActivity.Id); + a.ParentSpanId.Should().Be(orchestrationActivity.SpanId); + + a.TagObjects.Should().ContainKey("durabletask.fire_at").WhoseValue.Should().Be(fireAt.ToString("O")); + a.TagObjects.Should().ContainKey("durabletask.task.instance_id").WhoseValue.Should().Be(instanceId); + a.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(orchestratorName); + a.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("timer"); + a.TagObjects.Should().ContainKey("durabletask.task.task_id").WhoseValue.Should().NotBeNull(); + }); + } + + [Fact] + public async Task ClientRaiseEvent() + { + var activities = new List(); + + using var listener = CreateListener(ActivitySourceNames, activities); + + string orchestratorName = nameof(this.ClientRaiseEvent); + string eventName = "TestEvent"; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc( + orchestratorName, + (ctx, input) => + { + return true; + })); + }); + + string instanceId; + + using (var activity = TestActivitySource.StartActivity("Test")) + { + instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: true, + cancellation: this.TimeoutToken); + + await server.Client.RaiseEventAsync(instanceId, eventName, "TestData", this.TimeoutToken); + + OrchestrationMetadata metadata = + await server.Client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true, + this.TimeoutToken); + } + + var testActivity = activities.Single(a => a.Source == TestActivitySource && a.OperationName == "Test"); + + var raiseEventActivity = activities.Single(a => + a.Source.Name == CoreActivitySourceName && a.OperationName == $"orchestration_event:{eventName}"); + + // The raise event activity should be parented to the test activity. + raiseEventActivity.Kind.Should().Be(ActivityKind.Producer); + raiseEventActivity.ParentId.Should().Be(testActivity.Id); + raiseEventActivity.ParentSpanId.Should().Be(testActivity.SpanId); + + raiseEventActivity.TagObjects.Should().ContainKey("durabletask.event.target_instance_id").WhoseValue.Should().Be(instanceId); + raiseEventActivity.TagObjects.Should().ContainKey("durabletask.task.name").WhoseValue.Should().Be(eventName); + raiseEventActivity.TagObjects.Should().ContainKey("durabletask.type").WhoseValue.Should().Be("event"); + } + + static Task TestActivityAsync(bool shouldSucceed) + { + using var activity = TestActivitySource.StartActivity(); + + if (shouldSucceed) + { + activity?.SetStatus(ActivityStatusCode.Ok); + + return Task.FromResult(true); + } + else + { + activity?.SetStatus(ActivityStatusCode.Error); + + throw new InvalidOperationException("Test activity failed."); + } + } +}