Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

<!-- DurableTask Packages -->
<ItemGroup>
<PackageVersion Include="Microsoft.Azure.DurableTask.Core" Version="3.1.0" />
<PackageVersion Include="Microsoft.Azure.DurableTask.Core" Version="3.2.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.2.2" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion eng/targets/Release.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</PropertyGroup>

<PropertyGroup>
<VersionPrefix>1.10.0</VersionPrefix>
<VersionPrefix>1.11.0</VersionPrefix>
<VersionSuffix></VersionSuffix>
</PropertyGroup>

Expand Down
13 changes: 11 additions & 2 deletions src/Client/Grpc/GrpcDurableEntityClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.DurableTask.Entities;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -54,8 +55,16 @@ public override async Task SignalEntityAsync(
RequestId = requestId.ToString(),
Name = operationName,
Input = this.dataConverter.Serialize(input),
ScheduledTime = scheduledTime?.ToTimestamp(),
};
ScheduledTime = scheduledTime?.ToTimestamp(),
RequestTime = DateTimeOffset.UtcNow.ToTimestamp(),
};

if (Activity.Current is { } activity)
{
request.ParentTraceContext ??= new P.TraceContext();
request.ParentTraceContext.TraceParent = activity.Id;
request.ParentTraceContext.TraceState = activity.TraceStateString;
}

// TODO this.logger.LogSomething
try
Expand Down
3 changes: 2 additions & 1 deletion src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft Corporation.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics;
Expand Down Expand Up @@ -95,6 +95,7 @@
Version = version,
InstanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"),
Input = this.DataConverter.Serialize(input),
RequestTime = DateTimeOffset.UtcNow.ToTimestamp(),
};

// Add tags to the collection
Expand All @@ -108,7 +109,7 @@

if (Activity.Current?.Id != null || Activity.Current?.TraceStateString != null)
{
if (request.ParentTraceContext == null)

Check warning on line 112 in src/Client/Grpc/GrpcDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Dereference of a possibly null reference.

Check warning on line 112 in src/Client/Grpc/GrpcDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
{
request.ParentTraceContext = new P.TraceContext();
}
Expand All @@ -126,7 +127,7 @@

DateTimeOffset? startAt = options?.StartAt;
this.logger.SchedulingOrchestration(
request.InstanceId,

Check warning on line 130 in src/Client/Grpc/GrpcDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Dereference of a possibly null reference.

Check warning on line 130 in src/Client/Grpc/GrpcDurableTaskClient.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
orchestratorName,
sizeInBytes: request.Input != null ? Encoding.UTF8.GetByteCount(request.Input) : 0,
startAt.GetValueOrDefault(DateTimeOffset.UtcNow));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.Entities;
using DurableTask.Core.Tracing;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.DurableTask.Entities;

Expand Down Expand Up @@ -90,7 +92,10 @@ public override async Task SignalEntityAsync(
EntityMessageEvent.GetCappedScheduledTime(
DateTime.UtcNow,
this.options.Entities.MaxSignalDelayTimeOrDefault,
scheduledTime?.UtcDateTime));
scheduledTime?.UtcDateTime),
Activity.Current is { } activity ? new DistributedTraceContext(activity.Id!, activity.TraceStateString) : null,
requestTime: DateTimeOffset.UtcNow,
createTrace: true);

await this.options.Client!.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using DurableTask.Core;
using DurableTask.Core.History;
using DurableTask.Core.Query;
Expand Down Expand Up @@ -166,6 +168,16 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
};

string? serializedInput = this.DataConverter.Serialize(input);

var tags = new Dictionary<string, string>();
if (options?.Tags != null)
{
tags = options.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
}

tags[OrchestrationTags.CreateTraceForNewOrchestration] = "true";
tags[OrchestrationTags.RequestTime] = DateTimeOffset.UtcNow.ToString(CultureInfo.InvariantCulture);

