Skip to content

Commit 96a6eda

Browse files
authored
Merge branch 'danielgerlag:master' into master
2 parents 66b6e45 + 06470ce commit 96a6eda

File tree

18 files changed

+174
-57
lines changed

18 files changed

+174
-57
lines changed

WorkflowCore.sln

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{EF47161E-E39
77
EndProject
88
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{F6AC9AEB-24EF-475A-B190-AA4D9E01270A}"
99
ProjectSection(SolutionItems) = preProject
10-
readme.md = readme.md
10+
README.md = README.md
1111
EndProjectSection
1212
EndProject
1313
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{5080DB09-CBE8-4C45-9957-C3BB7651755E}"

src/Directory.Build.props

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
55
<RepositoryType>git</RepositoryType>
66
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
7-
<Version>3.5.0</Version>
8-
<AssemblyVersion>3.5.0.0</AssemblyVersion>
9-
<FileVersion>3.5.0.0</FileVersion>
7+
<Version>3.5.1</Version>
8+
<AssemblyVersion>3.5.1.0</AssemblyVersion>
9+
<FileVersion>3.5.1.0</FileVersion>
1010
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
11-
<PackageVersion>3.5.0</PackageVersion>
11+
<PackageVersion>3.5.1</PackageVersion>
1212
</PropertyGroup>
1313
</Project>

src/WorkflowCore/Interface/IDistributedLockProvider.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ namespace WorkflowCore.Interface
99
/// </remarks>
1010
public interface IDistributedLockProvider
1111
{
12+
/// <summary>
13+
/// Acquire a lock on the specified resource.
14+
/// </summary>
15+
/// <param name="Id">Resource ID to lock.</param>
16+
/// <param name="cancellationToken"></param>
17+
/// <returns>`true`, if the lock was acquired.</returns>
1218
Task<bool> AcquireLock(string Id, CancellationToken cancellationToken);
1319

1420
Task ReleaseLock(string Id);

src/WorkflowCore/Services/LifeCycleEventPublisher.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class LifeCycleEventPublisher : ILifeCycleEventPublisher, IDisposable
1111
{
1212
private readonly ILifeCycleEventHub _eventHub;
1313
private readonly ILogger _logger;
14-
private readonly BlockingCollection<LifeCycleEvent> _outbox;
14+
private BlockingCollection<LifeCycleEvent> _outbox;
1515
private Task _dispatchTask;
1616

1717
public LifeCycleEventPublisher(ILifeCycleEventHub eventHub, ILoggerFactory loggerFactory)
@@ -36,6 +36,11 @@ public void Start()
3636
throw new InvalidOperationException();
3737
}
3838

39+
if (_outbox.IsAddingCompleted)
40+
{
41+
_outbox = new BlockingCollection<LifeCycleEvent>();
42+
}
43+
3944
_dispatchTask = new Task(Execute);
4045
_dispatchTask.Start();
4146
}

src/WorkflowCore/Services/WorkflowHost.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ public WorkflowHost(IPersistenceProvider persistenceStore, IQueueProvider queueP
4747
_searchIndex = searchIndex;
4848
_activityController = activityController;
4949
_lifeCycleEventHub = lifeCycleEventHub;
50-
_lifeCycleEventHub.Subscribe(HandleLifeCycleEvent);
5150
}
5251

5352
public Task<string> StartWorkflow(string workflowId, object data = null, string reference=null)
@@ -91,6 +90,10 @@ public async Task StartAsync(CancellationToken cancellationToken)
9190
await _lifeCycleEventHub.Start();
9291
await _searchIndex.Start();
9392

93+
// Event subscriptions are removed when stopping the event hub.
94+
// Add them when starting.
95+
AddEventSubscriptions();
96+
9497
Logger.LogInformation("Starting background tasks");
9598

9699
foreach (var task in _backgroundTasks)
@@ -181,5 +184,10 @@ public Task SubmitActivityFailure(string token, object result)
181184
{
182185
return _activityController.SubmitActivityFailure(token, result);
183186
}
187+
188+
private void AddEventSubscriptions()
189+
{
190+
_lifeCycleEventHub.Subscribe(HandleLifeCycleEvent);
191+
}
184192
}
185193
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
using System.Threading.Tasks;
1+
using System.Threading;
2+
using System.Threading.Tasks;
23

