Skip to content

Commit 7e155d8

Browse files
committed
Update to latest protobuf definitions
1 parent 52e18fc commit 7e155d8

File tree

10 files changed

+74
-27
lines changed

10 files changed

+74
-27
lines changed

eng/targets/Release.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
</PropertyGroup>
1818

1919
<PropertyGroup>
20-
<VersionPrefix>1.4.0</VersionPrefix>
20+
<VersionPrefix>1.5.0</VersionPrefix>
2121
<VersionSuffix></VersionSuffix>
2222
</PropertyGroup>
2323

src/Abstractions/Abstractions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
1212
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
1313
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
14-
<PackageReference Include="System.Text.Json" Version="6.0.0" />
14+
<PackageReference Include="System.Text.Json" Version="6.0.10" />
1515
</ItemGroup>
1616

1717
<ItemGroup>

src/Shared/Grpc/ProtoUtils.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,18 +223,21 @@ internal static Timestamp ToTimestamp(this DateTime dateTime)
223223
/// <param name="instanceId">The orchestrator instance ID.</param>
224224
/// <param name="customStatus">The orchestrator customer status or <c>null</c> if no custom status.</param>
225225
/// <param name="actions">The orchestrator actions.</param>
226+
/// <param name="completionToken">The completion token from the work item.</param>
226227
/// <returns>The orchestrator response.</returns>
227228
/// <exception cref="NotSupportedException">When an orchestrator action is unknown.</exception>
228229
internal static P.OrchestratorResponse ConstructOrchestratorResponse(
229230
string instanceId,
230231
string? customStatus,
231-
IEnumerable<OrchestratorAction> actions)
232+
IEnumerable<OrchestratorAction> actions,
233+
string completionToken)
232234
{
233235
Check.NotNull(actions);
234236
var response = new P.OrchestratorResponse
235237
{
236238
InstanceId = instanceId,
237239
CustomStatus = customStatus,
240+
CompletionToken = completionToken,
238241
};
239242

240243
foreach (OrchestratorAction action in actions)
@@ -605,9 +608,12 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status)
605608
/// Converts a <see cref="EntityBatchResult" /> to <see cref="P.EntityBatchResult" />.
606609
/// </summary>
607610
/// <param name="entityBatchResult">The operation result to convert.</param>
611+
/// <param name="completionToken">The completion token from the work item.</param>
608612
/// <returns>The converted operation result.</returns>
609613
[return: NotNullIfNotNull(nameof(entityBatchResult))]
610-
internal static P.EntityBatchResult? ToEntityBatchResult(this EntityBatchResult? entityBatchResult)
614+
internal static P.EntityBatchResult? ToEntityBatchResult(
615+
this EntityBatchResult? entityBatchResult,
616+
string completionToken)
611617
{
612618
if (entityBatchResult == null)
613619
{
@@ -618,6 +624,7 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status)
618624
{
619625
EntityState = entityBatchResult.EntityState,
620626
FailureDetails = entityBatchResult.FailureDetails.ToProtobuf(),
627+
CompletionToken = completionToken,
621628
};
622629

623630
foreach (OperationAction action in entityBatchResult.Actions!)

src/Worker/Core/DurableTaskWorkerOptions.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,21 @@ public DataConverter DataConverter
8383
/// </remarks>
8484
public TimeSpan MaximumTimerInterval { get; set; } = TimeSpan.FromDays(3);
8585

86+
/// <summary>
87+
/// Gets or sets the maximum number of concurrent activity work items that can be processed by the worker.
88+
/// </summary>
89+
public int MaximumConcurrentActivityWorkItems { get; set; } = 100 * Environment.ProcessorCount;
90+
91+
/// <summary>
92+
/// Gets or sets the maximum number of concurrent orchestration work items that can be processed by the worker.
93+
/// </summary>
94+
public int MaximumConcurrentOrchestrationWorkItems { get; set; } = 100 * Environment.ProcessorCount;
95+
96+
/// <summary>
97+
/// Gets or sets the maximum number of concurrent entity work items that can be processed by the worker.
98+
/// </summary>
99+
public int MaximumConcurrentEntityWorkItems { get; set; } = 100 * Environment.ProcessorCount;
100+
86101
/// <summary>
87102
/// Gets a value indicating whether <see cref="DataConverter" /> was explicitly set or not.
88103
/// </summary>

src/Worker/Core/Worker.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ The worker is responsible for processing durable task work items.</PackageDescri
1111
<ItemGroup>
1212
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
1313
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
14-
<PackageReference Include="System.Text.Json" Version="6.0.0" />
14+
<PackageReference Include="System.Text.Json" Version="6.0.10" />
1515
</ItemGroup>
1616

1717
<ItemGroup>

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient sidec
3434
{
3535
this.worker = worker;
3636
this.sidecar = sidecar;
37-
this.shimFactory = new DurableTaskShimFactory(this.worker.options, this.worker.loggerFactory);
37+
this.shimFactory = new DurableTaskShimFactory(this.worker.grpcOptions, this.worker.loggerFactory);
3838
}
3939

4040
ILogger Logger => this.worker.logger;
@@ -102,7 +102,7 @@ static OrchestrationRuntimeState BuildRuntimeState(P.OrchestratorRequest request
102102

103103
if (runtimeState.ExecutionStartedEvent == null)
104104
{
105-
// TODO: What's the right way to handle this? Callback to the sidecar with a retryable error request?
105+
// TODO: What's the right way to handle this? Callback to the sidecar with a retriable error request?
106106
throw new InvalidOperationException("The provided orchestration history was incomplete");
107107
}
108108

@@ -133,8 +133,16 @@ static string GetActionsListForLogging(IReadOnlyList<P.OrchestratorAction> actio
133133
await this.sidecar!.HelloAsync(EmptyMessage, cancellationToken: cancellation);
134134
this.Logger.EstablishedWorkItemConnection();
135135

136+
DurableTaskWorkerOptions workerOptions = this.worker.workerOptions;
137+
136138
// Get the stream for receiving work-items
137-
return this.sidecar!.GetWorkItems(new P.GetWorkItemsRequest(), cancellationToken: cancellation);
139+
return this.sidecar!.GetWorkItems(
140+
new P.GetWorkItemsRequest
141+
{
142+
MaxConcurrentActivityWorkItems = workerOptions.MaximumConcurrentActivityWorkItems,
143+
MaxConcurrentOrchestrationWorkItems = workerOptions.MaximumConcurrentOrchestrationWorkItems,
144+
},
145+
cancellationToken: cancellation);
138146
}
139147

140148
async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, CancellationToken cancellation)
@@ -145,16 +153,25 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
145153
{
146154
if (workItem.RequestCase == P.WorkItem.RequestOneofCase.OrchestratorRequest)
147155
{
148-
this.RunBackgroundTask(workItem, () => this.OnRunOrchestratorAsync(
149-
workItem.OrchestratorRequest));
156+
this.RunBackgroundTask(
157+
workItem,
158+
() => this.OnRunOrchestratorAsync(workItem.OrchestratorRequest, workItem.CompletionToken));
150159
}
151160
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.ActivityRequest)
152161
{
153-
this.RunBackgroundTask(workItem, () => this.OnRunActivityAsync(workItem.ActivityRequest));
162+
this.RunBackgroundTask(
163+
workItem,
164+
() => this.OnRunActivityAsync(workItem.ActivityRequest, workItem.CompletionToken));
154165
}
155166
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequest)
156167
{
157-
this.RunBackgroundTask(workItem, () => this.OnRunEntityBatchAsync(workItem.EntityRequest));
168+
this.RunBackgroundTask(
169+
workItem,
170+
() => this.OnRunEntityBatchAsync(workItem.EntityRequest, workItem.CompletionToken));
171+
}
172+
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.HealthPing)
173+
{
174+
// No-op
158175
}
159176
else
160177
{
@@ -188,7 +205,7 @@ void RunBackgroundTask(P.WorkItem? workItem, Func<Task> handler)
188205
});
189206
}
190207

