Skip to content

Commit 7dea069

Browse files
authored
Redis persistence provider (#253)
1 parent 6ba5b71 commit 7dea069

29 files changed

+417
-20
lines changed

WorkflowCore.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Providers.Elas
130130
EndProject
131131
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Tests.Elasticsearch", "test\WorkflowCore.Tests.Elasticsearch\WorkflowCore.Tests.Elasticsearch.csproj", "{44644716-0CE8-4837-B189-AB65AE2106AA}"
132132
EndProject
133+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Tests.Redis", "test\WorkflowCore.Tests.Redis\WorkflowCore.Tests.Redis.csproj", "{78217204-B873-40B9-8875-E3925B2FBCEC}"
134+
EndProject
133135
Global
134136
GlobalSection(SolutionConfigurationPlatforms) = preSolution
135137
Debug|Any CPU = Debug|Any CPU
@@ -328,6 +330,10 @@ Global
328330
{44644716-0CE8-4837-B189-AB65AE2106AA}.Debug|Any CPU.Build.0 = Debug|Any CPU
329331
{44644716-0CE8-4837-B189-AB65AE2106AA}.Release|Any CPU.ActiveCfg = Release|Any CPU
330332
{44644716-0CE8-4837-B189-AB65AE2106AA}.Release|Any CPU.Build.0 = Release|Any CPU
333+
{78217204-B873-40B9-8875-E3925B2FBCEC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
334+
{78217204-B873-40B9-8875-E3925B2FBCEC}.Debug|Any CPU.Build.0 = Debug|Any CPU
335+
{78217204-B873-40B9-8875-E3925B2FBCEC}.Release|Any CPU.ActiveCfg = Release|Any CPU
336+
{78217204-B873-40B9-8875-E3925B2FBCEC}.Release|Any CPU.Build.0 = Release|Any CPU
331337
EndGlobalSection
332338
GlobalSection(SolutionProperties) = preSolution
333339
HideSolutionNode = FALSE
@@ -384,6 +390,7 @@ Global
384390
{435C6263-C6F8-4E93-B417-D861E9C22E18} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
385391
{F6348170-B695-4D97-BAE6-4F0F643F3BEF} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
386392
{44644716-0CE8-4837-B189-AB65AE2106AA} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
393+
{78217204-B873-40B9-8875-E3925B2FBCEC} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
387394
EndGlobalSection
388395
GlobalSection(ExtensibilityGlobals) = postSolution
389396
SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4}

src/providers/WorkflowCore.Providers.AWS/Services/DynamoDbProvisioner.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace WorkflowCore.Providers.AWS.Services
1313
public class DynamoDbProvisioner : IDynamoDbProvisioner
1414
{
1515
private readonly ILogger _logger;
16-
private readonly AmazonDynamoDBClient _client;
16+
private readonly IAmazonDynamoDB _client;
1717
private readonly string _tablePrefix;
1818

1919
public DynamoDbProvisioner(AWSCredentials credentials, AmazonDynamoDBConfig config, string tablePrefix, ILoggerFactory logFactory)

src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace WorkflowCore.Providers.AWS.Services
1313
public class DynamoLockProvider : IDistributedLockProvider
1414
{
1515
private readonly ILogger _logger;
16-
private readonly AmazonDynamoDBClient _client;
16+
private readonly IAmazonDynamoDB _client;
1717
private readonly string _tableName;
1818
private readonly string _nodeId;
1919
private readonly long _ttl = 30000;

src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace WorkflowCore.Providers.AWS.Services
1515
public class DynamoPersistenceProvider : IPersistenceProvider
1616
{
1717
private readonly ILogger _logger;
18-
private readonly AmazonDynamoDBClient _client;
18+
private readonly IAmazonDynamoDB _client;
1919
private readonly string _tablePrefix;
2020
private readonly IDynamoDbProvisioner _provisioner;
2121

src/providers/WorkflowCore.Providers.Redis/README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Redis providers for Workflow Core
22

3+
* Provides Persistence support on [Workflow Core](../../README.md) backed by Redis.
34
* Provides Queueing support on [Workflow Core](../../README.md) backed by Redis.
45
* Provides Distributed locking support on [Workflow Core](../../README.md) backed by Redis.
56
* Provides event hub support on [Workflow Core](../../README.md) backed by Redis.
@@ -23,15 +24,17 @@ dotnet add package WorkflowCore.Providers.Redis
2324
## Usage
2425

2526
Use the `IServiceCollection` extension methods when building your service provider
27+
* .UseRedisPersistence
2628
* .UseRedisQueues
2729
* .UseRedisLocking
2830
* .UseRedisEventHub
2931

3032
```C#
3133
services.AddWorkflow(cfg =>
3234
{
35+
cfg.UseRedisPersistence("localhost:6379", "app-name");
3336
cfg.UseRedisLocking("localhost:6379");
34-
cfg.UseRedisQueues("localhost:6379", "my-app");
35-
cfg.UseRedisEventHub("localhost:6379", "my-channel")
37+
cfg.UseRedisQueues("localhost:6379", "app-name");
38+
cfg.UseRedisEventHub("localhost:6379", "channel-name")
3639
});
3740
```

src/providers/WorkflowCore.Providers.Redis/ServiceCollectionExtensions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ public static WorkflowOptions UseRedisLocking(this WorkflowOptions options, stri
2020
return options;
2121
}
2222

23+
public static WorkflowOptions UseRedisPersistence(this WorkflowOptions options, string connectionString, string prefix)
24+
{
25+
options.UsePersistence(sp => new RedisPersistenceProvider(connectionString, prefix, sp.GetService<ILoggerFactory>()));
26+
return options;
27+
}
28+
2329
public static WorkflowOptions UseRedisEventHub(this WorkflowOptions options, string connectionString, string channel)
2430
{
2531
options.UseEventHub(sp => new RedisLifeCycleEventHub(connectionString, channel, sp.GetService<ILoggerFactory>()));
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Logging;
6+
using Newtonsoft.Json;
7+
using StackExchange.Redis;
8+
using WorkflowCore.Interface;
9+
using WorkflowCore.Models;
10+
11+
namespace WorkflowCore.Providers.Redis.Services
12+
{
13+
public class RedisPersistenceProvider : IPersistenceProvider
14+
{
15+
private readonly ILogger _logger;
16+
private readonly string _connectionString;
17+
private readonly string _prefix;
18+
private const string WORKFLOW_SET = "workflows";
19+
private const string SUBSCRIPTION_SET = "subscriptions";
20+
private const string EVENT_SET = "events";
21+
private const string RUNNABLE_INDEX = "runnable";
22+
private const string EVENTSLUG_INDEX = "eventslug";
23+
private readonly IConnectionMultiplexer _multiplexer;
24+
private readonly IDatabase _redis;
25+
26+
private readonly JsonSerializerSettings _serializerSettings = new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All };
27+
28+
public RedisPersistenceProvider(string connectionString, string prefix, ILoggerFactory logFactory)
29+
{
30+
_connectionString = connectionString;
31+
_prefix = prefix;
32+
_logger = logFactory.CreateLogger(GetType());
33+
_multiplexer = ConnectionMultiplexer.Connect(_connectionString);
34+
_redis = _multiplexer.GetDatabase();
35+
}
36+
37+
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
38+
{
39+
workflow.Id = Guid.NewGuid().ToString();
40+
await PersistWorkflow(workflow);
41+
return workflow.Id;
42+
}
43+
44+
public async Task PersistWorkflow(WorkflowInstance workflow)
45+
{
46+
var str = JsonConvert.SerializeObject(workflow, _serializerSettings);
47+
await _redis.HashSetAsync($"{_prefix}.{WORKFLOW_SET}", workflow.Id, str);
48+
49+
if ((workflow.Status == WorkflowStatus.Runnable) && (workflow.NextExecution.HasValue))
50+
await _redis.SortedSetAddAsync($"{_prefix}.{WORKFLOW_SET}.{RUNNABLE_INDEX}", workflow.Id, workflow.NextExecution.Value);
51+
else
52+
await _redis.SortedSetRemoveAsync($"{_prefix}.{WORKFLOW_SET}.{RUNNABLE_INDEX}", workflow.Id);
53+
}
54+
55+
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt)
56+
{
57+
var result = new List<string>();
58+
var data = await _redis.SortedSetRangeByScoreAsync($"{_prefix}.{WORKFLOW_SET}.{RUNNABLE_INDEX}", -1, DateTime.UtcNow.Ticks);
59+
60+
foreach (var item in data)
61+
result.Add(item);
62+
63+
return result;
64+
}
65+
66+
public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip,
67+
int take)
68+
{
69+
throw new NotImplementedException();
70+
}
71+
72+
public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
73+
{
74+
var raw = await _redis.HashGetAsync($"{_prefix}.{WORKFLOW_SET}", Id);
75+
return JsonConvert.DeserializeObject<WorkflowInstance>(raw, _serializerSettings);
76+
}
77+
78+
public async Task<string> CreateEventSubscription(EventSubscription subscription)
79+
{
80+
subscription.Id = Guid.NewGuid().ToString();
81+
var str = JsonConvert.SerializeObject(subscription, _serializerSettings);
82+
await _redis.HashSetAsync($"{_prefix}.{SUBSCRIPTION_SET}", subscription.Id, str);
83+
await _redis.SortedSetAddAsync($"{_prefix}.{SUBSCRIPTION_SET}.{EVENTSLUG_INDEX}.{subscription.EventName}-{subscription.EventKey}", subscription.Id, subscription.SubscribeAsOf.Ticks);
84+
85+
return subscription.Id;
86+
}
87+
88+
public async Task<IEnumerable<EventSubscription>> GetSubcriptions(string eventName, string eventKey, DateTime asOf)
89+
{
90+
var result = new List<EventSubscription>();
91+
var data = await _redis.SortedSetRangeByScoreAsync($"{_prefix}.{SUBSCRIPTION_SET}.{EVENTSLUG_INDEX}.{eventName}-{eventKey}", -1, asOf.Ticks);
92+
93+
foreach (var id in data)
94+
{
95+
var raw = await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", id);
96+
if (raw.HasValue)
97+
result.Add(JsonConvert.DeserializeObject<EventSubscription>(raw, _serializerSettings));
98+
}
99+
100+
return result;
101+
}
102+
103+
public async Task TerminateSubscription(string eventSubscriptionId)
104+
{
105+
var existingRaw = await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId);
106+
var existing = JsonConvert.DeserializeObject<EventSubscription>(existingRaw, _serializerSettings);
107+
await _redis.HashDeleteAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId);
108+
await _redis.SortedSetRemoveAsync($"{_prefix}.{SUBSCRIPTION_SET}.{EVENTSLUG_INDEX}.{existing.EventName}-{existing.EventKey}", eventSubscriptionId);
109+
}
110+
111+
public async Task<string> CreateEvent(Event newEvent)
112+
{
113+
newEvent.Id = Guid.NewGuid().ToString();
114+
var str = JsonConvert.SerializeObject(newEvent, _serializerSettings);
115+
await _redis.HashSetAsync($"{_prefix}.{EVENT_SET}", newEvent.Id, str);
116+
await _redis.SortedSetAddAsync($"{_prefix}.{EVENT_SET}.{EVENTSLUG_INDEX}.{newEvent.EventName}-{newEvent.EventKey}", newEvent.Id, newEvent.EventTime.Ticks);
117+
118+
if (newEvent.IsProcessed)
119+
await _redis.SortedSetRemoveAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", newEvent.Id);
120+
else
121+
await _redis.SortedSetAddAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", newEvent.Id, newEvent.EventTime.Ticks);
122+
123+
return newEvent.Id;
124+
}
125+
126+
public async Task<Event> GetEvent(string id)
127+
{
128+
var raw = await _redis.HashGetAsync($"{_prefix}.{EVENT_SET}", id);
129+
return JsonConvert.DeserializeObject<Event>(raw, _serializerSettings);
130+
}
131+
132+
public async Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt)
133+
{
134+
var result = new List<string>();
135+
var data = await _redis.SortedSetRangeByScoreAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", -1, asAt.Ticks);
136+
137+
foreach (var item in data)
138+
result.Add(item);
139+
140+
return result;
141+
}
142+
143+
public async Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf)
144+
{
145+
var result = new List<string>();
146+
var data = await _redis.SortedSetRangeByScoreAsync($"{_prefix}.{EVENT_SET}.{EVENTSLUG_INDEX}.{eventName}-{eventKey}", asOf.Ticks);
147+
148+
foreach (var id in data)
149+
result.Add(id);
150+
151+
return result;
152+
}
153+
154+
public async Task MarkEventProcessed(string id)
155+
{
156+
var evt = await GetEvent(id);
157+
evt.IsProcessed = true;
158+
var str = JsonConvert.SerializeObject(evt, _serializerSettings);
159+
await _redis.HashSetAsync($"{_prefix}.{EVENT_SET}", evt.Id, str);
160+
await _redis.SortedSetRemoveAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", id);
161+
}
162+
163+
public async Task MarkEventUnprocessed(string id)
164+
{
165+
var evt = await GetEvent(id);
166+
evt.IsProcessed = false;
167+
var str = JsonConvert.SerializeObject(evt, _serializerSettings);
168+
await _redis.HashSetAsync($"{_prefix}.{EVENT_SET}", evt.Id, str);
169+
await _redis.SortedSetAddAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", evt.Id, evt.EventTime.Ticks);
170+
}
171+
172+
public Task PersistErrors(IEnumerable<ExecutionError> errors)
173+
{
174+
return Task.CompletedTask;
175+
}
176+
177+
public void EnsureStoreExists()
178+
{
179+
}
180+
}
181+
}