34
namespace WorkflowCore.Providers.Azure.Interface
45
{
56
public interface ICosmosDbProvisioner
67
{
7-
Task Provision(string dbId);
8+
Task Provision(string dbId, CancellationToken cancellationToken = default);
89
}
910
}

src/providers/WorkflowCore.Providers.Azure/ServiceCollectionExtensions.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,20 @@ public static WorkflowOptions UseAzureServiceBusEventHub(
2626
return options;
2727
}
2828

29-
public static WorkflowOptions UseCosmosDbPersistence(this WorkflowOptions options, string connectionString, string databaseId)
29+
public static WorkflowOptions UseCosmosDbPersistence(
30+
this WorkflowOptions options,
31+
string connectionString,
32+
string databaseId,
33+
CosmosDbStorageOptions cosmosDbStorageOptions = null)
3034
{
35+
if (cosmosDbStorageOptions == null)
36+
{
37+
cosmosDbStorageOptions = new CosmosDbStorageOptions();
38+
}
39+
3140
options.Services.AddSingleton<ICosmosClientFactory>(sp => new CosmosClientFactory(connectionString));
32-
options.Services.AddTransient<ICosmosDbProvisioner>(sp => new CosmosDbProvisioner(sp.GetService<ICosmosClientFactory>(), sp.GetService<ILoggerFactory>()));
33-
options.UsePersistence(sp => new CosmosDbPersistenceProvider(sp.GetService<ICosmosClientFactory>(), databaseId, sp.GetService<ICosmosDbProvisioner>()));
41+
options.Services.AddTransient<ICosmosDbProvisioner>(sp => new CosmosDbProvisioner(sp.GetService<ICosmosClientFactory>(), cosmosDbStorageOptions));
42+
options.UsePersistence(sp => new CosmosDbPersistenceProvider(sp.GetService<ICosmosClientFactory>(), databaseId, sp.GetService<ICosmosDbProvisioner>(), cosmosDbStorageOptions));
3443
return options;
3544
}
3645
}

