Skip to content

Commit 2c46deb

Browse files
committed
Add entity support to taskhub shim
1 parent 3bd76fa commit 2c46deb

File tree

6 files changed

+245
-34
lines changed

6 files changed

+245
-34
lines changed

src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,20 @@ namespace Microsoft.DurableTask.Worker;
1010
/// <summary>
1111
/// The default builder for durable task.
1212
/// </summary>
13-
public class DefaultDurableTaskWorkerBuilder : IDurableTaskWorkerBuilder
13+
/// <remarks>
14+
/// Initializes a new instance of the <see cref="DefaultDurableTaskWorkerBuilder" /> class.
15+
/// </remarks>
16+
/// <param name="services">The service collection for this builder.</param>
17+
/// <param name="name">The name for this builder.</param>
18+
public class DefaultDurableTaskWorkerBuilder(string? name, IServiceCollection services) : IDurableTaskWorkerBuilder
1419
{
1520
Type? buildTarget;
1621

17-
/// <summary>
18-
/// Initializes a new instance of the <see cref="DefaultDurableTaskWorkerBuilder" /> class.
19-
/// </summary>
20-
/// <param name="services">The service collection for this builder.</param>
21-
/// <param name="name">The name for this builder.</param>
22-
public DefaultDurableTaskWorkerBuilder(string? name, IServiceCollection services)
23-
{
24-
this.Name = name ?? Extensions.Options.Options.DefaultName;
25-
this.Services = Check.NotNull(services);
26-
}
27-
2822
/// <inheritdoc/>
29-
public string Name { get; }
23+
public string Name { get; } = name ?? Extensions.Options.Options.DefaultName;
3024

3125
/// <inheritdoc/>
32-
public IServiceCollection Services { get; }
26+
public IServiceCollection Services { get; } = Check.NotNull(services);
3327

3428
/// <inheritdoc/>
3529
public Type? BuildTarget

src/Worker/Core/Hosting/DurableTaskWorker.cs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,21 @@ namespace Microsoft.DurableTask.Worker.Hosting;
88
/// <summary>
99
/// Base class for durable workers.
1010
/// </summary>
11-
public abstract class DurableTaskWorker : BackgroundService
11+
/// <remarks>
12+
/// Initializes a new instance of the <see cref="DurableTaskWorker" /> class.
13+
/// </remarks>
14+
/// <param name="name">The name of the worker.</param>
15+
/// <param name="factory">The durable factory.</param>
16+
public abstract class DurableTaskWorker(string? name, IDurableTaskFactory factory) : BackgroundService
1217
{
13-
/// <summary>
14-
/// Initializes a new instance of the <see cref="DurableTaskWorker" /> class.
15-
/// </summary>
16-
/// <param name="name">The name of the worker.</param>
17-
/// <param name="factory">The durable factory.</param>
18-
protected DurableTaskWorker(string? name, IDurableTaskFactory factory)
19-
{
20-
this.Name = name ?? Microsoft.Extensions.Options.Options.DefaultName;
21-
this.Factory = Check.NotNull(factory);
22-
}
23-
2418
/// <summary>
2519
/// Gets the name of this worker.
2620
/// </summary>
27-
protected virtual string Name { get; }
21+
protected virtual string Name { get; } = name ?? Microsoft.Extensions.Options.Options.DefaultName;
2822

2923
/// <summary>
3024
/// Gets the <see cref="IDurableTaskFactory" /> which has been initialized from
3125
/// the configured tasks during host construction.
3226
/// </summary>
33-
protected virtual IDurableTaskFactory Factory { get; }
27+
protected virtual IDurableTaskFactory Factory { get; } = Check.NotNull(factory);
3428
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
6+
namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core;
7+
8+
/// <summary>
9+
/// An <see cref="IOrchestrationService"/> that does not support entities.
10+
/// </summary>
11+
/// <remarks>
12+
/// This is used to suppress entity support when <see cref="DurableTaskWorkerOptions.EnableEntitySupport"/> is false.
13+
/// </remarks>
14+
sealed class OrchestrationServiceNoEntities(IOrchestrationService service) : IOrchestrationService
15+
{
16+
/// <inheritdoc/>
17+
public int TaskOrchestrationDispatcherCount => service.TaskOrchestrationDispatcherCount;
18+
19+
/// <inheritdoc/>
20+
public int MaxConcurrentTaskOrchestrationWorkItems => service.MaxConcurrentTaskOrchestrationWorkItems;
21+
22+
/// <inheritdoc/>
23+
public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => service.EventBehaviourForContinueAsNew;
24+
25+
/// <inheritdoc/>
26+
public int TaskActivityDispatcherCount => service.TaskActivityDispatcherCount;
27+
28+
/// <inheritdoc/>
29+
public int MaxConcurrentTaskActivityWorkItems => service.MaxConcurrentTaskActivityWorkItems;
30+
31+
/// <inheritdoc/>
32+
public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
33+
=> service.AbandonTaskActivityWorkItemAsync(workItem);
34+
35+
/// <inheritdoc/>
36+
public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
37+
=> service.AbandonTaskOrchestrationWorkItemAsync(workItem);
38+
39+
/// <inheritdoc/>
40+
public Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
41+
=> service.CompleteTaskActivityWorkItemAsync(workItem, responseMessage);
42+
43+
/// <inheritdoc/>
44+
public Task CompleteTaskOrchestrationWorkItemAsync(
45+
TaskOrchestrationWorkItem workItem,
46+
OrchestrationRuntimeState newOrchestrationRuntimeState,
47+
IList<TaskMessage> outboundMessages,
48+
IList<TaskMessage> orchestratorMessages,
49+
IList<TaskMessage> timerMessages,
50+
TaskMessage continuedAsNewMessage,
51+
OrchestrationState orchestrationState)
52+
=> service.CompleteTaskOrchestrationWorkItemAsync(
53+
workItem,
54+
newOrchestrationRuntimeState,
55+
outboundMessages,
56+
orchestratorMessages,
57+
timerMessages,
58+
continuedAsNewMessage,
59+
orchestrationState);
60+
61+
/// <inheritdoc/>
62+
public Task CreateAsync() => service.CreateAsync();
63+
64+
/// <inheritdoc/>
65+
public Task CreateAsync(bool recreateInstanceStore) => service.CreateAsync(recreateInstanceStore);
66+
67+
/// <inheritdoc/>
68+
public Task CreateIfNotExistsAsync() => service.CreateIfNotExistsAsync();
69+
70+
/// <inheritdoc/>
71+
public Task DeleteAsync() => service.DeleteAsync();
72+
73+
/// <inheritdoc/>
74+
public Task DeleteAsync(bool deleteInstanceStore) => service.DeleteAsync(deleteInstanceStore);
75+
76+
/// <inheritdoc/>
77+
public int GetDelayInSecondsAfterOnFetchException(Exception exception)
78+
=> service.GetDelayInSecondsAfterOnFetchException(exception);
79+
80+
/// <inheritdoc/>
81+
public int GetDelayInSecondsAfterOnProcessException(Exception exception)
82+
=> service.GetDelayInSecondsAfterOnProcessException(exception);
83+
84+
/// <inheritdoc/>
85+
public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
86+
=> service.IsMaxMessageCountExceeded(currentMessageCount, runtimeState);
87+
88+
/// <inheritdoc/>
89+
public Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(
90+
TimeSpan receiveTimeout, CancellationToken cancellationToken)
91+
=> service.LockNextTaskActivityWorkItem(receiveTimeout, cancellationToken);
92+
93+
/// <inheritdoc/>
94+
public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
95+
TimeSpan receiveTimeout, CancellationToken cancellationToken)
96+
=> service.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
97+
98+
/// <inheritdoc/>
99+
public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
100+
=> service.ReleaseTaskOrchestrationWorkItemAsync(workItem);
101+
102+
/// <inheritdoc/>
103+
public Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
104+
=> service.RenewTaskActivityWorkItemLockAsync(workItem);
105+
106+
/// <inheritdoc/>
107+
public Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
108+
=> service.RenewTaskOrchestrationWorkItemLockAsync(workItem);
109+
110+
/// <inheritdoc/>
111+
public Task StartAsync() => service.StartAsync();
112+
113+
/// <inheritdoc/>
114+
public Task StopAsync() => service.StopAsync();
115+
116+
/// <inheritdoc/>
117+
public Task StopAsync(bool isForced) => service.StopAsync(isForced);
118+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using DurableTask.Core;
5+
using DurableTask.Core.Entities;
6+
7+
namespace Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core;
8+
9+
/// <summary>
10+
/// A shim activity manager which allows for creating the actual activity in the middleware.
11+
/// </summary>
12+
sealed class ShimEntityManager : INameVersionObjectManager<TaskEntity>
13+
{
14+
/// <inheritdoc/>
15+
public void Add(ObjectCreator<TaskEntity> creator) => throw new NotSupportedException();
16+
17+
/// <inheritdoc/>
18+
public TaskEntity? GetObject(string name, string? version) => null;
19+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using Microsoft.DurableTask.Entities;
5+
using Microsoft.Extensions.Logging;
6+
7+
namespace Microsoft.DurableTask
8+
{
9+
/// <summary>
10+
/// Log messages.
11+
/// </summary>
12+
static partial class Logs
13+
{
14+
[LoggerMessage(EventId = 0, Level = LogLevel.Information, Message = "Entity support not enabled via options. Entities will be disabled.")]
15+
public static partial void EntitiesDisabled(this ILogger logger);
16+
17+
[LoggerMessage(EventId = 1, Level = LogLevel.Warning, Message = "Entity support is enabled, but the IDurableTaskFactory does not support entities.")]
18+
public static partial void TaskFactoryDoesNotSupportEntities(this ILogger logger);
19+
}
20+
}

src/Worker/OrchestrationServiceShim/ShimDurableTaskWorker.cs

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using System.Diagnostics;
45
using DurableTask.Core;
6+
using DurableTask.Core.Entities;
7+
using DurableTask.Core.Entities.OperationFormat;
58
using DurableTask.Core.History;
69
using DurableTask.Core.Middleware;
10+
using Microsoft.DurableTask.Entities;
711
using Microsoft.DurableTask.Worker.Hosting;
812
using Microsoft.DurableTask.Worker.OrchestrationServiceShim.Core;
913
using Microsoft.DurableTask.Worker.Shims;
@@ -21,8 +25,9 @@ class ShimDurableTaskWorker : DurableTaskWorker
2125
{
2226
readonly ShimDurableTaskWorkerOptions options;
2327
readonly IServiceProvider services;
24-
readonly TaskHubWorker worker;
2528
readonly DurableTaskShimFactory shimFactory;
29+
readonly ILogger logger;
30+
readonly TaskHubWorker worker;
2631

2732
/// <summary>
2833
/// Initializes a new instance of the <see cref="ShimDurableTaskWorker" /> class.
@@ -42,13 +47,12 @@ public ShimDurableTaskWorker(
4247
this.options = Check.NotNull(options).Get(name);
4348
this.services = Check.NotNull(services);
4449
this.shimFactory = new(this.options, loggerFactory);
50+
this.logger = loggerFactory.CreateLogger<ShimDurableTaskWorker>();
4551

4652
// This should already be validated by options.
4753
IOrchestrationService service = Verify.NotNull(this.options.Service);
48-
this.worker = new TaskHubWorker(
49-
service, new ShimOrchestrationManager(), new ShimActivityManager(), loggerFactory);
50-
this.worker.AddActivityDispatcherMiddleware(this.InvokeActivityAsync);
51-
this.worker.AddOrchestrationDispatcherMiddleware(this.InvokeOrchestrationAsync);
54+
this.worker = service is IEntityOrchestrationService entity
55+
? this.CreateWorker(entity, loggerFactory) : this.CreateWorker(service, loggerFactory);
5256
}
5357

5458
/// <inheritdoc/>
@@ -64,6 +68,43 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken)
6468
return this.worker.StartAsync().WaitAsync(stoppingToken);
6569
}
6670

71+
TaskHubWorker CreateWorker(IOrchestrationService service, ILoggerFactory loggerFactory)
72+
{
73+
TaskHubWorker worker = new(
74+
service, new ShimOrchestrationManager(), new ShimActivityManager(), loggerFactory);
75+
worker.AddActivityDispatcherMiddleware(this.InvokeActivityAsync);
76+
worker.AddOrchestrationDispatcherMiddleware(this.InvokeOrchestrationAsync);
77+
78+
return worker;
79+
}
80+
81+
TaskHubWorker CreateWorker(IEntityOrchestrationService service, ILoggerFactory loggerFactory)
82+
{
83+
if (!this.options.EnableEntitySupport)
84+
{
85+
this.logger.EntitiesDisabled();
86+
return this.CreateWorker(new OrchestrationServiceNoEntities(service), loggerFactory);
87+
}
88+
89+
if (this.Factory is not IDurableTaskFactory2)
90+
{
91+
this.logger.TaskFactoryDoesNotSupportEntities();
92+
return this.CreateWorker(new OrchestrationServiceNoEntities(service), loggerFactory);
93+
}
94+
95+
TaskHubWorker worker = new(
96+
service,
97+
new ShimOrchestrationManager(),
98+
new ShimActivityManager(),
99+
new ShimEntityManager(),
100+
loggerFactory);
101+
worker.AddActivityDispatcherMiddleware(this.InvokeActivityAsync);
102+
worker.AddOrchestrationDispatcherMiddleware(this.InvokeOrchestrationAsync);
103+
worker.AddEntityDispatcherMiddleware(this.InvokeEntityAsync);
104+
105+
return worker;
106+
}
107+
67108
async Task InvokeActivityAsync(DispatchMiddlewareContext context, Func<Task> next)
68109
{
69110
Check.NotNull(context);
@@ -97,9 +138,9 @@ async Task InvokeOrchestrationAsync(DispatchMiddlewareContext context, Func<Task
97138
Check.NotNull(next);
98139

99140
OrchestrationRuntimeState runtimeState = context.GetProperty<OrchestrationRuntimeState>();
100-
await using AsyncServiceScope scope = this.services.CreateAsyncScope();
101141

102142
TaskName name = new(runtimeState.Name);
143+
await using AsyncServiceScope scope = this.services.CreateAsyncScope();
103144
if (!this.Factory.TryCreateOrchestrator(name, scope.ServiceProvider, out ITaskOrchestrator? orchestrator))
104145
{
105146
throw new InvalidOperationException($"Orchestrator not found: {name}");
@@ -118,4 +159,29 @@ async Task InvokeOrchestrationAsync(DispatchMiddlewareContext context, Func<Task
118159
context.SetProperty(result);
119160
await next();
120161
}
162+
163+
async Task InvokeEntityAsync(DispatchMiddlewareContext context, Func<Task> next)
164+
{
165+
Check.NotNull(context);
166+
Check.NotNull(next);
167+
168+
EntityBatchRequest request = context.GetProperty<EntityBatchRequest>();
169+
if (request?.InstanceId is null)
170+
{
171+
throw new InvalidOperationException("EntityBatchRequest.InstanceId is not set.");
172+
}
173+
174+
EntityId entityId = EntityId.FromString(request.InstanceId);
175+
IDurableTaskFactory2 factory = (IDurableTaskFactory2)this.Factory; // verified castable at startup.
176+
await using AsyncServiceScope scope = this.services.CreateAsyncScope();
177+
if (!factory.TryCreateEntity(entityId.Name, this.services, out ITaskEntity? entity))
178+
{
179+
throw new InvalidOperationException($"Entity not found: {entityId.Name}");
180+
}
181+
182+
TaskEntity shim = this.shimFactory.CreateEntity(entityId.Name, entity, entityId);
183+
EntityBatchResult result = await shim.ExecuteOperationBatchAsync(request);
184+
context.SetProperty(result);
185+
await next();
186+
}
121187
}

0 commit comments

Comments
 (0)