Skip to content

Commit 716e809

Browse files
authored
Merge pull request #819 from VKAlwaysWin/VKAlwaysWin/PersistantStorageWithCancellationToken
CancellationToken passed to PersistantStorage providers
2 parents b84c9ce + cb3c5be commit 716e809

File tree

15 files changed

+388
-324
lines changed

15 files changed

+388
-324
lines changed
Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,24 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using WorkflowCore.Models;
56

67
namespace WorkflowCore.Interface
78
{
89
public interface IEventRepository
910
{
10-
Task<string> CreateEvent(Event newEvent);
11+
Task<string> CreateEvent(Event newEvent, CancellationToken cancellationToken = default);
1112

12-
Task<Event> GetEvent(string id);
13+
Task<Event> GetEvent(string id, CancellationToken cancellationToken = default);
1314

14-
Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt);
15+
Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken = default);
1516

16-
Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf);
17+
Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
1718

18-
Task MarkEventProcessed(string id);
19+
Task MarkEventProcessed(string id, CancellationToken cancellationToken = default);
1920

20-
Task MarkEventUnprocessed(string id);
21+
Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default);
2122

2223
}
2324
}

src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using WorkflowCore.Models;
56

@@ -8,7 +9,7 @@ namespace WorkflowCore.Interface
89
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository
910
{
1011

11-
Task PersistErrors(IEnumerable<ExecutionError> errors);
12+
Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
1213

1314
void EnsureStoreExists();
1415

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using WorkflowCore.Models;
56

67
namespace WorkflowCore.Interface
78
{
89
public interface ISubscriptionRepository
910
{
10-
Task<string> CreateEventSubscription(EventSubscription subscription);
11+
Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default);
1112

12-
Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf);
13+
Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
1314

14-
Task TerminateSubscription(string eventSubscriptionId);
15+
Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);
1516

16-
Task<EventSubscription> GetSubscription(string eventSubscriptionId);
17+
Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);
1718

18-
Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf);
19+
Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
1920

20-
Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry);
21+
Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default);
2122

22-
Task ClearSubscriptionToken(string eventSubscriptionId, string token);
23+
Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default);
2324

2425
}
2526
}
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using WorkflowCore.Models;
56

67
namespace WorkflowCore.Interface
78
{
89
public interface IWorkflowRepository
910
{
10-
Task<string> CreateNewWorkflow(WorkflowInstance workflow);
11+
Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
1112

12-
Task PersistWorkflow(WorkflowInstance workflow);
13+
Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
1314

14-
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt);
15+
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default);
1516

1617
[Obsolete]
1718
Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take);
1819

19-
Task<WorkflowInstance> GetWorkflowInstance(string Id);
20+
Task<WorkflowInstance> GetWorkflowInstance(string Id, CancellationToken cancellationToken = default);
2021

21-
Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids);
22+
Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids, CancellationToken cancellationToken = default);
2223

2324
}
2425
}

