diff --git a/Directory.Packages.props b/Directory.Packages.props index 81e3d633f..9a87896a6 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -28,7 +28,7 @@ - + diff --git a/eng/targets/Release.props b/eng/targets/Release.props index 61031c15c..697fc3f70 100644 --- a/eng/targets/Release.props +++ b/eng/targets/Release.props @@ -17,7 +17,7 @@ - 1.10.0 + 1.11.0 diff --git a/src/Client/Grpc/GrpcDurableEntityClient.cs b/src/Client/Grpc/GrpcDurableEntityClient.cs index b5e3371d0..fd8e63926 100644 --- a/src/Client/Grpc/GrpcDurableEntityClient.cs +++ b/src/Client/Grpc/GrpcDurableEntityClient.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Diagnostics; using Microsoft.DurableTask.Client.Entities; using Microsoft.DurableTask.Entities; using Microsoft.Extensions.Logging; @@ -54,8 +55,16 @@ public override async Task SignalEntityAsync( RequestId = requestId.ToString(), Name = operationName, Input = this.dataConverter.Serialize(input), - ScheduledTime = scheduledTime?.ToTimestamp(), - }; + ScheduledTime = scheduledTime?.ToTimestamp(), + RequestTime = DateTimeOffset.UtcNow.ToTimestamp(), + }; + + if (Activity.Current is { } activity) + { + request.ParentTraceContext ??= new P.TraceContext(); + request.ParentTraceContext.TraceParent = activity.Id; + request.ParentTraceContext.TraceState = activity.TraceStateString; + } // TODO this.logger.LogSomething try diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 642ca7dbc..e1caea2f5 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using System.Diagnostics; @@ -95,6 +95,7 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( Version = version, InstanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"), Input = this.DataConverter.Serialize(input), + RequestTime = DateTimeOffset.UtcNow.ToTimestamp(), }; // Add tags to the collection diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs index 10571e376..4d798479d 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableEntityClient.cs @@ -1,8 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Diagnostics; using DurableTask.Core; -using DurableTask.Core.Entities; +using DurableTask.Core.Entities; +using DurableTask.Core.Tracing; using Microsoft.DurableTask.Client.Entities; using Microsoft.DurableTask.Entities; @@ -90,7 +92,10 @@ public override async Task SignalEntityAsync( EntityMessageEvent.GetCappedScheduledTime( DateTime.UtcNow, this.options.Entities.MaxSignalDelayTimeOrDefault, - scheduledTime?.UtcDateTime)); + scheduledTime?.UtcDateTime), + Activity.Current is { } activity ? new DistributedTraceContext(activity.Id!, activity.TraceStateString) : null, + requestTime: DateTimeOffset.UtcNow, + createTrace: true); await this.options.Client!.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage()); } diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index bcf8eb98a..bb77aab21 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Globalization; using DurableTask.Core; using DurableTask.Core.History; using DurableTask.Core.Query; @@ -166,6 +168,16 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( }; string? serializedInput = this.DataConverter.Serialize(input); + + var tags = new Dictionary(); + if (options?.Tags != null) + { + tags = options.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); + } + + tags[OrchestrationTags.CreateTraceForNewOrchestration] = "true"; + tags[OrchestrationTags.RequestTime] = DateTimeOffset.UtcNow.ToString(CultureInfo.InvariantCulture); + TaskMessage message = new() { OrchestrationInstance = instance, @@ -175,7 +187,8 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( Version = options?.Version ?? string.Empty, OrchestrationInstance = instance, ScheduledStartTime = options?.StartAt?.UtcDateTime, - Tags = options?.Tags != null ? options.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value) : null, + ParentTraceContext = Activity.Current is { } activity ? new Core.Tracing.DistributedTraceContext(activity.Id!, activity.TraceStateString) : null, + Tags = tags, }, }; diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 88928c3ba..b2ca147b5 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -95,6 +95,7 @@ message TaskScheduledEvent { google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; TraceContext parentTraceContext = 4; + map tags = 5; } message TaskCompletedEvent { @@ -256,6 +257,7 @@ message ScheduleTaskAction { string name = 1; google.protobuf.StringValue version = 2; google.protobuf.StringValue input = 3; + map tags = 4; } message CreateSubOrchestrationAction { @@ -343,6 +345,7 @@ message CreateInstanceRequest { google.protobuf.StringValue executionId = 7; map tags = 8; TraceContext parentTraceContext = 9; + google.protobuf.Timestamp requestTime = 10; } message OrchestrationIdReusePolicy { @@ -490,6 +493,8 @@ message SignalEntityRequest { google.protobuf.StringValue input = 3; string requestId = 4; google.protobuf.Timestamp scheduledTime = 5; + TraceContext parentTraceContext = 6; + google.protobuf.Timestamp requestTime = 7; } message SignalEntityResponse { @@ -575,6 +580,7 @@ message OperationRequest { string operation = 1; string requestId = 2; google.protobuf.StringValue input = 3; + TraceContext traceContext = 4; } message OperationResult { @@ -591,10 +597,14 @@ message OperationInfo { message OperationResultSuccess { google.protobuf.StringValue result = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; } message OperationResultFailure { TaskFailureDetails failureDetails = 1; + google.protobuf.Timestamp startTimeUtc = 2; + google.protobuf.Timestamp endTimeUtc = 3; } message OperationAction { @@ -610,6 +620,8 @@ message SendSignalAction { string name = 2; google.protobuf.StringValue input = 3; google.protobuf.Timestamp scheduledTime = 4; + google.protobuf.Timestamp requestTime = 5; + TraceContext parentTraceContext = 6; } message StartNewOrchestrationAction { @@ -618,6 +630,8 @@ message StartNewOrchestrationAction { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; google.protobuf.Timestamp scheduledTime = 5; + google.protobuf.Timestamp requestTime = 6; + TraceContext parentTraceContext = 7; } message AbandonActivityTaskRequest { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index fc90a4409..121ec0ec7 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-04-23 23:27:00 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fbe5bb20835678099fc51a44993ed9b045dee5a6/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-06-02 21:12:34 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fd9369c6a03d6af4e95285e432b7c4e943c06970/protos/orchestrator_service.proto diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 56907bc95..b73d24374 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using System.Buffers; @@ -11,6 +11,7 @@ using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; +using DurableTask.Core.Tracing; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using DTCore = DurableTask.Core; @@ -590,6 +591,10 @@ internal static void ToEntityBatchRequest( Operation = operationRequest.Operation, Input = operationRequest.Input, Id = Guid.Parse(operationRequest.RequestId), + TraceContext = operationRequest.TraceContext != null ? + new DistributedTraceContext( + operationRequest.TraceContext.TraceParent, + operationRequest.TraceContext.TraceState) : null, }; } @@ -612,12 +617,16 @@ internal static void ToEntityBatchRequest( return new OperationResult() { Result = operationResult.Success.Result, + StartTimeUtc = operationResult.Success.StartTimeUtc?.ToDateTime(), + EndTimeUtc = operationResult.Success.EndTimeUtc?.ToDateTime(), }; case P.OperationResult.ResultTypeOneofCase.Failure: return new OperationResult() { FailureDetails = operationResult.Failure.FailureDetails.ToCore(), + StartTimeUtc = operationResult.Failure.StartTimeUtc?.ToDateTime(), + EndTimeUtc = operationResult.Failure.EndTimeUtc?.ToDateTime(), }; default: @@ -645,6 +654,8 @@ internal static void ToEntityBatchRequest( Success = new P.OperationResultSuccess() { Result = operationResult.Result, + StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(), + EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(), }, }; } @@ -655,6 +666,8 @@ internal static void ToEntityBatchRequest( Failure = new P.OperationResultFailure() { FailureDetails = ToProtobuf(operationResult.FailureDetails), + StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(), + EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(), }, }; } @@ -683,6 +696,11 @@ internal static void ToEntityBatchRequest( Input = operationAction.SendSignal.Input, InstanceId = operationAction.SendSignal.InstanceId, ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(), + RequestTime = operationAction.SendSignal.RequestTime?.ToDateTimeOffset(), + ParentTraceContext = operationAction.SendSignal.ParentTraceContext != null ? + new DistributedTraceContext( + operationAction.SendSignal.ParentTraceContext.TraceParent, + operationAction.SendSignal.ParentTraceContext.TraceState) : null, }; case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration: @@ -694,6 +712,11 @@ internal static void ToEntityBatchRequest( InstanceId = operationAction.StartNewOrchestration.InstanceId, Version = operationAction.StartNewOrchestration.Version, ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(), + RequestTime = operationAction.StartNewOrchestration.RequestTime?.ToDateTimeOffset(), + ParentTraceContext = operationAction.StartNewOrchestration.ParentTraceContext != null ? + new DistributedTraceContext( + operationAction.StartNewOrchestration.ParentTraceContext.TraceParent, + operationAction.StartNewOrchestration.ParentTraceContext.TraceState) : null, }; default: throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported."); @@ -725,6 +748,14 @@ internal static void ToEntityBatchRequest( Input = sendSignalAction.Input, InstanceId = sendSignalAction.InstanceId, ScheduledTime = sendSignalAction.ScheduledTime?.ToTimestamp(), + RequestTime = sendSignalAction.RequestTime?.ToTimestamp(), + ParentTraceContext = sendSignalAction.ParentTraceContext != null ? + new P.TraceContext + { + TraceParent = sendSignalAction.ParentTraceContext.TraceParent, + TraceState = sendSignalAction.ParentTraceContext.TraceState, + } + : null, }; break; @@ -737,6 +768,14 @@ internal static void ToEntityBatchRequest( Version = startNewOrchestrationAction.Version, InstanceId = startNewOrchestrationAction.InstanceId, ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(), + RequestTime = startNewOrchestrationAction.RequestTime?.ToTimestamp(), + ParentTraceContext = startNewOrchestrationAction.ParentTraceContext != null ? + new P.TraceContext + { + TraceParent = startNewOrchestrationAction.ParentTraceContext.TraceParent, + TraceState = startNewOrchestrationAction.ParentTraceContext.TraceState, + } + : null, }; break; } diff --git a/src/Worker/Core/Shims/TaskEntityShim.cs b/src/Worker/Core/Shims/TaskEntityShim.cs index 8f47a88b2..bad6c7f08 100644 --- a/src/Worker/Core/Shims/TaskEntityShim.cs +++ b/src/Worker/Core/Shims/TaskEntityShim.cs @@ -4,6 +4,7 @@ using DurableTask.Core; using DurableTask.Core.Entities; using DurableTask.Core.Entities.OperationFormat; +using DurableTask.Core.Tracing; using Microsoft.DurableTask.Entities; using Microsoft.Extensions.Logging; using DTCore = DurableTask.Core; @@ -57,14 +58,25 @@ public override async Task ExecuteOperationBatchAsync(EntityB List results = new(); foreach (OperationRequest current in operations.Operations!) - { - this.operation.SetNameAndInput(current.Operation!, current.Input); + { + var startTime = DateTime.UtcNow; + this.operation.SetNameAndInput(current.Operation!, current.Input); + + // The trace context of the current operation becomes the parent trace context of the TaskEntityContext. + // That way, if processing this operation request leads to the TaskEntityContext signaling another entity or starting an orchestration, + // then the parent trace context of these actions will be set to the trace context of whatever operation request triggered them + this.context.ParentTraceContext = current.TraceContext; try - { + { object? result = await this.taskEntity.RunAsync(this.operation); string? serializedResult = this.dataConverter.Serialize(result); - results.Add(new OperationResult() { Result = serializedResult }); + results.Add(new OperationResult() + { + Result = serializedResult, + StartTimeUtc = startTime, + EndTimeUtc = DateTime.UtcNow, + }); // the user code completed without exception, so we commit the current state and actions. this.state.Commit(); @@ -75,7 +87,9 @@ public override async Task ExecuteOperationBatchAsync(EntityB this.logger.OperationError(applicationException, this.entityId, current.Operation!); results.Add(new OperationResult() { - FailureDetails = new FailureDetails(applicationException), + FailureDetails = new FailureDetails(applicationException), + StartTimeUtc = startTime, + EndTimeUtc = DateTime.UtcNow, }); // the user code threw an unhandled exception, so we roll back the state and the actions. @@ -169,7 +183,9 @@ class ContextShim : TaskEntityContext readonly DataConverter dataConverter; List operationActions; - int checkpointPosition; + int checkpointPosition; + + DistributedTraceContext? parentTraceContext; public ContextShim(EntityInstanceId entityInstanceId, DataConverter dataConverter) { @@ -182,7 +198,13 @@ public ContextShim(EntityInstanceId entityInstanceId, DataConverter dataConverte public int CurrentPosition => this.operationActions.Count; - public override EntityInstanceId Id => this.entityInstanceId; + public override EntityInstanceId Id => this.entityInstanceId; + + public DistributedTraceContext? ParentTraceContext + { + get => this.parentTraceContext; + set => this.parentTraceContext = value; + } public void Commit() { @@ -209,7 +231,9 @@ public override void SignalEntity(EntityInstanceId id, string operationName, obj InstanceId = id.ToString(), Name = operationName, Input = this.dataConverter.Serialize(input), - ScheduledTime = options?.SignalTime?.UtcDateTime, + ScheduledTime = options?.SignalTime?.UtcDateTime, + RequestTime = DateTimeOffset.UtcNow, + ParentTraceContext = this.parentTraceContext, }); } @@ -224,7 +248,9 @@ public override string ScheduleNewOrchestration(TaskName name, object? input = n Version = options?.Version ?? string.Empty, InstanceId = instanceId, Input = this.dataConverter.Serialize(input), - ScheduledStartTime = options?.StartAt?.UtcDateTime, + ScheduledStartTime = options?.StartAt?.UtcDateTime, + RequestTime = DateTimeOffset.UtcNow, + ParentTraceContext = this.parentTraceContext, }); return instanceId; } diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index 35ad033ca..330fd1888 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -203,7 +203,9 @@ Guid SendOperationMessage(string instanceId, string operationName, object? input oneWay, guid, EntityMessageEvent.GetCappedScheduledTime(this.wrapper.innerContext.CurrentUtcDateTime, this.wrapper.invocationContext.Options.MaximumTimerInterval, scheduledTime?.UtcDateTime), - serializedInput); + serializedInput, + requestTime: DateTimeOffset.UtcNow, + createTrace: true); if (!this.wrapper.IsReplaying) {