src/providers/WorkflowCore.Providers.Redis/WorkflowCore.Providers.Redis.csproj

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@
22

33
<PropertyGroup>
44
<TargetFramework>netstandard2.0</TargetFramework>
5-
<Version>1.7.0</Version>
5+
<Version>1.8.0</Version>
66
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
77
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
88
<RepositoryType>git</RepositoryType>
99
<PackageProjectUrl>https://github.com/danielgerlag/workflow-core</PackageProjectUrl>
10-
<Description>Redis providers for Workflow Core
11-
12-
- Provides Queueing support on Workflow Core
13-
- Provides distributed locking support on Workflow Core</Description>
10+
<Description>Redis providers for Workflow Core (Persistence, queueing, distributed locking and event hubs)
11+
</Description>
1412
</PropertyGroup>
1513

1614
<ItemGroup>

src/samples/WorkflowCore.Sample04/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ private static IServiceProvider ConfigureServices()
7272

7373
//services.AddWorkflow(cfg =>
7474
//{
75+
// cfg.UseRedisPersistence("localhost:6379", "sample4");
7576
// cfg.UseRedisLocking("localhost:6379");
7677
// cfg.UseRedisQueues("localhost:6379", "sample4");
7778
// cfg.UseRedisEventHub("localhost:6379", "channel1");

test/WorkflowCore.Tests.DynamoDB/Scenarios/DynamoBasicScenario.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
using WorkflowCore.Tests.DynamoDB;
99
using Xunit;
1010

11-
namespace WorkflowCore.Tests.MongoDB.Scenarios
11+
namespace WorkflowCore.Tests.DynamoDB.Scenarios
1212
{
1313
[Collection("DynamoDb collection")]
1414
public class DynamoBasicScenario : BasicScenario

0 commit comments

Comments
 (0)