Skip to content

Commit 2746d05

Browse files
Use Lock on event subscription
1 parent 66b6e45 commit 2746d05

File tree

1 file changed

+21
-9
lines changed

1 file changed

+21
-9
lines changed

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,18 +92,30 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr
9292

9393
foreach (var evt in events)
9494
{
95-
var locked = await _lockProvider.AcquireLock($"evt:{evt}", cancellationToken);
96-
int attempt = 0;
97-
while (locked && attempt < 10)
95+
var eventKey = $"evt:{evt}";
96+
bool acquiredLock = false;
97+
try
9898
{
99-
locked = await _lockProvider.AcquireLock($"evt:{evt}", cancellationToken);
100-
await Task.Delay(Options.IdleTime);
99+
acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken);
100+
int attempt = 0;
101+
while (!acquiredLock && attempt < 10)
102+
{
103+
acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken);
104+
await Task.Delay(Options.IdleTime);
101105

102-
attempt++;
103-
}
106+
attempt++;
107+
}
104108

105-
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
106-
await QueueProvider.QueueWork(evt, QueueType.Event);
109+
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
110+
await QueueProvider.QueueWork(evt, QueueType.Event);
111+
}
112+
finally
113+
{
114+
if (acquiredLock)
115+
{
116+
await _lockProvider.ReleaseLock(eventKey);
117+
}
118+
}
107119
}
108120
}
109121
}

0 commit comments

Comments
 (0)