191-
async Task OnRunOrchestratorAsync(P.OrchestratorRequest request)
208+
async Task OnRunOrchestratorAsync(P.OrchestratorRequest request, string completionToken)
192209
{
193210
OrchestratorExecutionResult? result = null;
194211
P.TaskFailureDetails? failureDetails = null;
@@ -248,14 +265,16 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request)
248265
response = ProtoUtils.ConstructOrchestratorResponse(
249266
request.InstanceId,
250267
result.CustomStatus,
251-
result.Actions);
268+
result.Actions,
269+
completionToken);
252270
}
253271
else
254272
{
255273
// This is the case for failures that happened *outside* the orchestrator executor
256274
response = new P.OrchestratorResponse
257275
{
258276
InstanceId = request.InstanceId,
277+
CompletionToken = completionToken,
259278
Actions =
260279
{
261280
new P.OrchestratorAction
@@ -279,7 +298,7 @@ async Task OnRunOrchestratorAsync(P.OrchestratorRequest request)
279298
await this.sidecar.CompleteOrchestratorTaskAsync(response);
280299
}
281300

282-
async Task OnRunActivityAsync(P.ActivityRequest request)
301+
async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken)
283302
{
284303
OrchestrationInstance instance = request.OrchestrationInstance.ToCore();
285304
string rawInput = request.Input;
@@ -336,12 +355,13 @@ async Task OnRunActivityAsync(P.ActivityRequest request)
336355
TaskId = request.TaskId,
337356
Result = output,
338357
FailureDetails = failureDetails,
358+
CompletionToken = completionToken,
339359
};
340360

341361
await this.sidecar.CompleteActivityTaskAsync(response);
342362
}
343363

