Skip to content

Commit 202e7d3

Browse files
Fixed reliability of WorkflowConsumer when persisting workflow
1 parent 8fa1fb7 commit 202e7d3

File tree

11 files changed

+223
-15
lines changed

11 files changed

+223
-15
lines changed

src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ public interface IWorkflowRepository
1212

1313
Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
1414

15+
Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default);
16+
1517
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default);
1618

1719
[Obsolete]

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5555
finally
5656
{
5757
WorkflowActivity.Enrich(result);
58-
await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
58+
await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions, cancellationToken);
5959
await QueueProvider.QueueWork(itemId, QueueType.Index);
6060
_greylist.Remove($"wf:{itemId}");
6161
}
@@ -100,10 +100,6 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
100100

101101
private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
102102
{
103-
//TODO: move to own class
104-
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
105-
106-
await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
107103
if (subscription.EventName != Event.EventTypeActivity)
108104
{
109105
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);

src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,25 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _
4646
}
4747
}
4848

49+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
50+
{
51+
lock (_instances)
52+
{
53+
var existing = _instances.First(x => x.Id == workflow.Id);
54+
_instances.Remove(existing);
55+
_instances.Add(workflow);
56+
57+
lock (_subscriptions)
58+
{
59+
foreach (var subscription in subscriptions)
60+
{
61+
subscription.Id = Guid.NewGuid().ToString();
62+
_subscriptions.Add(subscription);
63+
}
64+
}
65+
}
66+
}
67+
4968
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken _ = default)
5069
{
5170
lock (_instances)

src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
5050

5151
public Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.PersistWorkflow(workflow);
5252

53+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
54+
{
55+
await PersistWorkflow(workflow, cancellationToken);
56+
57+
foreach(var subscription in subscriptions)
58+
{
59+
await CreateEventSubscription(subscription, cancellationToken);
60+
}
61+
}
62+
5363
public Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.TerminateSubscription(eventSubscriptionId);
5464
public Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.GetSubscription(eventSubscriptionId);
5565

src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,32 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
151151
await db.SaveChangesAsync(cancellationToken);
152152
}
153153
}
154+
155+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
156+
{
157+
using (var db = ConstructDbContext())
158+
{
159+
var uid = new Guid(workflow.Id);
160+
var existingEntity = await db.Set<PersistedWorkflow>()
161+
.Where(x => x.InstanceId == uid)
162+
.Include(wf => wf.ExecutionPointers)
163+
.ThenInclude(ep => ep.ExtensionAttributes)
164+
.Include(wf => wf.ExecutionPointers)
165+
.AsTracking()
166+
.FirstAsync(cancellationToken);
167+
168+
var workflowPersistable = workflow.ToPersistable(existingEntity);
169+
170+
foreach (var subscription in subscriptions)
171+
{
172+
subscription.Id = Guid.NewGuid().ToString();
173+
var subscriptionPersistable = subscription.ToPersistable();
174+
db.Set<PersistedSubscription>().Add(subscriptionPersistable);
175+
}
176+
177+
await db.SaveChangesAsync(cancellationToken);
178+
}
179+
}
154180

155181
public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default)
156182
{

src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,17 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
149149
await WorkflowInstances.ReplaceOneAsync(x => x.Id == workflow.Id, workflow, cancellationToken: cancellationToken);
150150
}
151151

152+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
153+
{
154+
using (var session = await _database.Client.StartSessionAsync())
155+
{
156+
session.StartTransaction();
157+
await PersistWorkflow(workflow, cancellationToken);
158+
await EventSubscriptions.InsertManyAsync(subscriptions, cancellationToken: cancellationToken);
159+
await session.CommitTransactionAsync();
160+
}
161+
}
162+
152163
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
153164
{
154165
var now = asAt.ToUniversalTime().Ticks;

src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using Raven.Client.Documents;
22
using Raven.Client.Documents.Linq;
33
using Raven.Client.Documents.Operations;
4+
using Raven.Client.Documents.Session;
45
using System;
56
using System.Collections.Generic;
67
using System.Linq;
@@ -56,21 +57,41 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
5657
{
5758
using (var session = _database.OpenAsyncSession())
5859
{
59-
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.WorkflowDefinitionId, workflow.WorkflowDefinitionId);
60-
session.Advanced.Patch<WorkflowInstance, int>(workflow.Id, x => x.Version, workflow.Version);
61-
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Description, workflow.Description);
62-
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Reference, workflow.Reference);
63-
session.Advanced.Patch<WorkflowInstance, ExecutionPointerCollection>(workflow.Id, x => x.ExecutionPointers, workflow.ExecutionPointers);
64-
session.Advanced.Patch<WorkflowInstance, long?>(workflow.Id, x => x.NextExecution, workflow.NextExecution);
65-
session.Advanced.Patch<WorkflowInstance, WorkflowStatus>(workflow.Id, x => x.Status, workflow.Status);
66-
session.Advanced.Patch<WorkflowInstance, object>(workflow.Id, x => x.Data, workflow.Data);
67-
session.Advanced.Patch<WorkflowInstance, DateTime>(workflow.Id, x => x.CreateTime, workflow.CreateTime);
68-
session.Advanced.Patch<WorkflowInstance, DateTime?>(workflow.Id, x => x.CompleteTime, workflow.CompleteTime);
60+
PatchSession(session, workflow);
61+
await session.SaveChangesAsync(cancellationToken);
62+
}
63+
}
64+
65+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
66+
{
67+
using (var session = _database.OpenAsyncSession())
68+
{
69+
PatchSession(session, workflow);
70+
71+
foreach (var subscription in subscriptions)
72+
{
73+
await session.StoreAsync(subscription, cancellationToken);
74+
}
6975

7076
await session.SaveChangesAsync(cancellationToken);
7177
}
7278
}
7379

