Skip to content

Commit b80fc13

Browse files
authored
Cosmos DB support (#649)
1 parent a95c36e commit b80fc13

File tree

11 files changed

+455
-4
lines changed

11 files changed

+455
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ There are several persistence providers available as separate Nuget packages.
132132

133133
* MemoryPersistenceProvider *(Default provider, for demo and testing purposes)*
134134
* [MongoDB](src/providers/WorkflowCore.Persistence.MongoDB)
135+
* [Cosmos DB](src/providers/WorkflowCore.Providers.Azure)
135136
* [Amazon DynamoDB](src/providers/WorkflowCore.Providers.AWS)
136137
* [SQL Server](src/providers/WorkflowCore.Persistence.SqlServer)
137138
* [PostgreSQL](src/providers/WorkflowCore.Persistence.PostgreSQL)

docs/persistence.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ There are several persistence providers available as separate Nuget packages.
99
* [PostgreSQL](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Persistence.PostgreSQL)
1010
* [Sqlite](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Persistence.Sqlite)
1111
* [Amazon DynamoDB](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.AWS)
12+
* [Cosmos DB](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.Azure)
1213
* [Redis](https://github.com/danielgerlag/workflow-core/tree/master/src/providers/WorkflowCore.Providers.Redis)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using System.Threading.Tasks;
2+
3+
namespace WorkflowCore.Providers.Azure.Interface
4+
{
5+
public interface ICosmosDbProvisioner
6+
{
7+
Task Provision(string dbId);
8+
}
9+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using Newtonsoft.Json;
2+
using System;
3+
using System.Collections.Generic;
4+
using WorkflowCore.Models;
5+
6+
namespace WorkflowCore.Providers.Azure.Models
7+
{
8+
public class PersistedEvent
9+
{
10+
public string id { get; set; }
11+
12+
public string EventName { get; set; }
13+
14+
public string EventKey { get; set; }
15+
16+
public string EventData { get; set; }
17+
18+
public DateTime EventTime { get; set; }
19+
20+
public bool IsProcessed { get; set; }
21+
22+
private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All };
23+
24+
public static PersistedEvent FromInstance(Event instance)
25+
{
26+
return new PersistedEvent()
27+
{
28+
id = instance.Id,
29+
EventKey = instance.EventKey,
30+
EventName = instance.EventName,
31+
EventTime = instance.EventTime,
32+
IsProcessed = instance.IsProcessed,
33+
EventData = JsonConvert.SerializeObject(instance.EventData, SerializerSettings),
34+
};
35+
}
36+
37+
public static Event ToInstance(PersistedEvent instance)
38+
{
39+
return new Event()
40+
{
41+
Id = instance.id,
42+
EventKey = instance.EventKey,
43+
EventName = instance.EventName,
44+
EventTime = instance.EventTime,
45+
IsProcessed = instance.IsProcessed,
46+
EventData = JsonConvert.DeserializeObject(instance.EventData, SerializerSettings),
47+
};
48+
}
49+
}
50+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using Newtonsoft.Json;
4+
using WorkflowCore.Models;
5+
6+
namespace WorkflowCore.Providers.Azure.Models
7+
{
8+
public class PersistedSubscription
9+
{
10+
public string id { get; set; }
11+
12+
public string WorkflowId { get; set; }
13+
14+
public int StepId { get; set; }
15+
16+
public string ExecutionPointerId { get; set; }
17+
18+
public string EventName { get; set; }
19+
20+
public string EventKey { get; set; }
21+
22+
public DateTime SubscribeAsOf { get; set; }
23+
24+
public string SubscriptionData { get; set; }
25+
26+
public string ExternalToken { get; set; }
27+
28+
public string ExternalWorkerId { get; set; }
29+
30+
public DateTime? ExternalTokenExpiry { get; set; }
31+
32+
private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All };
33+
34+
public static PersistedSubscription FromInstance(EventSubscription instance)
35+
{
36+
return new PersistedSubscription()
37+
{
38+
id = instance.Id,
39+
EventKey = instance.EventKey,
40+
EventName = instance.EventName,
41+
ExecutionPointerId = instance.ExecutionPointerId,
42+
ExternalToken = instance.ExternalToken,
43+
ExternalTokenExpiry = instance.ExternalTokenExpiry,
44+
ExternalWorkerId = instance.ExternalWorkerId,
45+
StepId = instance.StepId,
46+
SubscribeAsOf = instance.SubscribeAsOf,
47+
WorkflowId = instance.WorkflowId,
48+
SubscriptionData = JsonConvert.SerializeObject(instance.SubscriptionData, SerializerSettings),
49+
};
50+
}
51+
52+
public static EventSubscription ToInstance(PersistedSubscription instance)
53+
{
54+
return new EventSubscription()
55+
{
56+
Id = instance.id,
57+
EventKey = instance.EventKey,
58+
EventName = instance.EventName,
59+
ExecutionPointerId = instance.ExecutionPointerId,
60+
ExternalToken = instance.ExternalToken,
61+
ExternalTokenExpiry = instance.ExternalTokenExpiry,
62+
ExternalWorkerId = instance.ExternalWorkerId,
63+
StepId = instance.StepId,
64+
SubscribeAsOf = instance.SubscribeAsOf,
65+
WorkflowId = instance.WorkflowId,
66+
SubscriptionData = JsonConvert.DeserializeObject(instance.SubscriptionData, SerializerSettings),
67+
};
68+
}
69+
}
70+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
using Newtonsoft.Json;
2+
using System;
3+
using System.Collections.Generic;
4+
using WorkflowCore.Models;
5+
6+
namespace WorkflowCore.Providers.Azure.Models
7+
{
8+
public class PersistedWorkflow
9+
{
10+
public string id { get; set; }
11+
12+
public string WorkflowDefinitionId { get; set; }
13+
14+
public int Version { get; set; }
15+
16+
public string Description { get; set; }
17+
18+
public string Reference { get; set; }
19+
20+
public string ExecutionPointers { get; set; }
21+
22+
public long? NextExecution { get; set; }
23+
24+
public WorkflowStatus Status { get; set; }
25+
26+
public string Data { get; set; }
27+
28+
public DateTime CreateTime { get; set; }
29+
30+
public DateTime? CompleteTime { get; set; }
31+
32+
private static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All };
33+
34+
public static PersistedWorkflow FromInstance(WorkflowInstance instance)
35+
{
36+
var result = new PersistedWorkflow()
37+
{
38+
id = instance.Id,
39+
CompleteTime = instance.CompleteTime,
40+
CreateTime = instance.CreateTime,
41+
Description = instance.Description,
42+
NextExecution = instance.NextExecution,
43+
Reference = instance.Reference,
44+
Status = instance.Status,
45+
Version = instance.Version,
46+
WorkflowDefinitionId = instance.WorkflowDefinitionId,
47+
Data = JsonConvert.SerializeObject(instance.Data, SerializerSettings),
48+
ExecutionPointers = JsonConvert.SerializeObject(instance.ExecutionPointers, SerializerSettings),
49+
};
50+
51+
return result;
52+
}
53+
54+
public static WorkflowInstance ToInstance(PersistedWorkflow instance)
55+
{
56+
var result = new WorkflowInstance()
57+
{
58+
Id = instance.id,
59+
CompleteTime = instance.CompleteTime,
60+
CreateTime = instance.CreateTime,
61+
Description = instance.Description,
62+
NextExecution = instance.NextExecution,
63+
Reference = instance.Reference,
64+
Status = instance.Status,
65+
Version = instance.Version,
66+
WorkflowDefinitionId = instance.WorkflowDefinitionId,
67+
Data = JsonConvert.DeserializeObject(instance.Data, SerializerSettings),
68+
ExecutionPointers = JsonConvert.DeserializeObject<ExecutionPointerCollection>(instance.ExecutionPointers, SerializerSettings),
69+
};
70+
71+
return result;
72+
}
73+
74+
}
75+
}

src/providers/WorkflowCore.Providers.Azure/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* Provides [DLM](https://en.wikipedia.org/wiki/Distributed_lock_manager) support on [Workflow Core](../../README.md) using Azure Blob Storage leases.
44
* Provides Queueing support on [Workflow Core](../../README.md) using Azure Storage queues.
55
* Provides event hub support on [Workflow Core](../../README.md) backed by Azure Service Bus.
6+
* Provides persistence on [Workflow Core](../../README.md) backed by Azure Cosmos DB.
67

78
This makes it possible to have a cluster of nodes processing your workflows.
89

@@ -30,5 +31,6 @@ services.AddWorkflow(options =>
3031
{
3132
options.UseAzureSynchronization("azure storage connection string");
3233
options.UseAzureServiceBusEventHub("service bus connection string", "topic name", "subscription name");
34+
options.UseCosmosDbPersistence("connection string");
3335
});
3436
```

src/providers/WorkflowCore.Providers.Azure/ServiceCollectionExtensions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Microsoft.Extensions.Logging;
22
using WorkflowCore.Models;
3+
using WorkflowCore.Providers.Azure.Interface;
34
using WorkflowCore.Providers.Azure.Services;
45

56
namespace Microsoft.Extensions.DependencyInjection
@@ -24,5 +25,12 @@ public static WorkflowOptions UseAzureServiceBusEventHub(
2425

2526
return options;
2627
}
28+
29+
public static WorkflowOptions UseCosmosDbPersistence(this WorkflowOptions options, string connectionString, string databaseId)
30+
{
31+
options.Services.AddTransient<ICosmosDbProvisioner>(sp => new CosmosDbProvisioner(connectionString, sp.GetService<ILoggerFactory>()));
32+
options.UsePersistence(sp => new CosmosDbPersistenceProvider(connectionString, databaseId, sp.GetService<ICosmosDbProvisioner>()));
33+
return options;
34+
}
2735
}
2836
}

0 commit comments

Comments
 (0)