@@ -25,6 +25,8 @@ public sealed class BackgroundQueue : IBackgroundQueue
2525 private readonly ILogger < BackgroundQueue > _logger ;
2626 private readonly IQueueInformationUtil _queueInformationUtil ;
2727
28+ private long _lastWarnTicks ;
29+
2830 private readonly bool _log ;
2931
3032 public BackgroundQueue ( IConfiguration config , ILogger < BackgroundQueue > logger , IQueueInformationUtil queueInformationUtil )
@@ -51,7 +53,10 @@ public BackgroundQueue(IConfiguration config, ILogger<BackgroundQueue> logger, I
5153
5254 var options = new BoundedChannelOptions ( _queueLimit )
5355 {
54- FullMode = BoundedChannelFullMode . Wait
56+ FullMode = BoundedChannelFullMode . Wait ,
57+ SingleReader = true ,
58+ SingleWriter = false ,
59+ AllowSynchronousContinuations = false
5560 } ;
5661
5762 _valueTaskChannel = Channel . CreateBounded < Func < CancellationToken , ValueTask > > ( options ) ;
@@ -62,34 +67,58 @@ public async ValueTask QueueValueTask(Func<CancellationToken, ValueTask> workIte
6267 {
6368 // TODO: need to redo this, we're going to get too many warnings
6469
65- int count = await _queueInformationUtil . IncrementValueTaskCounter ( cancellationToken ) . NoSync ( ) ;
70+ int count = await _queueInformationUtil . IncrementValueTaskCounter ( cancellationToken )
71+ . NoSync ( ) ;
6672
67- if ( count > _queueWarning )
73+ try
74+ {
75+ await _valueTaskChannel . Writer . WriteAsync ( workItem , cancellationToken )
76+ . NoSync ( ) ;
77+ }
78+ catch
6879 {
69- _logger . LogWarning ( "ValueTask queue length ({length}) is currently greater than the warning ({_queueWarning}), and will wait after hitting limit ({_queueLimit})" , count ,
70- _queueWarning , _queueLimit ) ;
80+ await _queueInformationUtil . DecrementValueTaskCounter ( CancellationToken . None )
81+ . NoSync ( ) ;
82+ throw ;
7183 }
7284
73- if ( _log )
74- _logger . LogDebug ( "Queuing ValueTask: {name}" , workItem . ToString ( ) ) ;
85+ if ( count > _queueWarning && ShouldWarn ( ) )
86+ {
87+ _logger . LogWarning (
88+ "ValueTask queue length ({length}) is currently greater than the warning ({_queueWarning}), and will wait after hitting limit ({_queueLimit})" ,
89+ count , _queueWarning , _queueLimit ) ;
90+ }
7591
76- await _valueTaskChannel . Writer . WriteAsync ( workItem , cancellationToken ) . NoSync ( ) ;
92+ if ( _log )
93+ _logger . LogDebug ( "Queuing ValueTask: {name}" , workItem . Method . GetSignature ( ) ) ;
7794 }
7895
7996 public async ValueTask QueueTask ( Func < CancellationToken , Task > workItem , CancellationToken cancellationToken = default )
8097 {
81- int count = await _queueInformationUtil . IncrementTaskCounter ( cancellationToken ) . NoSync ( ) ;
98+ int count = await _queueInformationUtil . IncrementTaskCounter ( cancellationToken )
99+ . NoSync ( ) ;
82100
83- if ( count > _queueWarning )
101+ try
84102 {
85- _logger . LogWarning ( "ValueTask queue length ({length}) is currently greater than the warning ({_queueWarning}), and will wait after hitting limit ({_queueLimit})" , count ,
86- _queueWarning , _queueLimit ) ;
103+ await _taskChannel . Writer . WriteAsync ( workItem , cancellationToken )
104+ . NoSync ( ) ;
105+ }
106+ catch
107+ {
108+ await _queueInformationUtil . DecrementTaskCounter ( CancellationToken . None )
109+ . NoSync ( ) ;
110+ throw ;
111+ }
112+
113+ if ( count > _queueWarning && ShouldWarn ( ) )
114+ {
115+ _logger . LogWarning (
116+ "Task queue length ({length}) is currently greater than the warning ({_queueWarning}), and will wait after hitting limit ({_queueLimit})" ,
117+ count , _queueWarning , _queueLimit ) ;
87118 }
88119
89120 if ( _log )
90121 _logger . LogDebug ( "Queuing Task: {name}" , workItem . Method . GetSignature ( ) ) ;
91-
92- await _taskChannel . Writer . WriteAsync ( workItem , cancellationToken ) . NoSync ( ) ;
93122 }
94123
95124 public ValueTask < Func < CancellationToken , ValueTask > > DequeueValueTask ( CancellationToken cancellationToken = default )
@@ -110,7 +139,8 @@ public async ValueTask WaitUntilEmpty(CancellationToken cancellationToken = defa
110139
111140 do
112141 {
113- isProcessing = await _queueInformationUtil . IsProcessing ( cancellationToken ) . ConfigureAwait ( false ) ;
142+ isProcessing = await _queueInformationUtil . IsProcessing ( cancellationToken )
143+ . ConfigureAwait ( false ) ;
114144
115145 if ( isProcessing )
116146 {
@@ -119,12 +149,25 @@ public async ValueTask WaitUntilEmpty(CancellationToken cancellationToken = defa
119149 _logger . LogDebug ( "Delaying for {ms}ms (Background queue emptying)..." , delayMs ) ;
120150 }
121151
122- await DelayUtil . Delay ( delayMs , null , cancellationToken ) . NoSync ( ) ;
152+ await DelayUtil . Delay ( delayMs , null , cancellationToken )
153+ . NoSync ( ) ;
123154 }
124155 else
125156 {
126157 _logger . LogDebug ( "Background queue is empty; continuing" ) ;
127158 }
128- } while ( isProcessing ) ;
159+ }
160+ while ( isProcessing ) ;
161+ }
162+
163+ private bool ShouldWarn ( )
164+ {
165+ long now = Environment . TickCount64 ;
166+ long last = Volatile . Read ( ref _lastWarnTicks ) ;
167+
168+ if ( now - last < 10_000 ) // 10s
169+ return false ;
170+
171+ return Interlocked . CompareExchange ( ref _lastWarnTicks , now , last ) == last ;
129172 }
130173}
0 commit comments