src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
4242
try
4343
{
4444
cancellationToken.ThrowIfCancellationRequested();
45-
var evt = await _eventRepository.GetEvent(itemId);
45+
var evt = await _eventRepository.GetEvent(itemId, cancellationToken);
4646
if (evt.IsProcessed)
4747
{
4848
_greylist.Add($"evt:{evt.Id}");
@@ -53,18 +53,18 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5353
IEnumerable<EventSubscription> subs = null;
5454
if (evt.EventData is ActivityResult)
5555
{
56-
var activity = await _subscriptionRepository.GetSubscription((evt.EventData as ActivityResult).SubscriptionId);
56+
var activity = await _subscriptionRepository.GetSubscription((evt.EventData as ActivityResult).SubscriptionId, cancellationToken);
5757
if (activity == null)
5858
{
5959
Logger.LogWarning($"Activity already processed - {(evt.EventData as ActivityResult).SubscriptionId}");
60-
await _eventRepository.MarkEventProcessed(itemId);
60+
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
6161
return;
6262
}
6363
subs = new List<EventSubscription> { activity };
6464
}
6565
else
6666
{
67-
subs = await _subscriptionRepository.GetSubscriptions(evt.EventName, evt.EventKey, evt.EventTime);
67+
subs = await _subscriptionRepository.GetSubscriptions(evt.EventName, evt.EventKey, evt.EventTime, cancellationToken);
6868
}
6969

7070
var toQueue = new HashSet<string>();
@@ -74,7 +74,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
7474
complete = complete && await SeedSubscription(evt, sub, toQueue, cancellationToken);
7575

7676
if (complete)
77-
await _eventRepository.MarkEventProcessed(itemId);
77+
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
7878

7979
foreach (var eventId in toQueue)
8080
await QueueProvider.QueueWork(eventId, QueueType.Event);
@@ -88,12 +88,12 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
8888

8989
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, HashSet<string> toQueue, CancellationToken cancellationToken)
9090
{
91-
foreach (var eventId in await _eventRepository.GetEvents(sub.EventName, sub.EventKey, sub.SubscribeAsOf))
91+
foreach (var eventId in await _eventRepository.GetEvents(sub.EventName, sub.EventKey, sub.SubscribeAsOf, cancellationToken))
9292
{
9393
if (eventId == evt.Id)
9494
continue;
9595

96-
var siblingEvent = await _eventRepository.GetEvent(eventId);
96+
var siblingEvent = await _eventRepository.GetEvent(eventId, cancellationToken);
9797
if ((!siblingEvent.IsProcessed) && (siblingEvent.EventTime < evt.EventTime))
9898
{
9999
await QueueProvider.QueueWork(eventId, QueueType.Event);
@@ -112,7 +112,7 @@ private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, Hash
112112

113113
try
114114
{
115-
var workflow = await _workflowRepository.GetWorkflowInstance(sub.WorkflowId);
115+
var workflow = await _workflowRepository.GetWorkflowInstance(sub.WorkflowId, cancellationToken);
116116
IEnumerable<ExecutionPointer> pointers = null;
117117

118118
if (!string.IsNullOrEmpty(sub.ExecutionPointerId))
@@ -127,8 +127,8 @@ private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, Hash
127127
p.Active = true;
128128
}
129129
workflow.NextExecution = 0;
130-
await _workflowRepository.PersistWorkflow(workflow);
131-
await _subscriptionRepository.TerminateSubscription(sub.Id);
130+
await _workflowRepository.PersistWorkflow(workflow, cancellationToken);
131+
await _subscriptionRepository.TerminateSubscription(sub.Id, cancellationToken);
132132
return true;
133133
}
134134
catch (Exception ex)

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
4242
try
4343
{
4444
cancellationToken.ThrowIfCancellationRequested();
45-
workflow = await _persistenceStore.GetWorkflowInstance(itemId);
45+
workflow = await _persistenceStore.GetWorkflowInstance(itemId, cancellationToken);
4646
if (workflow.Status == WorkflowStatus.Runnable)
4747
{
4848
try
@@ -51,7 +51,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5151
}
5252
finally
5353
{
54-
await _persistenceStore.PersistWorkflow(workflow);
54+
await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
5555
await QueueProvider.QueueWork(itemId, QueueType.Index);
5656
_greylist.Remove($"wf:{itemId}");
5757
}
@@ -64,10 +64,10 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6464
{
6565
foreach (var sub in result.Subscriptions)
6666
{
67-
await SubscribeEvent(sub, _persistenceStore);
67+
await SubscribeEvent(sub, _persistenceStore, cancellationToken);
6868
}
6969

70-
await _persistenceStore.PersistErrors(result.Errors);
70+
await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
7171

7272
var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
7373

@@ -80,18 +80,18 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
8080

8181
}
8282

83-
private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore)
83+
private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
8484
{
8585
//TODO: move to own class
8686
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
8787

88-
await persistenceStore.CreateEventSubscription(subscription);
88+
await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
8989
if (subscription.EventName != Event.EventTypeActivity)
9090
{
91-
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf);
91+
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
9292
foreach (var evt in events)
9393
{
94-
await persistenceStore.MarkEventUnprocessed(evt);
94+
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
9595
await QueueProvider.QueueWork(evt, QueueType.Event);
9696
}
9797
}

0 commit comments

Comments
 (0)