TaskMessage message = new()
{
OrchestrationInstance = instance,
Expand All @@ -175,7 +187,8 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
Version = options?.Version ?? string.Empty,
OrchestrationInstance = instance,
ScheduledStartTime = options?.StartAt?.UtcDateTime,
Tags = options?.Tags != null ? options.Tags.ToDictionary(kvp => kvp.Key, kvp => kvp.Value) : null,
ParentTraceContext = Activity.Current is { } activity ? new Core.Tracing.DistributedTraceContext(activity.Id!, activity.TraceStateString) : null,
Tags = tags,
},
};

Expand Down
14 changes: 14 additions & 0 deletions src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ message TaskScheduledEvent {
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
TraceContext parentTraceContext = 4;
map<string, string> tags = 5;
}

message TaskCompletedEvent {
Expand Down Expand Up @@ -256,6 +257,7 @@ message ScheduleTaskAction {
string name = 1;
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
map<string, string> tags = 4;
}

message CreateSubOrchestrationAction {
Expand Down Expand Up @@ -343,6 +345,7 @@ message CreateInstanceRequest {
google.protobuf.StringValue executionId = 7;
map<string, string> tags = 8;
TraceContext parentTraceContext = 9;
google.protobuf.Timestamp requestTime = 10;
}

message OrchestrationIdReusePolicy {
Expand Down Expand Up @@ -490,6 +493,8 @@ message SignalEntityRequest {
google.protobuf.StringValue input = 3;
string requestId = 4;
google.protobuf.Timestamp scheduledTime = 5;
TraceContext parentTraceContext = 6;
google.protobuf.Timestamp requestTime = 7;
}

message SignalEntityResponse {
Expand Down Expand Up @@ -575,6 +580,7 @@ message OperationRequest {
string operation = 1;
string requestId = 2;
google.protobuf.StringValue input = 3;
TraceContext traceContext = 4;
}

message OperationResult {
Expand All @@ -591,10 +597,14 @@ message OperationInfo {

message OperationResultSuccess {
google.protobuf.StringValue result = 1;
google.protobuf.Timestamp startTimeUtc = 2;
google.protobuf.Timestamp endTimeUtc = 3;
}

message OperationResultFailure {
TaskFailureDetails failureDetails = 1;
google.protobuf.Timestamp startTimeUtc = 2;
google.protobuf.Timestamp endTimeUtc = 3;
}

message OperationAction {
Expand All @@ -610,6 +620,8 @@ message SendSignalAction {
string name = 2;
google.protobuf.StringValue input = 3;
google.protobuf.Timestamp scheduledTime = 4;
google.protobuf.Timestamp requestTime = 5;
TraceContext parentTraceContext = 6;
}

message StartNewOrchestrationAction {
Expand All @@ -618,6 +630,8 @@ message StartNewOrchestrationAction {
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
google.protobuf.Timestamp scheduledTime = 5;
google.protobuf.Timestamp requestTime = 6;
TraceContext parentTraceContext = 7;
}

message AbandonActivityTaskRequest {
Expand Down
4 changes: 2 additions & 2 deletions src/Grpc/versions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The following files were downloaded from branch main at 2025-04-23 23:27:00 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fbe5bb20835678099fc51a44993ed9b045dee5a6/protos/orchestrator_service.proto
# The following files were downloaded from branch main at 2025-06-02 21:12:34 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/fd9369c6a03d6af4e95285e432b7c4e943c06970/protos/orchestrator_service.proto
41 changes: 40 additions & 1 deletion src/Shared/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) Microsoft Corporation.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Buffers;
Expand All @@ -11,6 +11,7 @@
using DurableTask.Core.Entities;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.History;
using DurableTask.Core.Tracing;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using DTCore = DurableTask.Core;
Expand Down Expand Up @@ -590,6 +591,10 @@ internal static void ToEntityBatchRequest(
Operation = operationRequest.Operation,
Input = operationRequest.Input,
Id = Guid.Parse(operationRequest.RequestId),
TraceContext = operationRequest.TraceContext != null ?
new DistributedTraceContext(
operationRequest.TraceContext.TraceParent,
operationRequest.TraceContext.TraceState) : null,
};
}

Expand All @@ -612,12 +617,16 @@ internal static void ToEntityBatchRequest(
return new OperationResult()
{
Result = operationResult.Success.Result,
StartTimeUtc = operationResult.Success.StartTimeUtc?.ToDateTime(),
EndTimeUtc = operationResult.Success.EndTimeUtc?.ToDateTime(),
};

case P.OperationResult.ResultTypeOneofCase.Failure:
return new OperationResult()
{
FailureDetails = operationResult.Failure.FailureDetails.ToCore(),
StartTimeUtc = operationResult.Failure.StartTimeUtc?.ToDateTime(),
EndTimeUtc = operationResult.Failure.EndTimeUtc?.ToDateTime(),
};

default:
Expand Down Expand Up @@ -645,6 +654,8 @@ internal static void ToEntityBatchRequest(
Success = new P.OperationResultSuccess()
{
Result = operationResult.Result,
StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(),
EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(),
},
};
}
Expand All @@ -655,6 +666,8 @@ internal static void ToEntityBatchRequest(
Failure = new P.OperationResultFailure()
{
FailureDetails = ToProtobuf(operationResult.FailureDetails),
StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(),
EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(),
},
};
}
Expand Down Expand Up @@ -683,6 +696,11 @@ internal static void ToEntityBatchRequest(
Input = operationAction.SendSignal.Input,
InstanceId = operationAction.SendSignal.InstanceId,
ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(),
RequestTime = operationAction.SendSignal.RequestTime?.ToDateTimeOffset(),
ParentTraceContext = operationAction.SendSignal.ParentTraceContext != null ?
new DistributedTraceContext(
operationAction.SendSignal.ParentTraceContext.TraceParent,
operationAction.SendSignal.ParentTraceContext.TraceState) : null,
};

case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration:
Expand All @@ -694,6 +712,11 @@ internal static void ToEntityBatchRequest(
InstanceId = operationAction.StartNewOrchestration.InstanceId,
Version = operationAction.StartNewOrchestration.Version,
ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(),
RequestTime = operationAction.StartNewOrchestration.RequestTime?.ToDateTimeOffset(),
ParentTraceContext = operationAction.StartNewOrchestration.ParentTraceContext != null ?
new DistributedTraceContext(
operationAction.StartNewOrchestration.ParentTraceContext.TraceParent,
operationAction.StartNewOrchestration.ParentTraceContext.TraceState) : null,
};
default:
throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported.");
Expand Down Expand Up @@ -725,6 +748,14 @@ internal static void ToEntityBatchRequest(
Input = sendSignalAction.Input,
InstanceId = sendSignalAction.InstanceId,
ScheduledTime = sendSignalAction.ScheduledTime?.ToTimestamp(),
RequestTime = sendSignalAction.RequestTime?.ToTimestamp(),
ParentTraceContext = sendSignalAction.ParentTraceContext != null ?
new P.TraceContext
{
TraceParent = sendSignalAction.ParentTraceContext.TraceParent,
TraceState = sendSignalAction.ParentTraceContext.TraceState,
}
: null,
};
break;

Expand All @@ -737,6 +768,14 @@ internal static void ToEntityBatchRequest(
Version = startNewOrchestrationAction.Version,
InstanceId = startNewOrchestrationAction.InstanceId,
ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(),
RequestTime = startNewOrchestrationAction.RequestTime?.ToTimestamp(),
ParentTraceContext = startNewOrchestrationAction.ParentTraceContext != null ?
new P.TraceContext
{
TraceParent = startNewOrchestrationAction.ParentTraceContext.TraceParent,
TraceState = startNewOrchestrationAction.ParentTraceContext.TraceState,
}
: null,
};
break;
}
Expand Down
Loading
Loading