Skip to content

Commit e3a49d2

Browse files
1 parent 200f16f commit e3a49d2

File tree

15 files changed

+384
-327
lines changed

15 files changed

+384
-327
lines changed
Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,24 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using WorkflowCore.Models;
56

67
namespace WorkflowCore.Interface
78
{
89
public interface IEventRepository
910
{
10-
Task<string> CreateEvent(Event newEvent);
11+
Task<string> CreateEvent(Event newEvent, CancellationToken cancellationToken = default);
1112

12-
Task<Event> GetEvent(string id);
13+
Task<Event> GetEvent(string id, CancellationToken cancellationToken = default);
1314

14-
Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt);
15+
Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken = default);
1516

16-
Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf);
17+
Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
1718

18-
Task MarkEventProcessed(string id);
19+
Task MarkEventProcessed(string id, CancellationToken cancellationToken = default);
1920

20-
Task MarkEventUnprocessed(string id);
21+
Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default);
2122

2223
}
2324
}

src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using WorkflowCore.Models;
56

@@ -8,7 +9,7 @@ namespace WorkflowCore.Interface
89
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository
910
{
1011

11-
Task PersistErrors(IEnumerable<ExecutionError> errors);
12+
Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
1213

1314
void EnsureStoreExists();
1415

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using WorkflowCore.Models;
56

67
namespace WorkflowCore.Interface
78
{
89
public interface ISubscriptionRepository
910
{
10-
Task<string> CreateEventSubscription(EventSubscription subscription);
11+
Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default);
1112

12-
Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf);
13+
Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
1314

14-
Task TerminateSubscription(string eventSubscriptionId);
15+
Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);
1516

16-
Task<EventSubscription> GetSubscription(string eventSubscriptionId);
17+
Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);
1718

18-
Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf);
19+
Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
1920

20-
Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry);
21+
Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default);
2122

22-
Task ClearSubscriptionToken(string eventSubscriptionId, string token);
23+
Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default);
2324

2425
}
2526
}
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using WorkflowCore.Models;
56

67
namespace WorkflowCore.Interface
78
{
89
public interface IWorkflowRepository
910
{
10-
Task<string> CreateNewWorkflow(WorkflowInstance workflow);
11+
Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
1112

12-
Task PersistWorkflow(WorkflowInstance workflow);
13+
Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
1314

14-
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt);
15+
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default);
1516

1617
[Obsolete]
1718
Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take);
1819

19-
Task<WorkflowInstance> GetWorkflowInstance(string Id);
20+
Task<WorkflowInstance> GetWorkflowInstance(string Id, CancellationToken cancellationToken = default);
2021

21-
Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids);
22+
Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids, CancellationToken cancellationToken = default);
2223

2324
}
2425
}