80+
private void PatchSession(IAsyncDocumentSession session, WorkflowInstance workflow)
81+
{
82+
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.WorkflowDefinitionId, workflow.WorkflowDefinitionId);
83+
session.Advanced.Patch<WorkflowInstance, int>(workflow.Id, x => x.Version, workflow.Version);
84+
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Description, workflow.Description);
85+
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Reference, workflow.Reference);
86+
session.Advanced.Patch<WorkflowInstance, ExecutionPointerCollection>(workflow.Id, x => x.ExecutionPointers, workflow.ExecutionPointers);
87+
session.Advanced.Patch<WorkflowInstance, long?>(workflow.Id, x => x.NextExecution, workflow.NextExecution);
88+
session.Advanced.Patch<WorkflowInstance, WorkflowStatus>(workflow.Id, x => x.Status, workflow.Status);
89+
session.Advanced.Patch<WorkflowInstance, object>(workflow.Id, x => x.Data, workflow.Data);
90+
session.Advanced.Patch<WorkflowInstance, DateTime>(workflow.Id, x => x.CreateTime, workflow.CreateTime);
91+
session.Advanced.Patch<WorkflowInstance, DateTime?>(workflow.Id, x => x.CompleteTime, workflow.CompleteTime);
92+
93+
}
94+
7495
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
7596
{
7697
var now = asAt.ToUniversalTime().Ticks;

src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,43 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
6161
var response = await _client.PutItemAsync(request, cancellationToken);
6262
}
6363

64+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
65+
{
66+
var transactionWriteItemsRequest = new TransactWriteItemsRequest()
67+
{
68+
TransactItems = new List<TransactWriteItem>()
69+
{
70+
{
71+
new TransactWriteItem()
72+
{
73+
Put = new Put()
74+
{
75+
TableName = $"{_tablePrefix}-{WORKFLOW_TABLE}",
76+
Item = workflow.ToDynamoMap()
77+
}
78+
}
79+
}
80+
}
81+
};
82+
83+
foreach(var subscription in subscriptions)
84+
{
85+
subscription.Id = Guid.NewGuid().ToString();
86+
87+
transactionWriteItemsRequest.TransactItems.Add(new TransactWriteItem()
88+
{
89+
Put = new Put()
90+
{
91+
TableName = $"{_tablePrefix}-{SUBCRIPTION_TABLE}",
92+
Item = subscription.ToDynamoMap(),
93+
ConditionExpression = "attribute_not_exists(id)"
94+
}
95+
});
96+
}
97+
98+
await _client.TransactWriteItemsAsync(transactionWriteItemsRequest, cancellationToken);
99+
}
100+
64101
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
65102
{
66103
var result = new List<string>();

src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,16 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
227227
await _workflowContainer.Value.UpsertItemAsync(PersistedWorkflow.FromInstance(workflow), cancellationToken: cancellationToken);
228228
}
229229

230+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
231+
{
232+
await PersistWorkflow(workflow, cancellationToken);
233+
234+
foreach(var subscription in subscriptions)
235+
{
236+
await CreateEventSubscription(subscription, cancellationToken);
237+
}
238+
}
239+
230240
public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
231241
{
232242
throw new NotImplementedException();

src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, Cancellat
4747
return workflow.Id;
4848
}
4949

50+
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
51+
{
52+
await PersistWorkflow(workflow, cancellationToken);
53+
54+
foreach (var subscription in subscriptions)
55+
{
56+
await CreateEventSubscription(subscription, cancellationToken);
57+
}
58+
}
59+
5060
public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
5161
{
5262
var str = JsonConvert.SerializeObject(workflow, _serializerSettings);

0 commit comments

Comments
 (0)