Skip to content

Commit 6645b54

Browse files
committed
feat: Add more logs on subscriber pull stream retries
Stemming from b/375005996
1 parent 1d02a02 commit 6645b54

File tree

1 file changed

+24
-7
lines changed

1 file changed

+24
-7
lines changed

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ internal SingleChannel(SubscriberClientImpl subscriber,
209209
_eventReceiptModAckForExactlyOnceDelivery = new AsyncAutoResetEvent(subscriber._taskHelper);
210210
_continuationQueue = new AsyncSingleRecvQueue<TaskNextAction>(subscriber._taskHelper);
211211
_logger = subscriber.Logger;
212-
_retryState = new RetryState(clock, _logger, s_defaultPullRetryTiming, TimeSpan.FromSeconds(0.5));
212+
_retryState = new RetryState(clock, _logger, s_defaultPullRetryTiming, TimeSpan.FromSeconds(0.5), _clientIndex);
213213
}
214214

215215
internal async Task StartAsync()
@@ -351,7 +351,7 @@ private void RestartPullOrThrow(Exception e)
351351
}
352352
else
353353
{
354-
_logger?.LogError(e, "Unrecoverable error in streaming pull; aborting subscriber.");
354+
_logger?.LogError(e, "Unrecoverable error in streaming pull for client {index}; aborting subscriber.", _clientIndex);
355355
// Unrecoverable error; throw it.
356356
throw e.FlattenIfPossible();
357357
}
@@ -1126,26 +1126,33 @@ private class RetryState
11261126
/// We fail after this many consecutive failures, regardless of what the failure was.
11271127
/// With the backoffs involved, this will be after a pretty significant amount of time anyway.
11281128
/// </summary>
1129-
private const int ConcurrentFailureLimit = 100;
1129+
private const int ConsecutiveFailureLimit = 100;
1130+
/// <summary>
1131+
/// We log the status code of the latest <see cref="ConsecutiveFailureLimit"/> - <see cref="SkipFailuresInLogs"/>
1132+
/// exceptions.
1133+
/// </summary>
1134+
private const int SkipFailuresInLogs = 90;
11301135

11311136
private readonly IClock _clock;
11321137
private readonly ILogger _logger;
11331138
private readonly RetrySettings _backoffTiming;
11341139
private readonly TimeSpan _disconnectBackoff;
1140+
private readonly int _clientIndex;
11351141

11361142
private DateTime _currentStartTimestamp;
11371143
private readonly List<RpcException> _exceptions;
11381144
private DateTime? _firstExceptionTimestamp;
11391145
internal TimeSpan? Backoff { get; private set; }
11401146

1141-
internal RetryState(IClock clock, ILogger logger, RetrySettings backoffTiming, TimeSpan disconnectBackoff)
1147+
internal RetryState(IClock clock, ILogger logger, RetrySettings backoffTiming, TimeSpan disconnectBackoff, int clientIndex)
11421148
{
11431149
_clock = clock;
11441150
_logger = logger;
11451151
_backoffTiming = backoffTiming;
11461152
_disconnectBackoff = disconnectBackoff;
11471153
_exceptions = new();
11481154
_firstExceptionTimestamp = null;
1155+
_clientIndex = clientIndex;
11491156
Backoff = null;
11501157
}
11511158

@@ -1162,14 +1169,23 @@ internal bool RecordFailureAndCheckForRetry(Exception exception)
11621169
// Don't retry any non-RpcExceptions
11631170
if (exception.As<RpcException>() is not RpcException rpcEx)
11641171
{
1172+
_logger?.LogDebug(exception, "Can't recover from non-RpcException on stream for client {index}.", _clientIndex);
11651173
return false;
11661174
}
11671175

11681176
_exceptions.Add(rpcEx);
11691177

11701178
// If we've reached our limit, fail regardless.
1171-
if (_exceptions.Count == ConcurrentFailureLimit)
1179+
if (_exceptions.Count == ConsecutiveFailureLimit)
11721180
{
1181+
var latestStatuses = _exceptions.
1182+
Skip(SkipFailuresInLogs).
1183+
GroupBy(ex => ex.StatusCode).
1184+
Select(group => $"{group.Count()} errors with status {group.Key}");
1185+
_logger?.LogDebug(exception,
1186+
"Can't recover after reaching the consecutive error limit on stream for client {index}. " +
1187+
"The last errors were {lastErrors}.",
1188+
_clientIndex, string.Join(", ", latestStatuses));
11731189
return false;
11741190
}
11751191

@@ -1179,14 +1195,15 @@ internal bool RecordFailureAndCheckForRetry(Exception exception)
11791195
// deem that to be successful and retry with no backoff.
11801196
if (code == StatusCode.Unavailable && (now - _currentStartTimestamp) >= s_streamingPullSuccessThreshold)
11811197
{
1182-
_logger?.LogDebug("Pull stream terminated with no messages, but after success assumption threshold time. Retrying with no backoff.");
1198+
_logger?.LogDebug("Pull stream for client {index} terminated with no messages, but after success assumption threshold time. Retrying with no backoff.", _clientIndex);
11831199
OnSuccess();
11841200
return true;
11851201
}
11861202

11871203
// If the exception isn't generally recoverable, don't retry.
11881204
if (!rpcEx.IsRecoverable())
11891205
{
1206+
_logger?.LogDebug(exception, "Can't recover from generally unrecoverable RpcException on stream for client {index}.", _clientIndex);
11901207
return false;
11911208
}
11921209

@@ -1199,7 +1216,7 @@ internal bool RecordFailureAndCheckForRetry(Exception exception)
11991216
bool retry = count <= MaxAuthExceptionsBeforeFailing;
12001217
if (!retry)
12011218
{
1202-
_logger?.LogWarning("Failing pull request due to auth-based failures.");
1219+
_logger?.LogWarning(exception, "Failing pull stream for client {index} due to auth-based failures.", _clientIndex);
12031220
}
12041221
return retry;
12051222
}

0 commit comments

Comments
 (0)