@@ -24,6 +24,8 @@ internal abstract class QueueConsumer : IBackgroundTask
24
24
protected Task DispatchTask ;
25
25
private CancellationTokenSource _cancellationTokenSource ;
26
26
private Dictionary < string , EventWaitHandle > _activeTasks ;
27
+ private List < Task > _runningTasks ;
28
+ private readonly object _runningTasksLock = new object ( ) ;
27
29
private ConcurrentHashSet < string > _secondPasses ;
28
30
29
31
protected QueueConsumer ( IQueueProvider queueProvider , ILoggerFactory loggerFactory , WorkflowOptions options )
@@ -33,6 +35,7 @@ protected QueueConsumer(IQueueProvider queueProvider, ILoggerFactory loggerFacto
33
35
Logger = loggerFactory . CreateLogger ( GetType ( ) ) ;
34
36
35
37
_activeTasks = new Dictionary < string , EventWaitHandle > ( ) ;
38
+ _runningTasks = new List < Task > ( ) ;
36
39
_secondPasses = new ConcurrentHashSet < string > ( ) ;
37
40
}
38
41
@@ -115,6 +118,10 @@ private async Task Execute()
115
118
_activeTasks . Add ( item , waitHandle ) ;
116
119
}
117
120
var task = ExecuteItem ( item , waitHandle , activity ) ;
121
+ lock ( _runningTasksLock )
122
+ {
123
+ _runningTasks . Add ( task ) ;
124
+ }
118
125
}
119
126
catch ( OperationCanceledException )
120
127
{
@@ -138,6 +145,25 @@ private async Task Execute()
138
145
139
146
foreach ( var handle in toComplete )
140
147
handle . WaitOne ( ) ;
148
+
149
+ // Also await all running tasks to ensure proper async completion
150
+ Task [ ] tasksToAwait ;
151
+ lock ( _runningTasksLock )
152
+ {
153
+ tasksToAwait = _runningTasks . ToArray ( ) ;
154
+ }
155
+
156
+ if ( tasksToAwait . Length > 0 )
157
+ {
158
+ try
159
+ {
160
+ await Task . WhenAll ( tasksToAwait ) ;
161
+ }
162
+ catch
163
+ {
164
+ // Individual task exceptions are already logged in ExecuteItem
165
+ }
166
+ }
141
167
}
142
168
143
169
private async Task ExecuteItem ( string itemId , EventWaitHandle waitHandle , Activity activity )
0 commit comments