@@ -42,6 +42,8 @@ internal class SqlReceiver : IDisposable
4242 private bool _disposed ;
4343 private readonly string _maxIdSql = "SELECT [PayloadId] FROM [{0}].[{1}_Id]" ;
4444 private readonly string _selectSql = "SELECT [PayloadId], [Payload], [InsertedOn] FROM [{0}].[{1}] WHERE [PayloadId] > @PayloadId" ;
45+ private readonly TimeSpan _activityMaxDuration = TimeSpan . FromMinutes ( 10 ) ;
46+
4547
4648 public SqlReceiver ( SqlServerOptions options , ILogger logger , string tableName , string tracePrefix )
4749 {
@@ -73,14 +75,26 @@ private async Task StartLoop()
7375 {
7476 if ( _cts . IsCancellationRequested ) return ;
7577
76- if ( ! _lastPayloadId . HasValue )
78+ bool useBroker = _options . Mode . HasFlag ( SqlServerMessageMode . ServiceBroker ) ;
79+
80+ using ( var activity = SqlServerOptions . ActivitySource . StartActivity ( "SignalR.SqlServer.Start" ) )
7781 {
78- _lastPayloadId = await GetLastPayloadId ( ) ;
82+ activity ? . SetTag ( "signalr.hub" , _tracePrefix ) ;
83+ activity ? . SetTag ( "signalr.sql.mode" , _options . Mode . ToString ( ) ) ;
84+
85+ if ( ! _lastPayloadId . HasValue )
86+ {
87+ _lastPayloadId = await GetLastPayloadId ( ) ;
88+ }
89+ if ( useBroker )
90+ {
91+ useBroker = StartSqlDependencyListener ( ) ;
92+ }
7993 }
8094
8195 if ( _cts . IsCancellationRequested ) return ;
8296
83- if ( _options . Mode . HasFlag ( SqlServerMessageMode . ServiceBroker ) && StartSqlDependencyListener ( ) )
97+ if ( useBroker )
8498 {
8599 await NotificationLoop ( _cts . Token ) ;
86100 }
@@ -103,6 +117,9 @@ private async Task NotificationLoop(CancellationToken cancellationToken)
103117 {
104118 if ( cancellationToken . IsCancellationRequested ) return ;
105119
120+ using var activity = SqlServerOptions . ActivitySource . StartActivity ( "SignalR.SqlServer.Listen" ) ;
121+ activity ? . SetTag ( "signalr.hub" , _tracePrefix ) ;
122+
106123 try
107124 {
108125 _logger . LogDebug ( "{HubStream}: Setting up SQL notification" , _tracePrefix ) ;
@@ -170,6 +187,9 @@ private async Task NotificationLoop(CancellationToken cancellationToken)
170187 else
171188 {
172189 // Unknown subscription error, let's stop using query notifications
190+ activity ? . SetStatus ( ActivityStatusCode . Error ) ;
191+ // Dispose so this activity doesn't become the parent of the next loop.
192+ activity ? . Dispose ( ) ;
173193 _notificationsDisabled = true ;
174194 await StartLoop ( ) ;
175195 return ;
@@ -180,6 +200,7 @@ private async Task NotificationLoop(CancellationToken cancellationToken)
180200 catch ( TaskCanceledException ) { return ; }
181201 catch ( Exception ex )
182202 {
203+ activity ? . SetStatus ( ActivityStatusCode . Error ) ;
183204 _logger . LogError ( ex , "{HubStream}: Error in SQL notification loop" , _tracePrefix ) ;
184205
185206 await Task . Delay ( 1000 , cancellationToken ) ;
@@ -196,56 +217,75 @@ private async Task NotificationLoop(CancellationToken cancellationToken)
196217 /// </summary>
197218 private async Task PollingLoop ( CancellationToken cancellationToken )
198219 {
199- var delays = _updateLoopRetryDelays ;
200- for ( var retrySetIndex = 0 ; retrySetIndex < delays . Length ; retrySetIndex ++ )
220+ var activity = SqlServerOptions . ActivitySource . StartActivity ( "SignalR.SqlServer.Poll" ) ;
221+ activity ? . SetTag ( "signalr.hub" , _tracePrefix ) ;
222+
223+ try
201224 {
202- Tuple < int , int > retry = delays [ retrySetIndex ] ;
203- var retryDelay = retry . Item1 ;
204- var numRetries = retry . Item2 ;
205225
206- for ( var retryIndex = 0 ; retryIndex < numRetries ; retryIndex ++ )
226+ var delays = _updateLoopRetryDelays ;
227+ for ( var retrySetIndex = 0 ; retrySetIndex < delays . Length ; retrySetIndex ++ )
207228 {
208- if ( cancellationToken . IsCancellationRequested ) return ;
229+ Tuple < int , int > retry = delays [ retrySetIndex ] ;
230+ var retryDelay = retry . Item1 ;
231+ var numRetries = retry . Item2 ;
209232
210- var recordCount = 0 ;
211- try
233+ for ( var retryIndex = 0 ; retryIndex < numRetries ; retryIndex ++ )
212234 {
213- if ( retryDelay > 0 )
214- {
215- _logger . LogTrace ( "{HubStream}: Waiting {1}ms before checking for messages again" , _tracePrefix , retryDelay ) ;
235+ if ( cancellationToken . IsCancellationRequested ) return ;
216236
217- await Task . Delay ( retryDelay , cancellationToken ) ;
237+ // Restart activity every 10 minutes to prevent long-running activities
238+ if ( activity != null && DateTime . UtcNow - activity . StartTimeUtc > _activityMaxDuration )
239+ {
240+ activity ? . Dispose ( ) ;
241+ activity = SqlServerOptions . ActivitySource . StartActivity ( "SignalR.SqlServer.Poll" ) ;
242+ activity ? . SetTag ( "signalr.hub" , _tracePrefix ) ;
218243 }
219244
220- recordCount = await ReadRows ( null ) ;
221- }
222- catch ( TaskCanceledException ) { return ; }
223- catch ( Exception ex )
224- {
225- _logger . LogError ( ex , "{HubStream}: Error in SQL polling loop" , _tracePrefix ) ;
226- }
245+ var recordCount = 0 ;
246+ try
247+ {
248+ if ( retryDelay > 0 )
249+ {
250+ _logger . LogTrace ( "{HubStream}: Waiting {Delay}ms before checking for messages again" , _tracePrefix , retryDelay ) ;
227251
228- if ( recordCount > 0 )
229- {
230- _logger . LogDebug ( "{HubStream}: {RecordCount} records received" , _tracePrefix , recordCount ) ;
252+ await Task . Delay ( retryDelay , cancellationToken ) ;
253+ }
231254
232- // We got records so start the retry loop again
233- // at the lowest delay.
234- retrySetIndex = - 1 ;
235- break ;
236- }
255+ recordCount = await ReadRows ( null ) ;
256+ }
257+ catch ( TaskCanceledException ) { return ; }
258+ catch ( Exception ex )
259+ {
260+ _logger . LogError ( ex , "{HubStream}: Error in SQL polling loop" , _tracePrefix ) ;
261+ }
237262
238- _logger . LogTrace ( "{HubStream}: No records received" , _tracePrefix ) ;
263+ if ( recordCount > 0 )
264+ {
265+ _logger . LogDebug ( "{HubStream}: {RecordCount} records received" , _tracePrefix , recordCount ) ;
239266
240- var isLastRetry = retrySetIndex == delays . Length - 1 && retryIndex == numRetries - 1 ;
267+ // We got records so start the retry loop again
268+ // at the lowest delay.
269+ retrySetIndex = - 1 ;
270+ break ;
271+ }
241272
242- if ( isLastRetry )
243- {
244- // Last retry loop so just stay looping on the last retry delay
245- retryIndex -- ;
273+ _logger . LogTrace ( "{HubStream}: No records received" , _tracePrefix ) ;
274+
275+ var isLastRetry = retrySetIndex == delays . Length - 1 && retryIndex == numRetries - 1 ;
276+
277+ if ( isLastRetry )
278+ {
279+ // Last retry loop so just stay looping on the last retry delay
280+ retryIndex -- ;
281+ }
246282 }
247283 }
248284 }
285+ finally
286+ {
287+ activity ? . Dispose ( ) ;
288+ }
249289
250290 _logger . LogDebug ( "{HubStream}: SQL polling loop fell out" , _tracePrefix ) ;
251291 await StartLoop ( ) ;
0 commit comments