Skip to content

Commit b4660fb

Browse files
authored
Merge branch 'master' into copilot/fix-1375984f-ce0b-4c23-bedb-b0cdd97306e2
2 parents 21e5629 + 926f4c4 commit b4660fb

File tree

7 files changed

+30
-13
lines changed

7 files changed

+30
-13
lines changed

src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private async Task Execute()
122122
catch (Exception ex)
123123
{
124124
Logger.LogError(ex, ex.Message);
125-
activity?.RecordException(ex);
125+
activity?.AddException(ex);
126126
}
127127
finally
128128
{
@@ -158,7 +158,7 @@ private async Task ExecuteItem(string itemId, EventWaitHandle waitHandle, Activi
158158
catch (Exception ex)
159159
{
160160
Logger.LogError(default(EventId), ex, $"Error executing item {itemId} - {ex.Message}");
161-
activity?.RecordException(ex);
161+
activity?.AddException(ex);
162162
}
163163
finally
164164
{

src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
8686
catch (Exception ex)
8787
{
8888
_logger.LogError(ex, ex.Message);
89-
activity?.RecordException(ex);
89+
activity?.AddException(ex);
9090
}
9191
}
9292
if (_greylist.Contains($"wf:{item}"))
@@ -108,7 +108,7 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
108108
catch (Exception ex)
109109
{
110110
_logger.LogError(ex, ex.Message);
111-
activity?.RecordException(ex);
111+
activity?.AddException(ex);
112112
}
113113
finally
114114
{
@@ -145,7 +145,7 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
145145
catch (Exception ex)
146146
{
147147
_logger.LogError(ex, ex.Message);
148-
activity?.RecordException(ex);
148+
activity?.AddException(ex);
149149
}
150150
}
151151
if (_greylist.Contains($"evt:{item}"))
@@ -167,7 +167,7 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
167167
catch (Exception ex)
168168
{
169169
_logger.LogError(ex, ex.Message);
170-
activity?.RecordException(ex);
170+
activity?.AddException(ex);
171171
}
172172
finally
173173
{
@@ -210,7 +210,7 @@ await _persistenceStore.ProcessCommands(new DateTimeOffset(_dateTimeProvider.Utc
210210
catch (Exception ex)
211211
{
212212
_logger.LogError(ex, ex.Message);
213-
activity?.RecordException(ex);
213+
activity?.AddException(ex);
214214
}
215215
finally
216216
{

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88

99
namespace WorkflowCore.Services.BackgroundTasks
1010
{
11+
/// <summary>
12+
/// Background task responsible for consuming workflow items from the queue and processing them.
13+
/// This consumer ensures that workflows are removed from the greylist after processing,
14+
/// regardless of their status, to prevent workflows from getting stuck in "Pending" state.
15+
/// </summary>
1116
internal class WorkflowConsumer : QueueConsumer, IBackgroundTask
1217
{
1318
private readonly IDistributedLockProvider _lockProvider;
@@ -57,12 +62,25 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5762
WorkflowActivity.Enrich(result);
5863
await _persistenceStore.PersistWorkflow(workflow, result?.Subscriptions, cancellationToken);
5964
await QueueProvider.QueueWork(itemId, QueueType.Index);
60-
_greylist.Remove($"wf:{itemId}");
6165
}
6266
}
67+
else
68+
{
69+
Logger.LogDebug("Workflow {ItemId} is not runnable, status: {Status}", itemId, workflow.Status);
70+
}
71+
}
72+
catch (Exception ex)
73+
{
74+
Logger.LogError(ex, "Error processing workflow {ItemId}", itemId);
75+
throw;
6376
}
6477
finally
6578
{
79+
// Always remove from greylist regardless of workflow status
80+
// This prevents workflows from being stuck in greylist when they can't be processed
81+
Logger.LogDebug("Removing workflow {ItemId} from greylist", itemId);
82+
_greylist.Remove($"wf:{itemId}");
83+
6684
await _lockProvider.ReleaseLock(itemId);
6785
if ((workflow != null) && (result != null))
6886
{

src/WorkflowCore/Services/WorkflowActivity.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ internal static void Enrich(WorkflowExecutorResult result)
8383

8484
if (result?.Errors?.Count > 0)
8585
{
86-
activity.SetStatus(Status.Error);
8786
activity.SetStatus(ActivityStatusCode.Error);
8887
}
8988
}

src/WorkflowCore/Services/WorkflowHost.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
105105
}
106106
catch (Exception ex)
107107
{
108-
activity.RecordException(ex);
108+
activity.AddException(ex);
109109
throw;
110110
}
111111
finally

src/WorkflowCore/WorkflowCore.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.7.0" />
2727
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
2828
<PackageReference Include="System.Linq.Queryable" Version="4.3.0" />
29-
<PackageReference Include="OpenTelemetry.Api" Version="1.1.0" />
30-
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
29+
<PackageReference Include="OpenTelemetry.Api" Version="1.12.0" />
30+
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="9.0.0" />
3131
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleTo">
3232
<_Parameter1>WorkflowCore.IntegrationTests</_Parameter1>
3333
</AssemblyAttribute>

test/WorkflowCore.Tests.MySQL/WorkflowCore.Tests.MySQL.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
4-
<TargetFrameworks>net6.0</TargetFrameworks>
4+
<TargetFrameworks>net6.0;net8.0</TargetFrameworks>
55
</PropertyGroup>
66

77
<ItemGroup>

0 commit comments

Comments
 (0)