Skip to content

Commit 6877096

Browse files
committed
async activity results
1 parent a1b1dd3 commit 6877096

File tree

3 files changed

+55
-26
lines changed

3 files changed

+55
-26
lines changed

src/WorkflowCore/Models/ActivityResult.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public class ActivityResult
99
{
1010
public enum StatusType { Success, Fail }
1111
public StatusType Status { get; set; }
12+
public string SubscriptionId { get; set; }
1213
public object Data { get; set; }
1314
}
1415
}

src/WorkflowCore/Services/ActivityController.cs

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@ public class ActivityController : IActivityController
1818
private readonly IDistributedLockProvider _lockProvider;
1919
private readonly IDateTimeProvider _dateTimeProvider;
2020
private readonly IQueueProvider _queueProvider;
21+
private readonly IWorkflowController _workflowController;
2122

22-
public ActivityController(ISubscriptionRepository subscriptionRepository, IWorkflowRepository persistenceStore, IDateTimeProvider dateTimeProvider, IDistributedLockProvider lockProvider, IQueueProvider queueProvider)
23+
public ActivityController(ISubscriptionRepository subscriptionRepository, IWorkflowRepository persistenceStore, IWorkflowController workflowController, IDateTimeProvider dateTimeProvider, IDistributedLockProvider lockProvider, IQueueProvider queueProvider)
2324
{
2425
_persistenceStore = persistenceStore;
2526
_subscriptionRepository = subscriptionRepository;
2627
_dateTimeProvider = dateTimeProvider;
2728
_lockProvider = lockProvider;
2829
_queueProvider = queueProvider;
30+
_workflowController = workflowController;
2931
}
3032

3133
public async Task<PendingActivity> GetPendingActivity(string activityName, string workerId, TimeSpan? timeout = null)
@@ -93,7 +95,7 @@ public async Task SubmitActivityFailure(string token, object result)
9395
});
9496
}
9597

96-
private async Task SubmitActivityResult(string token, object result)
98+
private async Task SubmitActivityResult(string token, ActivityResult result)
9799
{
98100
var tokenObj = Token.Decode(token);
99101
var sub = await _subscriptionRepository.GetSubscription(tokenObj.SubscriptionId);
@@ -102,28 +104,32 @@ private async Task SubmitActivityResult(string token, object result)
102104

103105
if (sub.ExternalToken != token)
104106
throw new NotFoundException("Token mismatch");
105-
106-
if (!await _lockProvider.AcquireLock(sub.WorkflowId, CancellationToken.None))
107-
throw new WorkflowLockedException();
108-
109-
try
110-
{
111-
var workflow = await _persistenceStore.GetWorkflowInstance(sub.WorkflowId);
112-
var pointer = workflow.ExecutionPointers.Single(p => p.Id == sub.ExecutionPointerId);
113-
114-
pointer.EventData = result;
115-
pointer.EventPublished = true;
116-
pointer.Active = true;
117-
118-
workflow.NextExecution = 0;
119-
await _persistenceStore.PersistWorkflow(workflow);
120-
await _subscriptionRepository.TerminateSubscription(sub.Id);
121-
}
122-
finally
123-
{
124-
await _lockProvider.ReleaseLock(sub.WorkflowId);
125-
await _queueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
126-
}
107+
108+
result.SubscriptionId = sub.Id;
109+
110+
await _workflowController.PublishEvent(sub.EventName, sub.EventKey, result);
111+
112+
//if (!await _lockProvider.AcquireLock(sub.WorkflowId, CancellationToken.None))
113+
// throw new WorkflowLockedException();
114+
115+
//try
116+
//{
117+
// var workflow = await _persistenceStore.GetWorkflowInstance(sub.WorkflowId);
118+
// var pointer = workflow.ExecutionPointers.Single(p => p.Id == sub.ExecutionPointerId);
119+
120+
// pointer.EventData = result;
121+
// pointer.EventPublished = true;
122+
// pointer.Active = true;
123+
124+
// workflow.NextExecution = 0;
125+
// await _persistenceStore.PersistWorkflow(workflow);
126+
// await _subscriptionRepository.TerminateSubscription(sub.Id);
127+
//}
128+
//finally
129+
//{
130+
// await _lockProvider.ReleaseLock(sub.WorkflowId);
131+
// await _queueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
132+
//}
127133
}
128134

129135
class Token

src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,23 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
4343
var evt = await _eventRepository.GetEvent(itemId);
4444
if (evt.EventTime <= _datetimeProvider.UtcNow)
4545
{
46-
var subs = await _subscriptionRepository.GetSubscriptions(evt.EventName, evt.EventKey, evt.EventTime);
46+
IEnumerable<EventSubscription> subs = null;
47+
if (evt.EventData is ActivityResult)
48+
{
49+
var activity = await _subscriptionRepository.GetSubscription((evt.EventData as ActivityResult).SubscriptionId);
50+
if (activity == null)
51+
{
52+
Logger.LogWarning($"Activity already processed - {(evt.EventData as ActivityResult).SubscriptionId}");
53+
await _eventRepository.MarkEventProcessed(itemId);
54+
return;
55+
}
56+
subs = new List<EventSubscription>() { activity };
57+
}
58+
else
59+
{
60+
subs = await _subscriptionRepository.GetSubscriptions(evt.EventName, evt.EventKey, evt.EventTime);
61+
}
62+
4763
var toQueue = new List<string>();
4864
var complete = true;
4965

@@ -90,7 +106,13 @@ private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, List
90106
try
91107
{
92108
var workflow = await _workflowRepository.GetWorkflowInstance(sub.WorkflowId);
93-
var pointers = workflow.ExecutionPointers.Where(p => p.EventName == sub.EventName && p.EventKey == sub.EventKey && !p.EventPublished && p.EndTime == null);
109+
IEnumerable<ExecutionPointer> pointers = null;
110+
111+
if (!string.IsNullOrEmpty(sub.ExecutionPointerId))
112+
pointers = workflow.ExecutionPointers.Where(p => p.Id == sub.ExecutionPointerId && !p.EventPublished && p.EndTime == null);
113+
else
114+
pointers = workflow.ExecutionPointers.Where(p => p.EventName == sub.EventName && p.EventKey == sub.EventKey && !p.EventPublished && p.EndTime == null);
115+
94116
foreach (var p in pointers)
95117
{
96118
p.EventData = evt.EventData;

0 commit comments

Comments
 (0)