src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
4242
try
4343
{
4444
cancellationToken.ThrowIfCancellationRequested();
45-
var evt = await _eventRepository.GetEvent(itemId);
45+
var evt = await _eventRepository.GetEvent(itemId, cancellationToken);
4646
if (evt.IsProcessed)
4747
{
4848
_greylist.Add($"evt:{evt.Id}");
@@ -53,18 +53,18 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5353
IEnumerable<EventSubscription> subs = null;
5454
if (evt.EventData is ActivityResult)
5555
{
56-
var activity = await _subscriptionRepository.GetSubscription((evt.EventData as ActivityResult).SubscriptionId);
56+
var activity = await _subscriptionRepository.GetSubscription((evt.EventData as ActivityResult).SubscriptionId, cancellationToken);
5757
if (activity == null)
5858
{
5959
Logger.LogWarning($"Activity already processed - {(evt.EventData as ActivityResult).SubscriptionId}");
60-
await _eventRepository.MarkEventProcessed(itemId);
60+
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
6161
return;
6262
}
6363
subs = new List<EventSubscription>() { activity };
6464
}
6565
else
6666
{
67-
subs = await _subscriptionRepository.GetSubscriptions(evt.EventName, evt.EventKey, evt.EventTime);
67+
subs = await _subscriptionRepository.GetSubscriptions(evt.EventName, evt.EventKey, evt.EventTime, cancellationToken);
6868
}
6969

7070
var toQueue = new HashSet<string>();
@@ -74,7 +74,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
7474
complete = complete && await SeedSubscription(evt, sub, toQueue, cancellationToken);
7575

7676
if (complete)
77-
await _eventRepository.MarkEventProcessed(itemId);
77+
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
7878

7979
foreach (var eventId in toQueue)
8080
await QueueProvider.QueueWork(eventId, QueueType.Event);
@@ -88,12 +88,12 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
8888

8989
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, HashSet<string> toQueue, CancellationToken cancellationToken)
9090
{
91-
foreach (var eventId in await _eventRepository.GetEvents(sub.EventName, sub.EventKey, sub.SubscribeAsOf))
91+
foreach (var eventId in await _eventRepository.GetEvents(sub.EventName, sub.EventKey, sub.SubscribeAsOf, cancellationToken))
9292
{
9393
if (eventId == evt.Id)
9494
continue;
9595

96-
var siblingEvent = await _eventRepository.GetEvent(eventId);
96+
var siblingEvent = await _eventRepository.GetEvent(eventId, cancellationToken);
9797
if ((!siblingEvent.IsProcessed) && (siblingEvent.EventTime < evt.EventTime))
9898
{
9999
await QueueProvider.QueueWork(eventId, QueueType.Event);
@@ -112,7 +112,7 @@ private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, Hash
112112

113113
try
114114
{
115-
var workflow = await _workflowRepository.GetWorkflowInstance(sub.WorkflowId);
115+
var workflow = await _workflowRepository.GetWorkflowInstance(sub.WorkflowId, cancellationToken);
116116
IEnumerable<ExecutionPointer> pointers = null;
117117

118118
if (!string.IsNullOrEmpty(sub.ExecutionPointerId))
@@ -127,8 +127,8 @@ private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, Hash
127127
p.Active = true;
128128
}
129129
workflow.NextExecution = 0;
130-
await _workflowRepository.PersistWorkflow(workflow);
131-
await _subscriptionRepository.TerminateSubscription(sub.Id);
130+
await _workflowRepository.PersistWorkflow(workflow, cancellationToken);
131+
await _subscriptionRepository.TerminateSubscription(sub.Id, cancellationToken);
132132
return true;
133133
}
134134
catch (Exception ex)

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
4343
try
4444
{
4545
cancellationToken.ThrowIfCancellationRequested();
46-
workflow = await _persistenceStore.GetWorkflowInstance(itemId);
46+
workflow = await _persistenceStore.GetWorkflowInstance(itemId, cancellationToken);
4747
if (workflow.Status == WorkflowStatus.Runnable)
4848
{
4949
try
@@ -52,7 +52,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5252
}
5353
finally
5454
{
55-
await _persistenceStore.PersistWorkflow(workflow);
55+
await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
5656
await QueueProvider.QueueWork(itemId, QueueType.Index);
5757
_greylist.Remove($"wf:{itemId}");
5858
}
@@ -65,10 +65,10 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6565
{
6666
foreach (var sub in result.Subscriptions)
6767
{
68-
await SubscribeEvent(sub, _persistenceStore);
68+
await SubscribeEvent(sub, _persistenceStore, cancellationToken);
6969
}
7070

71-
await _persistenceStore.PersistErrors(result.Errors);
71+
await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
7272

7373
var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
7474

@@ -81,18 +81,18 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
8181

8282
}
8383

84-
private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore)
84+
private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
8585
{
8686
//TODO: move to own class
8787
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
8888

89-
await persistenceStore.CreateEventSubscription(subscription);
89+
await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
9090
if (subscription.EventName != Event.EventTypeActivity)
9191
{
92-
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf);
92+
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
9393
foreach (var evt in events)
9494
{
95-
await persistenceStore.MarkEventUnprocessed(evt);
95+
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
9696
await QueueProvider.QueueWork(evt, QueueType.Event);
9797
}
9898
}

0 commit comments

Comments
 (0)