|
8 | 8 |
|
9 | 9 | namespace WorkflowCore.Services.BackgroundTasks
|
10 | 10 | {
|
| 11 | + /// <summary> |
| 12 | + /// Background task responsible for consuming workflow items from the queue and processing them. |
| 13 | + /// This consumer ensures that workflows are removed from the greylist after processing, |
| 14 | + /// regardless of their status, to prevent workflows from getting stuck in "Pending" state. |
| 15 | + /// </summary> |
11 | 16 | internal class WorkflowConsumer : QueueConsumer, IBackgroundTask
|
12 | 17 | {
|
13 | 18 | private readonly IDistributedLockProvider _lockProvider;
|
@@ -57,12 +62,25 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
|
57 | 62 | WorkflowActivity.Enrich(result);
|
58 | 63 | await _persistenceStore.PersistWorkflow(workflow, result?.Subscriptions, cancellationToken);
|
59 | 64 | await QueueProvider.QueueWork(itemId, QueueType.Index);
|
60 |
| - _greylist.Remove($"wf:{itemId}"); |
61 | 65 | }
|
62 | 66 | }
|
| 67 | + else |
| 68 | + { |
| 69 | + Logger.LogDebug("Workflow {ItemId} is not runnable, status: {Status}", itemId, workflow.Status); |
| 70 | + } |
| 71 | + } |
| 72 | + catch (Exception ex) |
| 73 | + { |
| 74 | + Logger.LogError(ex, "Error processing workflow {ItemId}", itemId); |
| 75 | + throw; |
63 | 76 | }
|
64 | 77 | finally
|
65 | 78 | {
|
| 79 | + // Always remove from greylist regardless of workflow status |
| 80 | + // This prevents workflows from being stuck in greylist when they can't be processed |
| 81 | + Logger.LogDebug("Removing workflow {ItemId} from greylist", itemId); |
| 82 | + _greylist.Remove($"wf:{itemId}"); |
| 83 | + |
66 | 84 | await _lockProvider.ReleaseLock(itemId);
|
67 | 85 | if ((workflow != null) && (result != null))
|
68 | 86 | {
|
|
0 commit comments