src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,25 @@ namespace WorkflowCore.Providers.Azure.Services
1414
{
1515
public class CosmosDbPersistenceProvider : IPersistenceProvider
1616
{
17-
public const string WorkflowContainerName = "workflows";
18-
public const string EventContainerName = "events";
19-
public const string SubscriptionContainerName = "subscriptions";
20-
21-
private ICosmosDbProvisioner _provisioner;
22-
private string _dbId;
23-
private ICosmosClientFactory _clientFactory;
24-
private Lazy<Container> _workflowContainer;
25-
private Lazy<Container> _eventContainer;
26-
private Lazy<Container> _subscriptionContainer;
27-
28-
public CosmosDbPersistenceProvider(ICosmosClientFactory clientFactory, string dbId, ICosmosDbProvisioner provisioner)
17+
private readonly ICosmosDbProvisioner _provisioner;
18+
private readonly string _dbId;
19+
private readonly ICosmosClientFactory _clientFactory;
20+
private readonly Lazy<Container> _workflowContainer;
21+
private readonly Lazy<Container> _eventContainer;
22+
private readonly Lazy<Container> _subscriptionContainer;
23+
24+
public CosmosDbPersistenceProvider(
25+
ICosmosClientFactory clientFactory,
26+
string dbId,
27+
ICosmosDbProvisioner provisioner,
28+
CosmosDbStorageOptions cosmosDbStorageOptions)
2929
{
3030
_provisioner = provisioner;
3131
_dbId = dbId;
3232
_clientFactory = clientFactory;
33-
_workflowContainer = new Lazy<Container>(() => _clientFactory.GetCosmosClient().GetDatabase(_dbId).GetContainer(WorkflowContainerName));
34-
_eventContainer = new Lazy<Container>(() => _clientFactory.GetCosmosClient().GetDatabase(_dbId).GetContainer(EventContainerName));
35-
_subscriptionContainer = new Lazy<Container>(() => _clientFactory.GetCosmosClient().GetDatabase(_dbId).GetContainer(SubscriptionContainerName));
33+
_workflowContainer = new Lazy<Container>(() => _clientFactory.GetCosmosClient().GetDatabase(_dbId).GetContainer(cosmosDbStorageOptions.WorkflowContainerName));
34+
_eventContainer = new Lazy<Container>(() => _clientFactory.GetCosmosClient().GetDatabase(_dbId).GetContainer(cosmosDbStorageOptions.EventContainerName));
35+
_subscriptionContainer = new Lazy<Container>(() => _clientFactory.GetCosmosClient().GetDatabase(_dbId).GetContainer(cosmosDbStorageOptions.SubscriptionContainerName));
3636
}
3737

3838
public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default)

src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbProvisioner.cs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,37 @@
1-
using System;
1+
using System.Threading;
22
using System.Threading.Tasks;
33
using Microsoft.Azure.Cosmos;
4-
using Microsoft.Extensions.Logging;
54
using WorkflowCore.Providers.Azure.Interface;
65

76
namespace WorkflowCore.Providers.Azure.Services
87
{
98
public class CosmosDbProvisioner : ICosmosDbProvisioner
109
{
10+
private readonly ICosmosClientFactory _clientFactory;
11+
private readonly CosmosDbStorageOptions _cosmosDbStorageOptions;
1112

12-
private ICosmosClientFactory _clientFactory;
13-
14-
public CosmosDbProvisioner(ICosmosClientFactory clientFactory, ILoggerFactory loggerFactory)
13+
public CosmosDbProvisioner(
14+
ICosmosClientFactory clientFactory,
15+
CosmosDbStorageOptions cosmosDbStorageOptions)
1516
{
1617
_clientFactory = clientFactory;
18+
_cosmosDbStorageOptions = cosmosDbStorageOptions;
1719
}
1820

19-
public async Task Provision(string dbId)
21+
public async Task Provision(string dbId, CancellationToken cancellationToken = default)
2022
{
21-
var dbResp = await _clientFactory.GetCosmosClient().CreateDatabaseIfNotExistsAsync(dbId);
23+
var dbResp = await _clientFactory.GetCosmosClient().CreateDatabaseIfNotExistsAsync(dbId, cancellationToken: cancellationToken);
2224
var wfIndexPolicy = new IndexingPolicy();
2325
wfIndexPolicy.IncludedPaths.Add(new IncludedPath { Path = @"/*" });
2426
wfIndexPolicy.ExcludedPaths.Add(new ExcludedPath { Path = @"/ExecutionPointers/?" });
2527

2628
Task.WaitAll(
27-
dbResp.Database.CreateContainerIfNotExistsAsync(new ContainerProperties(CosmosDbPersistenceProvider.WorkflowContainerName, @"/id")
29+
dbResp.Database.CreateContainerIfNotExistsAsync(new ContainerProperties(_cosmosDbStorageOptions.WorkflowContainerName, @"/id")
2830
{
2931
IndexingPolicy = wfIndexPolicy
3032
}),
31-
dbResp.Database.CreateContainerIfNotExistsAsync(new ContainerProperties(CosmosDbPersistenceProvider.EventContainerName, @"/id")),
32-
dbResp.Database.CreateContainerIfNotExistsAsync(new ContainerProperties(CosmosDbPersistenceProvider.SubscriptionContainerName, @"/id"))
33+
dbResp.Database.CreateContainerIfNotExistsAsync(new ContainerProperties(_cosmosDbStorageOptions.EventContainerName, @"/id")),
34+
dbResp.Database.CreateContainerIfNotExistsAsync(new ContainerProperties(_cosmosDbStorageOptions.SubscriptionContainerName, @"/id"))
3335
);
3436
}
3537
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
namespace WorkflowCore.Providers.Azure.Services
2+
{
3+
public sealed class CosmosDbStorageOptions
4+
{
5+
/// <summary>
6+
/// The default name of workflow container.
7+
/// </summary>
8+
public const string DefaultWorkflowContainerName = "workflows";
9+
10+
/// <summary>
11+
/// The name of Workflow container in Cosmos DB.
12+
/// </summary>
13+
public string WorkflowContainerName { get; set; } = DefaultWorkflowContainerName;
14+
15+
/// <summary>
16+
/// The default name of event container.
17+
/// </summary>
18+
public const string DefaultEventContainerName = "events";
19+
20+
/// <summary>
21+
/// The name of Event container in Cosmos DB.
22+
/// </summary>
23+
public string EventContainerName { get; set; } = DefaultEventContainerName;
24+
25+
/// <summary>
26+
/// The default name of subscription container.
27+
/// </summary>
28+
public const string DefaultSubscriptionContainerName = "subscriptions";
29+
30+
/// <summary>
31+
/// The name of Subscription container in Cosmos DB.
32+
/// </summary>
33+
public string SubscriptionContainerName { get; set; } = DefaultSubscriptionContainerName;
34+
}
35+
}

0 commit comments

Comments
 (0)