344-
async Task OnRunEntityBatchAsync(P.EntityBatchRequest request)
364+
async Task OnRunEntityBatchAsync(P.EntityBatchRequest request, string completionToken)
345365
{
346366
var coreEntityId = DTCore.Entities.EntityId.FromString(request.InstanceId);
347367
EntityId entityId = new(coreEntityId.Name, coreEntityId.Key);
@@ -400,7 +420,7 @@ async Task OnRunEntityBatchAsync(P.EntityBatchRequest request)
400420
}
401421

402422
// convert the result to protobuf format and send it back
403-
P.EntityBatchResult response = batchResult.ToEntityBatchResult();
423+
P.EntityBatchResult response = batchResult.ToEntityBatchResult(completionToken);
404424
await this.sidecar.CompleteEntityTaskAsync(response);
405425
}
406426
}

src/Worker/Grpc/GrpcDurableTaskWorker.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ namespace Microsoft.DurableTask.Worker.Grpc;
1212
/// </summary>
1313
sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
1414
{
15-
readonly GrpcDurableTaskWorkerOptions options;
15+
readonly GrpcDurableTaskWorkerOptions grpcOptions;
16+
readonly DurableTaskWorkerOptions workerOptions;
1617
readonly IServiceProvider services;
1718
readonly ILoggerFactory loggerFactory;
1819
readonly ILogger logger;
@@ -22,18 +23,21 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
2223
/// </summary>
2324
/// <param name="name">The name of the worker.</param>
2425
/// <param name="factory">The task factory.</param>
25-
/// <param name="options">The gRPC worker options.</param>
26+
/// <param name="grpcOptions">The gRPC-specific worker options.</param>
27+
/// <param name="workerOptions">The generic worker options.</param>
2628
/// <param name="services">The service provider.</param>
2729
/// <param name="loggerFactory">The logger.</param>
2830
public GrpcDurableTaskWorker(
2931
string name,
3032
IDurableTaskFactory factory,
31-
IOptionsMonitor<GrpcDurableTaskWorkerOptions> options,
33+
IOptionsMonitor<GrpcDurableTaskWorkerOptions> grpcOptions,
34+
IOptionsMonitor<DurableTaskWorkerOptions> workerOptions,
3235
IServiceProvider services,
3336
ILoggerFactory loggerFactory)
3437
: base(name, factory)
3538
{
36-
this.options = Check.NotNull(options).Get(name);
39+
this.grpcOptions = Check.NotNull(grpcOptions).Get(name);
40+
this.workerOptions = Check.NotNull(workerOptions).Get(name);
3741
this.services = Check.NotNull(services);
3842
this.loggerFactory = Check.NotNull(loggerFactory);
3943
this.logger = loggerFactory.CreateLogger("Microsoft.DurableTask"); // TODO: use better category name.
@@ -73,19 +77,19 @@ static GrpcChannel GetChannel(string? address)
7377

7478
AsyncDisposable GetCallInvoker(out CallInvoker callInvoker)
7579
{
76-
if (this.options.Channel is GrpcChannel c)
80+
if (this.grpcOptions.Channel is GrpcChannel c)
7781
{
7882
callInvoker = c.CreateCallInvoker();
7983
return default;
8084
}
8185

82-
if (this.options.CallInvoker is CallInvoker invoker)
86+
if (this.grpcOptions.CallInvoker is CallInvoker invoker)
8387
{
8488
callInvoker = invoker;
8589
return default;
8690
}
8791

88-
c = GetChannel(this.options.Address);
92+
c = GetChannel(this.grpcOptions.Address);
8993
callInvoker = c.CreateCallInvoker();
9094
return new AsyncDisposable(() => new(c.ShutdownAsync()));
9195
}

src/Worker/Grpc/GrpcEntityRunner.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public static async Task<string> LoadAndRunAsync(
6969
TaskEntity entity = factory.CreateEntity(entityName, implementation, id);
7070
EntityBatchResult result = await entity.ExecuteOperationBatchAsync(batch);
7171

72-
P.EntityBatchResult response = result.ToEntityBatchResult();
72+
P.EntityBatchResult response = result.ToEntityBatchResult(completionToken: string.Empty /* doesn't apply */);
7373
byte[] responseBytes = response.ToByteArray();
7474
return Convert.ToBase64String(responseBytes);
7575
}

src/Worker/Grpc/GrpcOrchestrationRunner.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ public static string LoadAndRun(
115115
P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse(
116116
request.InstanceId,
117117
result.CustomStatus,
118-
result.Actions);
118+
result.Actions,
119+
completionToken: string.Empty /* doesn't apply */);
119120
byte[] responseBytes = response.ToByteArray();
120121
return Convert.ToBase64String(responseBytes);
121122
}

0 commit comments

Comments
 (0)